| 
 | 1 | +# This file is part of the MicroPython project, http://micropython.org/  | 
 | 2 | +#  | 
 | 3 | +# The MIT License (MIT)  | 
 | 4 | +#  | 
 | 5 | +# Copyright (c) 2022 Ibrahim Abdelkader <[email protected]>  | 
 | 6 | +#  | 
 | 7 | +# Permission is hereby granted, free of charge, to any person obtaining a copy  | 
 | 8 | +# of this software and associated documentation files (the "Software"), to deal  | 
 | 9 | +# in the Software without restriction, including without limitation the rights  | 
 | 10 | +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell  | 
 | 11 | +# copies of the Software, and to permit persons to whom the Software is  | 
 | 12 | +# furnished to do so, subject to the following conditions:  | 
 | 13 | +#  | 
 | 14 | +# The above copyright notice and this permission notice shall be included in  | 
 | 15 | +# all copies or substantial portions of the Software.  | 
 | 16 | +#  | 
 | 17 | +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR  | 
 | 18 | +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  | 
 | 19 | +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE  | 
 | 20 | +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER  | 
 | 21 | +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,  | 
 | 22 | +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN  | 
 | 23 | +# THE SOFTWARE.  | 
 | 24 | +#  | 
 | 25 | +# A minimal esptool implementation to communicate with ESP32 ROM bootloader.  | 
 | 26 | +# Note this tool does Not support advanced features, other ESP chips or stub loading.  | 
 | 27 | +# This is only meant to be used for updating the U-blox Nina module firmware.  | 
 | 28 | + | 
 | 29 | +import os  | 
 | 30 | +import struct  | 
 | 31 | +from machine import Pin  | 
 | 32 | +from machine import UART  | 
 | 33 | +from micropython import const  | 
 | 34 | +from time import sleep  | 
 | 35 | + | 
 | 36 | +_CMD_SYNC = const(0x08)  | 
 | 37 | +_CMD_CHANGE_BAUDRATE = const(0x0F)  | 
 | 38 | + | 
 | 39 | +_CMD_ESP_READ_REG = const(0x0A)  | 
 | 40 | +_CMD_ESP_WRITE_REG = const(0x09)  | 
 | 41 | + | 
 | 42 | +_CMD_SPI_ATTACH = const(0x0D)  | 
 | 43 | +_CMD_SPI_FLASH_MD5 = const(0x13)  | 
 | 44 | +_CMD_SPI_FLASH_PARAMS = const(0x0B)  | 
 | 45 | +_CMD_SPI_FLASH_BEGIN = const(0x02)  | 
 | 46 | +_CMD_SPI_FLASH_DATA = const(0x03)  | 
 | 47 | +_CMD_SPI_FLASH_END = const(0x04)  | 
 | 48 | + | 
 | 49 | +_FLASH_ID = const(0)  | 
 | 50 | +_FLASH_REG_BASE = const(0x60002000)  | 
 | 51 | +_FLASH_BLOCK_SIZE = const(64 * 1024)  | 
 | 52 | +_FLASH_SECTOR_SIZE = const(4 * 1024)  | 
 | 53 | +_FLASH_PAGE_SIZE = const(256)  | 
 | 54 | + | 
 | 55 | +_ESP_ERRORS = {  | 
 | 56 | +    0x05: "Received message is invalid",  | 
 | 57 | +    0x06: "Failed to act on received message",  | 
 | 58 | +    0x07: "Invalid CRC in message",  | 
 | 59 | +    0x08: "Flash write error",  | 
 | 60 | +    0x09: "Flash read error",  | 
 | 61 | +    0x0A: "Flash read length error",  | 
 | 62 | +    0x0B: "Deflate error",  | 
 | 63 | +}  | 
 | 64 | + | 
 | 65 | + | 
 | 66 | +class ESPFlash:  | 
 | 67 | +    def __init__(  | 
 | 68 | +        self, reset=3, gpio0=2, uart_id=1, uart_tx=Pin(8), uart_rx=Pin(9), log_enabled=False  | 
 | 69 | +    ):  | 
 | 70 | +        self.uart_id = uart_id  | 
 | 71 | +        self.uart_tx = uart_tx  | 
 | 72 | +        self.uart_rx = uart_rx  | 
 | 73 | +        self.uart_buf = 4096  | 
 | 74 | +        self.uart_baudrate = 115200  | 
 | 75 | +        self.log = log_enabled  | 
 | 76 | +        self.reset_pin = Pin(reset, Pin.OUT)  | 
 | 77 | +        self.gpio0_pin = Pin(gpio0, Pin.OUT)  | 
 | 78 | +        self.set_baudrate(self.uart_baudrate)  | 
 | 79 | + | 
 | 80 | +    def _log(self, data, out=True):  | 
 | 81 | +        if self.log:  | 
 | 82 | +            size = len(data)  | 
 | 83 | +            print(  | 
 | 84 | +                f"out({size}) => " if out else f"in({size})  <= ",  | 
 | 85 | +                "".join("%.2x" % (i) for i in data[0:10]),  | 
 | 86 | +            )  | 
 | 87 | + | 
 | 88 | +    def set_baudrate(self, baudrate, timeout=350):  | 
 | 89 | +        if baudrate != self.uart_baudrate:  | 
 | 90 | +            print(f"Changing baudrate => {baudrate}")  | 
 | 91 | +            self.uart_drain()  | 
 | 92 | +            self.command(_CMD_CHANGE_BAUDRATE, struct.pack("<II", baudrate, 0))  | 
 | 93 | +            self.uart_baudrate = baudrate  | 
 | 94 | +        self.uart = UART(  | 
 | 95 | +            self.uart_id,  | 
 | 96 | +            baudrate,  | 
 | 97 | +            tx=self.uart_tx,  | 
 | 98 | +            rx=self.uart_rx,  | 
 | 99 | +            rxbuf=self.uart_buf,  | 
 | 100 | +            txbuf=self.uart_buf,  | 
 | 101 | +            timeout=timeout,  | 
 | 102 | +        )  | 
 | 103 | +        self.uart_drain()  | 
 | 104 | + | 
 | 105 | +    def uart_drain(self):  | 
 | 106 | +        while self.uart.read(1) is not None:  | 
 | 107 | +            pass  | 
 | 108 | + | 
 | 109 | +    def write_slip(self, pkt):  | 
 | 110 | +        pkt = pkt.replace(b"\xDB", b"\xdb\xdd").replace(b"\xc0", b"\xdb\xdc")  | 
 | 111 | +        self.uart.write(b"\xC0" + pkt + b"\xC0")  | 
 | 112 | +        self._log(pkt)  | 
 | 113 | + | 
 | 114 | +    def read_slip(self):  | 
 | 115 | +        pkt = None  | 
 | 116 | +        # Find the packet start.  | 
 | 117 | +        if self.uart.read(1) == b"\xC0":  | 
 | 118 | +            pkt = bytearray()  | 
 | 119 | +            while True:  | 
 | 120 | +                b = self.uart.read(1)  | 
 | 121 | +                if b is None or b == b"\xC0":  | 
 | 122 | +                    break  | 
 | 123 | +                pkt += b  | 
 | 124 | +            pkt = pkt.replace(b"\xDB\xDD", b"\xDB").replace(b"\xDB\xDC", b"\xC0")  | 
 | 125 | +            self._log(b"\xC0" + pkt + b"\xC0", False)  | 
 | 126 | +        return pkt  | 
 | 127 | + | 
 | 128 | +    def esperror(self, err):  | 
 | 129 | +        if err in _ESP_ERRORS:  | 
 | 130 | +            return _ESP_ERRORS[err]  | 
 | 131 | +        return "Unknown error"  | 
 | 132 | + | 
 | 133 | +    def checksum(self, data):  | 
 | 134 | +        checksum = 0xEF  | 
 | 135 | +        for i in data:  | 
 | 136 | +            checksum ^= i  | 
 | 137 | +        return checksum  | 
 | 138 | + | 
 | 139 | +    def command(self, cmd, payload=b"", checksum=0):  | 
 | 140 | +        self.write_slip(struct.pack(b"<BBHI", 0, cmd, len(payload), checksum) + payload)  | 
 | 141 | +        for i in range(10):  | 
 | 142 | +            pkt = self.read_slip()  | 
 | 143 | +            if pkt is not None and len(pkt) >= 8:  | 
 | 144 | +                (flag, _cmd, size, val) = struct.unpack("<BBHI", pkt[:8])  | 
 | 145 | +                if flag == 1 and cmd == _cmd:  | 
 | 146 | +                    status = list(pkt[-4:])  | 
 | 147 | +                    if status[0] == 1:  | 
 | 148 | +                        raise Exception(f"Command {cmd} failed {self.esperror(status[1])}")  | 
 | 149 | +                    return val, pkt[8:]  | 
 | 150 | +        raise Exception(f"Failed to read response to command {cmd}.")  | 
 | 151 | + | 
 | 152 | +    def bootloader(self, retry=6):  | 
 | 153 | +        for i in range(retry):  | 
 | 154 | +            self.gpio0_pin(1)  | 
 | 155 | +            self.reset_pin(0)  | 
 | 156 | +            sleep(0.1)  | 
 | 157 | +            self.gpio0_pin(0)  | 
 | 158 | +            self.reset_pin(1)  | 
 | 159 | +            sleep(0.1)  | 
 | 160 | +            self.gpio0_pin(1)  | 
 | 161 | + | 
 | 162 | +            if "POWERON_RESET" not in self.uart.read():  | 
 | 163 | +                continue  | 
 | 164 | + | 
 | 165 | +            for i in range(10):  | 
 | 166 | +                self.uart_drain()  | 
 | 167 | +                try:  | 
 | 168 | +                    # 36 bytes: 0x07 0x07 0x12 0x20, followed by 32 x 0x55  | 
 | 169 | +                    self.command(_CMD_SYNC, b"\x07\x07\x12\x20" + 32 * b"\x55")  | 
 | 170 | +                    self.uart_drain()  | 
 | 171 | +                    return True  | 
 | 172 | +                except Exception as e:  | 
 | 173 | +                    print(e)  | 
 | 174 | + | 
 | 175 | +        raise Exception("Failed to enter download mode!")  | 
 | 176 | + | 
 | 177 | +    def _read_reg(self, addr):  | 
 | 178 | +        v, d = self.command(_CMD_ESP_READ_REG, struct.pack("<I", _FLASH_REG_BASE + addr))  | 
 | 179 | +        if d[0] != 0:  | 
 | 180 | +            raise Exception("Command ESP_READ_REG failed.")  | 
 | 181 | +        return v  | 
 | 182 | + | 
 | 183 | +    def _write_reg(self, addr, data, mask=0xFFFFFFFF, delay=0):  | 
 | 184 | +        v, d = self.command(  | 
 | 185 | +            _CMD_ESP_WRITE_REG, struct.pack("<IIII", _FLASH_REG_BASE + addr, data, mask, delay)  | 
 | 186 | +        )  | 
 | 187 | +        if d[0] != 0:  | 
 | 188 | +            raise Exception("Command ESP_WRITE_REG failed.")  | 
 | 189 | + | 
 | 190 | +    def _poll_reg(self, addr, flag, retry=10, delay=0.050):  | 
 | 191 | +        for i in range(0, retry):  | 
 | 192 | +            reg = self._read_reg(addr)  | 
 | 193 | +            if (reg & flag) == 0:  | 
 | 194 | +                break  | 
 | 195 | +            sleep(delay)  | 
 | 196 | +        else:  | 
 | 197 | +            raise Exception(f"Register poll timeout. Addr: 0x{addr:02X} Flag: 0x{flag:02X}.")  | 
 | 198 | + | 
 | 199 | +    def flash_read_size(self):  | 
 | 200 | +        SPI_REG_CMD = 0x00  | 
 | 201 | +        SPI_USR_FLAG = 1 << 18  | 
 | 202 | +        SPI_REG_USR = 0x1C  | 
 | 203 | +        SPI_REG_USR2 = 0x24  | 
 | 204 | +        SPI_REG_W0 = 0x80  | 
 | 205 | +        SPI_REG_DLEN = 0x2C  | 
 | 206 | + | 
 | 207 | +        # Command bit len | command  | 
 | 208 | +        SPI_RDID_CMD = ((8 - 1) << 28) | 0x9F  | 
 | 209 | +        SPI_RDID_LEN = 24 - 1  | 
 | 210 | + | 
 | 211 | +        # Save USR and USR2 registers  | 
 | 212 | +        reg_usr = self._read_reg(SPI_REG_USR)  | 
 | 213 | +        reg_usr2 = self._read_reg(SPI_REG_USR2)  | 
 | 214 | + | 
 | 215 | +        # Enable command phase and read phase.  | 
 | 216 | +        self._write_reg(SPI_REG_USR, (1 << 31) | (1 << 28))  | 
 | 217 | + | 
 | 218 | +        # Configure command.  | 
 | 219 | +        self._write_reg(SPI_REG_DLEN, SPI_RDID_LEN)  | 
 | 220 | +        self._write_reg(SPI_REG_USR2, SPI_RDID_CMD)  | 
 | 221 | + | 
 | 222 | +        self._write_reg(SPI_REG_W0, 0)  | 
 | 223 | +        # Trigger SPI operation.  | 
 | 224 | +        self._write_reg(SPI_REG_CMD, SPI_USR_FLAG)  | 
 | 225 | + | 
 | 226 | +        # Poll CMD_USER flag.  | 
 | 227 | +        self._poll_reg(SPI_REG_CMD, SPI_USR_FLAG)  | 
 | 228 | + | 
 | 229 | +        # Restore USR and USR2 registers  | 
 | 230 | +        self._write_reg(SPI_REG_USR, reg_usr)  | 
 | 231 | +        self._write_reg(SPI_REG_USR2, reg_usr2)  | 
 | 232 | + | 
 | 233 | +        flash_bits = int(self._read_reg(SPI_REG_W0)) >> 16  | 
 | 234 | +        if flash_bits < 0x12 or flash_bits > 0x19:  | 
 | 235 | +            raise Exception(f"Unexpected flash size bits: 0x{flash_bits:02X}.")  | 
 | 236 | + | 
 | 237 | +        flash_size = 2**flash_bits  | 
 | 238 | +        print(f"Flash size {flash_size/1024/1024} MBytes")  | 
 | 239 | +        return flash_size  | 
 | 240 | + | 
 | 241 | +    def flash_attach(self):  | 
 | 242 | +        self.command(_CMD_SPI_ATTACH, struct.pack("<II", 0, 0))  | 
 | 243 | +        print(f"Flash attached")  | 
 | 244 | + | 
 | 245 | +    def flash_config(self, flash_size=2 * 1024 * 1024):  | 
 | 246 | +        self.command(  | 
 | 247 | +            _CMD_SPI_FLASH_PARAMS,  | 
 | 248 | +            struct.pack(  | 
 | 249 | +                "<IIIIII",  | 
 | 250 | +                _FLASH_ID,  | 
 | 251 | +                flash_size,  | 
 | 252 | +                _FLASH_BLOCK_SIZE,  | 
 | 253 | +                _FLASH_SECTOR_SIZE,  | 
 | 254 | +                _FLASH_PAGE_SIZE,  | 
 | 255 | +                0xFFFF,  | 
 | 256 | +            ),  | 
 | 257 | +        )  | 
 | 258 | + | 
 | 259 | +    def flash_write_file(self, path, blksize=0x1000):  | 
 | 260 | +        size = os.stat(path)[6]  | 
 | 261 | +        total_blocks = (size + blksize - 1) // blksize  | 
 | 262 | +        erase_blocks = 1  | 
 | 263 | +        print(f"Flash write size: {size} total_blocks: {total_blocks} block size: {blksize}")  | 
 | 264 | +        with open(path, "rb") as f:  | 
 | 265 | +            seq = 0  | 
 | 266 | +            subseq = 0  | 
 | 267 | +            for i in range(total_blocks):  | 
 | 268 | +                buf = f.read(blksize)  | 
 | 269 | +                if len(buf) < blksize:  | 
 | 270 | +                    # The last data block should be padded to the block size with 0xFF bytes.  | 
 | 271 | +                    buf += b"\xFF" * (blksize - len(buf))  | 
 | 272 | +                checksum = self.checksum(buf)  | 
 | 273 | +                if seq % erase_blocks == 0:  | 
 | 274 | +                    # print(f"Erasing {seq} -> {seq+erase_blocks}...")  | 
 | 275 | +                    self.command(  | 
 | 276 | +                        _CMD_SPI_FLASH_BEGIN,  | 
 | 277 | +                        struct.pack(  | 
 | 278 | +                            "<IIII", erase_blocks * blksize, erase_blocks, blksize, seq * blksize  | 
 | 279 | +                        ),  | 
 | 280 | +                    )  | 
 | 281 | +                print(f"Writing sequence number {seq}/{total_blocks}...")  | 
 | 282 | +                self.command(  | 
 | 283 | +                    _CMD_SPI_FLASH_DATA,  | 
 | 284 | +                    struct.pack("<IIII", len(buf), seq % erase_blocks, 0, 0) + buf,  | 
 | 285 | +                    checksum,  | 
 | 286 | +                )  | 
 | 287 | +                seq += 1  | 
 | 288 | + | 
 | 289 | +        print("Flash write finished")  | 
 | 290 | + | 
 | 291 | +    def flash_verify_file(self, path, md5sum, offset=0):  | 
 | 292 | +        size = os.stat(path)[6]  | 
 | 293 | +        val, data = self.command(_CMD_SPI_FLASH_MD5, struct.pack("<IIII", offset, size, 0, 0))  | 
 | 294 | +        print(f"Flash verify file MD5 {md5sum}")  | 
 | 295 | +        print(f"Flash verify flash MD5 {bytes(data[0:32])}")  | 
 | 296 | +        if md5sum == data[0:32]:  | 
 | 297 | +            print("Firmware write verified")  | 
 | 298 | +        else:  | 
 | 299 | +            raise Exception(f"Firmware verification failed")  | 
 | 300 | + | 
 | 301 | +    def reboot(self):  | 
 | 302 | +        payload = struct.pack("<I", 0)  | 
 | 303 | +        self.write_slip(struct.pack(b"<BBHI", 0, _CMD_SPI_FLASH_END, len(payload), 0) + payload)  | 
0 commit comments