Closed
Description
In most of my scripts running on micro-controllers, one or more tasks are to be scheduled at regular intervals, independent of the actual running time of the task at hand. To achieve this when using asyncio, a small extension to module delay_ms
suffice. The difference between the official and the extended version of module delay_ms
is:
--- ./delay_ms.py 2025-01-13 10:06:27.779011912 +0100
+++ ./micropython-utils/delay_ms.py 2025-01-13 10:12:42.318156831 +0100
@@ -5,6 +5,9 @@
# Copyright (c) 2018-2022 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
+# Added methods repeat() and restart(), allowing a timer to be used to schedule
+# a task at regular intervals. 2024 Wim Nelis
+
import asyncio
from utime import ticks_add, ticks_diff, ticks_ms
from . import launch
@@ -22,6 +25,7 @@
self._args = args
self._durn = duration # Default duration
self._retn = None # Return value of launched callable
+ self._tper = -1 # Repeater period [ms]
self._tend = None # Stop time (absolute ms).
self._busy = False
self._trig = asyncio.ThreadSafeFlag()
@@ -52,11 +56,29 @@
def trigger(self, duration=0): # Update absolute end time, 0-> ctor default
if self._mtask is None:
raise RuntimeError("Delay_ms.deinit() has run.")
- self._tend = ticks_add(ticks_ms(), duration if duration > 0 else self._durn)
+ self._tper = duration if duration > 0 else self._durn
+ self._tend = ticks_add(ticks_ms(), self._tper)
self._retn = None # Default in case cancelled.
self._busy = True
self._trig.set()
+ def repeat(self):
+ assert not self._busy, "Can't repeat a running timer"
+ assert self._tper > 0, "Trigger not invoked yet"
+# now= ticks_ms() # Handle tasks running longer dan _tper [ms]
+# while ticks_diff(now, self._tend) > self._tper:
+# self._tend = ticks_add(self._tend, self._tper)
+ self._tend = ticks_add(self._tend, self._tper)
+ self._busy = True
+ self._tout.clear()
+ self._trig.set()
+
+ def restart(self):
+ assert self._tper > 0, "Trigger not invoked yet"
+ if self._busy:
+ self.stop()
+ self.trigger(self._tper)
+
def stop(self):
self._ttask.cancel()
self._ttask = self._fake
Using this extension, an asynchronous task can be scheduled at regular intervals in the following way:
atimer = delay_ms.Delay_ms() # Timer of 1 [s]
async def atask():
while True:
await atimer.wait()
atimer.repeat() # Implicit atimer.clear()
<Do the job>
How about including this extension in module delay_ms
?
Metadata
Metadata
Assignees
Labels
No labels