Skip to content

Commit 5afbcc5

Browse files
committed
Add threadsafe/context.py
1 parent 4279c8b commit 5afbcc5

File tree

4 files changed

+83
-4
lines changed

4 files changed

+83
-4
lines changed

v3/docs/THREADING.md

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ to WiFi and issue:
2222
import mip
2323
mip.install("github:peterhinch/micropython-async/v3/threadsafe")
2424
```
25-
For non-networked targets use `mpremote` as described in
26-
[the official docs](http://docs.micropython.org/en/latest/reference/packages.html#installing-packages-with-mpremote).
25+
On any target `mpremote` may be used:
26+
```bash
27+
$ mpremote mip install github:peterhinch/micropython-async/v3/threadsafe
28+
```
2729

2830
###### [Main README](../README.md)
2931
###### [Tutorial](./TUTORIAL.md)
@@ -47,6 +49,8 @@ For non-networked targets use `mpremote` as described in
4749
3.1 [Threadsafe Event](./THREADING.md#31-threadsafe-event)
4850
3.2 [Message](./THREADING.md#32-message) A threadsafe event with data payload.
4951
4. [Taming blocking functions](./THREADING.md#4-taming-blocking-functions) Enabling uasyncio to handle blocking code.
52+
4.1 [Basic approach](./THREADING.md#41-basic-approach)
53+
4.2 [More general solution](./THREADING,md#42-more-general-solution)
5054
5. [Sharing a stream device](./THREADING.md#5-sharing-a-stream-device)
5155
6. [Glossary](./THREADING.md#6-glossary) Terminology of realtime coding.
5256

@@ -188,7 +192,7 @@ thread safe classes offered here do not yet support Unix.
188192
is only required if mutual consistency of the three values is essential.
189193
3. In the absence of a GIL some operations on built-in objects are not thread
190194
safe. For example adding or deleting items in a `dict`. This extends to global
191-
variables which are implemented as a `dict`. See [Globals](./THREADING.md#15-globals).
195+
variables because these are implemented as a `dict`. See [Globals](./THREADING.md#15-globals).
192196
4. The observations in 1.3 re user defined data structures and `uasyncio`
193197
interfacing apply.
194198
5. Code running on a core other than that running `uasyncio` may block for
@@ -643,7 +647,13 @@ again before it is accessed, the first data item will be lost.
643647

644648
Blocking functions or methods have the potential of stalling the `uasyncio`
645649
scheduler. Short of rewriting them to work properly the only way to tame them
646-
is to run them in another thread. The following is a way to achieve this.
650+
is to run them in another thread. Any function to be run in this way must
651+
conform to the guiedelines above, notably with regard to allocation and side
652+
effects.
653+
654+
## 4.1 Basic approach
655+
656+
The following is a way to "unblock" a single function or method.
647657
```python
648658
async def unblock(func, *args, **kwargs):
649659
def wrap(func, message, args, kwargs):
@@ -699,6 +709,42 @@ asyncio.run(main())
699709
```
700710
###### [Contents](./THREADING.md#contents)
701711

712+
## 4.1 More general solution
713+
714+
This provides a queueing mechanism. A task can assign a blocking function to a
715+
core even if the core is already busy. Further it allows for multiple cores or
716+
threads; these are defined as `Context` instances. Typical use:
717+
```python
718+
from threadsafe import Context
719+
720+
core1 = Context() # Has an instance of _thread, so a core on RP2
721+
722+
def rats(t, n): # Arbitrary blocking function or method
723+
time.sleep(t)
724+
return n * n
725+
726+
async def some_task():
727+
await core1.assign(rats, t=3, n=99) # rats() runs on other core
728+
```
729+
#### Context class
730+
731+
Constructor arg:
732+
* `qsize=10` Size of function queue.
733+
734+
Asynchronous method:
735+
* `assign(func, *args, **kwargs)` Accepts a synchronous function with optional
736+
args. These are placed on a queue for execution in the `Context` instance. The
737+
method pauses until execution is complete, returning the fuction's return
738+
value.
739+
740+
The `Context` class constructor spawns a thread which waits on the `Context`
741+
queue. The`assign` method accepts a fuction and creates a `Job` instance. This
742+
includes a `ThreadSafeFlag` along with the function and its args. The `Assign`
743+
method places the `Job` on the queue and waits on the `ThreadSafeFlag`.
744+
745+
The thread removes a `Job` from the queue and executes it. When complete it
746+
assigns the return value to the `Job` and sets the `ThreadSafeFlag`.
747+
702748
# 5. Sharing a stream device
703749

704750
Typical stream devices are a UART or a socket. These are typically employed to

v3/threadsafe/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"ThreadSafeEvent": "threadsafe_event",
88
"ThreadSafeQueue": "threadsafe_queue",
99
"Message": "message",
10+
"Context": "context",
1011
}
1112

1213
# Copied from uasyncio.__init__.py

v3/threadsafe/context.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# context.py: Run functions or methods on another core or in another thread
2+
3+
import uasyncio as asyncio
4+
import _thread
5+
from threadsafe import ThreadSafeQueue
6+
7+
# Object describing a job to be run on another core
8+
class Job:
9+
def __init__(self, func, args, kwargs):
10+
self.kwargs = kwargs
11+
self.args = args
12+
self.func = func
13+
self.rval = None # Return value
14+
self.done = asyncio.ThreadSafeFlag() # "done" indicator
15+
16+
def worker(q): # Runs forever on a core executing jobs as they arrive
17+
while True:
18+
job = q.get_sync(True) # Block until a Job arrives
19+
job.rval = job.func(*job.args, **job.kwargs)
20+
job.done.set()
21+
22+
class Context:
23+
def __init__(self, qsize=10):
24+
self.q = ThreadSafeQueue(qsize)
25+
_thread.start_new_thread(worker, (self.q,))
26+
27+
async def assign(self, func, *args, **kwargs):
28+
job = Job(func, args, kwargs)
29+
await self.q.put(job) # Will pause if q is full.
30+
await job.done.wait() # Pause until function has run
31+
return job.rval

v3/threadsafe/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
["threadsafe/message.py", "github:peterhinch/micropython-async/v3/threadsafe/message.py"],
55
["threadsafe/threadsafe_event.py", "github:peterhinch/micropython-async/v3/threadsafe/threadsafe_event.py"],
66
["threadsafe/threadsafe_queue.py", "github:peterhinch/micropython-async/v3/threadsafe/threadsafe_queue.py"]
7+
["threadsafe/context.py", "github:peterhinch/micropython-async/v3/threadsafe/context.py"]
78
],
89
"version": "0.1"
910
}

0 commit comments

Comments
 (0)