Skip to content

Commit 709663d

Browse files
committed
broker: Prior to adding args.
1 parent 7d0b3d3 commit 709663d

File tree

3 files changed

+21
-16
lines changed

3 files changed

+21
-16
lines changed

v3/docs/DRIVERS.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,7 +1134,9 @@ finally:
11341134

11351135
This is under development: please check for updates. See
11361136
[code](https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/broker.py).
1137-
1137+
```py
1138+
from primitives import Broker
1139+
```
11381140
The `Broker` class provides a flexible means of messaging between running tasks.
11391141
It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task
11401142
publishes to a topic. Any tasks subscribed to that topic will receive the
@@ -1153,13 +1155,13 @@ Broker methods. All are synchronous, constructor has no args:
11531155
matching `topic`.
11541156
* `unsubscribe(topic, agent)` The `agent` will stop being triggered.
11551157
* `publish(topic, message)` All `agent` instances subscribed to `topic` will be
1156-
triggered, receiving `topic` and `message` args. Returns `True` unless a `Queue`
1157-
agent has become full. A `False` value indicates that at least one message has
1158-
been lost.
1158+
triggered, receiving `topic` and `message` args. The method is not threadsafe;
1159+
it should not be called from a hard ISR or from another thread.
11591160

11601161
The `topic` arg is typically a string but may be any hashable object. A
11611162
`message` is an arbitrary Python object. An `agent` may be any of the following:
11621163
* `Queue` When a message is received receives 2-tuple `(topic, message)`.
1164+
* `RingbufQueue` When a message is received receives 2-tuple `(topic, message)`.
11631165
* `function` Called when a message is received. Gets 2 args, topic and message.
11641166
* `bound method` Called when a message is received. Gets 2 args, topic and
11651167
message.
@@ -1179,7 +1181,7 @@ import asyncio
11791181
from primitives import Broker, Queue
11801182

11811183
broker = Broker()
1182-
queue = Queue()
1184+
queue = Queue() # Or (e.g. RingbufQueue(20))
11831185
async def sender(t):
11841186
for x in range(t):
11851187
await asyncio.sleep(1)

v3/primitives/broker.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
# https://www.joeltok.com/posts/2021-03-building-an-event-bus-in-python/
88

99
import asyncio
10-
from primitives import Queue, type_coro
10+
from primitives import Queue, RingbufQueue, type_coro
1111

1212

1313
class Agent:
@@ -31,22 +31,20 @@ def unsubscribe(self, topic, agent):
3131

3232
def publish(self, topic, message):
3333
agents = self.get(topic, [])
34-
result = True
3534
for agent in agents:
3635
if isinstance(agent, asyncio.Event):
3736
agent.set()
3837
continue
3938
if isinstance(agent, Agent): # User class
4039
agent.put(topic, message) # Must support .put
4140
continue
42-
if isinstance(agent, Queue):
43-
if agent.full():
44-
result = False
45-
else:
41+
if isinstance(agent, Queue) or isinstance(agent, RingbufQueue):
42+
try:
4643
agent.put_nowait((topic, message))
44+
except Exception: # TODO
45+
pass
4746
continue
4847
# agent is function, method, coroutine or bound coroutine
4948
res = agent(topic, message)
5049
if isinstance(res, type_coro):
5150
asyncio.create_task(res)
52-
return result

v3/primitives/tests/broker_test.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# import primitives.tests.broker_test
44

55
import asyncio
6-
from primitives import Broker, Queue
6+
from primitives import Broker, Queue, RingbufQueue
77

88
broker = Broker()
99

@@ -56,11 +56,13 @@ async def print_queue(q):
5656
async def main():
5757
tc = TestClass()
5858
q = Queue(10)
59-
print("Subscribing Event, coroutine, Queue and bound coroutine.")
59+
rq = RingbufQueue(10)
60+
print("Subscribing Event, coroutine, Queue, RingbufQueue and bound coroutine.")
6061
broker.subscribe("foo_topic", tc.fetch_data) # Bound coroutine
6162
broker.subscribe("bar_topic", subs) # Coroutine
6263
broker.subscribe("bar_topic", event)
6364
broker.subscribe("foo_topic", q)
65+
broker.subscribe("bar_topic", rq)
6466

6567
asyncio.create_task(test(30)) # Publish to topics for 30s
6668
asyncio.create_task(event_test())
@@ -91,11 +93,14 @@ async def main():
9193
broker.unsubscribe("foo_topic", tc.get_data) # Async method
9294
print("Pause 5s")
9395
await asyncio.sleep(5)
94-
print("Retrieving foo_topic messages from queue")
96+
print("Retrieving foo_topic messages from Queue")
9597
try:
9698
await asyncio.wait_for(print_queue(q), 5)
9799
except asyncio.TimeoutError:
98-
print("Done")
100+
print("Timeout")
101+
print("Retrieving bar_topic messages from RingbufQueue")
102+
async for topic, message in rq:
103+
print(topic, message)
99104

100105

101106
asyncio.run(main())

0 commit comments

Comments
 (0)