diff --git a/v3/as_drivers/hd44780/alcd.py b/v3/as_drivers/hd44780/alcd.py index 2b1c76c..908a322 100644 --- a/v3/as_drivers/hd44780/alcd.py +++ b/v3/as_drivers/hd44780/alcd.py @@ -100,9 +100,9 @@ async def runlcd(self): # Periodically check for changed text and update LCD if for row in range(self.rows): if self.dirty[row]: msg = self[row] + self.dirty[row] = False self.lcd_byte(LCD.LCD_LINES[row], LCD.CMD) for thisbyte in msg: self.lcd_byte(ord(thisbyte), LCD.CHR) await asyncio.sleep_ms(0) # Reshedule ASAP - self.dirty[row] = False await asyncio.sleep_ms(20) # Give other coros a look-in diff --git a/v3/as_drivers/htu21d/htu21d_mc.py b/v3/as_drivers/htu21d/htu21d_mc.py index 5d19ba0..385ec48 100644 --- a/v3/as_drivers/htu21d/htu21d_mc.py +++ b/v3/as_drivers/htu21d/htu21d_mc.py @@ -11,7 +11,6 @@ import asyncio from micropython import const -_ADDRESS = const(0x40) # HTU21D Address _PAUSE_MS = const(60) # HTU21D acquisition delay _READ_USER_REG = const(0xE7) @@ -25,10 +24,11 @@ class HTU21D: START_TEMP_MEASURE = b"\xF3" # Commands START_HUMD_MEASURE = b"\xF5" - def __init__(self, i2c, read_delay=10): + def __init__(self, i2c, read_delay=10, address=0x40): self.i2c = i2c - if _ADDRESS not in self.i2c.scan(): + if address not in self.i2c.scan(): raise OSError("No HTU21D device found.") + self.address = address self.temperature = None self.humidity = None asyncio.create_task(self._run(read_delay)) @@ -46,9 +46,9 @@ def __iter__(self): # Await 1st reading yield from asyncio.sleep(0) async def _get_data(self, cmd, divisor=0x131 << 15, bit=1 << 23): - self.i2c.writeto(_ADDRESS, cmd) # Start reading + self.i2c.writeto(self.address, cmd) # Start reading await asyncio.sleep_ms(_PAUSE_MS) # Wait for device - value = self.i2c.readfrom(_ADDRESS, 3) # Read result, check CRC8 + value = self.i2c.readfrom(self.address, 3) # Read result, check CRC8 data, crc = ustruct.unpack(">HB", value) remainder = (data << 8) | crc while bit > 128: @@ -61,4 +61,4 @@ async def _get_data(self, cmd, divisor=0x131 << 15, bit=1 << 23): return data & 0xFFFC # Clear the status bits def user_register(self): # Read the user register byte (should be 2) - return self.i2c.readfrom_mem(_ADDRESS, _READ_USER_REG, 1)[0] + return self.i2c.readfrom_mem(self.address, _READ_USER_REG, 1)[0] diff --git a/v3/docs/DRIVERS.md b/v3/docs/DRIVERS.md index 75a3ce6..dcaf218 100644 --- a/v3/docs/DRIVERS.md +++ b/v3/docs/DRIVERS.md @@ -26,11 +26,16 @@ MicroPython's `asyncio` when used in a microcontroller context. 6.1 [Encoder class](./DRIVERS.md#61-encoder-class) 7. [Ringbuf Queue](./DRIVERS.md#7-ringbuf-queue) A MicroPython optimised queue primitive. 8. [Delay_ms class](./DRIVERS.md#8-delay_ms-class) A flexible retriggerable delay with callback or Event interface. - 9. [Additional functions](./DRIVERS.md#9-additional-functions) - 9.1 [launch](./DRIVERS.md#91-launch) Run a coro or callback interchangeably. - 9.2 [set_global_exception](./DRIVERS.md#92-set_global_exception) Simplify debugging with a global exception handler. + 9. [Message Broker](./DRIVERS.md#9-message-broker) A flexible means of messaging between tasks. + 9.1 [Further examples](./DRIVERS.md#91-further-examples) + 9.2 [User agents](./DRIVERS.md#92-user-agents) User defined Agent classes. + 9.3 [Wildcard subscriptions](./DRIVERS.md#93-wildcard-subscriptions) + 9.4 [Notes](./DRIVERS.md#9-notes) + 10. [Additional functions](./DRIVERS.md#10-additional-functions) + 10.1 [launch](./DRIVERS.md#101-launch) Run a coro or callback interchangeably. + 10.2 [set_global_exception](./DRIVERS.md#102-set_global_exception) Simplify debugging with a global exception handler. -###### [Tutorial](./TUTORIAL.md#contents) +###### [asyncio Tutorial](./TUTORIAL.md#contents) # 1. Introduction @@ -1023,6 +1028,9 @@ def add_item(q, data): # 8. Delay_ms class +```python +from primitives import Delay_ms # delay_ms.py +``` This implements the software equivalent of a retriggerable monostable or a watchdog timer. It has an internal boolean `running` state. When instantiated the `Delay_ms` instance does nothing, with `running` `False` until triggered. @@ -1062,7 +1070,9 @@ Synchronous methods: the `Task` instance. This allows the `Task` to be cancelled or awaited. 6. `callback` args `func=None`, `args=()`. Allows the callable and its args to be assigned, reassigned or disabled at run time. - 7. `deinit` No args. Cancels the running task. See [Object scope](./TUTORIAL.md#44-object-scope). + 7. `deinit` No args. Cancels the running task. To avoid a memory leak this + should be called before allowing a `Delay_ms` object to go out of scope. See + [Object scope](./TUTORIAL.md#44-object-scope). 8. `clear` No args. Clears the `Event` described in `wait` below. 9. `set` No args. Sets the `Event` described in `wait` below. @@ -1126,9 +1136,271 @@ finally: ``` ###### [Contents](./DRIVERS.md#0-contents) -# 9. Additional functions +# 9. Message Broker + +```python +from primitives import Broker, broker # broker.py +``` +The `Broker` class provides a flexible means of messaging between running tasks. +It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task +publishes to a topic. Objects subscribed to that topic will receive the message. +This enables one to one, one to many, many to one or many to many messaging. + +A task subscribes to a topic via an `agent`: this term describes a set of Python +types which may be used in this role. An `agent` is stored by the broker. When +the broker publishes a message, every `agent` subscribed to the message topic +will be triggered. In the simplest case the `agent` is a `Queue` instance: the +broker puts the topic and message onto the subscriber's queue for retrieval. + +More advanced agents can perform actions in response to a message, such as +calling a function, launching a `task` or lighting an LED. + +Agents may be subscribed and unsubscribed dynamically. The publishing task has +no "knowledge" of the number or type of agents subscribed to a topic. The module +is not threadsafe: `Broker` methods should not be called from a hard ISR or from +another thread. + +A `Broker` instance `broker` is provided. Where multiple modules issue +```python +from primitives import broker +``` +all will see the same instance, facilitating message passing between modules. + +#### Broker methods + +All are synchronous. +* Constructor This has no args. +* `subscribe(topic, agent, *args)` Passed `agent` will be triggered by messages +with a matching `topic`. Any additional args will be passed to the `agent` when +it is triggered. +* `unsubscribe(topic, agent, *args)` The `agent` will stop being triggered. If +args were passed on subscription, the same args must be passed. +* `publish(topic, message=None)` All `agent` instances subscribed to `topic` +will be triggered, receiving `topic` and `message` plus any further args that +were passed to `subscribe`. + +The `topic` arg is typically a string but may be any hashable object. A +`message` is an arbitrary Python object. Where string topics are used, wildcard +subscriptions are possible. + +#### Broker class variable + +* `Verbose=True` Enables printing of debug messages. + +#### Agent types + +An `agent` may be an instance of any of the following types. Args refers to any +arguments passed to the `agent` on subscription. + +* `RingbufQueue` Received messages are queued as a 2-tuple `(topic, message)` +assuming no subscription args - otheriwse `(topic, message, (args...))`. +* `Queue` Received messages are queued as described above. +* `function` Called when a message is received. Args: `topic`, `message` plus any +further subscription args. +* `bound method` Called when a message is received. Args: `topic`, `message` +plus any further args. +* `coroutine` Converted to a `task` when a message is received. Args: `topic`, +`message` plus any further subscription args. +* `bound coroutine` Converted to a `task` when a message is received. Args: `topic`, +`message` plus any further subscription args. +* `Event` Set when a message is received. +* `user_agent` Instance of a user class. See user agents below. + +Note that synchronous `agent` instances must run to completion quickly otherwise +the `publish` method will be slowed. See [Notes](./DRIVERS.md#93-notes) for +further details on queue behaviour. + +#### example +```py +import asyncio +from primitives import broker, RingbufQueue + +async def sender(t): + for x in range(t): + await asyncio.sleep(1) + broker.publish("foo_topic", f"test {x}") + +async def receiver(): + queue = RingbufQueue(20) + broker.subscribe("foo_topic", queue) + async for topic, message in queue: + print(topic, message) + +async def main(): + rx = asyncio.create_task(receiver()) + await sender(10) + await asyncio.sleep(2) + rx.cancel() + +asyncio.run(main()) +``` +## 9.1 Further examples + +An interesting application is to extend MQTT into the Python code +(see [mqtt_as](https://github.com/peterhinch/micropython-mqtt/tree/master)). +This is as simple as: +```py +async def messages(client): + async for topic, msg, retained in client.queue: + broker.publish(topic.decode(), msg.decode()) +``` +Assuming the MQTT client is subscribed to multiple topics, message strings are +directed to agents, each dedicated to handling a topic. An `agent` might operate +an interface or queue the message for a running task. + +The following illustrates a use case for passing args to an `agent` (pin nos. +are for Pyoard 1.1). +```py +import asyncio +from primitives import broker +from machine import Pin +red = Pin("A13", Pin.OUT, value=0) # Pin nos. for Pyboard V1.1 +green = Pin("A14", Pin.OUT, value=0) + +async def flash(): + broker.publish("led", 1) + await asyncio.sleep(1) + broker.publish("led", 0) + +def recv(topic, message, led): + led(message) # Light or extinguish an LED + +async def main(): + broker.subscribe("led", recv, red) + broker.subscribe("led", recv, green) + for _ in range(10): + await flash() + await asyncio.sleep(1) + broker.unsubscribe("led", recv, green) # Arg(s) must be passed + for _ in range(3): + await flash() + await asyncio.sleep(1) + +asyncio.run(main()) +``` +A task can wait on multiple topics using a `RingbufQueue`: +```python +import asyncio +from primitives import broker, RingbufQueue + + +async def receiver(): + q = RingbufQueue(10) + broker.subscribe("foo_topic", q) + broker.subscribe("bar_topic", q) + async for topic, message in q: + print(f"Received Topic: {topic} Message: {message}") + + +async def sender(t): + for x in range(t): + await asyncio.sleep(1) + broker.publish("foo_topic", f"test {x}") + broker.publish("bar_topic", f"test {x}") + broker.publish("ignore me", f"test {x}") + + +async def main(): + rx = asyncio.create_task(receiver()) + await sender(10) + await asyncio.sleep(2) + rx.cancel() + + +asyncio.run(main()) +``` +here the `receiver` task waits on two topics. The asynchronous iterator returns +messages as they are published. + +## 9.2 User agents + +An `agent` can be an instance of a user class. The class must be a subclass of +`Agent`, and it must support a synchronous `.put` method. Arguments are `topic` +and `message`, followed by any further args passed on subscription. The method +should run to completion quickly. + +```py +import asyncio +from primitives import broker, Agent + +class MyAgent(Agent): + def put(sef, topic, message, arg): + print(f"User agent. Topic: {topic} Message: {message} Arg: {arg}") + +async def sender(t): + for x in range(t): + await asyncio.sleep(1) + broker.publish("foo_topic", f"test {x}") + +async def main(): + broker.subscribe("foo_topic", MyAgent(), 42) + await sender(10) + +asyncio.run(main()) +``` +## 9.3 Wildcard subscriptions + +In the case of publications whose topics are strings, a single call to +`.subscribe` can subscribe an `agent` to multiple topics. This is by wildcard +matching. By default exact matching is used, however this can be changed to use +regular expressions as in this code fragment: +```py +from primitives import Broker, RegExp +broker.subscribe(RegExp(".*_topic"), some_agent) +``` +In this case `some_agent` would be triggered by publications to `foo_topic` or +`bar_topic` because the string `".*_topic"` matches these by the rules of +regular expressions. + +## 9.4 Notes + +#### The publish/subscribe model + +As in the real world, publication carries no guarantee of readership. If at the +time of publication there are no tasks with subscribed `agent` instances, the +message will silently be lost. + +#### agent arguments + +Arguments must be hashable objects. Mutable objects such as lists and +dictionaries are not permitted. If an object can be added to a `set` it is +valid. In general, interfaces such as `Pin` instances are OK. + +#### agent uniqueness + +An `agent` can be subscribed to multiple `topic`s. An `agent` may be subscribed +to a `topic` multiple times only if each instance has different arguments. + +#### queues + +If a message causes a queue to fill, a message will silently be lost. It is the +responsibility of the subscriber to avoid this. In the case of a `Queue` +instance the lost message is the one causing the overflow. In the case of +`RingbufQueue` the oldest message in the queue is discarded. In some +applications this behaviour is preferable. In general `RingbufQueue` is +preferred as it is optimised for microcontroller use and supports retrieval by +an asynchronous iterator. + +If either queue type is subscribed with args, a publication will create a queue +entry that is a 3-tuple `(topic, message, (args...))`. There is no obvious use +case for this. + +#### exceptions + +An `agent` instance is owned by a subscribing tasks but is executed by a +publishing task. If a function used as an `agent` throws an exception, the +traceback will point to a `Broker.publish` call. + +The `Broker` class throws a `ValueError` if `.subscribe` is called with an +invalid `agent` type. There are a number of non-fatal conditions which can occur +such as a queue overflow or an attempt to unsubscribe an `agent` twice. The +`Broker` will report these if `Broker.Verbose=True`. + +###### [Contents](./DRIVERS.md#0-contents) + +# 10. Additional functions -## 9.1 Launch +## 10.1 Launch Import as follows: ```python @@ -1140,7 +1412,7 @@ runs it and returns the callback's return value. If a coro is passed, it is converted to a `task` and run asynchronously. The return value is the `task` instance. A usage example is in `primitives/switch.py`. -## 9.2 set_global_exception +## 10.2 set_global_exception Import as follows: ```python diff --git a/v3/docs/HTU21D.md b/v3/docs/HTU21D.md index daa29f3..07feb14 100644 --- a/v3/docs/HTU21D.md +++ b/v3/docs/HTU21D.md @@ -52,9 +52,10 @@ import as_drivers.htu21d.htu_test This provides a single class `HTU21D`. Constructor. -This takes two args, `i2c` (mandatory) and an optional `read_delay=10`. The -former must be an initialised I2C bus instance. The `read_delay` (secs) -determines how frequently the data values are updated. +This takes the following args +* `i2c` (mandatory) An initialised I2C bus instance. +* `read_delay=10`. The frequency (secs) at which data values are updated. +* `address=0x40` I2C address of the chip. Public bound values 1. `temperature` Latest value in Celcius. diff --git a/v3/docs/INTERRUPTS.md b/v3/docs/INTERRUPTS.md index 54dcd70..a2c684c 100644 --- a/v3/docs/INTERRUPTS.md +++ b/v3/docs/INTERRUPTS.md @@ -8,7 +8,7 @@ interrupts in `asyncio` applications. Writing an interrupt service routine (ISR) requires care: see the [official docs](https://docs.micropython.org/en/latest/reference/isr_rules.html). There are restrictions (detailed below) on the way an ISR can interface with -`asyncio`. Finally, on many platformasyncioupts are a limited resource. In +`asyncio`. Finally, on many platforms interrupts are a limited resource. In short interrupts are extremely useful but, if a practical alternative exists, it should be seriously considered. @@ -184,17 +184,69 @@ async def process_data(): await tsf.wait() # Process the data here before waiting for the next interrupt ``` +## 3.4 micropython.RingIO + +This is a byte-oriented circular buffer [documented here] +(https://docs.micropython.org/en/latest/library/micropython.html#micropython.RingIO), +which provides an efficient way to return data from an ISR to an `asyncio` task. +It is implemented in C so performance is high, and supports stream I/O. The +following is a usage example: +```py +import asyncio +from machine import Timer +import micropython +micropython.alloc_emergency_exception_buf(100) + +imu = SomeDevice() # Fictional hardware IMU device + +FRAMESIZE = 8 # Count, x, y, z accel +BUFSIZE = 200 # No. of records. Size allows for up to 200ms of asyncio latency. +rio = micropython.RingIO(FRAMESIZE * BUFSIZE + 1) # RingIO requires an extra byte +count = 0x4000 # Bit14 is "Start of frame" marker. Low bits are a frame counter. + +def cb(_): # Timer callback. Runs at 1KHz. + global count # Frame count + imu.get_accel_irq() # Trigger the device + rio.write(chr(count >> 8)) + rio.write(chr(count & 0xff)) + rio.write(imu.accel.ix) # Device returns bytes objects (length 2) + rio.write(imu.accel.iy) + rio.write(imu.accel.iz) + count += 1 + +async def main(nrecs): + t = Timer(freq=1_000, callback=cb) + sreader = asyncio.StreamReader(rio) + rpb = 100 # Records per block + blocksize = FRAMESIZE * rpb + with open('/sd/imudata', 'wb') as f: + swriter = asyncio.StreamWriter(f, {}) + while nrecs: + data = await sreader.readexactly(blocksize) + swriter.write(data) + await swriter.drain() + nrecs -= rpb + t.deinit() + +asyncio.run(main(1_000)) +``` +In this example data is acquired at a timer-controlled rate of 1KHz, with eight +bytes being written to the `RingIO` every tick. The `main()` task reads the data +stream and writes it out to a file. Similar code was tested on a Pyboard 1.1. -## 3.4 Thread Safe Classes +## 3.5 Other Thread Safe Classes Other classes capable of being used to interface an ISR with `asyncio` are discussed [here](https://github.com/peterhinch/micropython-async/blob/master/v3/docs/THREADING.md), -notably the `ThreadSafeQueue`. +notably the `ThreadSafeQueue`. This ring buffer allows entries to be objects +other than bytes. It supports the asynchronous iterator protocol (rather than +stream I/O) and is written in Python. # 4. Conclusion -The key take-away is that `ThreadSafeFlag` is the only official `asyncio` -construct which can safely be used in an ISR context. Unofficial "thread -safe" classes may also be used. +The `ThreadSafeFlag` and `RingIO` classes are the official `asyncio` constructs +which can safely be used in an ISR context. Unofficial "thread safe" classes may +also be used. Beware of classes such as `Queue` and `RingbufQueue` which are not +thread safe. ###### [Main tutorial](./TUTORIAL.md#contents) diff --git a/v3/docs/PRIMITIVES.md b/v3/docs/PRIMITIVES.md new file mode 100644 index 0000000..30e6e97 --- /dev/null +++ b/v3/docs/PRIMITIVES.md @@ -0,0 +1 @@ +### For historical reasons documentation for primitives may be found [here](./DRIVERS.md). diff --git a/v3/docs/THREADING.md b/v3/docs/THREADING.md index f13c343..04c6b34 100644 --- a/v3/docs/THREADING.md +++ b/v3/docs/THREADING.md @@ -340,7 +340,7 @@ instances are required. Attributes of `ThreadSafeQueue`: 1. It is of fixed capacity defined on instantiation. 2. It uses a pre-allocated buffer of user selectable type (`Queue` uses a - dynaically allocated `list`). + dynamically allocated `list`). 3. It is an asynchronous iterator allowing retrieval with `async for`. 4. It provides synchronous "put" and "get" methods. If the queue becomes full (put) or empty (get), behaviour is user definable. The method either blocks or diff --git a/v3/docs/TUTORIAL.md b/v3/docs/TUTORIAL.md index cda901f..b818d58 100644 --- a/v3/docs/TUTORIAL.md +++ b/v3/docs/TUTORIAL.md @@ -45,7 +45,8 @@ import uasyncio as asyncio 3.7 [Barrier](./TUTORIAL.md#37-barrier) 3.8 [Delay_ms](./TUTORIAL.md#38-delay_ms-class) Software retriggerable delay. 3.9 [Message](./TUTORIAL.md#39-message) - 3.10 [Synchronising to hardware](./TUTORIAL.md#310-synchronising-to-hardware) + 3.10 [Message broker](./TUTORIAL.md#310-message-broker) A publish-subscribe model of messaging and control. + 3.11 [Synchronising to hardware](./TUTORIAL.md#311-synchronising-to-hardware) Debouncing switches, pushbuttons, ESP32 touchpads and encoder knobs. Taming ADC's. 4. [Designing classes for asyncio](./TUTORIAL.md#4-designing-classes-for-asyncio) 4.1 [Awaitable classes](./TUTORIAL.md#41-awaitable-classes) @@ -589,6 +590,8 @@ following classes which are non-standard, are also in that directory: in a similar (but not identical) way to `gather`. * `Delay_ms` A useful software-retriggerable monostable, akin to a watchdog. Calls a user callback if not cancelled or regularly retriggered. + * `RingbufQueue` a MicroPython-optimised queue. + * `Broker` a means of messaging and control based on a publish/subscribe model. A further set of primitives for synchronising hardware are detailed in [section 3.9](./TUTORIAL.md#39-synchronising-to-hardware). @@ -794,7 +797,9 @@ wa = WaitAll((evt1, evt2)).wait() await wa # Both were triggered ``` -Awaiting `WaitAll` or `WaitAny` may be cancelled or subject to a timeout. +Awaiting `WaitAll` or `WaitAny` may be cancelled or subject to a timeout. These +primitives are documented in +[event baed programming](https://github.com/peterhinch/micropython-async/blob/master/v3/docs/EVENTS.md). ###### [Contents](./TUTORIAL.md#contents) @@ -1019,8 +1024,9 @@ running in another thread or on another core. It operates in a similar way to * It is self-clearing. * Only one task may wait on the flag. -Synchronous method: +Synchronous methods: * `set` Triggers the flag. Like issuing `set` then `clear` to an `Event`. + * `clear` Unconditionally clear down the flag. Asynchronous method: * `wait` Wait for the flag to be set. If the flag is already set then it @@ -1097,7 +1103,9 @@ class which allows multiple tasks to wait on it. ### 3.6.1 Querying a ThreadSafeFlag -The state of a ThreadSafeFlag may be tested as follows: +The `ThreadSafeFlag` class has no equivalent to `Event.is_set`. A synchronous +function which returns the state of a `ThreadSafeFlag` instance may be created +as follows: ```python import asyncio from select import poll, POLLIN @@ -1108,14 +1116,14 @@ async def foo(tsf): # Periodically set the ThreadSafeFlag await asyncio.sleep(1) tsf.set() - def ready(tsf, poller): - r = (tsf, POLLIN) - poller.register(*r) +def ready(tsf, poller): # Return a function which returns tsf status + r = (tsf, POLLIN) + poller.register(*r) - def is_rdy(): - return r in poller.ipoll(0) + def is_rdy(): + return r in poller.ipoll(0) # Immediate return - return is_rdy + return is_rdy async def test(): tsf = asyncio.ThreadSafeFlag() @@ -1135,9 +1143,12 @@ async def test(): asyncio.run(test()) ``` The `ready` closure returns a nonblocking function which tests the status of a -given flag. In the above example `.wait()` is not called until the flag has been +passed flag. In this example `.wait()` is not called until the flag has been set, consequently `.wait()` returns rapidly. +The `select.poll` mechanism works because `ThreadSafeFlag` is subclassed from +`io.IOBase` and has an `ioctl` method. + ###### [Contents](./TUTORIAL.md#contents) ## 3.7 Barrier @@ -1274,7 +1285,26 @@ provide an object similar to `Event` with the following differences: It may be found in the `threadsafe` directory and is documented [here](./THREADING.md#32-message). -## 3.10 Synchronising to hardware +## 3.10 Message broker + +A `Broker` is a means of communicating data and/or control within or between +modules. It is typically a single global object, and uses a publish-subscribe +model. A publication comprises a `topic` and a `message`; the latter may be any +Python object. Tasks subscribe to a `topic` via an `agent` object. Whenever a +publication, occurs all `agent` instances currently subscribed to that topic are +triggered. + +An `agent` may be an instance of various types including a function, a coroutine +or a queue. + +A benefit of this approach is that the design of publishing tasks can proceed +independently from that of the subscribers; `agent` instances can be subscribed +and unsubscribed at run time with no effect on the publisher. The publisher +neither knows or cares about the type or number of subscribing `agent`s. + +This is [documented here](https://github.com/peterhinch/micropython-async/blob/master/v3/docs/DRIVERS.md#9-message-broker). + +## 3.11 Synchronising to hardware The following hardware-related classes are documented [here](./DRIVERS.md): * `ESwitch` A debounced switch with an `Event` interface. diff --git a/v3/primitives/__init__.py b/v3/primitives/__init__.py index 0431e3c..ceaad77 100644 --- a/v3/primitives/__init__.py +++ b/v3/primitives/__init__.py @@ -1,6 +1,6 @@ # __init__.py Common functions for uasyncio primitives -# Copyright (c) 2018-2022 Peter Hinch +# Copyright (c) 2018-2024 Peter Hinch # Released under the MIT License (MIT) - see LICENSE file import asyncio @@ -53,9 +53,13 @@ def _handle_exception(loop, context): "RingbufQueue": "ringbuf_queue", "Keyboard": "sw_array", "SwArray": "sw_array", + "Broker": "broker", + "broker": "broker", + "Agent": "broker", + "RegExp": "broker", } -# Copied from uasyncio.__init__.py +# Copied from asyncio.__init__.py # Lazy loader, effectively does: # global attr # from .mod import attr diff --git a/v3/primitives/broker.py b/v3/primitives/broker.py new file mode 100644 index 0000000..73072bb --- /dev/null +++ b/v3/primitives/broker.py @@ -0,0 +1,89 @@ +# broker.py A message broker for MicroPython + +# Copyright (c) 2024-2025 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +# Inspired by the following +# https://www.joeltok.com/posts/2021-03-building-an-event-bus-in-python/ + +import asyncio +from primitives import Queue, RingbufQueue, type_coro +import re + + +class Agent: # ABC for user agent + pass + + +class RegExp: + def __init__(self, re_str): + self.re = re.compile(re_str) + + def matching(self, topic): + return re.match(self.re, topic) is not None + + +def _validate(a): + return ( + isinstance(a, asyncio.Event) + or isinstance(a, Queue) + or isinstance(a, RingbufQueue) + or isinstance(a, Agent) + or callable(a) + ) + + +class Broker(dict): + Verbose = True + + def subscribe(self, topic, agent, *args): + if not _validate(agent): + raise ValueError("Invalid agent:", agent) + aa = (agent, args) + if not (t := self.get(topic, False)): + self[topic] = {aa} + else: + if aa in t and Broker.Verbose: + print(f"Duplicate agent {aa} in topic {topic}.") + t.add(aa) + + def unsubscribe(self, topic, agent, *args): + if topic in self: + if (aa := (agent, args)) in self[topic]: + self[topic].remove(aa) + elif Broker.Verbose: + print(f"Unsubscribe agent {aa} from topic {topic} fail: agent not subscribed.") + if len(self[topic]) == 0: + del self[topic] + elif Broker.Verbose: + print(f"Unsubscribe topic {topic} fail: topic not subscribed.") + + def publish(self, topic, message=None): + agents = set() # Agents which are triggered by this topic + if isinstance(topic, str): # Check regexps + # Are any keys RegExp instances? + for regexp in [k for k in self.keys() if isinstance(k, RegExp)]: + if regexp.matching(topic): + agents.update(self[regexp]) # Append matching agents + agents.update(self.get(topic, [])) # Exact match + for agent, args in agents: + if isinstance(agent, asyncio.Event): + agent.set() + continue + if isinstance(agent, Agent): # User class + agent.put(topic, message, *args) # Must support .put + continue + if isinstance(agent, Queue) or isinstance(agent, RingbufQueue): + t = (topic, message, args) + try: + agent.put_nowait(t if args else t[:2]) + except Exception: # Queue discards current message. RingbufQueue discards oldest + Broker.Verbose and print(f"Message lost topic {topic} message {message}") + continue + # agent is function, method, coroutine or bound coroutine + res = agent(topic, message, *args) + if isinstance(res, type_coro): + asyncio.create_task(res) + + +broker = Broker() diff --git a/v3/primitives/encoder.py b/v3/primitives/encoder.py index 0f43b87..ef9b561 100644 --- a/v3/primitives/encoder.py +++ b/v3/primitives/encoder.py @@ -1,6 +1,6 @@ # encoder.py Asynchronous driver for incremental quadrature encoder. -# Copyright (c) 2021-2023 Peter Hinch +# Copyright (c) 2021-2024 Peter Hinch # Released under the MIT License (MIT) - see LICENSE file # For an explanation of the design please see @@ -17,19 +17,10 @@ # Raul Kompaß (@rkompass) for suggesting a bugfix here # https://forum.micropython.org/viewtopic.php?f=15&t=9929&p=66175#p66156 +# Now uses ThreadSafeFlag.clear() + import asyncio from machine import Pin -from select import poll, POLLIN - - -def ready(tsf, poller): - r = (tsf, POLLIN) - poller.register(*r) - - def is_rdy(): - return r in poller.ipoll(0) - - return is_rdy class Encoder: @@ -58,7 +49,6 @@ def __init__( if ((vmin is not None) and v < vmin) or ((vmax is not None) and v > vmax): raise ValueError("Incompatible args: must have vmin <= v <= vmax") self._tsf = asyncio.ThreadSafeFlag() - self._tsf_ready = ready(self._tsf, poll()) # Create a ready function trig = Pin.IRQ_RISING | Pin.IRQ_FALLING try: xirq = pin_x.irq(trigger=trig, handler=self._x_cb, hard=True) @@ -90,10 +80,9 @@ async def _run(self, vmin, vmax, div, mod, cb, args): plcv = pcv # Previous value after limits applied delay = self.delay while True: - if delay > 0 and self._tsf_ready(): # Ensure ThreadSafeFlag is clear - await self._tsf.wait() - await self._tsf.wait() - await asyncio.sleep_ms(delay) # Wait for motion to stop. + self._tsf.clear() + await self._tsf.wait() # Wait for an edge. A stopped encoder waits here. + await asyncio.sleep_ms(delay) # Optional rate limit for callback/trig. hv = self._v # Sample hardware (atomic read). if hv == pv: # A change happened but was negated before continue # this got scheduled. Nothing to do. @@ -115,7 +104,7 @@ async def _run(self, vmin, vmax, div, mod, cb, args): def __aiter__(self): return self - def __anext__(self): + async def __anext__(self): await self._trig.wait() self._trig.clear() return self._cv diff --git a/v3/primitives/package.json b/v3/primitives/package.json index 8f7e7a7..2adf5cf 100644 --- a/v3/primitives/package.json +++ b/v3/primitives/package.json @@ -3,6 +3,7 @@ ["primitives/__init__.py", "github:peterhinch/micropython-async/v3/primitives/__init__.py"], ["primitives/aadc.py", "github:peterhinch/micropython-async/v3/primitives/aadc.py"], ["primitives/barrier.py", "github:peterhinch/micropython-async/v3/primitives/barrier.py"], + ["primitives/broker.py", "github:peterhinch/micropython-async/v3/primitives/broker.py"], ["primitives/condition.py", "github:peterhinch/micropython-async/v3/primitives/condition.py"], ["primitives/delay_ms.py", "github:peterhinch/micropython-async/v3/primitives/delay_ms.py"], ["primitives/encoder.py", "github:peterhinch/micropython-async/v3/primitives/encoder.py"], diff --git a/v3/primitives/ringbuf_queue.py b/v3/primitives/ringbuf_queue.py index 65366d3..eaf7ad3 100644 --- a/v3/primitives/ringbuf_queue.py +++ b/v3/primitives/ringbuf_queue.py @@ -6,7 +6,9 @@ # API differs from CPython # Uses pre-allocated ring buffer: can use list or array # Asynchronous iterator allowing consumer to use async for -# put_nowait QueueFull exception can be ignored allowing oldest data to be discarded. +# put_nowait QueueFull exception can be ignored allowing oldest data to be discarded - +# this is not thread safe. Nor is the class as a whole TS because of its use of +# Event objects. import asyncio diff --git a/v3/primitives/tests/broker_test.py b/v3/primitives/tests/broker_test.py new file mode 100644 index 0000000..ad1357e --- /dev/null +++ b/v3/primitives/tests/broker_test.py @@ -0,0 +1,129 @@ +# broker_test.py Test various types of subscriber + +# import primitives.tests.broker_test + +import asyncio +from primitives import broker, Queue, RingbufQueue, RegExp + +# Periodically publish messages to two topics +async def test(t): + for x in range(t): + await asyncio.sleep(1) + broker.publish("foo_topic", f"dogs {x}") + broker.publish("bar_topic", f"rats {x}") + + +# Suscribe via coroutine +async def subs(topic, message): + await asyncio.sleep_ms(100) + print("coroutine", topic, message) + + +# Subscribe via function +def func(topic, message): + print("function", topic, message) + + +# Subscribe via Event + +event = asyncio.Event() + + +async def event_test(): + while True: + await event.wait() + event.clear() + print("Event triggered") + + +class TestClass: + async def fetch_data(self, topic, message, arg1, arg2): + await asyncio.sleep_ms(100) + print("bound coro", topic, message, arg1, arg2) + + def get_data(self, topic, message): + print("bound method", topic, message) + + +async def print_queue(q): + while True: + topic, message = await asyncio.wait_for(q.get(), 2) + print(topic, message) + + +async def print_ringbuf_q(q): + while True: + topic, message, args = await asyncio.wait_for(q.get(), 2) + print(topic, message, args) + + +async def main(): + Broker.Verbose = False # Suppress q full messages + tc = TestClass() + q = Queue(10) + rq = RingbufQueue(10) + print("Subscribing Event, coroutine, Queue, RingbufQueue and bound coroutine.") + broker.subscribe("foo_topic", tc.fetch_data, 1, 42) # Bound coroutine + broker.subscribe("bar_topic", subs) # Coroutine + broker.subscribe("bar_topic", event) + broker.subscribe("foo_topic", q) + broker.subscribe("bar_topic", rq, "args", "added") + + asyncio.create_task(test(30)) # Publish to topics for 30s + asyncio.create_task(event_test()) + await asyncio.sleep(5) + print() + print("Unsubscribing coroutine") + broker.unsubscribe("bar_topic", subs) + await asyncio.sleep(5) + print() + print("Unsubscribing Event") + broker.unsubscribe("bar_topic", event) + print() + print("Subscribing function") + broker.subscribe("bar_topic", func) + await asyncio.sleep(5) + print() + print("Unsubscribing function") + broker.unsubscribe("bar_topic", func) + print() + print("Unsubscribing bound coroutine") + broker.unsubscribe("foo_topic", tc.fetch_data, 1, 42) # Async method + print() + print("Subscribing method") + broker.subscribe("foo_topic", tc.get_data) # Sync method + await asyncio.sleep(5) + print() + print("Unsubscribing method") + broker.unsubscribe("foo_topic", tc.get_data) # Async method + print("Retrieving foo_topic messages from Queue") + print("Retrieving bar_topic messages from RingbufQueue") + await asyncio.gather(print_queue(q), print_ringbuf_q(rq), return_exceptions=True) + # Queues are now empty + print() + print("*** Unsubscribing queues ***") + broker.unsubscribe("foo_topic", q) + broker.unsubscribe("bar_topic", rq, "args", "added") + print() + + print("*** Testing error reports and exception ***") + print() + Broker.Verbose = True + print("*** Produce warning messages on invalid unsubscribe ***") + broker.unsubscribe("rats", "more rats") # Invalid topic + broker.unsubscribe("foo_topic", "rats") # Invalid agent + print("*** Check exception on invalid subscribe ***") + try: + broker.subscribe("foo_topic", "rubbish_agent") + print("Test FAIL") + except ValueError: + print("Test PASS") + print() + print("*** Test wildcard subscribe ***") + broker.subscribe(RegExp(".*_topic"), func) + broker.publish("FAIL", func) # No match + asyncio.create_task(test(5)) + await asyncio.sleep(10) + + +asyncio.run(main())