Skip to content

aioble: Fix notified/indicated event waiting. #459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion micropython/bluetooth/aioble/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ aioble

This library provides an object-oriented, asyncio-based wrapper for MicroPython's [ubluetooth](https://docs.micropython.org/en/latest/library/ubluetooth.html) API.

**Note**: aioble requires MicroPython v1.15 or higher.
**Note**: aioble requires MicroPython v1.17 or higher.

Features
--------
Expand Down
138 changes: 91 additions & 47 deletions micropython/bluetooth/aioble/aioble/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# MIT license; Copyright (c) 2021 Jim Mussared

from micropython import const
from collections import deque
import uasyncio as asyncio
import struct

Expand All @@ -27,6 +28,12 @@
_CCCD_NOTIFY = const(1)
_CCCD_INDICATE = const(2)

_FLAG_READ = const(0x0002)
_FLAG_WRITE_NO_RESPONSE = const(0x0004)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)
_FLAG_INDICATE = const(0x0020)

# Forward IRQs directly to static methods on the type that handles them and
# knows how to map handles to instances. Note: We copy all uuid and data
# params here for safety, but a future optimisation might be able to avoid
Expand Down Expand Up @@ -202,8 +209,13 @@ def _find(conn_handle, value_handle):
# value handle for the done event.
return None

def _check(self, flag):
if not (self.properties & flag):
raise ValueError("Unsupported")

# Issue a read to the characteristic.
async def read(self, timeout_ms=1000):
self._check(_FLAG_READ)
# Make sure this conn_handle/value_handle is known.
self._register_with_connection()
# This will be set by the done IRQ.
Expand Down Expand Up @@ -235,10 +247,15 @@ def _read_done(conn_handle, value_handle, status):
characteristic._read_event.set()

async def write(self, data, response=False, timeout_ms=1000):
# TODO: default response to True if properties includes WRITE and is char.
# Something like:
# if response is None and self.properties & _FLAGS_WRITE:
# response = True
self._check(_FLAG_WRITE | _FLAG_WRITE_NO_RESPONSE)

# If we only support write-with-response, then force sensible default.
if (
response is None
and (self.properties & _FLAGS_WRITE)
and not (self.properties & _FLAG_WRITE_NO_RESPONSE)
):
response = True

if response:
# Same as read.
Expand Down Expand Up @@ -281,28 +298,32 @@ def __init__(self, service, def_handle, value_handle, properties, uuid):
# Allows comparison to a known uuid.
self.uuid = uuid

# Fired for each read result and read done IRQ.
self._read_event = None
self._read_data = None
# Used to indicate that the read is complete.
self._read_status = None

# Fired for the write done IRQ.
self._write_event = None
# Used to indicate that the write is complete.
self._write_status = None
if properties & _FLAG_READ:
# Fired for each read result and read done IRQ.
self._read_event = None
self._read_data = None
# Used to indicate that the read is complete.
self._read_status = None

if (properties & _FLAG_WRITE) or (properties & _FLAG_WRITE_NO_RESPONSE):
# Fired for the write done IRQ.
self._write_event = None
# Used to indicate that the write is complete.
self._write_status = None

# Fired when a notification arrives.
self._notify_event = None
# Data for the most recent notification.
self._notify_data = None
# Same for indications.
self._indicate_event = None
self._indicate_data = None
if properties & _FLAG_NOTIFY:
# Fired when a notification arrives.
self._notify_event = asyncio.ThreadSafeFlag()
# Data for the most recent notification.
self._notify_queue = deque((), 1)
if properties & _FLAG_INDICATE:
# Same for indications.
self._indicate_event = asyncio.ThreadSafeFlag()
self._indicate_queue = deque((), 1)

def __str__(self):
return "Characteristic: {} {} {} {}".format(
self._def_handle, self._value_handle, self._properties, self.uuid
self._def_handle, self._value_handle, self.properties, self.uuid
)

def _connection(self):
Expand Down Expand Up @@ -334,45 +355,65 @@ def _start_discovery(service, uuid=None):
uuid,
)

# Helper for notified() and indicated().
async def _notified_indicated(self, queue, event, timeout_ms):
# Ensure that events for this connection can route to this characteristic.
self._register_with_connection()

# If the queue is empty, then we need to wait. However, if the queue
# has a single item, we also need to do a no-op wait in order to
# clear the event flag (because the queue will become empty and
# therefore the event should be cleared).
if len(queue) <= 1:
with self._connection().timeout(timeout_ms):
await event.wait()

# Either we started > 1 item, or the wait completed successfully, return
# the front of the queue.
return queue.popleft()

# Wait for the next notification.
# Will return immediately if a notification has already been received.
async def notified(self, timeout_ms=None):
self._register_with_connection()
data = self._notify_data
if data is None:
self._notify_event = self._notify_event or asyncio.ThreadSafeFlag()
with self._connection().timeout(timeout_ms):
await self._notify_event.wait()
data = self._notify_data
self._notify_data = None
return data
self._check(_FLAG_NOTIFY)
return await self._notified_indicated(self._notify_queue, self._notify_event, timeout_ms)

def _on_notify_indicate(self, queue, event, data):
# If we've gone from empty to one item, then wake something
# blocking on `await char.notified()` (or `await char.indicated()`).
wake = len(queue) == 0
# Append the data. By default this is a deque with max-length==1, so it
# replaces. But if capture is enabled then it will append.
queue.append(data)
if wake:
# Queue is now non-empty. If something is waiting, it will be
# worken. If something isn't waiting right now, then a future
# caller to `await char.written()` will see the queue is
# non-empty, and wait on the event if it's going to empty the
# queue.
event.set()

# Map an incoming notify IRQ to a registered characteristic.
def _on_notify(conn_handle, value_handle, notify_data):
if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
characteristic._notify_data = notify_data
if characteristic._notify_event:
characteristic._notify_event.set()
characteristic._on_notify_indicate(
characteristic._notify_queue, characteristic._notify_event, notify_data
)

# Wait for the next indication.
# Will return immediately if an indication has already been received.
async def indicated(self, timeout_ms=None):
self._register_with_connection()
data = self._indicate_data
if data is None:
self._indicate_event = self._indicate_event or asyncio.ThreadSafeFlag()
with self._connection().timeout(timeout_ms):
await self._indicate_event.wait()
data = self._indicate_data
self._indicate_data = None
return data
self._check(_FLAG_INDICATE)
return await self._notified_indicated(
self._indicate_queue, self._indicate_event, timeout_ms
)

# Map an incoming indicate IRQ to a registered characteristic.
def _on_indicate(conn_handle, value_handle, indicate_data):
if characteristic := ClientCharacteristic._find(conn_handle, value_handle):
characteristic._indicate_data = indicate_data
if characteristic._indicate_event:
characteristic._indicate_event.set()
characteristic._on_notify_indicate(
characteristic._indicate_queue, characteristic._indicate_event, indicate_data
)

# Write to the Client Characteristic Configuration to subscribe to
# notify/indications for this characteristic.
Expand All @@ -399,9 +440,12 @@ def __init__(self, characteristic, dsc_handle, uuid):
# Used for read/write.
self._value_handle = dsc_handle

# Default flags
self.properties = _FLAG_READ | _FLAG_WRITE_NO_RESPONSE

def __str__(self):
return "Descriptor: {} {} {} {}".format(
self._def_handle, self._value_handle, self._properties, self.uuid
self._def_handle, self._value_handle, self.properties, self.uuid
)

def _connection(self):
Expand Down
6 changes: 3 additions & 3 deletions micropython/bluetooth/aioble/aioble/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ def read(self):
else:
return ble.gatts_read(self._value_handle)

# Write value to local db.
def write(self, data):
# Write value to local db, and optionally notify/indicate subscribers.
def write(self, data, send_update=False):
if self._value_handle is None:
self._initial = data
else:
ble.gatts_write(self._value_handle, data)
ble.gatts_write(self._value_handle, data, send_update)

# Wait for a write on this characteristic. Returns the connection that did
# the write, or a tuple of (connection, value) if capture is enabled for
Expand Down
148 changes: 148 additions & 0 deletions micropython/bluetooth/aioble/multitests/ble_notify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Test notification-specific behavior.

import sys

sys.path.append("")

from micropython import const
import time, machine

import uasyncio as asyncio
import aioble
import bluetooth

TIMEOUT_MS = 5000

SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")


# Acting in peripheral role.
async def instance0_task():
service = aioble.Service(SERVICE_UUID)
characteristic = aioble.Characteristic(service, CHAR_UUID, read=True, notify=True)
aioble.register_services(service)

multitest.globals(BDADDR=aioble.config("mac"))
multitest.next()

# Wait for central to connect to us.
print("advertise")
connection = await aioble.advertise(
20_000, adv_data=b"\x02\x01\x06\x04\xffMPY", timeout_ms=TIMEOUT_MS
)
print("connected")

# Send a subscribed-write (but client isn't subscribed, won't send anything).
multitest.wait("discovery")
await asyncio.sleep_ms(100)
characteristic.write("before-subscribe", send_update=True)

# Send a subscribed-write (now client is subscribed, client should get notified).
multitest.wait("subscribed")
await asyncio.sleep_ms(100)
characteristic.write("after-subscribe", send_update=True)

# Send a subscribed-write (now client is unsubscribed, won't send anything).
multitest.wait("unsubscribed")
await asyncio.sleep_ms(100)
characteristic.write("after-unsubscribe", send_update=True)

# Send 5 direct notifications.
multitest.wait("start-direct")
for i in range(5):
# Send 1 notification each time, except for 3 quick notifications the third time.
# The client should only see the last one.
for j in range(3 if i == 2 else 1):
if j > 0:
await asyncio.sleep_ms(100)
msg = "direct-{}-{}".format(i, j)
print("notify", msg)
characteristic.notify(connection, msg)

# Tell client to wait for notification.
multitest.broadcast("notified")
# Wait until client is ready for next notification.
multitest.wait("next")

# Wait for the central to disconnect.
await connection.disconnected(timeout_ms=TIMEOUT_MS)
print("disconnected")


def instance0():
try:
asyncio.run(instance0_task())
finally:
aioble.stop()


# Acting in central role.
async def instance1_task():
multitest.next()

# Connect to peripheral and then disconnect.
print("connect")
device = aioble.Device(*BDADDR)
connection = await device.connect(timeout_ms=TIMEOUT_MS)

# Discover characteristics.
service = await connection.service(SERVICE_UUID)
print("service", service.uuid)
characteristic = await service.characteristic(CHAR_UUID)
print("characteristic", characteristic.uuid)

# Expect to not receive a notification (not subscribed).
multitest.broadcast("discovery")
try:
await characteristic.notified(timeout_ms=500)
print("fail")
return
except asyncio.TimeoutError:
print("no notification")

# Subscribe and expect a notification.
await characteristic.subscribe(notify=True)
multitest.broadcast("subscribed")
value = await characteristic.notified()
print("notified", value)

# Unsubscribe, and expect not to receive a notification.
await characteristic.subscribe(notify=False)
multitest.broadcast("unsubscribed")
try:
await characteristic.notified(timeout_ms=500)
print("fail")
return
except asyncio.TimeoutError:
print("no notification")

# Receive 5 notifications.
multitest.broadcast("start-direct")
for i in range(5):
multitest.wait("notified")
await asyncio.sleep_ms(200)
value = await characteristic.notified()
print("notified", value)

# Expect that after receiving a notification we don't get another one
# until we broadcast to the server.
try:
value = await characteristic.notified(timeout_ms=100)
print("unexpected notify", value)
except asyncio.TimeoutError:
pass

multitest.broadcast("next")

# Disconnect from peripheral.
print("disconnect")
await connection.disconnect(timeout_ms=TIMEOUT_MS)
print("disconnected")


def instance1():
try:
asyncio.run(instance1_task())
finally:
aioble.stop()
Loading