Skip to content

Initial logging support #145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions buildhat/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(self, port):
Device._used[p] = True

@staticmethod
def _setup(device="/dev/serial0"):
def _setup(**kwargs):
if Device._instance:
return
data = os.path.join(os.path.dirname(sys.modules["buildhat"].__file__), "data/")
Expand All @@ -77,7 +77,7 @@ def _setup(device="/dev/serial0"):
vfile = open(ver)
v = int(vfile.read())
vfile.close()
Device._instance = BuildHAT(firm, sig, v, device=device)
Device._instance = BuildHAT(firm, sig, v, **kwargs)
weakref.finalize(Device._instance, Device._instance.shutdown)

def __del__(self):
Expand Down
9 changes: 5 additions & 4 deletions buildhat/hat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
class Hat:
"""Allows enumeration of devices which are connected to the hat"""

def __init__(self, device=None):
def __init__(self, device=None, debug=False):
"""Hat

:param device: Optional string containing path to Build HAT serial device
:param debug: Optional boolean to log debug information
"""
self.led_status = -1
if not device:
Device._setup()
if device is None:
Device._setup(debug=debug)
else:
Device._setup(device)
Device._setup(device=device, debug=debug)

def get(self):
"""Get devices which are connected or disconnected
Expand Down
82 changes: 52 additions & 30 deletions buildhat/serinterface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Build HAT handling functionality"""

import logging
import queue
import tempfile
import threading
import time
from enum import Enum
Expand Down Expand Up @@ -69,13 +71,14 @@ class BuildHAT:
RESET_GPIO_NUMBER = 4
BOOT0_GPIO_NUMBER = 22

def __init__(self, firmware, signature, version, device="/dev/serial0"):
def __init__(self, firmware, signature, version, device="/dev/serial0", debug=False):
"""Interact with Build HAT

:param firmware: Firmware file
:param signature: Signature file
:param version: Firmware version
:param device: Serial device to use
:param debug: Optional boolean to log debug information
:raises BuildHATError: Occurs if can't find HAT
"""
self.cond = Condition()
Expand All @@ -88,6 +91,10 @@ def __init__(self, firmware, signature, version, device="/dev/serial0"):
self.running = True
self.vincond = Condition()
self.vin = None
if debug:
tmp = tempfile.NamedTemporaryFile(suffix=".log", prefix="buildhat-", delete=False)
logging.basicConfig(filename=tmp.name, format='%(asctime)s %(message)s',
level=logging.INFO)

for _ in range(4):
self.connections.append(Connection())
Expand All @@ -99,16 +106,18 @@ def __init__(self, firmware, signature, version, device="/dev/serial0"):
# Check if we're in the bootloader or the firmware
self.write(b"version\r")

emptydata = 0
incdata = 0
while True:
try:
line = self.ser.readline().decode('utf-8', 'ignore')
except serial.SerialException:
pass
line = self.read()
if len(line) == 0:
# Didn't recieve any data
break
if line[:len(BuildHAT.FIRMWARE)] == BuildHAT.FIRMWARE:
# Didn't receive any data
emptydata += 1
if emptydata > 3:
break
else:
continue
if cmp(line, BuildHAT.FIRMWARE):
self.state = HatState.FIRMWARE
ver = line[len(BuildHAT.FIRMWARE):].split(' ')
if int(ver[0]) == version:
Expand All @@ -117,7 +126,7 @@ def __init__(self, firmware, signature, version, device="/dev/serial0"):
else:
self.state = HatState.NEEDNEWFIRMWARE
break
elif line[:len(BuildHAT.BOOTLOADER)] == BuildHAT.BOOTLOADER:
elif cmp(line, BuildHAT.BOOTLOADER):
self.state = HatState.BOOTLOADER
break
else:
Expand Down Expand Up @@ -184,15 +193,15 @@ def loadfirmware(self, firmware, signature):
self.getprompt()
self.write("load {} {}\r".format(len(firm), self.checksum(firm)).encode())
time.sleep(0.1)
self.write(b"\x02")
self.write(firm)
self.write(b"\x03\r")
self.write(b"\x02", replace="0x02")
self.write(firm, replace="--firmware file--")
self.write(b"\x03\r", replace="0x03")
self.getprompt()
self.write("signature {}\r".format(len(sig)).encode())
time.sleep(0.1)
self.write(b"\x02")
self.write(sig)
self.write(b"\x03\r")
self.write(b"\x02", replace="0x02")
self.write(sig, replace="--signature file--")
self.write(b"\x03\r", replace="0x03")
self.getprompt()

def getprompt(self):
Expand All @@ -201,12 +210,8 @@ def getprompt(self):
Need to decide what we will do, when no prompt
"""
while True:
line = b""
try:
line = self.ser.readline().decode('utf-8', 'ignore')
except serial.SerialException:
pass
if line[:len(BuildHAT.PROMPT)] == BuildHAT.PROMPT:
line = self.read()
if cmp(line, BuildHAT.PROMPT):
break

def checksum(self, data):
Expand All @@ -224,12 +229,33 @@ def checksum(self, data):
u = (u ^ data[i]) & 0xFFFFFFFF
return u

def write(self, data):
def write(self, data, log=True, replace=""):
"""Write data to the serial port of Build HAT

:param data: Data to write to Build HAT
:param log: Whether to log line or not
:param replace: Whether to log an alternative string
"""
self.ser.write(data)
if not self.fin and log:
if replace != "":
logging.info("> {}".format(replace))
else:
logging.info("> {}".format(data.decode('utf-8', 'ignore').strip()))

def read(self):
"""Read data from the serial port of Build HAT

:return: Line that has been read
"""
line = ""
try:
line = self.ser.readline().decode('utf-8', 'ignore').strip()
except serial.SerialException:
pass
if line != "":
logging.info("< {}".format(line))
return line

def shutdown(self):
"""Turn off the Build HAT devices"""
Expand Down Expand Up @@ -275,11 +301,7 @@ def loop(self, cond, uselist, q):
"""
count = 0
while self.running:
line = b""
try:
line = self.ser.readline().decode('utf-8', 'ignore')
except serial.SerialException:
pass
line = self.read()
if len(line) == 0:
continue
if line[0] == "P" and line[2] == ":":
Expand Down Expand Up @@ -326,7 +348,7 @@ def runit():

if line[0] == "P" and (line[2] == "C" or line[2] == "M"):
portid = int(line[1])
data = line[5:].strip().split(" ")
data = line[5:].split(" ")
newdata = []
for d in data:
if "." in d:
Expand All @@ -341,8 +363,8 @@ def runit():
with self.portcond[portid]:
self.portcond[portid].notify()

if len(line) >= 5 and line[1] == "." and line.strip().endswith(" V"):
vin = float(line.strip().split(" ")[0])
if len(line) >= 5 and line[1] == "." and line.endswith(" V"):
vin = float(line.split(" ")[0])
self.vin = vin
with self.vincond:
self.vincond.notify()