Skip to content

Commit eba8974

Browse files
pi-anljimmo
authored andcommitted
aioble/server.py: Maintain write order for captured characteristics.
This replaced the per-characteristic queues with a single shared queue, which means that the characteristics will return from `written()` in the exact order that the original writes arrived, even if the writes are occuring across multiple different characteristics. This work was funded by Planet Innovation. Signed-off-by: Jim Mussared <[email protected]>
1 parent 0c5880d commit eba8974

File tree

3 files changed

+233
-28
lines changed

3 files changed

+233
-28
lines changed

micropython/bluetooth/aioble/aioble/server.py

Lines changed: 78 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ def _server_irq(event, data):
6060
def _server_shutdown():
6161
global _registered_characteristics
6262
_registered_characteristics = {}
63+
if hasattr(BaseCharacteristic, "_capture_task"):
64+
BaseCharacteristic._capture_task.cancel()
65+
del BaseCharacteristic._capture_queue
66+
del BaseCharacteristic._capture_write_event
67+
del BaseCharacteristic._capture_consumed_event
68+
del BaseCharacteristic._capture_task
6369

6470

6571
register_irq_handler(_server_irq, _server_shutdown)
@@ -97,6 +103,42 @@ def write(self, data, send_update=False):
97103
else:
98104
ble.gatts_write(self._value_handle, data, send_update)
99105

106+
# When the a capture-enabled characteristic is created, create the
107+
# necessary events (if not already created).
108+
@staticmethod
109+
def _init_capture():
110+
if hasattr(BaseCharacteristic, "_capture_queue"):
111+
return
112+
113+
BaseCharacteristic._capture_queue = deque((), _WRITE_CAPTURE_QUEUE_LIMIT)
114+
BaseCharacteristic._capture_write_event = asyncio.ThreadSafeFlag()
115+
BaseCharacteristic._capture_consumed_event = asyncio.ThreadSafeFlag()
116+
BaseCharacteristic._capture_task = asyncio.create_task(
117+
BaseCharacteristic._run_capture_task()
118+
)
119+
120+
# Monitor the shared queue for incoming characteristic writes and forward
121+
# them sequentially to the individual characteristic events.
122+
@staticmethod
123+
async def _run_capture_task():
124+
write = BaseCharacteristic._capture_write_event
125+
consumed = BaseCharacteristic._capture_consumed_event
126+
q = BaseCharacteristic._capture_queue
127+
128+
while True:
129+
if len(q):
130+
conn, data, characteristic = q.popleft()
131+
# Let the characteristic waiting in `written()` know that it
132+
# can proceed.
133+
characteristic._write_data = (conn, data)
134+
characteristic._write_event.set()
135+
# Wait for the characteristic to complete `written()` before
136+
# continuing.
137+
await consumed.wait()
138+
139+
if not len(q):
140+
await write.wait()
141+
100142
# Wait for a write on this characteristic. Returns the connection that did
101143
# the write, or a tuple of (connection, value) if capture is enabled for
102144
# this characteristics.
@@ -105,17 +147,27 @@ async def written(self, timeout_ms=None):
105147
# Not a writable characteristic.
106148
return
107149

108-
# If the queue is empty, then we need to wait. However, if the queue
109-
# has a single item, we also need to do a no-op wait in order to
110-
# clear the event flag (because the queue will become empty and
111-
# therefore the event should be cleared).
112-
if len(self._write_queue) <= 1:
113-
with DeviceTimeout(None, timeout_ms):
114-
await self._write_event.wait()
150+
# If no write has been seen then we need to wait. If the event has
151+
# already been set this will clear the event and continue
152+
# immediately. In regular mode, this is set by the write IRQ
153+
# directly (in _remote_write). In capture mode, this is set when it's
154+
# our turn by _capture_task.
155+
with DeviceTimeout(None, timeout_ms):
156+
await self._write_event.wait()
157+
158+
# Return the write data and clear the stored copy.
159+
# In default usage this will be just the connection handle.
160+
# In capture mode this will be a tuple of (connection_handle, received_data)
161+
data = self._write_data
162+
self._write_data = None
115163

116-
# Either we started > 1 item, or the wait completed successfully, return
117-
# the front of the queue.
118-
return self._write_queue.popleft()
164+
if self.flags & _FLAG_WRITE_CAPTURE:
165+
# Notify the shared queue monitor that the event has been consumed
166+
# by the caller to `written()` and another characteristic can now
167+
# proceed.
168+
BaseCharacteristic._capture_consumed_event.set()
169+
170+
return data
119171

120172
def on_read(self, connection):
121173
return 0
@@ -124,27 +176,20 @@ def _remote_write(conn_handle, value_handle):
124176
if characteristic := _registered_characteristics.get(value_handle, None):
125177
# If we've gone from empty to one item, then wake something
126178
# blocking on `await char.written()`.
127-
wake = len(characteristic._write_queue) == 0
128179

129180
conn = DeviceConnection._connected.get(conn_handle, None)
130-
q = characteristic._write_queue
131181

132182
if characteristic.flags & _FLAG_WRITE_CAPTURE:
133-
# For capture, we append both the connection and the written
134-
# value to the queue. The deque will enforce the max queue len.
183+
# For capture, we append the connection and the written value
184+
# value to the shared queue along with the matching characteristic object.
185+
# The deque will enforce the max queue len.
135186
data = characteristic.read()
136-
q.append((conn, data))
187+
BaseCharacteristic._capture_queue.append((conn, data, characteristic))
188+
BaseCharacteristic._capture_write_event.set()
137189
else:
138-
# Use the queue as a single slot -- it has max length of 1,
139-
# so if there's an existing item it will be replaced.
140-
q.append(conn)
141-
142-
if wake:
143-
# Queue is now non-empty. If something is waiting, it will be
144-
# worken. If something isn't waiting right now, then a future
145-
# caller to `await char.written()` will see the queue is
146-
# non-empty, and wait on the event if it's going to empty the
147-
# queue.
190+
# Store the write connection handle to be later used to retrieve the data
191+
# then set event to handle in written() task.
192+
characteristic._write_data = conn
148193
characteristic._write_event.set()
149194

150195
def _remote_read(conn_handle, value_handle):
@@ -178,10 +223,15 @@ def __init__(
178223
if capture:
179224
# Capture means that we keep track of all writes, and capture
180225
# their values (and connection) in a queue. Otherwise we just
181-
# track the most recent connection.
226+
# track the connection of the most recent write.
182227
flags |= _FLAG_WRITE_CAPTURE
228+
BaseCharacteristic._init_capture()
229+
230+
# Set when this characteristic has a value waiting in self._write_data.
183231
self._write_event = asyncio.ThreadSafeFlag()
184-
self._write_queue = deque((), _WRITE_CAPTURE_QUEUE_LIMIT if capture else 1)
232+
# The connection of the most recent write, or a tuple of
233+
# (connection, data) if capture is enabled.
234+
self._write_data = None
185235
if notify:
186236
flags |= _FLAG_NOTIFY
187237
if indicate:
@@ -263,7 +313,7 @@ def __init__(self, characteristic, uuid, read=False, write=False, initial=None):
263313
flags |= _FLAG_DESC_READ
264314
if write:
265315
self._write_event = asyncio.ThreadSafeFlag()
266-
self._write_queue = deque((), 1)
316+
self._write_data = None
267317
flags |= _FLAG_DESC_WRITE
268318

269319
self.uuid = uuid
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# Test characteristic write capture preserves order across characteristics.
2+
3+
import sys
4+
5+
sys.path.append("")
6+
7+
from micropython import const
8+
import time, machine
9+
10+
import uasyncio as asyncio
11+
import aioble
12+
import bluetooth
13+
14+
TIMEOUT_MS = 5000
15+
16+
# Without the write ordering (via the shared queue) in server.py, this test
17+
# passes with delay of 1, fails some at 5, fails more at 50
18+
DUMMY_DELAY = 50
19+
20+
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
21+
CHAR_FIRST_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
22+
CHAR_SECOND_UUID = bluetooth.UUID("00000000-1111-2222-3333-555555555555")
23+
24+
# Acting in peripheral role.
25+
async def instance0_task():
26+
service = aioble.Service(SERVICE_UUID)
27+
characteristic_first = aioble.Characteristic(
28+
service,
29+
CHAR_FIRST_UUID,
30+
write=True,
31+
capture=True,
32+
)
33+
# Second characteristic enabled write capture.
34+
characteristic_second = aioble.Characteristic(
35+
service,
36+
CHAR_SECOND_UUID,
37+
write=True,
38+
capture=True,
39+
)
40+
aioble.register_services(service)
41+
42+
# Register characteristic.written() handlers as asyncio background tasks.
43+
# The order of these is important!
44+
asyncio.create_task(task_written(characteristic_second, "second"))
45+
asyncio.create_task(task_written(characteristic_first, "first"))
46+
47+
# This dummy task simulates background processing on a real system that
48+
# can block the asyncio loop for brief periods of time
49+
asyncio.create_task(task_dummy())
50+
51+
multitest.globals(BDADDR=aioble.config("mac"))
52+
multitest.next()
53+
54+
# Wait for central to connect to us.
55+
print("advertise")
56+
async with await aioble.advertise(
57+
20_000, adv_data=b"\x02\x01\x06\x04\xffMPY", timeout_ms=TIMEOUT_MS
58+
) as connection:
59+
print("connected")
60+
61+
await connection.disconnected()
62+
63+
64+
async def task_written(chr, label):
65+
while True:
66+
await chr.written()
67+
data = chr.read().decode()
68+
print(f"written: {label} {data}")
69+
70+
71+
async def task_dummy():
72+
while True:
73+
time.sleep_ms(DUMMY_DELAY)
74+
await asyncio.sleep_ms(5)
75+
76+
77+
def instance0():
78+
try:
79+
asyncio.run(instance0_task())
80+
finally:
81+
aioble.stop()
82+
83+
84+
# Acting in central role.
85+
async def instance1_task():
86+
multitest.next()
87+
88+
# Connect to peripheral and then disconnect.
89+
print("connect")
90+
device = aioble.Device(*BDADDR)
91+
async with await device.connect(timeout_ms=TIMEOUT_MS) as connection:
92+
# Discover characteristics.
93+
service = await connection.service(SERVICE_UUID)
94+
print("service", service.uuid)
95+
characteristic_first = await service.characteristic(CHAR_FIRST_UUID)
96+
characteristic_second = await service.characteristic(CHAR_SECOND_UUID)
97+
print("characteristic", characteristic_first.uuid, characteristic_second.uuid)
98+
99+
for i in range(5):
100+
print(f"write c{i}")
101+
await characteristic_first.write("c" + str(i), timeout_ms=TIMEOUT_MS)
102+
await characteristic_second.write("c" + str(i), timeout_ms=TIMEOUT_MS)
103+
104+
await asyncio.sleep_ms(300)
105+
106+
for i in range(5):
107+
print(f"write r{i}")
108+
await characteristic_second.write("r" + str(i), timeout_ms=TIMEOUT_MS)
109+
await characteristic_first.write("r" + str(i), timeout_ms=TIMEOUT_MS)
110+
111+
await asyncio.sleep_ms(300)
112+
113+
114+
def instance1():
115+
try:
116+
asyncio.run(instance1_task())
117+
finally:
118+
aioble.stop()
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
--- instance0 ---
2+
advertise
3+
connected
4+
written: first c0
5+
written: second c0
6+
written: first c1
7+
written: second c1
8+
written: first c2
9+
written: second c2
10+
written: first c3
11+
written: second c3
12+
written: first c4
13+
written: second c4
14+
written: second r0
15+
written: first r0
16+
written: second r1
17+
written: first r1
18+
written: second r2
19+
written: first r2
20+
written: second r3
21+
written: first r3
22+
written: second r4
23+
written: first r4
24+
--- instance1 ---
25+
connect
26+
service UUID('a5a5a5a5-ffff-9999-1111-5a5a5a5a5a5a')
27+
characteristic UUID('00000000-1111-2222-3333-444444444444') UUID('00000000-1111-2222-3333-555555555555')
28+
write c0
29+
write c1
30+
write c2
31+
write c3
32+
write c4
33+
write r0
34+
write r1
35+
write r2
36+
write r3
37+
write r4

0 commit comments

Comments
 (0)