@@ -14,36 +14,43 @@ def _quit(s):
14
14
print ("Monitor " + s )
15
15
exit (0 )
16
16
17
- _write = lambda _ : _quit ("must run set_device" )
18
- _dummy = lambda : None # If UART do nothing.
17
+
18
+ _write = lambda _ : _quit ("must run set_device" )
19
+ _ifrst = lambda : None # Reset interface. If UART do nothing.
19
20
20
21
# For UART pass initialised UART. Baudrate must be 1_000_000.
21
22
# For SPI pass initialised instance SPI. Can be any baudrate, but
22
23
# must be default in other respects.
23
24
def set_device (dev , cspin = None ):
24
25
global _write
25
- global _dummy
26
+ global _ifrst
26
27
if isinstance (dev , UART ) and cspin is None : # UART
27
28
_write = dev .write
28
29
elif isinstance (dev , SPI ) and isinstance (cspin , Pin ):
29
30
cspin (1 )
31
+
30
32
def spiwrite (data ):
31
33
cspin (0 )
32
34
dev .write (data )
33
35
cspin (1 )
36
+
34
37
_write = spiwrite
38
+
35
39
def clear_sm (): # Set Pico SM to its initial state
36
40
cspin (1 )
37
41
dev .write (b"\0 " ) # SM is now waiting for CS low.
38
- _dummy = clear_sm
42
+
43
+ _ifrst = clear_sm
39
44
else :
40
45
_quit ("set_device: invalid args." )
41
46
47
+
42
48
# Justification for validation even when decorating a method
43
49
# /mnt/qnap2/data/Projects/Python/AssortedTechniques/decorators
44
50
_available = set (range (0 , 22 )) # Valid idents are 0..21
45
51
_reserved = set () # Idents reserved for synchronous monitoring
46
52
53
+
47
54
def _validate (ident , num = 1 ):
48
55
if ident >= 0 and ident + num < 22 :
49
56
try :
@@ -54,83 +61,90 @@ def _validate(ident, num=1):
54
61
else :
55
62
_quit ("error - ident {:02d} out of range." .format (ident ))
56
63
64
+
57
65
# Reserve ID's to be used for synchronous monitoring
58
66
def reserve (* ids ):
59
67
for ident in ids :
60
68
_validate (ident )
61
69
_reserved .add (ident )
62
70
71
+
63
72
# Check whether a synchronous ident was reserved
64
73
def _check (ident ):
65
74
if ident not in _reserved :
66
75
_quit ("error: synchronous ident {:02d} was not reserved." .format (ident ))
67
76
77
+
68
78
# asynchronous monitor
69
- def asyn (n , max_instances = 1 ):
79
+ def asyn (n , max_instances = 1 , verbose = True ):
70
80
def decorator (coro ):
71
- # This code runs before asyncio.run()
72
81
_validate (n , max_instances )
73
82
instance = 0
83
+
74
84
async def wrapped_coro (* args , ** kwargs ):
75
- # realtime
76
85
nonlocal instance
77
86
d = 0x40 + n + min (instance , max_instances - 1 )
78
87
v = int .to_bytes (d , 1 , "big" )
79
88
instance += 1
80
- if instance > max_instances : # Warning only
81
- print ("Monitor {:02d} max_instances reached ." .format (n ))
89
+ if verbose and instance > max_instances : # Warning only.
90
+ print ("Monitor ident: {:02d} instances: {} ." .format (n , instance ))
82
91
_write (v )
83
92
try :
84
93
res = await coro (* args , ** kwargs )
85
94
except asyncio .CancelledError :
86
- raise
95
+ raise # Other exceptions produce traceback.
87
96
finally :
88
97
d |= 0x20
89
98
v = int .to_bytes (d , 1 , "big" )
90
99
_write (v )
91
100
instance -= 1
92
101
return res
102
+
93
103
return wrapped_coro
104
+
94
105
return decorator
95
106
107
+
96
108
# If SPI, clears the state machine in case prior test resulted in the DUT
97
109
# crashing. It does this by sending a byte with CS\ False (high).
98
110
def init ():
99
- _dummy () # Does nothing if UART
111
+ _ifrst () # Reset interface. Does nothing if UART.
100
112
_write (b"z" ) # Clear Pico's instance counters etc.
101
113
114
+
102
115
# Optionally run this to show up periods of blocking behaviour
103
- async def hog_detect (i = 1 , s = (b"\x40 " , b"\x60 " )):
116
+ async def hog_detect (s = (b"\x40 " , b"\x60 " )):
104
117
while True :
105
- _write (s [(i := i ^ 1 )])
106
- await asyncio .sleep_ms (0 )
118
+ for v in s :
119
+ _write (v )
120
+ await asyncio .sleep_ms (0 )
121
+
107
122
108
123
# Monitor a synchronous function definition
109
124
def sync (n ):
110
125
def decorator (func ):
111
126
_validate (n )
112
- dstart = 0x40 + n
113
- vstart = int .to_bytes (dstart , 1 , "big" )
114
- dend = 0x60 + n
115
- vend = int .to_bytes (dend , 1 , "big" )
127
+ vstart = int .to_bytes (0x40 + n , 1 , "big" )
128
+ vend = int .to_bytes (0x60 + n , 1 , "big" )
129
+
116
130
def wrapped_func (* args , ** kwargs ):
117
131
_write (vstart )
118
132
res = func (* args , ** kwargs )
119
133
_write (vend )
120
134
return res
135
+
121
136
return wrapped_func
137
+
122
138
return decorator
123
139
140
+
124
141
# Runtime monitoring: can't validate because code may be looping.
125
142
# Monitor a synchronous function call
126
143
class mon_call :
127
144
def __init__ (self , n ):
128
145
_check (n )
129
- self .n = n
130
- self .dstart = 0x40 + n
131
- self .vstart = int .to_bytes (self .dstart , 1 , "big" )
132
- self .dend = 0x60 + n
133
- self .vend = int .to_bytes (self .dend , 1 , "big" )
146
+ self .vstart = int .to_bytes (0x40 + n , 1 , "big" )
147
+ self .vend = int .to_bytes (0x60 + n , 1 , "big" )
134
148
135
149
def __enter__ (self ):
136
150
_write (self .vstart )
@@ -140,6 +154,7 @@ def __exit__(self, type, value, traceback):
140
154
_write (self .vend )
141
155
return False # Don't silence exceptions
142
156
157
+
143
158
# Cause pico ident n to produce a brief (~80μs) pulse
144
159
def trigger (n ):
145
160
_check (n )
0 commit comments