Skip to content

Proposal for an extension of module delay_ms #133

Closed
@wnelis

Description

@wnelis

In most of my scripts running on micro-controllers, one or more tasks are to be scheduled at regular intervals, independent of the actual running time of the task at hand. To achieve this when using asyncio, a small extension to module delay_ms suffice. The difference between the official and the extended version of module delay_ms is:

--- ./delay_ms.py	2025-01-13 10:06:27.779011912 +0100
+++ ./micropython-utils/delay_ms.py	2025-01-13 10:12:42.318156831 +0100
@@ -5,6 +5,9 @@
 # Copyright (c) 2018-2022 Peter Hinch
 # Released under the MIT License (MIT) - see LICENSE file
 
+# Added methods repeat() and restart(), allowing a timer to be used to schedule
+# a task at regular intervals.  2024 Wim Nelis
+
 import asyncio
 from utime import ticks_add, ticks_diff, ticks_ms
 from . import launch
@@ -22,6 +25,7 @@
         self._args = args
         self._durn = duration  # Default duration
         self._retn = None  # Return value of launched callable
+        self._tper =   -1  # Repeater period [ms]
         self._tend = None  # Stop time (absolute ms).
         self._busy = False
         self._trig = asyncio.ThreadSafeFlag()
@@ -52,11 +56,29 @@
     def trigger(self, duration=0):  # Update absolute end time, 0-> ctor default
         if self._mtask is None:
             raise RuntimeError("Delay_ms.deinit() has run.")
-        self._tend = ticks_add(ticks_ms(), duration if duration > 0 else self._durn)
+        self._tper = duration  if duration > 0  else self._durn
+        self._tend = ticks_add(ticks_ms(), self._tper)
         self._retn = None  # Default in case cancelled.
         self._busy = True
         self._trig.set()
 
+    def repeat(self):
+        assert not self._busy, "Can't repeat a running timer"
+        assert self._tper > 0, "Trigger not invoked yet"
+#       now= ticks_ms()  # Handle tasks running longer dan _tper [ms]
+#       while ticks_diff(now, self._tend) > self._tper:
+#           self._tend = ticks_add(self._tend, self._tper)
+        self._tend = ticks_add(self._tend, self._tper)
+        self._busy = True
+        self._tout.clear()
+        self._trig.set()
+
+    def restart(self):
+        assert self._tper > 0, "Trigger not invoked yet"
+        if self._busy:
+            self.stop()
+        self.trigger(self._tper)
+
     def stop(self):
         self._ttask.cancel()
         self._ttask = self._fake

Using this extension, an asynchronous task can be scheduled at regular intervals in the following way:

atimer = delay_ms.Delay_ms()  # Timer of 1 [s]

async def atask():
    while True:
        await atimer.wait()
        atimer.repeat()  # Implicit atimer.clear()
        <Do the job>

How about including this extension in module delay_ms?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions