|
| 1 | +# as_GPS_time.py Using GPS for precision timing and for calibrating Pyboard RTC |
| 2 | +# This is STM-specific: requires pyb module. |
| 3 | + |
| 4 | +# Current state: getcal seems to work but needs further testing (odd values ocasionally) |
| 5 | +# Other API functions need testing |
| 6 | + |
| 7 | +# Copyright (c) 2018 Peter Hinch |
| 8 | +# Released under the MIT License (MIT) - see LICENSE file |
| 9 | + |
| 10 | +import uasyncio as asyncio |
| 11 | +import math |
| 12 | +import pyb |
| 13 | +import utime |
| 14 | +import micropython |
| 15 | +import as_GPS |
| 16 | + |
| 17 | +micropython.alloc_emergency_exception_buf(100) |
| 18 | + |
| 19 | +red, green, yellow, blue = pyb.LED(1), pyb.LED(2), pyb.LED(3), pyb.LED(4) |
| 20 | +rtc = pyb.RTC() |
| 21 | + |
| 22 | +# Convenience function. Return RTC seconds since midnight as float |
| 23 | +def rtc_secs(): |
| 24 | + dt = rtc.datetime() |
| 25 | + return 3600*dt[4] + 60*dt[5] + dt[6] + (255 - dt[7])/256 |
| 26 | + |
| 27 | +# Return day of week from date. Pyboard RTC format: 1-7 for Monday through Sunday. |
| 28 | +# https://stackoverflow.com/questions/9847213/how-do-i-get-the-day-of-week-given-a-date-in-python?noredirect=1&lq=1 |
| 29 | +# Adapted for Python 3 and Pyboard RTC format. |
| 30 | +def week_day(year, month, day): |
| 31 | + offset = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334] |
| 32 | + aux = year - 1700 - (1 if month <= 2 else 0) |
| 33 | + # day_of_week for 1700/1/1 = 5, Friday |
| 34 | + day_of_week = 5 |
| 35 | + # partial sum of days betweem current date and 1700/1/1 |
| 36 | + day_of_week += (aux + (1 if month <= 2 else 0)) * 365 |
| 37 | + # leap year correction |
| 38 | + day_of_week += aux // 4 - aux // 100 + (aux + 100) // 400 |
| 39 | + # sum monthly and day offsets |
| 40 | + day_of_week += offset[month - 1] + (day - 1) |
| 41 | + day_of_week %= 7 |
| 42 | + day_of_week = day_of_week if day_of_week else 7 |
| 43 | + return day_of_week |
| 44 | + |
| 45 | +class GPS_Timer(): |
| 46 | + def __init__(self, gps, pps_pin): |
| 47 | + self.gps = gps |
| 48 | + self.secs = None # Integer time since midnight at last PPS |
| 49 | + self.acquired = None # Value of ticks_us at edge of PPS |
| 50 | + loop = asyncio.get_event_loop() |
| 51 | + loop.create_task(self._start(pps_pin)) |
| 52 | + |
| 53 | + async def _start(self, pps_pin): |
| 54 | + await self.gps.data_received(date=True) |
| 55 | + pyb.ExtInt(pps_pin, pyb.ExtInt.IRQ_RISING, pyb.Pin.PULL_NONE, self._isr) |
| 56 | + print('ISR set up', self.gps.date, self.gps.timestamp) |
| 57 | + |
| 58 | + def _isr(self, _): |
| 59 | + t = self.gps.timestamp |
| 60 | + secs = 3600*t[0] + 60*t[1] + t[2] # Time in last NMEA sentence |
| 61 | + # Could be an outage here, so PPS arrives many secs after last sentence |
| 62 | + # Is this right? Does PPS continue during outage? |
| 63 | + self.secs = secs + 1 # PPS preceeds NMEA so add 1 sec |
| 64 | + self.acquired = utime.ticks_us() |
| 65 | + blue.toggle() # TEST |
| 66 | + |
| 67 | + # Return accurate GPS time in seconds (float) since midnight |
| 68 | + def get_secs(self): |
| 69 | + print(self.gps.timestamp) |
| 70 | + t = self.secs |
| 71 | + if t != self.secs: # An interrupt has occurred |
| 72 | + t = self.secs # IRQ's are at 1Hz so this must be OK |
| 73 | + return t + utime.ticks_diff(utime.ticks_us(), self.acquired) / 1000000 |
| 74 | + |
| 75 | + # Return accurate GPS time of day (hrs <int>, mins <int>, secs<float>) |
| 76 | + def get_t_split(self): |
| 77 | + t = math.modf(self.get_secs()) |
| 78 | + m, s = divmod(int(t[1]), 60) |
| 79 | + h = int(m // 60) |
| 80 | + return h, m, s + t[0] |
| 81 | + |
| 82 | + # Return a time/date tuple suitable for setting RTC |
| 83 | + def _get_td_split(self): |
| 84 | + d, m, y = self.gps.date |
| 85 | + t = math.modf(self.get_secs()) |
| 86 | + m, s = divmod(int(t[1]), 60) |
| 87 | + h = int(m // 60) |
| 88 | + ss = int(255*(1 - t[0])) |
| 89 | + return y, m, d, week_day(y, m, d), h, m, s, ss |
| 90 | + |
| 91 | + def set_rtc(self): |
| 92 | + rtc.datetime(self._get_td_split()) |
| 93 | + |
| 94 | + # Time from GPS: integer μs since Y2K. Call after awaiting PPS: result is |
| 95 | + # time when PPS leading edge occurred |
| 96 | + def _get_gps_usecs(self): |
| 97 | + d, m, y = self.gps.date |
| 98 | + t = math.modf(self.get_secs()) |
| 99 | + mins, secs = divmod(int(t[1]), 60) |
| 100 | + hrs = int(mins // 60) |
| 101 | + print(y, m, d, t, hrs, mins, secs) |
| 102 | + tim = utime.mktime((2000 + y, m, d, hrs, mins, secs, week_day(y, m, d) - 1, 0)) |
| 103 | + return tim * 1000000 |
| 104 | + |
| 105 | + # Return no. of μs RTC leads GPS. Done by comparing times at the instant of |
| 106 | + # PPS leading edge. |
| 107 | + def delta(self): |
| 108 | + rtc_time = self._await_pps() # μs since Y2K at time of PPS |
| 109 | + gps_time = self._get_gps_usecs() # μs since Y2K at PPS |
| 110 | + return rtc_time - gps_time |
| 111 | + |
| 112 | + # Pause until PPS interrupt occurs. Then wait for an RTC subsecond change. |
| 113 | + # Read the RTC time in μs since Y2K and adjust to give the time the RTC |
| 114 | + # (notionally) would have read at the PPS leading edge. |
| 115 | + def _await_pps(self): |
| 116 | + t0 = self.acquired |
| 117 | + while self.acquired == t0: # Busy-wait on PPS interrupt |
| 118 | + pass |
| 119 | + st = rtc.datetime()[7] |
| 120 | + while rtc.datetime()[7] == st: # Wait for RTC to change |
| 121 | + pass |
| 122 | + dt = utime.ticks_diff(utime.ticks_us(), self.acquired) |
| 123 | + return 1000000 * utime.time() + ((1000000 * (255 - rtc.datetime()[7])) >> 8) - dt |
| 124 | + |
| 125 | + # Non-realtime calculation of calibration factor. times are in μs |
| 126 | + def _calculate(self, gps_start, gps_end, rtc_start, rtc_end): |
| 127 | + # Duration (μs) between PPS edges |
| 128 | + print('Calculate', gps_start, gps_end, rtc_start, rtc_end) |
| 129 | + pps_delta = (gps_end - gps_start) |
| 130 | + # Duration (μs) between PPS edges as measured by RTC and corrected |
| 131 | + rtc_delta = (rtc_end - rtc_start) |
| 132 | + ppm = (1000000 * (rtc_delta - pps_delta)) / pps_delta # parts per million |
| 133 | + return int(-ppm/0.954) |
| 134 | + |
| 135 | + # Measure difference between RTC and GPS rate and return calibration factor |
| 136 | + # Note this blocks for upto 1 sec at intervals |
| 137 | + async def getcal(self, minutes=5): |
| 138 | + if minutes < 1: |
| 139 | + raise ValueError('Minutes must be >= 1') |
| 140 | + rtc.calibration(0) # Clear existing cal |
| 141 | + # Wait for PPS, then RTC 1/256 second change |
| 142 | + # return RTC time in μs since Y2K at instant of PPS |
| 143 | + rtc_start = self._await_pps() |
| 144 | + # GPS start time in μs since Y2K: co at time of PPS edge |
| 145 | + gps_start = self._get_gps_usecs() |
| 146 | + for n in range(minutes): |
| 147 | + for _ in range(6): |
| 148 | + await asyncio.sleep(10) # TEST 60 |
| 149 | + # Get RTC time at instant of PPS |
| 150 | + rtc_end = self._await_pps() |
| 151 | + gps_end = self._get_gps_usecs() |
| 152 | + cal = self._calculate(gps_start, gps_end, rtc_start, rtc_end) |
| 153 | + print('Mins {:d} cal factor {:d}'.format(n + 1, cal)) |
| 154 | + return cal |
| 155 | + |
| 156 | + async def calibrate(self, minutes=5): |
| 157 | + print('Waiting for startup') |
| 158 | + while self.acquired is None: |
| 159 | + await asyncio.sleep(1) # Wait for startup |
| 160 | + print('Waiting {} minutes to acquire calibration factor...'.format(minutes)) |
| 161 | + cal = await self.getcal(minutes) |
| 162 | + if cal <= 512 and cal >= -511: |
| 163 | + rtc.calibration(cal) |
| 164 | + print('Pyboard RTC is calibrated. Factor is {:d}.'.format(cal)) |
| 165 | + else: |
| 166 | + print('Calibration factor {:d} is out of range.'.format(cal)) |
| 167 | + |
| 168 | +# Test script. Red LED toggles on fix, Blue on PPS interrupt. |
| 169 | +async def run_test(minutes): |
| 170 | + uart = pyb.UART(4, 9600, read_buf_len=200) |
| 171 | + sreader = asyncio.StreamReader(uart) |
| 172 | + gps = as_GPS.AS_GPS(sreader, fix_cb=lambda *_: red.toggle()) |
| 173 | + pps_pin = pyb.Pin('X3', pyb.Pin.IN) |
| 174 | + gps_tim = GPS_Timer(gps, pps_pin) |
| 175 | + await gps_tim.calibrate(minutes) |
| 176 | + |
| 177 | +def test(minutes=5): |
| 178 | + loop = asyncio.get_event_loop() |
| 179 | + loop.run_until_complete(run_test(minutes)) |
0 commit comments