diff --git a/src/libtmux/_internal/engines/base.py b/src/libtmux/_internal/engines/base.py new file mode 100644 index 000000000..47c17f576 --- /dev/null +++ b/src/libtmux/_internal/engines/base.py @@ -0,0 +1,164 @@ +"""Engine abstractions and shared types for libtmux.""" + +from __future__ import annotations + +import dataclasses +import enum +import typing as t +from abc import ABC, abstractmethod + +from libtmux.common import tmux_cmd + + +class ExitStatus(enum.Enum): + """Exit status returned by tmux control mode commands.""" + + OK = 0 + ERROR = 1 + + +@dataclasses.dataclass +class CommandResult: + """Canonical result shape produced by engines. + + This is the internal representation used by engines. Public-facing APIs + still return :class:`libtmux.common.tmux_cmd` for compatibility; see + :func:`command_result_to_tmux_cmd`. + """ + + argv: list[str] + stdout: list[str] + stderr: list[str] + exit_status: ExitStatus + cmd_id: int | None = None + start_time: float | None = None + end_time: float | None = None + tmux_time: int | None = None + flags: int | None = None + + @property + def returncode(self) -> int: + """Return a POSIX-style return code matching tmux expectations.""" + return 0 if self.exit_status is ExitStatus.OK else 1 + + +class NotificationKind(enum.Enum): + """High-level categories for tmux control-mode notifications.""" + + PANE_OUTPUT = enum.auto() + PANE_EXTENDED_OUTPUT = enum.auto() + PANE_MODE_CHANGED = enum.auto() + WINDOW_ADD = enum.auto() + WINDOW_CLOSE = enum.auto() + WINDOW_RENAMED = enum.auto() + WINDOW_PANE_CHANGED = enum.auto() + SESSION_CHANGED = enum.auto() + SESSIONS_CHANGED = enum.auto() + SESSION_WINDOW_CHANGED = enum.auto() + PAUSE = enum.auto() + CONTINUE = enum.auto() + SUBSCRIPTION_CHANGED = enum.auto() + EXIT = enum.auto() + RAW = enum.auto() + + +@dataclasses.dataclass +class Notification: + """Parsed notification emitted by tmux control mode.""" + + kind: NotificationKind + when: float + raw: str + data: dict[str, t.Any] + + +@dataclasses.dataclass +class EngineStats: + """Light-weight diagnostics about engine state.""" + + in_flight: int + notif_queue_depth: int + dropped_notifications: int + restarts: int + last_error: str | None + last_activity: float | None + + +def command_result_to_tmux_cmd(result: CommandResult) -> tmux_cmd: + """Adapt :class:`CommandResult` into the legacy ``tmux_cmd`` wrapper.""" + proc = tmux_cmd( + cmd=result.argv, + stdout=result.stdout, + stderr=result.stderr, + returncode=result.returncode, + ) + # Preserve extra metadata for consumers that know about it. + proc.exit_status = result.exit_status # type: ignore[attr-defined] + proc.cmd_id = result.cmd_id # type: ignore[attr-defined] + proc.tmux_time = result.tmux_time # type: ignore[attr-defined] + proc.flags = result.flags # type: ignore[attr-defined] + proc.start_time = result.start_time # type: ignore[attr-defined] + proc.end_time = result.end_time # type: ignore[attr-defined] + return proc + + +class Engine(ABC): + """Abstract base class for tmux execution engines. + + Engines produce :class:`CommandResult` internally but surface ``tmux_cmd`` + to the existing libtmux public surface. Subclasses should implement + :meth:`run_result` and rely on the base :meth:`run` adapter unless they have + a strong reason to override both. + """ + + def run( + self, + cmd: str, + cmd_args: t.Sequence[str | int] | None = None, + server_args: t.Sequence[str | int] | None = None, + timeout: float | None = None, + ) -> tmux_cmd: + """Run a tmux command and return a ``tmux_cmd`` wrapper.""" + return command_result_to_tmux_cmd( + self.run_result( + cmd=cmd, + cmd_args=cmd_args, + server_args=server_args, + timeout=timeout, + ), + ) + + @abstractmethod + def run_result( + self, + cmd: str, + cmd_args: t.Sequence[str | int] | None = None, + server_args: t.Sequence[str | int] | None = None, + timeout: float | None = None, + ) -> CommandResult: + """Run a tmux command and return a :class:`CommandResult`.""" + + def iter_notifications( + self, + *, + timeout: float | None = None, + ) -> t.Iterator[Notification]: # pragma: no cover - default noop + """Yield control-mode notifications if supported by the engine.""" + if False: # keeps the function a generator for typing + yield timeout + return + + def get_stats(self) -> EngineStats: # pragma: no cover - default noop + """Return engine diagnostic stats.""" + return EngineStats( + in_flight=0, + notif_queue_depth=0, + dropped_notifications=0, + restarts=0, + last_error=None, + last_activity=None, + ) + + def close(self) -> None: # pragma: no cover - default noop + """Clean up any engine resources.""" + return None diff --git a/src/libtmux/_internal/engines/control_mode.py b/src/libtmux/_internal/engines/control_mode.py new file mode 100644 index 000000000..8bcc2501a --- /dev/null +++ b/src/libtmux/_internal/engines/control_mode.py @@ -0,0 +1,258 @@ +"""Control Mode engine for libtmux.""" + +from __future__ import annotations + +import logging +import shlex +import shutil +import subprocess +import threading +import typing as t + +from libtmux import exc +from libtmux._internal.engines.base import ( + CommandResult, + Engine, + EngineStats, + ExitStatus, + Notification, +) +from libtmux._internal.engines.control_protocol import ( + CommandContext, + ControlProtocol, +) + +logger = logging.getLogger(__name__) + + +class ControlModeEngine(Engine): + """Engine that runs tmux commands via a persistent Control Mode process.""" + + def __init__( + self, + command_timeout: float | None = 10.0, + notification_queue_size: int = 4096, + ) -> None: + self.process: subprocess.Popen[str] | None = None + self._lock = threading.Lock() + self._server_args: tuple[str | int, ...] | None = None + self.command_timeout = command_timeout + self.tmux_bin: str | None = None + self._reader_thread: threading.Thread | None = None + self._stderr_thread: threading.Thread | None = None + self._notification_queue_size = notification_queue_size + self._protocol = ControlProtocol( + notification_queue_size=notification_queue_size, + ) + self._restarts = 0 + + # Lifecycle --------------------------------------------------------- + def close(self) -> None: + """Terminate the tmux control mode process and clean up threads.""" + proc = self.process + if proc is None: + return + + try: + proc.terminate() + proc.wait(timeout=1) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + finally: + self.process = None + self._server_args = None + self._protocol.mark_dead("engine closed") + + def __del__(self) -> None: # pragma: no cover - best effort cleanup + """Ensure subprocess is terminated on GC.""" + self.close() + + # Engine API -------------------------------------------------------- + def run_result( + self, + cmd: str, + cmd_args: t.Sequence[str | int] | None = None, + server_args: t.Sequence[str | int] | None = None, + timeout: float | None = None, + ) -> CommandResult: + """Run a tmux command and return a :class:`CommandResult`.""" + incoming_server_args = tuple(server_args or ()) + effective_timeout = timeout if timeout is not None else self.command_timeout + attempts = 0 + + while True: + attempts += 1 + with self._lock: + self._ensure_process(incoming_server_args) + assert self.process is not None + full_argv: list[str] = [ + self.tmux_bin or "tmux", + *[str(x) for x in incoming_server_args], + cmd, + ] + if cmd_args: + full_argv.extend(str(a) for a in cmd_args) + + ctx = CommandContext(argv=full_argv) + self._protocol.register_command(ctx) + + command_line = shlex.join([cmd, *(str(a) for a in cmd_args or [])]) + try: + self._write_line(command_line, server_args=incoming_server_args) + except exc.ControlModeConnectionError: + if attempts >= 2: + raise + # retry the full cycle with a fresh process/context + continue + + # Wait outside the lock so multiple callers can run concurrently + if not ctx.wait(timeout=effective_timeout): + self.close() + msg = "tmux control mode command timed out" + raise exc.ControlModeTimeout(msg) + + if ctx.error is not None: + raise ctx.error + + if ctx.exit_status is None: + ctx.exit_status = ExitStatus.OK + + return self._protocol.build_result(ctx) + + def iter_notifications( + self, + *, + timeout: float | None = None, + ) -> t.Iterator[Notification]: + """Yield control-mode notifications until the stream ends.""" + while True: + notif = self._protocol.get_notification(timeout=timeout) + if notif is None: + return + if notif.kind.name == "EXIT": + return + yield notif + + def get_stats(self) -> EngineStats: + """Return diagnostic statistics for the engine.""" + return self._protocol.get_stats(restarts=self._restarts) + + # Internals --------------------------------------------------------- + def _ensure_process(self, server_args: tuple[str | int, ...]) -> None: + if self.process is None: + self._start_process(server_args) + return + + if server_args != self._server_args: + logger.warning( + ( + "Server args changed; restarting Control Mode process. " + "Old: %s, New: %s" + ), + self._server_args, + server_args, + ) + self.close() + self._start_process(server_args) + + def _start_process(self, server_args: tuple[str | int, ...]) -> None: + tmux_bin = shutil.which("tmux") + if not tmux_bin: + raise exc.TmuxCommandNotFound + + self.tmux_bin = tmux_bin + self._server_args = server_args + self._protocol = ControlProtocol( + notification_queue_size=self._notification_queue_size, + ) + + cmd = [ + tmux_bin, + *[str(a) for a in server_args], + "-C", + "new-session", + "-A", + "-s", + "libtmux_control_mode", + ] + + logger.debug("Starting Control Mode process: %s", cmd) + self.process = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=0, + errors="backslashreplace", + ) + + # The initial command (new-session) emits an output block; register + # a context so the protocol can consume it. + bootstrap_ctx = CommandContext( + argv=[ + tmux_bin, + *[str(a) for a in server_args], + "new-session", + "-A", + "-s", + "libtmux_control_mode", + ], + ) + self._protocol.register_command(bootstrap_ctx) + + # Start IO threads after registration to avoid early protocol errors. + self._reader_thread = threading.Thread( + target=self._reader, + args=(self.process,), + daemon=True, + ) + self._reader_thread.start() + + self._stderr_thread = threading.Thread( + target=self._drain_stderr, + args=(self.process,), + daemon=True, + ) + self._stderr_thread.start() + + if not bootstrap_ctx.wait(timeout=self.command_timeout): + self.close() + msg = "Control Mode bootstrap command timed out" + raise exc.ControlModeTimeout(msg) + + def _write_line( + self, + command_line: str, + *, + server_args: tuple[str | int, ...], + ) -> None: + assert self.process is not None + assert self.process.stdin is not None + + try: + self.process.stdin.write(command_line + "\n") + self.process.stdin.flush() + except BrokenPipeError: + logger.exception("Control Mode process died, restarting...") + self.close() + self._restarts += 1 + msg = "control mode process unavailable" + raise exc.ControlModeConnectionError(msg) from None + + def _reader(self, process: subprocess.Popen[str]) -> None: + assert process.stdout is not None + try: + for raw in process.stdout: + self._protocol.feed_line(raw.rstrip("\n")) + except Exception: # pragma: no cover - defensive + logger.exception("Control Mode reader thread crashed") + finally: + self._protocol.mark_dead("EOF from tmux") + + def _drain_stderr(self, process: subprocess.Popen[str]) -> None: + if process.stderr is None: + return + for err_line in process.stderr: + logger.debug("Control Mode stderr: %s", err_line.rstrip("\n")) diff --git a/src/libtmux/_internal/engines/control_protocol.py b/src/libtmux/_internal/engines/control_protocol.py new file mode 100644 index 000000000..9183b3f37 --- /dev/null +++ b/src/libtmux/_internal/engines/control_protocol.py @@ -0,0 +1,280 @@ +"""Control mode protocol parsing and bookkeeping.""" + +from __future__ import annotations + +import collections +import dataclasses +import enum +import logging +import queue +import threading +import time +import typing as t + +from libtmux import exc +from libtmux._internal.engines.base import ( + CommandResult, + EngineStats, + ExitStatus, + Notification, + NotificationKind, +) + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass +class CommandContext: + """Tracks state for a single in-flight control-mode command.""" + + argv: list[str] + cmd_id: int | None = None + tmux_time: int | None = None + flags: int | None = None + stdout: list[str] = dataclasses.field(default_factory=list) + stderr: list[str] = dataclasses.field(default_factory=list) + exit_status: ExitStatus | None = None + start_time: float | None = None + end_time: float | None = None + done: threading.Event = dataclasses.field(default_factory=threading.Event) + error: BaseException | None = None + + def signal_done(self) -> None: + """Mark the context as complete.""" + self.done.set() + + def wait(self, timeout: float | None) -> bool: + """Wait for completion; returns False on timeout.""" + return self.done.wait(timeout=timeout) + + +class ParserState(enum.Enum): + """Minimal state machine for control-mode parsing.""" + + IDLE = enum.auto() + IN_COMMAND = enum.auto() + DEAD = enum.auto() + + +def _parse_notification(line: str, parts: list[str]) -> Notification: + """Map raw notification lines into structured :class:`Notification`. + + The mapping is intentionally conservative; unknown tags fall back to RAW. + """ + tag = parts[0] + now = time.monotonic() + data: dict[str, t.Any] = {} + kind = NotificationKind.RAW + + if tag == "%output" and len(parts) >= 3: + kind = NotificationKind.PANE_OUTPUT + data = {"pane_id": parts[1], "payload": " ".join(parts[2:])} + elif tag == "%extended-output" and len(parts) >= 4: + kind = NotificationKind.PANE_EXTENDED_OUTPUT + data = { + "pane_id": parts[1], + "behind_ms": parts[2], + "payload": " ".join(parts[3:]), + } + elif tag == "%pane-mode-changed" and len(parts) >= 2: + kind = NotificationKind.PANE_MODE_CHANGED + data = {"pane_id": parts[1], "mode": parts[2:]} + elif tag == "%window-add" and len(parts) >= 2: + kind = NotificationKind.WINDOW_ADD + data = {"window_id": parts[1], "rest": parts[2:]} + elif tag == "%window-close" and len(parts) >= 2: + kind = NotificationKind.WINDOW_CLOSE + data = {"window_id": parts[1]} + elif tag == "%window-renamed" and len(parts) >= 3: + kind = NotificationKind.WINDOW_RENAMED + data = {"window_id": parts[1], "name": " ".join(parts[2:])} + elif tag == "%window-pane-changed" and len(parts) >= 3: + kind = NotificationKind.WINDOW_PANE_CHANGED + data = {"window_id": parts[1], "pane_id": parts[2]} + elif tag == "%session-changed" and len(parts) >= 2: + kind = NotificationKind.SESSION_CHANGED + data = {"session_id": parts[1]} + elif tag == "%sessions-changed": + kind = NotificationKind.SESSIONS_CHANGED + elif tag == "%session-window-changed" and len(parts) >= 3: + kind = NotificationKind.SESSION_WINDOW_CHANGED + data = {"session_id": parts[1], "window_id": parts[2]} + elif tag == "%pause" and len(parts) >= 2: + kind = NotificationKind.PAUSE + data = {"pane_id": parts[1]} + elif tag == "%continue" and len(parts) >= 2: + kind = NotificationKind.CONTINUE + data = {"pane_id": parts[1]} + elif tag == "%subscription-changed" and len(parts) >= 4: + kind = NotificationKind.SUBSCRIPTION_CHANGED + data = {"name": parts[1], "type": parts[2], "value": " ".join(parts[3:])} + elif tag == "%exit": + kind = NotificationKind.EXIT + + return Notification(kind=kind, when=now, raw=line, data=data) + + +class ControlProtocol: + """Parse the tmux control-mode stream into commands and notifications.""" + + def __init__(self, *, notification_queue_size: int = 4096) -> None: + self.state = ParserState.IDLE + self._pending: collections.deque[CommandContext] = collections.deque() + self._current: CommandContext | None = None + self._notif_queue: queue.Queue[Notification] = queue.Queue( + maxsize=notification_queue_size, + ) + self._dropped_notifications = 0 + self._last_error: str | None = None + self._last_activity: float | None = None + + # Command lifecycle ------------------------------------------------- + def register_command(self, ctx: CommandContext) -> None: + """Queue a command context awaiting %begin/%end.""" + self._pending.append(ctx) + + def feed_line(self, line: str) -> None: + """Feed a raw line from tmux into the parser.""" + self._last_activity = time.monotonic() + if self.state is ParserState.DEAD: + return + + if line.startswith("%"): + self._handle_percent_line(line) + else: + self._handle_plain_line(line) + + def _handle_percent_line(self, line: str) -> None: + parts = line.split() + tag = parts[0] + + if tag == "%begin": + self._on_begin(parts) + elif tag in ("%end", "%error"): + self._on_end_or_error(tag, parts) + else: + self._on_notification(line, parts) + + def _handle_plain_line(self, line: str) -> None: + if self.state is ParserState.IN_COMMAND and self._current: + self._current.stdout.append(line) + else: + logger.debug("Unexpected plain line outside command: %r", line) + + def _on_begin(self, parts: list[str]) -> None: + if self.state is not ParserState.IDLE: + self._protocol_error("nested %begin") + return + + try: + tmux_time = int(parts[1]) + cmd_id = int(parts[2]) + flags = int(parts[3]) if len(parts) > 3 else 0 + except (IndexError, ValueError): + self._protocol_error(f"malformed %begin: {parts}") + return + + try: + ctx = self._pending.popleft() + except IndexError: + self._protocol_error(f"no pending command for %begin id={cmd_id}") + return + + ctx.cmd_id = cmd_id + ctx.tmux_time = tmux_time + ctx.flags = flags + ctx.start_time = time.monotonic() + self._current = ctx + self.state = ParserState.IN_COMMAND + + def _on_end_or_error(self, tag: str, parts: list[str]) -> None: + if self.state is not ParserState.IN_COMMAND or self._current is None: + self._protocol_error(f"unexpected {tag}") + return + + ctx = self._current + ctx.exit_status = ExitStatus.OK if tag == "%end" else ExitStatus.ERROR + ctx.end_time = time.monotonic() + + # Copy tmux_time/flags if provided on the closing tag + try: + if len(parts) > 1: + ctx.tmux_time = int(parts[1]) + if len(parts) > 3: + ctx.flags = int(parts[3]) + except ValueError: + pass + + if ctx.exit_status is ExitStatus.ERROR and ctx.stdout and not ctx.stderr: + ctx.stderr, ctx.stdout = ctx.stdout, [] + + ctx.signal_done() + self._current = None + self.state = ParserState.IDLE + + def _on_notification(self, line: str, parts: list[str]) -> None: + notif = _parse_notification(line, parts) + try: + self._notif_queue.put_nowait(notif) + except queue.Full: + self._dropped_notifications += 1 + if self._dropped_notifications & (self._dropped_notifications - 1) == 0: + logger.warning( + "Control Mode notification queue full; dropped=%d", + self._dropped_notifications, + ) + + def mark_dead(self, reason: str) -> None: + """Mark protocol as unusable and fail pending commands.""" + self.state = ParserState.DEAD + self._last_error = reason + err = exc.ControlModeConnectionError(reason) + + if self._current: + self._current.error = err + self._current.signal_done() + self._current = None + + while self._pending: + ctx = self._pending.popleft() + ctx.error = err + ctx.signal_done() + + def _protocol_error(self, reason: str) -> None: + logger.error("Control Mode protocol error: %s", reason) + self.mark_dead(reason) + + # Accessors --------------------------------------------------------- + def get_notification(self, timeout: float | None = None) -> Notification | None: + """Return the next notification or ``None`` if none available.""" + try: + return self._notif_queue.get(timeout=timeout) + except queue.Empty: + return None + + def get_stats(self, *, restarts: int) -> EngineStats: + """Return diagnostic counters for the protocol.""" + in_flight = (1 if self._current else 0) + len(self._pending) + return EngineStats( + in_flight=in_flight, + notif_queue_depth=self._notif_queue.qsize(), + dropped_notifications=self._dropped_notifications, + restarts=restarts, + last_error=self._last_error, + last_activity=self._last_activity, + ) + + def build_result(self, ctx: CommandContext) -> CommandResult: + """Convert a completed context into a :class:`CommandResult`.""" + exit_status = ctx.exit_status or ExitStatus.OK + return CommandResult( + argv=ctx.argv, + stdout=ctx.stdout, + stderr=ctx.stderr, + exit_status=exit_status, + cmd_id=ctx.cmd_id, + start_time=ctx.start_time, + end_time=ctx.end_time, + tmux_time=ctx.tmux_time, + flags=ctx.flags, + ) diff --git a/src/libtmux/_internal/engines/subprocess_engine.py b/src/libtmux/_internal/engines/subprocess_engine.py new file mode 100644 index 000000000..3f8ce455a --- /dev/null +++ b/src/libtmux/_internal/engines/subprocess_engine.py @@ -0,0 +1,85 @@ +"""Subprocess engine for libtmux.""" + +from __future__ import annotations + +import logging +import shutil +import subprocess +import typing as t + +from libtmux import exc +from libtmux._internal.engines.base import CommandResult, Engine, ExitStatus + +logger = logging.getLogger(__name__) + + +class SubprocessEngine(Engine): + """Engine that runs tmux commands via subprocess.""" + + def run_result( + self, + cmd: str, + cmd_args: t.Sequence[str | int] | None = None, + server_args: t.Sequence[str | int] | None = None, + timeout: float | None = None, + ) -> CommandResult: + """Run a tmux command using ``subprocess.Popen``.""" + tmux_bin = shutil.which("tmux") + if not tmux_bin: + raise exc.TmuxCommandNotFound + + full_cmd: list[str | int] = [tmux_bin] + if server_args: + full_cmd += list(server_args) + full_cmd.append(cmd) + if cmd_args: + full_cmd += list(cmd_args) + + full_cmd_str = [str(c) for c in full_cmd] + + try: + process = subprocess.Popen( + full_cmd_str, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + errors="backslashreplace", + ) + stdout_str, stderr_str = process.communicate(timeout=timeout) + returncode = process.returncode + except subprocess.TimeoutExpired: + process.kill() + process.wait() + msg = "tmux subprocess timed out" + raise exc.SubprocessTimeout(msg) from None + except Exception: + logger.exception(f"Exception for {subprocess.list2cmdline(full_cmd_str)}") + raise + + stdout_split = stdout_str.split("\n") + while stdout_split and stdout_split[-1] == "": + stdout_split.pop() + + stderr_split = stderr_str.split("\n") + stderr = list(filter(None, stderr_split)) + + if "has-session" in full_cmd_str and len(stderr) and not stdout_split: + stdout = [stderr[0]] + else: + stdout = stdout_split + + logger.debug( + "self.stdout for {cmd}: {stdout}".format( + cmd=" ".join(full_cmd_str), + stdout=stdout, + ), + ) + + exit_status = ExitStatus.OK if returncode == 0 else ExitStatus.ERROR + + return CommandResult( + argv=full_cmd_str, + stdout=stdout, + stderr=stderr, + exit_status=exit_status, + ) diff --git a/src/libtmux/common.py b/src/libtmux/common.py index ac9b9b7f1..7f480d4a9 100644 --- a/src/libtmux/common.py +++ b/src/libtmux/common.py @@ -219,7 +219,21 @@ class tmux_cmd: Renamed from ``tmux`` to ``tmux_cmd``. """ - def __init__(self, *args: t.Any) -> None: + def __init__( + self, + *args: t.Any, + cmd: list[str] | None = None, + stdout: list[str] | None = None, + stderr: list[str] | None = None, + returncode: int | None = None, + ) -> None: + if cmd is not None: + self.cmd = cmd + self.stdout = stdout or [] + self.stderr = stderr or [] + self.returncode = returncode + return + tmux_bin = shutil.which("tmux") if not tmux_bin: raise exc.TmuxCommandNotFound @@ -238,7 +252,7 @@ def __init__(self, *args: t.Any) -> None: text=True, errors="backslashreplace", ) - stdout, stderr = self.process.communicate() + stdout_str, stderr_str = self.process.communicate() returncode = self.process.returncode except Exception: logger.exception(f"Exception for {subprocess.list2cmdline(cmd)}") @@ -246,12 +260,12 @@ def __init__(self, *args: t.Any) -> None: self.returncode = returncode - stdout_split = stdout.split("\n") + stdout_split = stdout_str.split("\n") # remove trailing newlines from stdout while stdout_split and stdout_split[-1] == "": stdout_split.pop() - stderr_split = stderr.split("\n") + stderr_split = stderr_str.split("\n") self.stderr = list(filter(None, stderr_split)) # filter empty values if "has-session" in cmd and len(self.stderr) and not stdout_split: diff --git a/src/libtmux/exc.py b/src/libtmux/exc.py index 7777403f3..f8b537c91 100644 --- a/src/libtmux/exc.py +++ b/src/libtmux/exc.py @@ -92,6 +92,22 @@ class WaitTimeout(LibTmuxException): """Function timed out without meeting condition.""" +class ControlModeTimeout(LibTmuxException): + """tmux control mode command did not return within the timeout.""" + + +class ControlModeProtocolError(LibTmuxException): + """Protocol-level error while parsing control mode stream.""" + + +class ControlModeConnectionError(LibTmuxException): + """Control mode connection was lost unexpectedly.""" + + +class SubprocessTimeout(LibTmuxException): + """tmux subprocess exceeded the allowed timeout.""" + + class VariableUnpackingError(LibTmuxException): """Error unpacking variable.""" diff --git a/src/libtmux/server.py b/src/libtmux/server.py index 17b290c34..156aec494 100644 --- a/src/libtmux/server.py +++ b/src/libtmux/server.py @@ -16,6 +16,7 @@ import warnings from libtmux import exc, formats +from libtmux._internal.engines.subprocess_engine import SubprocessEngine from libtmux._internal.query_list import QueryList from libtmux.common import tmux_cmd from libtmux.neo import fetch_objs @@ -38,6 +39,7 @@ from typing_extensions import Self + from libtmux._internal.engines.base import Engine from libtmux._internal.types import StrPath DashLiteral: TypeAlias = t.Literal["-"] @@ -126,12 +128,17 @@ def __init__( colors: int | None = None, on_init: t.Callable[[Server], None] | None = None, socket_name_factory: t.Callable[[], str] | None = None, + engine: Engine | None = None, **kwargs: t.Any, ) -> None: EnvironmentMixin.__init__(self, "-g") self._windows: list[WindowDict] = [] self._panes: list[PaneDict] = [] + if engine is None: + engine = SubprocessEngine() + self.engine = engine + if socket_path is not None: self.socket_path = socket_path elif socket_name is not None: @@ -214,15 +221,20 @@ def raise_if_dead(self) -> None: if tmux_bin is None: raise exc.TmuxCommandNotFound - cmd_args: list[str] = ["list-sessions"] + server_args: list[str] = [] if self.socket_name: - cmd_args.insert(0, f"-L{self.socket_name}") + server_args.append(f"-L{self.socket_name}") if self.socket_path: - cmd_args.insert(0, f"-S{self.socket_path}") + server_args.append(f"-S{self.socket_path}") if self.config_file: - cmd_args.insert(0, f"-f{self.config_file}") + server_args.append(f"-f{self.config_file}") - subprocess.check_call([tmux_bin, *cmd_args]) + proc = self.engine.run("list-sessions", server_args=server_args) + if proc.returncode is not None and proc.returncode != 0: + raise subprocess.CalledProcessError( + returncode=proc.returncode, + cmd=[tmux_bin, *server_args, "list-sessions"], + ) # # Command @@ -280,25 +292,24 @@ def cmd( Renamed from ``.tmux`` to ``.cmd``. """ - svr_args: list[str | int] = [cmd] - cmd_args: list[str | int] = [] + server_args: list[str | int] = [] if self.socket_name: - svr_args.insert(0, f"-L{self.socket_name}") + server_args.append(f"-L{self.socket_name}") if self.socket_path: - svr_args.insert(0, f"-S{self.socket_path}") + server_args.append(f"-S{self.socket_path}") if self.config_file: - svr_args.insert(0, f"-f{self.config_file}") + server_args.append(f"-f{self.config_file}") if self.colors: if self.colors == 256: - svr_args.insert(0, "-2") + server_args.append("-2") elif self.colors == 88: - svr_args.insert(0, "-8") + server_args.append("-8") else: raise exc.UnknownColorOption - cmd_args = ["-t", str(target), *args] if target is not None else [*args] + cmd_args = ["-t", str(target), *args] if target is not None else list(args) - return tmux_cmd(*svr_args, *cmd_args) + return self.engine.run(cmd, cmd_args=cmd_args, server_args=server_args) @property def attached_sessions(self) -> list[Session]: diff --git a/tests/test_control_mode_engine.py b/tests/test_control_mode_engine.py new file mode 100644 index 000000000..f21d54857 --- /dev/null +++ b/tests/test_control_mode_engine.py @@ -0,0 +1,108 @@ +"""Tests for ControlModeEngine.""" + +from __future__ import annotations + +import io +import pathlib +import subprocess +import time +import typing as t + +import pytest + +from libtmux import exc +from libtmux._internal.engines.base import ExitStatus +from libtmux._internal.engines.control_mode import ControlModeEngine +from libtmux.server import Server + + +def test_control_mode_engine_basic(tmp_path: pathlib.Path) -> None: + """Test basic functionality of ControlModeEngine.""" + socket_path = tmp_path / "tmux-control-mode-test" + engine = ControlModeEngine() + + # Server should auto-start engine on first cmd + server = Server(socket_path=socket_path, engine=engine) + + # kill server if exists (cleanup from previous runs if any) + if server.is_alive(): + server.kill() + + # new session + session = server.new_session(session_name="test_sess", kill_session=True) + assert session.name == "test_sess" + + # check engine process is running + assert engine.process is not None + assert engine.process.poll() is None + + # list sessions + # ControlModeEngine creates a bootstrap session "libtmux_control_mode", so we + # expect 2 sessions + sessions = server.sessions + assert len(sessions) >= 1 + session_names = [s.name for s in sessions] + assert "test_sess" in session_names + assert "libtmux_control_mode" in session_names + + # run a command that returns output + output_cmd = server.cmd("display-message", "-p", "hello") + assert output_cmd.stdout == ["hello"] + assert getattr(output_cmd, "exit_status", ExitStatus.OK) in ( + ExitStatus.OK, + 0, + ) + + # cleanup + server.kill() + # Engine process should terminate eventually (ControlModeEngine.close is called + # manually or via weakref/del) + # Server.kill() kills the tmux SERVER. The control mode client process should + # exit as a result. + + engine.process.wait(timeout=2) + assert engine.process.poll() is not None + + +def test_control_mode_timeout(monkeypatch: pytest.MonkeyPatch) -> None: + """ControlModeEngine should surface timeouts and clean up the process.""" + + class BlockingStdout: + def __iter__(self) -> "BlockingStdout": + return self + + def __next__(self) -> str: # pragma: no cover - simple block + time.sleep(0.05) + raise StopIteration + + class FakeProcess: + def __init__(self) -> None: + self.stdin = io.StringIO() + self.stdout = BlockingStdout() + self.stderr = None + self._terminated = False + + def terminate(self) -> None: # pragma: no cover - simple stub + self._terminated = True + + def kill(self) -> None: # pragma: no cover - simple stub + self._terminated = True + + def wait(self, timeout: float | None = None) -> None: # pragma: no cover + return None + + engine = ControlModeEngine(command_timeout=0.01) + + fake_process = FakeProcess() + + def fake_start(server_args: t.Sequence[str | int] | None) -> None: + engine.tmux_bin = "tmux" + engine._server_args = tuple(server_args or ()) + engine.process = t.cast(subprocess.Popen[str], fake_process) + + monkeypatch.setattr(engine, "_start_process", fake_start) + + with pytest.raises(exc.ControlModeTimeout): + engine.run("list-sessions") + + assert engine.process is None diff --git a/tests/test_engine_protocol.py b/tests/test_engine_protocol.py new file mode 100644 index 000000000..81d14cec3 --- /dev/null +++ b/tests/test_engine_protocol.py @@ -0,0 +1,61 @@ +"""Unit tests for engine protocol and wrappers.""" + +from __future__ import annotations + +from libtmux._internal.engines.base import ( + CommandResult, + ExitStatus, + NotificationKind, + command_result_to_tmux_cmd, +) +from libtmux._internal.engines.control_protocol import CommandContext, ControlProtocol + + +def test_command_result_wraps_tmux_cmd() -> None: + """CommandResult should adapt cleanly into tmux_cmd wrapper.""" + result = CommandResult( + argv=["tmux", "-V"], + stdout=["tmux 3.4"], + stderr=[], + exit_status=ExitStatus.OK, + cmd_id=7, + ) + + wrapped = command_result_to_tmux_cmd(result) + + assert wrapped.stdout == ["tmux 3.4"] + assert wrapped.returncode == 0 + assert getattr(wrapped, "cmd_id", None) == 7 + + +def test_control_protocol_parses_begin_end() -> None: + """Parser should map %begin/%end into a completed context.""" + proto = ControlProtocol() + ctx = CommandContext(argv=["tmux", "list-sessions"]) + proto.register_command(ctx) + + proto.feed_line("%begin 1700000000 10 0") + proto.feed_line("session-one") + proto.feed_line("%end 1700000001 10 0") + + assert ctx.done.wait(timeout=0.05) + + result = proto.build_result(ctx) + assert result.stdout == ["session-one"] + assert result.exit_status is ExitStatus.OK + assert result.cmd_id == 10 + + +def test_control_protocol_notifications() -> None: + """Notifications should enqueue and track drop counts when bounded.""" + proto = ControlProtocol(notification_queue_size=1) + proto.feed_line("%sessions-changed") + + notif = proto.get_notification(timeout=0.05) + assert notif is not None + assert notif.kind is NotificationKind.SESSIONS_CHANGED + + # queue is bounded; pushing another should increment drop counter when full + proto.feed_line("%sessions-changed") + proto.feed_line("%sessions-changed") + assert proto.get_stats(restarts=0).dropped_notifications >= 1