|
| 1 | +# sw_array.py A crosspoint array of pushbuttons |
| 2 | + |
| 3 | +# Copyright (c) 2023 Peter Hinch |
| 4 | +# Released under the MIT License (MIT) - see LICENSE file |
| 5 | + |
| 6 | +import asyncio |
| 7 | +from . import RingbufQueue |
| 8 | +from time import ticks_ms, ticks_diff |
| 9 | + |
| 10 | +# A crosspoint array of pushbuttons |
| 11 | +# Tuples/lists of pins. Rows are OUT, cols are IN |
| 12 | +class Keyboard(RingbufQueue): |
| 13 | + def __init__(self, rowpins, colpins, *, buffer=bytearray(10), db_delay=50): |
| 14 | + super().__init__(buffer) |
| 15 | + self.rowpins = rowpins |
| 16 | + self.colpins = colpins |
| 17 | + self._state = 0 # State of all keys as bitmap |
| 18 | + for opin in self.rowpins: # Initialise output pins |
| 19 | + opin(1) |
| 20 | + asyncio.create_task(self.scan(len(rowpins) * len(colpins), db_delay)) |
| 21 | + |
| 22 | + def __getitem__(self, scan_code): |
| 23 | + return bool(self._state & (1 << scan_code)) |
| 24 | + |
| 25 | + async def scan(self, nkeys, db_delay): |
| 26 | + while True: |
| 27 | + cur = 0 # Current bitmap of logical key states |
| 28 | + for opin in self.rowpins: |
| 29 | + opin(0) # Assert output |
| 30 | + for ipin in self.colpins: |
| 31 | + cur <<= 1 |
| 32 | + cur |= ipin() ^ 1 # Convert physical to logical |
| 33 | + opin(1) |
| 34 | + if pressed := (cur & ~self._state): # 1's are newly pressed button(s) |
| 35 | + for sc in range(nkeys): |
| 36 | + if pressed & 1: |
| 37 | + try: |
| 38 | + self.put_nowait(sc) |
| 39 | + except IndexError: # q full. Overwrite oldest |
| 40 | + pass |
| 41 | + pressed >>= 1 |
| 42 | + changed = cur ^ self._state # Any new press or release |
| 43 | + self._state = cur |
| 44 | + await asyncio.sleep_ms(db_delay if changed else 0) # Wait out bounce |
| 45 | + |
| 46 | +CLOSE = const(1) # cfg comprises the OR of these constants |
| 47 | +OPEN = const(2) |
| 48 | +LONG = const(4) |
| 49 | +DOUBLE = const(8) |
| 50 | +SUPPRESS = const(16) # Disambiguate |
| 51 | + |
| 52 | +# Entries in queue are (scan_code, event) where event is an OR of above constants |
| 53 | +# Tuples/lists of pins. Rows are OUT, cols are IN |
| 54 | +class SwArray(RingbufQueue): |
| 55 | + debounce_ms = 50 # Attributes can be varied by user |
| 56 | + long_press_ms = 1000 |
| 57 | + double_click_ms = 400 |
| 58 | + def __init__(self, rowpins, colpins, cfg, *, bufsize=10): |
| 59 | + super().__init__(bufsize) |
| 60 | + self._rowpins = rowpins |
| 61 | + self._colpins = colpins |
| 62 | + self._cfg = cfg |
| 63 | + self._state = 0 # State of all keys as bitmap |
| 64 | + self._flags = 0 # Busy bitmap |
| 65 | + self._basic = not bool(cfg & (SUPPRESS | LONG | DOUBLE)) # Basic mode |
| 66 | + self._suppress = bool(cfg & SUPPRESS) |
| 67 | + for opin in self._rowpins: # Initialise output pins |
| 68 | + opin(1) |
| 69 | + asyncio.create_task(self._scan(len(rowpins) * len(colpins))) |
| 70 | + |
| 71 | + def __getitem__(self, scan_code): |
| 72 | + return bool(self._state & (1 << scan_code)) |
| 73 | + |
| 74 | + def _put(self, sc, evt): |
| 75 | + if evt & self._cfg: # Only if user has requested it |
| 76 | + try: |
| 77 | + self.put_nowait((sc, evt)) |
| 78 | + except IndexError: # q full. Overwrite oldest |
| 79 | + pass |
| 80 | + |
| 81 | + def _timeout(self, ts, condition): |
| 82 | + t = SwArray.long_press_ms if condition == LONG else SwArray.double_click_ms |
| 83 | + return ticks_diff(ticks_ms(), ts) > t |
| 84 | + |
| 85 | + def _busy(self, sc, v): |
| 86 | + of = self._flags # Return prior state |
| 87 | + if v: |
| 88 | + self._flags |= 1 << sc |
| 89 | + else: |
| 90 | + self._flags &= ~(1 << sc) |
| 91 | + return (of >> sc) & 1 |
| 92 | + |
| 93 | + async def _finish(self, sc): # Tidy up. If necessary await a contact open |
| 94 | + while self[sc]: |
| 95 | + await asyncio.sleep_ms(0) |
| 96 | + self._put(sc, OPEN) |
| 97 | + self._busy(sc, False) |
| 98 | + |
| 99 | + # Handle long, double. Switch has closed. |
| 100 | + async def _defer(self, sc): |
| 101 | + # Wait for contact closure to be registered: let calling loop complete |
| 102 | + await asyncio.sleep_ms(0) |
| 103 | + ts = ticks_ms() |
| 104 | + if not self._suppress: |
| 105 | + self._put(sc, CLOSE) |
| 106 | + while self[sc]: # Pressed |
| 107 | + await asyncio.sleep_ms(0) |
| 108 | + if self._timeout(ts, LONG): |
| 109 | + self._put(sc, LONG) |
| 110 | + await self._finish(sc) |
| 111 | + return |
| 112 | + if not self._suppress: |
| 113 | + self._put(sc, OPEN) |
| 114 | + while not self[sc]: |
| 115 | + await asyncio.sleep_ms(0) |
| 116 | + if self._timeout(ts, DOUBLE): # No second closure |
| 117 | + self._put(sc, CLOSE) # Single press. Report CLOSE |
| 118 | + await self._finish(sc) # then OPEN |
| 119 | + return |
| 120 | + self._put(sc, DOUBLE) |
| 121 | + await self._finish(sc) |
| 122 | + |
| 123 | + async def _scan(self, nkeys): |
| 124 | + db_delay = SwArray.debounce_ms |
| 125 | + while True: |
| 126 | + cur = 0 # Current bitmap of logical key states (1 == pressed) |
| 127 | + for opin in self._rowpins: |
| 128 | + opin(0) # Assert output |
| 129 | + for ipin in self._colpins: |
| 130 | + cur <<= 1 |
| 131 | + cur |= ipin() ^ 1 # Convert physical to logical |
| 132 | + opin(1) |
| 133 | + curb = cur # Copy current bitmap |
| 134 | + if changed := (cur ^ self._state): # 1's are newly canged button(s) |
| 135 | + for sc in range(nkeys): |
| 136 | + if (changed & 1): # Current key has changed state |
| 137 | + if self._basic: # No timed behaviour |
| 138 | + self._put(sc, CLOSE if cur & 1 else OPEN) |
| 139 | + elif cur & 1: # Closed |
| 140 | + if not self._busy(sc, True): # Currently not busy |
| 141 | + asyncio.create_task(self._defer(sc)) # Q is handled asynchronously |
| 142 | + changed >>= 1 |
| 143 | + cur >>= 1 |
| 144 | + changed = curb ^ self._state # Any new press or release |
| 145 | + self._state = curb |
| 146 | + await asyncio.sleep_ms(db_delay if changed else 0) # Wait out bounce |
0 commit comments