diff --git a/test/firmware.py b/test/firmware.py new file mode 100644 index 0000000..66a1633 --- /dev/null +++ b/test/firmware.py @@ -0,0 +1,62 @@ +"""Test uploading firmware""" + +import time +import unittest +from multiprocessing import Process, Queue +from queue import Empty + +from gpiozero import DigitalOutputDevice + +RESET_GPIO_NUMBER = 4 +BOOT0_GPIO_NUMBER = 22 + + +def resethat(): + """Reset the HAT""" + reset = DigitalOutputDevice(RESET_GPIO_NUMBER) + boot0 = DigitalOutputDevice(BOOT0_GPIO_NUMBER) + boot0.off() + reset.off() + time.sleep(0.01) + reset.on() + time.sleep(0.01) + boot0.close() + reset.close() + time.sleep(0.5) + + +def reboot(exc): + """Reboot hat and load firmware + + :param exc: Queue to pass exceptions + """ + try: + from buildhat import Hat + resethat() + h = Hat(debug=True) + print(h.get()) + except Exception as e: + exc.put(e) + + +class TestFirmware(unittest.TestCase): + """Test firmware uploading functions""" + + def test_upload(self): + """Test upload firmware + + :raises exc.get: Raised if exception in subprocess + """ + for _ in range(2): + exc = Queue() + p = Process(target=reboot, args=(exc,)) + p.start() + p.join() + try: + raise exc.get(False) + except Empty: + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/test/powercycle/.gitignore b/test/powercycle/.gitignore new file mode 100644 index 0000000..6ff331c --- /dev/null +++ b/test/powercycle/.gitignore @@ -0,0 +1 @@ +hosts diff --git a/test/powercycle/control.yml b/test/powercycle/control.yml new file mode 100644 index 0000000..aa5248d --- /dev/null +++ b/test/powercycle/control.yml @@ -0,0 +1,29 @@ +- name: Run test + hosts: hosts + gather_facts: false + environment: + PYTHONPATH: /home/pi/Repositories/python-build-hat:/home/pi/.local/lib/python3.9/site-packages/ + tasks: + - name: Wait 300 seconds, but only start checking after 60 seconds + wait_for_connection: + delay: 60 + timeout: 300 + - name: Run experiment + command: python3 /home/pi/Repositories/python-build-hat/test/firmware.py + vars: + ansible_command_timeout: 60 # timeout set to double time firmware.py takes to run + - name: Find logs from experiment + find: + paths: /tmp + patterns: "buildhat-*.log" + register: log_files + - name: Copy + copy: + remote_src: yes + src: "{{item.path}}" + dest: "/home/pi/logs/" + loop: "{{log_files.files}}" + - name: Shutdown machine + command: systemctl poweroff + when: shutdown is defined + become: yes diff --git a/test/powercycle/restart.py b/test/powercycle/restart.py new file mode 100644 index 0000000..9b9d51b --- /dev/null +++ b/test/powercycle/restart.py @@ -0,0 +1,56 @@ +import json +import os +import time +import urllib.request + +from ansible import context +from ansible.cli import CLI +from ansible.executor.playbook_executor import PlaybookExecutor +from ansible.inventory.manager import InventoryManager +from ansible.module_utils.common.collections import ImmutableDict +from ansible.parsing.dataloader import DataLoader +from ansible.vars.manager import VariableManager + + +def runPlaybook(yml, evars): + loader = DataLoader() + context.CLIARGS = ImmutableDict(tags={}, timeout=300, listtags=False, listtasks=False, listhosts=False, syntax=False, connection='ssh', + module_path=None, forks=100, private_key_file=None, ssh_common_args=None, ssh_extra_args=None, + sftp_extra_args=None, scp_extra_args=None, become=False, become_method='sudo', become_user='root', + verbosity=True, check=False, start_at_task=None, extra_vars=[{**{"ansible_python_interpreter": "/usr/bin/python3"}, **evars}]) + inventory = InventoryManager(loader=loader, sources=('hosts',)) + variable_manager = VariableManager(loader=loader, inventory=inventory, version_info=CLI.version_info(gitinfo=False)) + pbex = PlaybookExecutor(playbooks=[yml], inventory=inventory, variable_manager=variable_manager, loader=loader, passwords={}) + results = pbex.run() + if results != 0: + raise RuntimeError("Playbook failed to run correctly") + +def switch(plug, on): + if on: + state = "On" + else: + state = "Off" + api_url = f"http://{plug}/cm?cmnd=Power%20{state}" + request = urllib.request.Request(api_url) + with urllib.request.urlopen(request) as response: + data = json.loads(response.read().decode("utf-8")) + if data['POWER'] != state.upper(): + raise Exception("Failed to set power", data['POWER']) + +if __name__ == "__main__": + + plug = os.getenv('TASMOTA') + if plug is None: + raise Exception("Need to set TASMOTA env variable") + + COUNT = 3 + for i in range(COUNT): + print(f"At {i}") + if i < COUNT - 1: + runPlaybook("control.yml", {"shutdown" : True}) + time.sleep(20) + switch(plug, False) + time.sleep(5) + switch(plug, True) + else: + runPlaybook("control.yml", {})