Skip to content

Commit 7362966

Browse files
committed
connections: pipe stdin to running program
Issue: #17
1 parent fb1fbaf commit 7362966

File tree

1 file changed

+107
-2
lines changed

1 file changed

+107
-2
lines changed

pybricksdev/connections.py

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33

44
import asyncio
55
import base64
6+
import io
67
import json
78
import logging
89
import os
910
import random
1011
import struct
12+
import sys
1113

1214
import asyncssh
1315
import semver
@@ -604,8 +606,13 @@ def __init__(self):
604606
# used to notify when the user program has ended
605607
self.user_program_stopped = asyncio.Event()
606608

609+
# after a hub is connected, this will contain the kind of hub
607610
self.hub_kind: HubKind
611+
# after a hub is connected, this will contain the hub variant
608612
self.hub_variant: int
613+
# buffer to hold hub stdout data received from the hub while local
614+
# stdout is busy
615+
self._buffered_stdout: io.BytesIO = io.BytesIO()
609616

610617
# File handle for logging
611618
self.log_file = None
@@ -668,6 +675,16 @@ def nus_handler(self, sender, data):
668675
logger.debug(f"Correct checksum: {checksum}")
669676
return
670677

678+
if self.loading:
679+
# avoid echoing while progress bar is showing
680+
self._buffered_stdout.write(data)
681+
else:
682+
sys.stdout.buffer.write(data)
683+
sys.stdout.buffer.flush()
684+
return
685+
686+
# FIXME: make attaching data handler optional
687+
671688
# Store incoming data
672689
self.stream_buf += data
673690
logger.debug("NUS DATA: {0}".format(data))
@@ -702,6 +719,10 @@ def pybricks_service_handler(self, _: int, data: bytes) -> None:
702719
if self.program_running != program_running_now:
703720
logger.info(f"Program running: {program_running_now}")
704721
self.program_running = program_running_now
722+
# we can receive stdio data from the hub before the download
723+
# is "done", so it was buffered and we output it now
724+
sys.stdout.buffer.write(self._buffered_stdout.read())
725+
sys.stdout.buffer.flush()
705726
if not program_running_now:
706727
self.user_program_stopped.set()
707728

@@ -813,5 +834,89 @@ async def run(self, py_path, wait=True, print_output=True):
813834
self.loading = False
814835

815836
if wait:
816-
await self.user_program_stopped.wait()
817-
await asyncio.sleep(0.3)
837+
loop = asyncio.get_running_loop()
838+
839+
# parallel task: read from stdin and send it to the hub
840+
async def pipe_stdin():
841+
try:
842+
reader = asyncio.StreamReader()
843+
protocol = asyncio.StreamReaderProtocol(reader)
844+
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
845+
846+
# BOOST Move hub has limited MTU of 23 bytes
847+
chunk_size = 20 if self.hub_kind == HubKind.BOOST else 100
848+
849+
while True:
850+
data = await reader.read(chunk_size)
851+
852+
if not data: # EOF
853+
break
854+
855+
await self.client.write_gatt_char(NUS_RX_UUID, data)
856+
except asyncio.CancelledError:
857+
pass
858+
859+
# parallel task: wait for the hub to tell us that the program is done
860+
async def wait_for_program_end():
861+
await self.user_program_stopped.wait()
862+
863+
# HACK: There may still be buffered stdout from the user
864+
# program that hasn't been received yet. Hopefully, this
865+
# is long enough to wait for all of it.
866+
await asyncio.sleep(0.3)
867+
868+
# HACK: handle dropping to REPL after Ctrl-C
869+
# needed since user program flag is unset then set again
870+
if self.program_running:
871+
self.user_program_stopped.clear()
872+
await self.user_program_stopped.wait()
873+
874+
# combine the parallel tasks
875+
def pipe_and_wait():
876+
pipe_task = loop.create_task(pipe_stdin())
877+
wait_task = loop.create_task(wait_for_program_end())
878+
879+
# pipe_stdin() will run until EOF, which may be never, so we
880+
# have to cancel to prevent waiting forever
881+
wait_task.add_done_callback(lambda _: pipe_task.cancel())
882+
883+
return asyncio.gather(pipe_task, wait_task)
884+
885+
fd = sys.stdin.fileno()
886+
if os.isatty(fd):
887+
from termios import (
888+
tcgetattr,
889+
tcsetattr,
890+
TCSANOW,
891+
ECHO,
892+
ICANON,
893+
ICRNL,
894+
INLCR,
895+
VINTR,
896+
VMIN,
897+
VTIME,
898+
)
899+
from tty import LFLAG, IFLAG, CC
900+
901+
new_mode = save_mode = tcgetattr(fd)
902+
try:
903+
new_mode = tcgetattr(fd)
904+
# Disable echo and canonical input (don't wait for newline, pass EOF, etc.)
905+
new_mode[LFLAG] = new_mode[LFLAG] & ~(ECHO | ICANON)
906+
# Change the line endings from \n to \r as required by MicroPython's readline
907+
new_mode[IFLAG] = new_mode[IFLAG] & ~(ICRNL) | (INLCR)
908+
# Change Ctrl-C to Ctrl-X so that Ctrl-C gets passed to the hub
909+
new_mode[CC][VINTR] = 24
910+
# read at least one byte at a time, no timeout
911+
new_mode[CC][VMIN] = 1
912+
new_mode[CC][VTIME] = 0
913+
914+
tcsetattr(fd, TCSANOW, new_mode)
915+
916+
await pipe_and_wait()
917+
finally:
918+
# restore the original TTY settings
919+
tcsetattr(fd, TCSANOW, save_mode)
920+
921+
else:
922+
await pipe_and_wait()

0 commit comments

Comments
 (0)