|
2 | 2 | import machine
|
3 | 3 | from machine import Pin, I2C
|
4 | 4 | from mpu6886 import MPU6886
|
| 5 | +import bluetooth |
5 | 6 |
|
6 | 7 | import time
|
7 | 8 | import struct
|
@@ -30,6 +31,79 @@ def copy_array_into(source, target):
|
30 | 31 | for i in range(len(target)):
|
31 | 32 | target[i] = source[i]
|
32 | 33 |
|
| 34 | +def clamp(value, lower, upper) -> float: |
| 35 | + v = value |
| 36 | + v = min(v, upper) |
| 37 | + v = max(v, lower) |
| 38 | + return v |
| 39 | + |
| 40 | +def manufacturer_specific_advertisement(data : bytearray, manufacturer=[0xca, 0xab], limited_disc=False, br_edr=False): |
| 41 | + _ADV_TYPE_FLAGS = const(0x01) |
| 42 | + _ADV_TYPE_CUSTOMDATA = const(0xff) |
| 43 | + _ADV_MAX_PAYLOAD = const(31) |
| 44 | + |
| 45 | + payload = bytearray() |
| 46 | + |
| 47 | + # Advertising payloads are repeated packets of the following form: |
| 48 | + # 1 byte data length (N + 1) |
| 49 | + # 1 byte type (see constants below) |
| 50 | + # N bytes type-specific data |
| 51 | + def _append(adv_type, value): |
| 52 | + nonlocal payload |
| 53 | + payload += struct.pack("BB", len(value) + 1, adv_type) + value |
| 54 | + |
| 55 | + # Flags |
| 56 | + _append( |
| 57 | + _ADV_TYPE_FLAGS, |
| 58 | + struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)), |
| 59 | + ) |
| 60 | + |
| 61 | + # Specify manufacturer-specific data |
| 62 | + manufacturer_id = bytearray(manufacturer) |
| 63 | + _append(_ADV_TYPE_CUSTOMDATA, (manufacturer_id + data)) |
| 64 | + |
| 65 | + if len(payload) > _ADV_MAX_PAYLOAD: |
| 66 | + raise ValueError("advertising payload too large") |
| 67 | + |
| 68 | + return payload |
| 69 | + |
| 70 | +def send_bluetooth_le(sequence, probabilities, |
| 71 | + advertisements=4, |
| 72 | + advertise_interval_ms=50, |
| 73 | + format=0xAA, |
| 74 | + version=0x01): |
| 75 | + """ |
| 76 | + Send data as BLE advertisements |
| 77 | + Delivery of advertisements are not guaranteed. So we repeat N times to have a decent chance |
| 78 | + """ |
| 79 | + |
| 80 | + # Start BLE |
| 81 | + ble = bluetooth.BLE() |
| 82 | + ble.active(True) |
| 83 | + mac = ble.config('mac') |
| 84 | + |
| 85 | + # Encode data as BLE advertisement. Max 29 bytes |
| 86 | + data = bytearray() |
| 87 | + data += struct.pack('B', format) |
| 88 | + data += struct.pack('B', version) |
| 89 | + data += struct.pack('>H', sequence) |
| 90 | + |
| 91 | + for p in probabilities: |
| 92 | + q = int(clamp(p*255, 0, 255)) |
| 93 | + data += struct.pack('B', q) |
| 94 | + |
| 95 | + payload = manufacturer_specific_advertisement(data) |
| 96 | + |
| 97 | + print('ble-advertise', mac[1], data) |
| 98 | + |
| 99 | + # send and wait until N advertisements are sent |
| 100 | + advertise_us = int(1000*advertise_interval_ms) |
| 101 | + ble.gap_advertise(advertise_us, adv_data=payload, connectable=False) |
| 102 | + time.sleep_ms(advertisements*advertise_interval_ms) |
| 103 | + |
| 104 | + # Turn of BLE |
| 105 | + ble.active(False) |
| 106 | + |
33 | 107 | def main():
|
34 | 108 |
|
35 | 109 | dataset = 'har_exercise_1'
|
@@ -75,6 +149,8 @@ def main():
|
75 | 149 | features = array.array(features_typecode, (0 for _ in range(n_features)))
|
76 | 150 | out = array.array('f', range(model.outputs()))
|
77 | 151 |
|
| 152 | + prediction_no = 0 |
| 153 | + |
78 | 154 | while True:
|
79 | 155 |
|
80 | 156 | count = mpu.get_fifo_count()
|
@@ -103,9 +179,13 @@ def main():
|
103 | 179 | activity = class_index_to_name[result]
|
104 | 180 |
|
105 | 181 | d = time.ticks_diff(time.ticks_ms(), start)
|
106 |
| - print('class', activity, d) |
| 182 | + print('classify', d, activity, out) |
| 183 | + |
| 184 | + send_bluetooth_le(prediction_no, out) |
| 185 | + prediction_no += 1 |
107 | 186 |
|
108 |
| - machine.lightsleep(100) |
| 187 | + time.sleep_ms(100) |
| 188 | + #machine.lightsleep(100) |
109 | 189 |
|
110 | 190 |
|
111 | 191 | if __name__ == '__main__':
|
|
0 commit comments