Skip to content

Commit b08304f

Browse files
committed
I2C: Add flow control. Reduce allocation.
1 parent 4ecaa13 commit b08304f

File tree

6 files changed

+106
-45
lines changed

6 files changed

+106
-45
lines changed

i2c/README.md

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ The `Initiator` implements a timeout enabling it to detect failure of the other
2222
end of the interface (the `Responder`). There is optional provision to reset
2323
the `Responder` in this event.
2424

25+
## Changes
26+
27+
V0.15 RAM allocation reduced and flow control implemented.
28+
V0.1 Initial release.
29+
2530
###### [Main README](../README.md)
2631

2732
# Contents
@@ -36,7 +41,7 @@ the `Responder` in this event.
3641
4.3 [Responder class](./README.md#43-responder-class)
3742
5. [Limitations](./README.md#5-limitations)
3843
5.1 [Blocking](./README.md#51-blocking)
39-
5.2 [Buffering](./README.md#52-buffering)
44+
5.2 [Buffering and RAM usage](./README.md#52-buffering-and-ram-usage)
4045
5.3 [Responder crash detection](./README.md#53-responder-crash-detection)
4146

4247
# 1. Files
@@ -184,6 +189,8 @@ Class variables:
184189
1. `timeout=1000` Timeout (in ms) before `Initiator` assumes `Responder` has
185190
failed.
186191
2. `t_poll=100` Interval (ms) for `Initiator` polling `Responder`.
192+
3. `rxbufsize=200` Size of receive buffer. This should exceed the maximum
193+
message length.
187194

188195
Class variables should be set before instantiating `Initiator` or `Responder`.
189196
See [Section 4.4](./README.md#44-configuration).
@@ -230,10 +237,12 @@ Constructor args:
230237

231238
`Pin` instances passed to the constructor must be instantiated by `machine`.
232239

233-
Class variable:
240+
Class variables:
234241
1. `addr=0x12` Address of I2C slave. This should be set before instantiating
235242
`Initiator` or `Responder`. If the default address (0x12) is to be overriden,
236243
`Initiator` application code must instantiate the I2C accordingly.
244+
2. `rxbufsize` Size of receive buffer. This should exceed the maximum message
245+
length.
237246

238247
###### [Contents](./README.md#contents)
239248

@@ -270,16 +279,19 @@ but involves explicit delays. I took the view that a 2-wire solution is easier
270279
should anyone want to port the `Responder` to a platform such as the Raspberry
271280
Pi. The design has no timing constraints and uses normal I/O pins.
272281

273-
## 5.2 Buffering
282+
## 5.2 Buffering and RAM usage
274283

275-
The protocol does not implement flow control, incoming data being buffered
276-
until read. To avoid the risk of memory errors a coroutine should read incoming
277-
data as it arrives. Since this is the normal mode of using such an interface I
278-
see little merit in increasing the complexity of the code with flow control.
284+
The protocol implements flow control: the `StreamWriter` at one end of the link
285+
will pause until the last string transmitted has been read by the corresponding
286+
`StreamReader`.
279287

280288
Outgoing data is unbuffered. `StreamWriter.awrite` will pause until pending
281289
data has been transmitted.
282290

291+
Efforts are under way to remove RAM allocation by the `Responder`. This would
292+
enable hard interrupts to be used, further reducing blocking. With this aim
293+
incoming data is buffered in a pre-allocated bytearray.
294+
283295
## 5.3 Responder crash detection
284296

285297
The `Responder` protocol executes in a soft interrupt context. This means that

i2c/asi2c.py

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import uasyncio as asyncio
2626
import machine
2727
import utime
28-
from micropython import const
28+
from micropython import const, schedule
2929
import io
3030

3131
_MP_STREAM_POLL_RD = const(1)
@@ -39,7 +39,7 @@
3939

4040
# Base class provides user interface and send/receive object buffers
4141
class Channel(io.IOBase):
42-
def __init__(self, i2c, own, rem, verbose):
42+
def __init__(self, i2c, own, rem, verbose, rxbufsize):
4343
self.verbose = verbose
4444
self.synchronised = False
4545
# Hardware
@@ -49,9 +49,12 @@ def __init__(self, i2c, own, rem, verbose):
4949
own.init(mode=machine.Pin.OUT, value=1)
5050
rem.init(mode=machine.Pin.IN, pull=machine.Pin.PULL_UP)
5151
# I/O
52-
self.txbyt = b''
52+
self.txbyt = b'' # Data to send
53+
self.txsiz = bytearray(2) # Size of .txbyt encoded as 2 bytes
5354
self.rxbyt = b''
54-
self.last_tx = 0 # Size of last buffer sent
55+
self.rxbuf = bytearray(rxbufsize)
56+
self.rx_mv = memoryview(self.rxbuf)
57+
self.cantx = True # Remote can accept data
5558

5659
async def _sync(self):
5760
self.verbose and print('Synchronising')
@@ -67,18 +70,14 @@ def waitfor(self, val): # Initiator overrides
6770
while not self.rem() == val:
6871
pass
6972

70-
# Concatenate incoming bytes instance.
73+
# Get incoming bytes instance from memoryview.
7174
def _handle_rxd(self, msg):
72-
self.rxbyt = b''.join((self.rxbyt, msg))
73-
74-
# Get bytes if present. Return bytes, len as 2 bytes, len
75-
def _get_tx_data(self):
76-
d = self.txbyt
77-
n = len(d)
78-
return d, n.to_bytes(2, 'little'), n
75+
self.rxbyt = bytes(msg)
7976

8077
def _txdone(self):
8178
self.txbyt = b''
79+
self.txsiz[0] = 0
80+
self.txsiz[1] = 0
8281

8382
# Stream interface
8483

@@ -91,7 +90,7 @@ def ioctl(self, req, arg):
9190
if self.rxbyt:
9291
ret |= _MP_STREAM_POLL_RD
9392
if arg & _MP_STREAM_POLL_WR:
94-
if not self.txbyt:
93+
if (not self.txbyt) and self.cantx:
9594
ret |= _MP_STREAM_POLL_WR
9695
return ret
9796

@@ -110,13 +109,23 @@ def read(self, n):
110109
self.rxbyt = self.rxbyt[n:]
111110
return t.decode()
112111

112+
# Set .txbyt to the required data. Return its size. So awrite returns
113+
# with transmission occurring in tha background.
113114
def write(self, buf, off, sz):
114115
if self.synchronised:
115-
if self.txbyt: # Waiting for existing data to go out
116-
return 0
117-
d = buf[off : off + sz]
116+
if self.txbyt: # Initial call from awrite
117+
return 0 # Waiting for existing data to go out
118+
# If awrite is called without off or sz args, avoid allocation
119+
if off == 0 and sz == len(buf):
120+
d = buf
121+
else:
122+
d = buf[off : off + sz]
123+
d = d.encode()
124+
l = len(d)
118125
self.txbyt = d
119-
return len(d)
126+
self.txsiz[0] = l & 0xff
127+
self.txsiz[1] = l >> 8
128+
return l
120129
return 0
121130

122131
# User interface
@@ -136,8 +145,9 @@ def close(self):
136145

137146
class Responder(Channel):
138147
addr = 0x12
148+
rxbufsize = 200
139149
def __init__(self, i2c, pin, pinack, verbose=True):
140-
super().__init__(i2c, pinack, pin, verbose)
150+
super().__init__(i2c, pinack, pin, verbose, self.rxbufsize)
141151
loop = asyncio.get_event_loop()
142152
loop.create_task(self._run())
143153

@@ -147,7 +157,7 @@ async def _run(self):
147157

148158
# Request was received: immediately read payload size, then payload
149159
# On Pyboard blocks for 380μs to 1.2ms for small amounts of data
150-
def _handler(self, _, sn=bytearray(2)):
160+
def _handler(self, _, sn=bytearray(2), txnull=bytearray(2)):
151161
# tstart = utime.ticks_us() # TEST
152162
addr = Responder.addr
153163
self.rem.irq(handler = None, trigger = machine.Pin.IRQ_RISING)
@@ -156,28 +166,35 @@ def _handler(self, _, sn=bytearray(2)):
156166
self.own(1)
157167
self.waitfor(0)
158168
self.own(0)
159-
n = int.from_bytes(sn, 'little') # no of bytes to receive
169+
n = sn[0] + ((sn[1] & 0x7f) << 8) # no of bytes to receive
170+
self.cantx = not bool(sn[1] & 0x80) # Can Initiator accept a payload?
160171
if n:
161172
self.waitfor(1)
162173
utime.sleep_us(_DELAY)
163-
data = self.i2c.readfrom(addr, n)
174+
mv = memoryview(self.rx_mv[0 : n]) # allocates
175+
self.i2c.readfrom_into(addr, mv)
164176
self.own(1)
165177
self.waitfor(0)
166178
self.own(0)
167-
self._handle_rxd(data)
179+
schedule(self._handle_rxd, mv) # Postpone allocation
168180

169-
s, nb, n = self._get_tx_data()
170181
self.own(1) # Request to send
171182
self.waitfor(1)
172183
utime.sleep_us(_DELAY)
173-
self.i2c.writeto(addr, nb)
184+
dtx = self.txbyt != b'' and self.cantx # Data to send
185+
siz = self.txsiz if dtx else txnull
186+
if n or self.rxbyt: # test n because .rxbyt may not be populated yet
187+
siz[1] |= 0x80 # Hold off Initiator TX
188+
else:
189+
siz[1] &= 0x7f
190+
self.i2c.writeto(addr, siz)
174191
self.own(0)
175192
self.waitfor(0)
176-
if n:
193+
if dtx:
177194
self.own(1)
178195
self.waitfor(1)
179196
utime.sleep_us(_DELAY)
180-
self.i2c.writeto(addr, s)
197+
self.i2c.writeto(addr, self.txbyt)
181198
self.own(0)
182199
self.waitfor(0)
183200
self._txdone() # Invalidate source

i2c/asi2c_i.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import machine
2828
import utime
2929
import gc
30+
from micropython import schedule
3031
from asi2c import Channel
3132

3233
# The initiator is an I2C slave. It runs on a Pyboard. I2C uses pyb for slave
@@ -36,9 +37,10 @@
3637
class Initiator(Channel):
3738
timeout = 1000 # ms Timeout to detect slave down
3839
t_poll = 100 # ms between Initiator polling Responder
40+
rxbufsize = 200
3941

4042
def __init__(self, i2c, pin, pinack, reset=None, verbose=True):
41-
super().__init__(i2c, pin, pinack, verbose)
43+
super().__init__(i2c, pin, pinack, verbose, self.rxbufsize)
4244
self.reset = reset
4345
if reset is not None:
4446
reset[0].init(mode=machine.Pin.OUT, value = not(reset[1]))
@@ -73,11 +75,12 @@ async def _run(self):
7375
self.rxbyt = b''
7476
await self._sync()
7577
await asyncio.sleep(1) # Ensure Responder is ready
78+
rxbusy = False
7679
while True:
7780
gc.collect()
7881
try:
7982
tstart = utime.ticks_us()
80-
self._sendrx()
83+
rxbusy = self._sendrx(rxbusy)
8184
t = utime.ticks_diff(utime.ticks_us(), tstart)
8285
except OSError:
8386
break
@@ -90,18 +93,26 @@ async def _run(self):
9093
raise OSError('Responder fail.')
9194

9295
# Send payload length (may be 0) then payload (if any)
93-
def _sendrx(self, sn=bytearray(2)):
96+
def _sendrx(self, rxbusy, sn=bytearray(2), txnull=bytearray(2)):
9497
to = Initiator.timeout
95-
s, nb, n = self._get_tx_data()
96-
self.own(1) # Triggers interrupt on responder
97-
self.i2c.send(nb, timeout=to) # Must start before RX begins, but blocks until then
98+
siz = self.txsiz if self.cantx else txnull
99+
# rxbusy handles the (unlikely) case where last call received data but
100+
# schedule() has not yet processed it
101+
if self.rxbyt or rxbusy:
102+
siz[1] |= 0x80 # Hold off further received data
103+
else:
104+
siz[1] &= 0x7f
105+
# CRITICAL TIMING. Trigger interrupt on responder immediately before
106+
# send. Send must start before RX begins. Fast responders may need to
107+
# do a short blocking wait to guarantee this.
108+
self.own(1) # Trigger interrupt.
109+
self.i2c.send(siz, timeout=to) # Blocks until RX complete.
98110
self.waitfor(1)
99111
self.own(0)
100112
self.waitfor(0)
101-
if n:
102-
# start timer: timer CB sets self.own which causes RX
113+
if self.txbyt and self.cantx:
103114
self.own(1)
104-
self.i2c.send(s, timeout=to)
115+
self.i2c.send(self.txbyt, timeout=to)
105116
self.waitfor(1)
106117
self.own(0)
107118
self.waitfor(0)
@@ -112,11 +123,15 @@ def _sendrx(self, sn=bytearray(2)):
112123
self.i2c.recv(sn, timeout=to)
113124
self.waitfor(0)
114125
self.own(0)
115-
n = int.from_bytes(sn, 'little') # no of bytes to receive
126+
n = sn[0] + ((sn[1] & 0x7f) << 8) # no of bytes to receive
127+
self.cantx = not bool(sn[1] & 0x80)
116128
if n:
117129
self.waitfor(1) # Wait for responder to request send
118130
self.own(1) # Acknowledge
119-
data = self.i2c.recv(n, timeout=to)
131+
mv = memoryview(self.rx_mv[0 : n])
132+
self.i2c.recv(mv, timeout=to)
120133
self.waitfor(0)
121134
self.own(0)
122-
self._handle_rxd(data)
135+
schedule(self._handle_rxd, mv) # Postpone allocation
136+
return True
137+
return False

i2c/i2c_esp.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@
4242

4343
async def receiver():
4444
sreader = asyncio.StreamReader(chan)
45+
await chan.ready()
46+
print('started')
47+
for _ in range(5): # Test flow control
48+
res = await sreader.readline()
49+
print('Received', ujson.loads(res))
50+
await asyncio.sleep(4)
4551
while True:
4652
res = await sreader.readline()
4753
print('Received', ujson.loads(res))

i2c/i2c_init.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@
4444

4545
async def receiver():
4646
sreader = asyncio.StreamReader(chan)
47+
for _ in range(5): # Test flow control
48+
res = await sreader.readline()
49+
print('Received', ujson.loads(res))
50+
await asyncio.sleep(4)
4751
while True:
4852
res = await sreader.readline()
4953
print('Received', ujson.loads(res))
@@ -63,6 +67,7 @@ async def test(loop):
6367
loop.create_task(receiver())
6468
loop.create_task(sender())
6569
while True:
70+
await chan.ready()
6671
await asyncio.sleep(10)
6772
print('Blocking time {:d}μs max. {:d}μs mean.'.format(
6873
chan.block_max, int(chan.block_sum/chan.block_cnt)))

i2c/i2c_resp.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@
4141

4242
async def receiver():
4343
sreader = asyncio.StreamReader(chan)
44+
await chan.ready()
45+
print('started')
46+
for _ in range(5): # Test flow control
47+
res = await sreader.readline()
48+
print('Received', ujson.loads(res))
49+
await asyncio.sleep(4)
4450
while True:
4551
res = await sreader.readline()
4652
print('Received', ujson.loads(res))

0 commit comments

Comments
 (0)