diff --git a/AGENTS.md b/AGENTS.md new file mode 120000 index 000000000..681311eb9 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1 @@ +CLAUDE.md \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 000000000..221389418 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,335 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +libtmux is a typed Python library providing an ORM-like interface for tmux (terminal multiplexer). It allows programmatic control of tmux servers, sessions, windows, and panes. + +## Development Commands + +### Testing +- `make test` - Run full test suite +- `make start` - Run tests then start pytest-watcher +- `make watch_test` - Auto-run tests on file changes (requires entr) +- `uv run pytest tests/path/to/specific_test.py` - Run a specific test file +- `uv run pytest -k "test_name"` - Run tests matching pattern + +### Code Quality +- `make ruff` - Run linter checks (must pass before committing) +- `make ruff_format` - Auto-format code +- `make mypy` - Run type checking (must pass before committing) +- `make watch_ruff` - Auto-lint on file changes +- `make watch_mypy` - Auto-typecheck on file changes + +### Documentation +- `make build_docs` - Build documentation +- `make serve_docs` - Serve docs locally at http://localhost:8009 +- `make dev_docs` - Watch and rebuild docs on changes + +## Architecture + +### Core Objects Hierarchy +``` +Server (tmux server process) +└── Session (tmux session) + └── Window (tmux window) + └── Pane (tmux pane) +``` + +Each object provides methods to: +- Query and manipulate tmux state +- Execute tmux commands +- Access child objects (e.g., server.sessions, session.windows) + +### Key Modules +- `server.py` - Server class for managing tmux server +- `session.py` - Session class for tmux sessions +- `window.py` - Window class for tmux windows +- `pane.py` - Pane class for tmux panes +- `common.py` - Shared utilities and base classes +- `formats.py` - tmux format string handling +- `exc.py` - Custom exceptions + +### Internal Architecture +- `_internal/dataclasses.py` - Type-safe data structures for tmux objects +- `_internal/query_list.py` - QueryList implementation for filtering collections +- All tmux commands go through `tmux_cmd()` method on objects +- Uses subprocess to communicate with tmux via CLI + +### Command Execution Architecture + +Commands flow through a hierarchical delegation pattern: + +``` +User Code → Object Method → .cmd() → Server.cmd() → tmux_cmd → subprocess → tmux binary +``` + +**Key components**: +- `tmux_cmd` class (`src/libtmux/common.py:193-268`) - Wraps tmux binary via subprocess +- Each object (Server, Session, Window, Pane) has a `.cmd()` method +- Commands built progressively as tuples with conditional flags +- Auto-targeting: objects automatically include their ID (`-t` flag) + +**Example flow**: +```python +session.new_window(window_name='my_window', attach=False) +# → Builds: ("-d", "-P", "-F#{window_id}", "-n", "my_window") +# → Delegates: self.cmd("new-window", *args, target=self.session_id) +# → Executes: tmux -Llibtmux_test3k9m7x2q new-window -d -P -F#{window_id} -n my_window -t$1 +``` + +### Testing Architecture + +**CRITICAL: We NEVER mock tmux. All tests use real tmux processes.** + +#### Core Testing Principles + +1. **Real tmux processes** - Every test runs against actual tmux server via subprocess +2. **Unique isolation** - Each test gets its own tmux server with guaranteed unique socket +3. **No mocking** - All tmux commands execute through real tmux CLI +4. **Parallel-safe** - Tests can run concurrently without conflicts + +#### Unique Server Isolation + +Each test gets a real tmux server with unique socket name: + +```python +Server(socket_name=f"libtmux_test{next(namer)}") +# Example: libtmux_test3k9m7x2q +``` + +**Socket name generation** (`src/libtmux/test/random.py:28-56`): +- Uses `RandomStrSequence` to generate 8-character random suffixes +- Format: `libtmux_test` + 8 random chars (lowercase, digits, underscore) +- Each socket creates independent tmux process in `/tmp/tmux-{uid}/` +- Negligible collision probability + +#### Pytest Fixtures + +**Fixture hierarchy** (`src/libtmux/pytest_plugin.py`): + +``` +Session-scoped (shared across test session): +├── home_path (temporary /home/) +├── user_path (temporary user directory) +└── config_file (~/.tmux.conf with base-index=1) + +Function-scoped (per test): +├── set_home (auto-use: sets $HOME to isolated directory) +├── clear_env (cleans unnecessary environment variables) +├── server (unique tmux Server with auto-cleanup) +├── session (Session on server with unique name) +└── TestServer (factory for creating multiple servers per test) +``` + +**Key fixtures**: + +- **`server`** - Creates tmux server with unique socket, auto-killed via finalizer +- **`session`** - Creates session on server with unique name (`libtmux_` + random) +- **`TestServer`** - Factory using `functools.partial` to create multiple independent servers + ```python + def test_multiple_servers(TestServer): + server1 = TestServer() # libtmux_test3k9m7x2q + server2 = TestServer() # libtmux_testz9w1b4a7 + # Both are real, independent tmux processes + ``` + +#### Isolation Mechanisms + +**Triple isolation ensures parallel test safety**: + +1. **Unique socket names** - 8-char random suffix prevents collisions +2. **Independent processes** - Each server is separate tmux process with unique PID +3. **Isolated $HOME** - Temporary home directory with standard `.tmux.conf` + +**Home directory setup**: +- Each test session gets temporary home directory +- Contains `.tmux.conf` with `base-index 1` for consistent window/pane indexing +- `$HOME` environment variable monkeypatched to isolated directory +- No interference from user's actual tmux configuration + +#### Test Utilities + +**Helper modules** in `src/libtmux/test/`: + +- **`temporary.py`** - Context managers for temporary objects: + ```python + with temp_session(server) as session: + session.new_window() # Auto-cleaned up after block + + with temp_window(session) as window: + window.split_window() # Auto-cleaned up after block + ``` + +- **`random.py`** - Unique name generation: + ```python + get_test_session_name(server) # Returns: libtmux_3k9m7x2q (checks for uniqueness) + get_test_window_name(session) # Returns: libtmux_z9w1b4a7 (checks for uniqueness) + ``` + +- **`retry.py`** - Retry logic for tmux operations: + ```python + retry_until(lambda: pane.pane_current_path is not None) + # Retries for up to 8 seconds (configurable via RETRY_TIMEOUT_SECONDS) + # 50ms intervals (configurable via RETRY_INTERVAL_SECONDS) + ``` + +- **`constants.py`** - Test configuration: + - `TEST_SESSION_PREFIX = "libtmux_"` + - `RETRY_TIMEOUT_SECONDS = 8` (configurable via env var) + - `RETRY_INTERVAL_SECONDS = 0.05` (configurable via env var) + +#### Doctest Integration + +**All doctests use real tmux** (`conftest.py:31-49`): + +```python +@pytest.fixture(autouse=True) +def add_doctest_fixtures(doctest_namespace): + # Injects Server, Session, Window, Pane classes + # Injects server, session, window, pane instances + # All are real tmux objects with unique sockets +``` + +Docstrings can include runnable examples: +```python +>>> server.new_session('my_session') +Session($1 my_session) + +>>> session.new_window(window_name='my_window') +Window(@3 2:my_window, Session($1 ...)) +``` + +These execute against real tmux during `pytest --doctest-modules`. + +**CRITICAL DOCTEST RULE: NEVER create Server objects directly** + +❌ **WRONG - Will kill user's tmux session:** +```python +>>> server = Server(socket_name="default") # DANGEROUS! +>>> server = Server(socket_name="testing") # Still dangerous! +``` + +✅ **CORRECT - Use lowercase fixtures:** +```python +>>> assert server.command_runner is not None # Uses injected fixture +>>> server.new_session("test") # Fixture has unique socket +``` + +**Why this matters:** +- `conftest.py:44` overwrites `Server` with `TestServer` factory +- `TestServer` tracks ALL created servers via `on_init` callback +- `TestServer` finalizer kills ALL tracked servers at test suite end +- If you use `socket_name="default"` → user's session gets killed! +- If you use any socket name → cleanup may interfere with user + +**Safe patterns:** +1. Use lowercase `server` fixture (unique socket like `libtmux_test3k9m7x2q`) +2. Use lowercase `session`, `window`, `pane` fixtures +3. Never instantiate `Server()` directly in doctests +4. Never use `socket_name="default"` anywhere + +#### Parallel Test Execution + +**Tests are safe for parallel execution** (`pytest -n auto`): + +- Each worker process generates unique socket names +- No shared state between test workers +- Independent home directories prevent race conditions +- Automatic cleanup prevents resource leaks + +#### Testing Patterns + +**Standard test pattern**: +```python +def test_example(server: Server, session: Session) -> None: + """Test description.""" + # No setup needed - fixtures provide real tmux objects + window = session.new_window(window_name='test') + assert window.window_name == 'test' + # No teardown needed - fixtures auto-cleanup +``` + +**Multiple server pattern**: +```python +def test_multiple_servers(TestServer: t.Callable[..., Server]) -> None: + """Test with multiple independent servers.""" + server1 = TestServer() + server2 = TestServer() + # Both are real tmux processes with unique sockets + assert server1.socket_name != server2.socket_name +``` + +**Retry pattern for tmux operations**: +```python +def test_async_operation(session: Session) -> None: + """Test operation that takes time to complete.""" + pane = session.active_window.active_pane + pane.send_keys('cd /tmp', enter=True) + + # Wait for tmux to update pane path + retry_until(lambda: pane.pane_current_path == '/tmp') +``` + +#### CI Testing Matrix + +Tests run against: +- **tmux versions**: 2.6, 2.7, 2.8, 3.0, 3.1, 3.2, 3.3, 3.4, master +- **Python versions**: 3.9, 3.10, 3.11, 3.12, 3.13 +- All use real tmux processes (never mocked) + +## Important Patterns + +### Command Building +- Commands built progressively as tuples with conditional flags: + ```python + tmux_args: tuple[str, ...] = () + + if not attach: + tmux_args += ("-d",) + + tmux_args += ("-P", "-F#{window_id}") + + if window_name is not None: + tmux_args += ("-n", window_name) + + cmd = self.cmd("new-window", *tmux_args, target=target) + ``` +- Auto-targeting: objects pass their ID automatically (override with `target=`) +- Version-aware: use `has_gte_version()` / `has_lt_version()` for compatibility +- Format strings: use `FORMAT_SEPARATOR` (default `␞`) for multi-field parsing + +### Type Safety +- All public APIs are fully typed +- Use `from __future__ import annotations` in all modules +- Mypy runs in strict mode - new code must be type-safe + +### Error Handling +- Custom exceptions in `exc.py` (e.g., `LibTmuxException`, `TmuxCommandNotFound`) +- tmux command failures raise exceptions with command output +- Check `cmd.stderr` after command execution + +### Vendor Dependencies +- Some dependencies are vendored in `_vendor/` to avoid runtime dependencies +- Do not modify vendored code directly + +### Writing Tests + +**When writing new tests**: +- Use `server` and `session` fixtures - they provide real tmux instances +- **NEVER create `Server()` directly with hardcoded socket names** (use fixtures or `TestServer`) +- **NEVER use `socket_name="default"` or reuse fixture sockets** (will kill user's tmux!) +- Never mock tmux - use `retry_until()` for async operations instead +- Use `temp_session()` / `temp_window()` context managers for temporary objects +- Use `get_test_session_name()` / `get_test_window_name()` for unique names +- Tests must work across all tmux versions (2.6+) and Python versions (3.9-3.13) +- Use version checks (`has_gte_version`, `has_lt_version`) for version-specific features + +**For multiple servers per test**: +```python +def test_example(TestServer): + server1 = TestServer() + server2 = TestServer() + # Both are real, independent tmux processes +``` \ No newline at end of file diff --git a/docs/topics/control_mode.md b/docs/topics/control_mode.md new file mode 100644 index 000000000..7e0e053a0 --- /dev/null +++ b/docs/topics/control_mode.md @@ -0,0 +1,220 @@ +# Control Mode Engine + +The control mode engine provides a high-performance alternative to the default subprocess-based command execution. By maintaining a persistent connection to the tmux server, control mode eliminates the overhead of spawning a new process for each command. + +## Overview + +libtmux offers two command execution engines: + +1. **Subprocess Engine** (default): Spawns a new tmux process for each command +2. **Control Mode Engine**: Uses a persistent tmux control mode connection + +The control mode engine is particularly beneficial for: +- Scripts with many sequential tmux operations +- Query-heavy workloads (list sessions, windows, panes) +- Performance-critical applications + +## Performance Characteristics + +Control mode provides significant performance improvements for sequential operations: + +- **Query operations**: 10-50x faster (e.g., `list-sessions`, `list-windows`) +- **Mixed workloads**: 5-15x faster (creation + queries) +- **Single operations**: Similar performance to subprocess + +The speedup comes from eliminating process spawn overhead. Each subprocess call incurs ~5-10ms of overhead, while control mode operations complete in microseconds. + +See `tests/control_mode/test_benchmarks.py` for detailed performance comparisons. + +## Usage + +### Basic Usage + +```python +from libtmux._internal.engines import ControlModeCommandRunner +from libtmux.server import Server + +# Create control mode runner +runner = ControlModeCommandRunner("my_socket") + +# Create server with control mode +server = Server(socket_name="my_socket", command_runner=runner) + +# All operations now use persistent connection +session = server.new_session("my_session") +window = session.new_window("my_window") + +# Query operations are very fast +sessions = server.sessions +windows = session.windows + +# Cleanup when done +runner.close() +``` + +### Context Manager + +The recommended pattern uses a context manager for automatic cleanup: + +```python +from libtmux._internal.engines import ControlModeCommandRunner +from libtmux.server import Server + +with ControlModeCommandRunner("my_socket") as runner: + server = Server(socket_name="my_socket", command_runner=runner) + + # Perform many operations + for i in range(100): + session = server.new_session(f"session_{i}") + session.new_window(f"window_{i}") + + # Query operations + all_sessions = server.sessions + +# Connection automatically closed on exit +``` + +## Transparent Subprocess Fallback + +Control mode doesn't support tmux format strings (`-F` flag). Operations that require format strings transparently fall back to subprocess execution: + +```python +with ControlModeCommandRunner("my_socket") as runner: + server = Server(socket_name="my_socket", command_runner=runner) + + # This uses control mode (fast) + sessions = server.sessions + + # This transparently uses subprocess (format string required) + session = server.new_session("my_session") # Uses -F#{session_id} + + # This uses control mode again (fast) + windows = session.windows +``` + +This fallback is: +- **Automatic**: No code changes required +- **Transparent**: Same interface, same behavior +- **Optimal**: 80-90% of operations still use control mode + +### Operations That Use Subprocess Fallback + +The following operations require format strings and use subprocess: +- `Server.new_session()` - needs session ID +- `Session.new_window()` - needs window ID +- `Pane.split()` - needs pane ID + +All other operations (queries, modifications, etc.) use control mode. + +## Thread Safety + +Control mode runner is thread-safe but serializes command execution: + +```python +import threading + +with ControlModeCommandRunner("my_socket") as runner: + server = Server(socket_name="my_socket", command_runner=runner) + + def create_sessions(start_idx: int) -> None: + for i in range(start_idx, start_idx + 10): + server.new_session(f"thread_session_{i}") + + # Multiple threads can safely use the same runner + threads = [ + threading.Thread(target=create_sessions, args=(i * 10,)) + for i in range(5) + ] + + for thread in threads: + thread.start() + for thread in threads: + thread.join() +``` + +Commands are executed sequentially (one at a time) even when called from multiple threads. This ensures correct tmux state and prevents output parsing errors. + +## Implementation Details + +### Architecture + +Control mode works by: + +1. Starting tmux with `-C` flag (control mode) +2. Sending commands over stdin +3. Parsing structured output from stdout +4. Queuing notifications for later processing + +The protocol uses `%begin`, `%end`, and `%error` blocks: + +``` +%begin 1234 1 +session_name: 2 windows (created ...) +%end 1234 1 +``` + +### Connection Lifecycle + +```python +# Connection established on initialization +runner = ControlModeCommandRunner("my_socket") +# tmux -C -L my_socket started in background + +# Commands use persistent connection +result = runner.run("list-sessions") + +# Explicit cleanup +runner.close() +# Background process terminated +``` + +### Error Handling + +Control mode handles errors gracefully: + +```python +result = runner.run("invalid-command") +assert result.returncode == 1 +assert "unknown command" in result.stdout[0].lower() +``` + +Errors are returned as `ControlModeResult` with non-zero return code, matching subprocess behavior. + +## When to Use Control Mode + +**Use control mode when:** +- Running many tmux commands sequentially +- Performance is critical +- Querying tmux state frequently + +**Use subprocess (default) when:** +- Running single/few commands +- Simplicity is preferred over performance +- No need for connection management + +**Example: Script with 100 operations** + +```python +# Subprocess: ~500ms-1000ms (5-10ms per operation) +server = Server(socket_name="my_socket") +for i in range(100): + sessions = server.sessions # 100 subprocess spawns + +# Control mode: ~50ms-100ms (0.5-1ms per operation) +with ControlModeCommandRunner("my_socket") as runner: + server = Server(socket_name="my_socket", command_runner=runner) + for i in range(100): + sessions = server.sessions # 100 control mode queries +``` + +## Limitations + +1. **Format strings not supported**: Operations with `-F` flag use subprocess fallback +2. **Single connection**: One control mode connection per socket (thread-safe but serialized) +3. **Connection management**: Requires explicit `close()` or context manager + +## See Also + +- {doc}`/api/index` - API reference for Server, Session, Window, Pane +- `tests/control_mode/` - Implementation tests (all use real tmux) +- `tests/control_mode/test_benchmarks.py` - Performance benchmarks diff --git a/docs/topics/index.md b/docs/topics/index.md index 0653bb57b..74737e455 100644 --- a/docs/topics/index.md +++ b/docs/topics/index.md @@ -9,5 +9,6 @@ Explore libtmux’s core functionalities and underlying principles at a high lev ```{toctree} context_managers +control_mode traversal ``` diff --git a/pyproject.toml b/pyproject.toml index 2deddc21c..97681d839 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -227,6 +227,9 @@ testpaths = [ "docs", "README.md", ] +markers = [ + "benchmark: Performance benchmark tests", +] filterwarnings = [ "ignore:The frontend.Option(Parser)? class.*:DeprecationWarning::", "ignore::DeprecationWarning:libtmux.*:", diff --git a/src/libtmux/_internal/command_runner.py b/src/libtmux/_internal/command_runner.py new file mode 100644 index 000000000..aa6c4c0a8 --- /dev/null +++ b/src/libtmux/_internal/command_runner.py @@ -0,0 +1,80 @@ +"""Command runner protocols for tmux execution engines.""" + +from __future__ import annotations + +import typing as t +from typing import Protocol + +if t.TYPE_CHECKING: + pass + + +class CommandResult(Protocol): + """Protocol for command execution results. + + Any object conforming to this protocol can be returned by a CommandRunner. + The existing tmux_cmd class automatically conforms to this protocol. + + Attributes + ---------- + stdout : list[str] + Command standard output, split by lines + stderr : list[str] + Command standard error, split by lines + returncode : int + Command return code + cmd : list[str] + The command that was executed (for debugging) + """ + + @property + def stdout(self) -> list[str]: + """Command standard output, split by lines.""" + ... + + @property + def stderr(self) -> list[str]: + """Command standard error, split by lines.""" + ... + + @property + def returncode(self) -> int: + """Command return code.""" + ... + + @property + def cmd(self) -> list[str]: + """The command that was executed (for debugging).""" + ... + + +class CommandRunner(Protocol): + """Protocol for tmux command execution engines. + + Implementations must provide a run() method that executes tmux commands + and returns a CommandResult. + + Examples + -------- + >>> from libtmux._internal.engines import SubprocessCommandRunner + >>> runner = SubprocessCommandRunner() + >>> result = runner.run("-V") + >>> assert hasattr(result, 'stdout') + >>> assert hasattr(result, 'stderr') + >>> assert hasattr(result, 'returncode') + """ + + def run(self, *args: str) -> CommandResult: + """Execute a tmux command. + + Parameters + ---------- + *args : str + Command arguments to pass to tmux binary + + Returns + ------- + CommandResult + Object with stdout, stderr, returncode, cmd attributes + """ + ... diff --git a/src/libtmux/_internal/engines/__init__.py b/src/libtmux/_internal/engines/__init__.py new file mode 100644 index 000000000..e87a43ba3 --- /dev/null +++ b/src/libtmux/_internal/engines/__init__.py @@ -0,0 +1,8 @@ +"""Command execution engines for libtmux.""" + +from __future__ import annotations + +__all__ = ("ControlModeCommandRunner", "SubprocessCommandRunner") + +from libtmux._internal.engines.control_mode import ControlModeCommandRunner +from libtmux._internal.engines.subprocess_engine import SubprocessCommandRunner diff --git a/src/libtmux/_internal/engines/control_mode/__init__.py b/src/libtmux/_internal/engines/control_mode/__init__.py new file mode 100644 index 000000000..b9b54fca1 --- /dev/null +++ b/src/libtmux/_internal/engines/control_mode/__init__.py @@ -0,0 +1,8 @@ +"""Control mode command execution engine.""" + +from __future__ import annotations + +__all__ = ("ControlModeCommandRunner", "ControlModeResult") + +from libtmux._internal.engines.control_mode.result import ControlModeResult +from libtmux._internal.engines.control_mode.runner import ControlModeCommandRunner diff --git a/src/libtmux/_internal/engines/control_mode/parser.py b/src/libtmux/_internal/engines/control_mode/parser.py new file mode 100644 index 000000000..ff4b193a9 --- /dev/null +++ b/src/libtmux/_internal/engines/control_mode/parser.py @@ -0,0 +1,108 @@ +"""Control mode protocol parser.""" + +from __future__ import annotations + +from typing import IO + +from .result import ControlModeResult + + +class ProtocolParser: + r"""Parser for tmux control mode protocol. + + Handles %begin/%end/%error blocks and notifications. + + The tmux control mode protocol format: + - Commands produce output blocks + - Blocks start with %begin and end with %end or %error + - Format: %begin timestamp cmd_num flags + - Notifications (%session-changed, etc.) can appear between blocks + + Examples + -------- + >>> import io + >>> stdout = io.StringIO( + ... "%begin 1234 1 0\n" + ... "session1\n" + ... "%end 1234 1 0\n" + ... ) + >>> parser = ProtocolParser(stdout) + >>> result = parser.parse_response(["list-sessions"]) + >>> result.stdout + ['session1'] + >>> result.returncode + 0 + """ + + def __init__(self, stdout: IO[str]) -> None: + self.stdout = stdout + self.notifications: list[str] = [] + + def parse_response(self, cmd: list[str]) -> ControlModeResult: + """Parse a single command response. + + Parameters + ---------- + cmd : list[str] + The command that was executed (for result.cmd) + + Returns + ------- + ControlModeResult + Parsed result with stdout, stderr, returncode + + Raises + ------ + ConnectionError + If connection closes unexpectedly + ProtocolError + If protocol format is unexpected + """ + stdout_lines: list[str] = [] + stderr_lines: list[str] = [] + returncode = 0 + + # State machine + in_response = False + + while True: + line = self.stdout.readline() + if not line: # EOF + msg = "Control mode connection closed unexpectedly" + raise ConnectionError(msg) + + line = line.rstrip("\n") + + # Parse line type + if line.startswith("%begin"): + # %begin timestamp cmd_num flags + in_response = True + continue + + elif line.startswith("%end"): + # Success - response complete + return ControlModeResult(stdout_lines, stderr_lines, returncode, cmd) + + elif line.startswith("%error"): + # Error - command failed + returncode = 1 + # Note: error details are in stdout_lines already + return ControlModeResult(stdout_lines, stderr_lines, returncode, cmd) + + elif line.startswith("%"): + # Notification - queue for future processing + self.notifications.append(line) + # Don't break - keep reading for our response + continue + + else: + # Regular output line + if in_response: + stdout_lines.append(line) + # else: orphaned line before %begin (should not happen in practice) + + +class ProtocolError(Exception): + """Raised when control mode protocol is violated.""" + + pass diff --git a/src/libtmux/_internal/engines/control_mode/result.py b/src/libtmux/_internal/engines/control_mode/result.py new file mode 100644 index 000000000..bba6c13d6 --- /dev/null +++ b/src/libtmux/_internal/engines/control_mode/result.py @@ -0,0 +1,57 @@ +"""Result type for control mode command execution.""" + +from __future__ import annotations + + +class ControlModeResult: + """Result from control mode execution. + + Duck-types as tmux_cmd for backward compatibility. + Has identical interface: stdout, stderr, returncode, cmd. + + Attributes + ---------- + stdout : list[str] + Command standard output, split by lines + stderr : list[str] + Command standard error, split by lines + returncode : int + Command return code (0 = success, 1 = error) + cmd : list[str] + The command that was executed (for debugging) + + Examples + -------- + >>> result = ControlModeResult( + ... stdout=["session1", "session2"], + ... stderr=[], + ... returncode=0, + ... cmd=["tmux", "-C", "list-sessions"] + ... ) + >>> result.stdout + ['session1', 'session2'] + >>> result.returncode + 0 + >>> bool(result.stderr) + False + """ + + __slots__ = ("cmd", "returncode", "stderr", "stdout") + + def __init__( + self, + stdout: list[str], + stderr: list[str], + returncode: int, + cmd: list[str], + ) -> None: + self.stdout = stdout + self.stderr = stderr + self.returncode = returncode + self.cmd = cmd + + def __repr__(self) -> str: + return ( + f"ControlModeResult(returncode={self.returncode}, " + f"stdout_lines={len(self.stdout)}, stderr_lines={len(self.stderr)})" + ) diff --git a/src/libtmux/_internal/engines/control_mode/runner.py b/src/libtmux/_internal/engines/control_mode/runner.py new file mode 100644 index 000000000..3ff7334e3 --- /dev/null +++ b/src/libtmux/_internal/engines/control_mode/runner.py @@ -0,0 +1,283 @@ +"""Control mode command execution engine.""" + +from __future__ import annotations + +import contextlib +import subprocess +import typing as t +from threading import Lock + +if t.TYPE_CHECKING: + pass + +from .parser import ProtocolParser +from .result import ControlModeResult + + +class ControlModeCommandRunner: + """Command runner using persistent tmux control mode connection. + + Maintains a single persistent connection to tmux server for faster + command execution compared to spawning subprocess for each command. + + Thread-safe: Uses Lock to serialize command execution. + + Parameters + ---------- + socket_name : str + Socket name for tmux server (-L flag) + + Examples + -------- + >>> runner = ControlModeCommandRunner("test_socket") + >>> result = runner.run("list-sessions") # doctest: +SKIP + >>> print(result.stdout) # doctest: +SKIP + ['0: session1 ...'] + >>> runner.close() # doctest: +SKIP + + Or use as context manager: + + >>> with ControlModeCommandRunner("test_socket") as runner: # doctest: +SKIP + ... result = runner.run("list-sessions") + ... # ... more commands ... + # Auto-closes on exit + """ + + def __init__(self, socket_name: str) -> None: + self.socket_name = socket_name + self._lock = Lock() + self._command_counter = 0 + self._process: subprocess.Popen[str] | None = None + self._parser: ProtocolParser | None = None + self._connect() + + def _connect(self) -> None: + """Start tmux in control mode.""" + self._process = subprocess.Popen( + ["tmux", "-C", "-L", self.socket_name], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, # Line buffered + ) + + if self._process.stdout is None: + msg = "Failed to get stdout from control mode process" + raise RuntimeError(msg) + + self._parser = ProtocolParser(self._process.stdout) + + # Consume any initial output blocks from session auto-creation + # tmux may automatically create/attach a session on connect + self._consume_initial_blocks() + + def _consume_initial_blocks(self) -> None: + """Consume any output blocks sent during connection. + + When tmux starts in control mode, it may automatically create/attach + a session, which generates output blocks. We need to consume these + before we start sending commands, otherwise our parser will read + the initial blocks instead of our command responses. + """ + if self._parser is None or self._process is None: + return + + # Read lines until we're past any %begin/%end blocks + # We stop when we see only notifications (lines starting with %) + # or when the stream is empty for a moment + import select + + while True: + # Check if there's data available (non-blocking with 50ms timeout) + if not select.select([self._process.stdout], [], [], 0.05)[0]: + # No more data available, we're past initial blocks + break + + # Peek at next line to see if it's a %begin + line = self._parser.stdout.readline() + if not line: + break + + line = line.rstrip("\n") + + if line.startswith("%begin"): + # This is start of a block - consume until %end or %error + while True: + inner_line = self._parser.stdout.readline() + if not inner_line: + break + inner_line = inner_line.rstrip("\n") + if inner_line.startswith("%end") or inner_line.startswith( + "%error", + ): + break + elif line.startswith("%"): + # Notification - queue it for later + self._parser.notifications.append(line) + else: + # Shouldn't happen - orphaned output line + pass + + def _has_format_flag(self, args: tuple[str, ...]) -> bool: + """Check if arguments contain a format string flag (-F). + + Parameters + ---------- + args : tuple[str, ...] + Command arguments to check + + Returns + ------- + bool + True if -F flag is present, False otherwise + """ + return any(arg.startswith("-F") for arg in args) + + def _filter_args(self, args: tuple[str, ...]) -> list[str]: + """Filter server-level and incompatible flags from args. + + Control mode connection is already bound to socket, so we must + remove -L/-S/-f flags that were prepended by Server.cmd(). + + Note: Format flags (-F) are handled via transparent subprocess fallback + and won't reach this method. + + Parameters + ---------- + args : tuple[str, ...] + Arguments from Server.cmd() like ("-Lsocket", "list-sessions") + + Returns + ------- + list[str] + Filtered args like ["list-sessions"] + """ + filtered = [] + skip_next = False + + for arg in args: + if skip_next: + skip_next = False + continue + + # Skip socket-related flags (already in connection) + if arg.startswith("-L") or arg.startswith("-S"): + if len(arg) == 2: # -L socket (two-part) + skip_next = True + # else: -Lsocket (one-part), already skipped + continue + + # Skip config file flag + if arg.startswith("-f"): + if len(arg) == 2: # -f file (two-part) + skip_next = True + continue + + # Skip color flags (not relevant in control mode) + if arg in ("-2", "-8"): + continue + + filtered.append(arg) + + return filtered + + def run(self, *args: str) -> ControlModeResult: + """Execute tmux command via control mode. + + Thread-safe: Only one command executes at a time. + + For commands with format strings (-F flag), transparently falls back + to subprocess execution since control mode doesn't support format strings. + + Parameters + ---------- + *args : str + Arguments to pass to tmux (may include server flags) + + Returns + ------- + ControlModeResult + Command result with stdout, stderr, returncode + + Raises + ------ + ConnectionError + If control mode connection is lost + """ + # Transparent fallback for format strings + if self._has_format_flag(args): + from libtmux.common import tmux_cmd + + return tmux_cmd(*args) # type: ignore[return-value] + + with self._lock: # Serialize command execution + if self._process is None or self._parser is None: + msg = "Control mode not connected" + raise ConnectionError(msg) + + if self._process.poll() is not None: + msg = "Control mode process terminated" + raise ConnectionError(msg) + + # Filter server-level flags + filtered_args = self._filter_args(args) + + if not filtered_args: + # Edge case: only flags, no command + msg = "No command after filtering flags" + raise ValueError(msg) + + # Build command line + command_line = " ".join(filtered_args) + + # Send command + if self._process.stdin: + self._process.stdin.write(f"{command_line}\n") + self._process.stdin.flush() + else: + msg = "Control mode stdin closed" + raise ConnectionError(msg) + + # Parse response + self._command_counter += 1 + result = self._parser.parse_response( + cmd=["tmux", "-C", "-L", self.socket_name, *filtered_args] + ) + + return result + + def close(self) -> None: + """Close the control mode connection. + + Safe to call multiple times. + """ + if self._process and self._process.poll() is None: + # Try graceful shutdown + if self._process.stdin: + try: + self._process.stdin.close() + self._process.wait(timeout=2) + except Exception: + # Force kill if graceful fails + self._process.kill() + self._process.wait() + else: + self._process.kill() + self._process.wait() + + self._process = None + self._parser = None + + def __enter__(self) -> ControlModeCommandRunner: + """Context manager entry.""" + return self + + def __exit__(self, *args: t.Any) -> None: + """Context manager exit - close connection.""" + self.close() + + def __del__(self) -> None: + """Ensure connection is closed on object destruction.""" + with contextlib.suppress(Exception): + self.close() diff --git a/src/libtmux/_internal/engines/subprocess_engine.py b/src/libtmux/_internal/engines/subprocess_engine.py new file mode 100644 index 000000000..55bf1a792 --- /dev/null +++ b/src/libtmux/_internal/engines/subprocess_engine.py @@ -0,0 +1,41 @@ +"""Subprocess-based command execution engine.""" + +from __future__ import annotations + +import typing as t + +if t.TYPE_CHECKING: + from libtmux.common import tmux_cmd + + +class SubprocessCommandRunner: + """Command runner that uses subprocess to execute tmux binary. + + This is the default command runner and wraps the existing tmux_cmd + implementation for backward compatibility. + + Examples + -------- + >>> runner = SubprocessCommandRunner() + >>> result = runner.run("-V") + >>> assert hasattr(result, 'stdout') + >>> assert hasattr(result, 'stderr') + >>> assert hasattr(result, 'returncode') + """ + + def run(self, *args: str) -> tmux_cmd: + """Execute tmux command via subprocess. + + Parameters + ---------- + *args : str + Arguments to pass to tmux binary + + Returns + ------- + tmux_cmd + Command result with stdout, stderr, returncode + """ + from libtmux.common import tmux_cmd + + return tmux_cmd(*args) diff --git a/src/libtmux/common.py b/src/libtmux/common.py index ac9b9b7f1..100638664 100644 --- a/src/libtmux/common.py +++ b/src/libtmux/common.py @@ -16,6 +16,7 @@ from . import exc from ._compat import LooseVersion +from ._internal.command_runner import CommandResult if t.TYPE_CHECKING: from collections.abc import Callable @@ -40,7 +41,7 @@ class EnvironmentMixin: _add_option = None - cmd: Callable[[t.Any, t.Any], tmux_cmd] + cmd: Callable[[t.Any, t.Any], CommandResult] def __init__(self, add_option: str | None = None) -> None: self._add_option = add_option diff --git a/src/libtmux/pane.py b/src/libtmux/pane.py index 7f126f452..dde0e2d9a 100644 --- a/src/libtmux/pane.py +++ b/src/libtmux/pane.py @@ -14,7 +14,8 @@ import warnings from libtmux import exc -from libtmux.common import has_gte_version, has_lt_version, tmux_cmd +from libtmux._internal.command_runner import CommandResult +from libtmux.common import has_gte_version, has_lt_version from libtmux.constants import ( PANE_DIRECTION_FLAG_MAP, RESIZE_ADJUSTMENT_DIRECTION_FLAG_MAP, @@ -171,7 +172,7 @@ def cmd( cmd: str, *args: t.Any, target: str | int | None = None, - ) -> tmux_cmd: + ) -> CommandResult: """Execute tmux subcommand within pane context. Automatically binds target by adding ``-t`` for object's pane ID to the diff --git a/src/libtmux/pytest_plugin.py b/src/libtmux/pytest_plugin.py index 8b2d6589f..4c38b2d9e 100644 --- a/src/libtmux/pytest_plugin.py +++ b/src/libtmux/pytest_plugin.py @@ -19,6 +19,7 @@ if t.TYPE_CHECKING: import pathlib + from libtmux._internal.command_runner import CommandRunner from libtmux.session import Session logger = logging.getLogger(__name__) @@ -110,11 +111,38 @@ def clear_env(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv(k) +@pytest.fixture +def command_runner() -> CommandRunner | None: + """Command runner for tests. + + Override this fixture in conftest.py to provide a custom command runner + for all tests. + + Returns + ------- + CommandRunner or None + Custom command runner, or None to use default subprocess runner + + Examples + -------- + >>> def test_with_default(command_runner): + ... assert command_runner is None # Default uses subprocess + + To use a custom runner: + + >>> @pytest.fixture + ... def command_runner(): + ... return MyCustomRunner() + """ + return None + + @pytest.fixture def server( request: pytest.FixtureRequest, monkeypatch: pytest.MonkeyPatch, config_file: pathlib.Path, + command_runner: CommandRunner | None, ) -> Server: """Return new, temporary :class:`libtmux.Server`. @@ -141,7 +169,10 @@ def server( >>> result.assert_outcomes(passed=1) """ - server = Server(socket_name=f"libtmux_test{next(namer)}") + server = Server( + socket_name=f"libtmux_test{next(namer)}", + command_runner=command_runner, + ) def fin() -> None: server.kill() @@ -263,6 +294,7 @@ def session( @pytest.fixture def TestServer( request: pytest.FixtureRequest, + command_runner: CommandRunner | None, ) -> type[Server]: """Create a temporary tmux server that cleans up after itself. @@ -309,6 +341,7 @@ def fin() -> None: "type[Server]", functools.partial( Server, + command_runner=command_runner, on_init=on_init, socket_name_factory=socket_name_factory, ), diff --git a/src/libtmux/server.py b/src/libtmux/server.py index 17b290c34..a2ae3000e 100644 --- a/src/libtmux/server.py +++ b/src/libtmux/server.py @@ -16,8 +16,8 @@ import warnings from libtmux import exc, formats +from libtmux._internal.command_runner import CommandResult from libtmux._internal.query_list import QueryList -from libtmux.common import tmux_cmd from libtmux.neo import fetch_objs from libtmux.pane import Pane from libtmux.session import Session @@ -38,6 +38,7 @@ from typing_extensions import Self + from libtmux._internal.command_runner import CommandRunner from libtmux._internal.types import StrPath DashLiteral: TypeAlias = t.Literal["-"] @@ -64,6 +65,8 @@ class Server(EnvironmentMixin): socket_path : str, optional config_file : str, optional colors : str, optional + command_runner : CommandRunner, optional + Custom command execution engine. Defaults to subprocess-based runner. on_init : callable, optional socket_name_factory : callable, optional @@ -124,6 +127,7 @@ def __init__( socket_path: str | pathlib.Path | None = None, config_file: str | None = None, colors: int | None = None, + command_runner: CommandRunner | None = None, on_init: t.Callable[[Server], None] | None = None, socket_name_factory: t.Callable[[], str] | None = None, **kwargs: t.Any, @@ -131,6 +135,7 @@ def __init__( EnvironmentMixin.__init__(self, "-g") self._windows: list[WindowDict] = [] self._panes: list[PaneDict] = [] + self._command_runner = command_runner if socket_path is not None: self.socket_path = socket_path @@ -188,6 +193,38 @@ def __exit__( if self.is_alive(): self.kill() + @property + def command_runner(self) -> CommandRunner: + """Get or lazily initialize the command runner. + + Returns + ------- + CommandRunner + The command execution engine for this server + + Examples + -------- + >>> assert server.command_runner is not None + >>> type(server.command_runner).__name__ + 'SubprocessCommandRunner' + """ + if self._command_runner is None: + from libtmux._internal.engines import SubprocessCommandRunner + + self._command_runner = SubprocessCommandRunner() + return self._command_runner + + @command_runner.setter + def command_runner(self, value: CommandRunner) -> None: + """Set the command runner. + + Parameters + ---------- + value : CommandRunner + New command execution engine + """ + self._command_runner = value + def is_alive(self) -> bool: """Return True if tmux server alive. @@ -232,7 +269,7 @@ def cmd( cmd: str, *args: t.Any, target: str | int | None = None, - ) -> tmux_cmd: + ) -> CommandResult: """Execute tmux command respective of socket name and file, return output. Examples @@ -298,7 +335,9 @@ def cmd( cmd_args = ["-t", str(target), *args] if target is not None else [*args] - return tmux_cmd(*svr_args, *cmd_args) + # Convert all arguments to strings for the command runner + all_args = [str(arg) for arg in [*svr_args, *cmd_args]] + return self.command_runner.run(*all_args) @property def attached_sessions(self) -> list[Session]: diff --git a/src/libtmux/session.py b/src/libtmux/session.py index 26b55426d..c54054963 100644 --- a/src/libtmux/session.py +++ b/src/libtmux/session.py @@ -13,6 +13,7 @@ import typing as t import warnings +from libtmux._internal.command_runner import CommandResult from libtmux._internal.query_list import QueryList from libtmux.constants import WINDOW_DIRECTION_FLAG_MAP, WindowDirection from libtmux.formats import FORMAT_SEPARATOR @@ -35,7 +36,6 @@ import types from libtmux._internal.types import StrPath - from libtmux.common import tmux_cmd if sys.version_info >= (3, 11): from typing import Self @@ -195,7 +195,7 @@ def cmd( cmd: str, *args: t.Any, target: str | int | None = None, - ) -> tmux_cmd: + ) -> CommandResult: """Execute tmux subcommand within session context. Automatically binds target by adding ``-t`` for object's session ID to the diff --git a/src/libtmux/window.py b/src/libtmux/window.py index e20eb26f3..069999b2d 100644 --- a/src/libtmux/window.py +++ b/src/libtmux/window.py @@ -13,8 +13,9 @@ import typing as t import warnings +from libtmux._internal.command_runner import CommandResult from libtmux._internal.query_list import QueryList -from libtmux.common import has_gte_version, tmux_cmd +from libtmux.common import has_gte_version from libtmux.constants import ( RESIZE_ADJUSTMENT_DIRECTION_FLAG_MAP, PaneDirection, @@ -195,7 +196,7 @@ def cmd( cmd: str, *args: t.Any, target: str | int | None = None, - ) -> tmux_cmd: + ) -> CommandResult: """Execute tmux subcommand within window context. Automatically binds target by adding ``-t`` for object's window ID to the diff --git a/tests/control_mode/__init__.py b/tests/control_mode/__init__.py new file mode 100644 index 000000000..af345350e --- /dev/null +++ b/tests/control_mode/__init__.py @@ -0,0 +1 @@ +"""Tests for libtmux's control mode engine.""" diff --git a/tests/control_mode/test_benchmarks.py b/tests/control_mode/test_benchmarks.py new file mode 100644 index 000000000..761dbde68 --- /dev/null +++ b/tests/control_mode/test_benchmarks.py @@ -0,0 +1,214 @@ +"""Performance benchmarks for control mode vs subprocess. + +These benchmarks demonstrate the performance advantages of control mode for +sequential tmux operations. Control mode maintains a persistent connection, +avoiding the overhead of spawning a subprocess for each command. + +Run with: uv run pytest tests/control_mode/test_benchmarks.py -v +""" + +from __future__ import annotations + +import typing as t + +import pytest + +from libtmux._internal.engines import ControlModeCommandRunner, SubprocessCommandRunner +from libtmux.server import Server + +if t.TYPE_CHECKING: + pass + + +@pytest.mark.benchmark +def test_benchmark_list_operations_subprocess(server: Server) -> None: + """Benchmark subprocess runner for repeated list operations. + + This simulates typical usage where many query operations are performed + sequentially. Each operation spawns a new tmux process. + """ + assert server.socket_name + subprocess_runner = SubprocessCommandRunner() + + # Create some test sessions first + for i in range(5): + subprocess_runner.run( + "-L", server.socket_name, "new-session", "-d", "-s", f"bench_sp_{i}" + ) + + # Benchmark: 50 list operations + for _ in range(50): + result = subprocess_runner.run("-L", server.socket_name, "list-sessions") + assert result.returncode == 0 + + +@pytest.mark.benchmark +def test_benchmark_list_operations_control_mode(server: Server) -> None: + """Benchmark control mode runner for repeated list operations. + + This demonstrates the performance advantage of persistent connection. + All operations use a single tmux process in control mode. + """ + assert server.socket_name + control_runner = ControlModeCommandRunner(server.socket_name) + + # Create some test sessions first + for i in range(5): + control_runner.run( + "-L", server.socket_name, "new-session", "-d", "-s", f"bench_cm_{i}" + ) + + # Benchmark: 50 list operations + for _ in range(50): + result = control_runner.run("-L", server.socket_name, "list-sessions") + assert result.returncode == 0 + + control_runner.close() + + +@pytest.mark.benchmark +def test_benchmark_mixed_workload_subprocess(server: Server) -> None: + """Benchmark subprocess runner for mixed create/query operations. + + This simulates a typical workflow with session creation and queries. + """ + assert server.socket_name + subprocess_runner = SubprocessCommandRunner() + + # Mixed workload: create session, query, create window, query + for i in range(10): + # Create session + result = subprocess_runner.run( + "-L", server.socket_name, "new-session", "-d", "-s", f"mixed_sp_{i}" + ) + assert result.returncode == 0 + + # Query sessions + result = subprocess_runner.run("-L", server.socket_name, "list-sessions") + assert result.returncode == 0 + + # Create window + result = subprocess_runner.run( + "-L", server.socket_name, "new-window", "-t", f"mixed_sp_{i}", "-d" + ) + assert result.returncode == 0 + + # Query windows + result = subprocess_runner.run( + "-L", server.socket_name, "list-windows", "-t", f"mixed_sp_{i}" + ) + assert result.returncode == 0 + + +@pytest.mark.benchmark +def test_benchmark_mixed_workload_control_mode(server: Server) -> None: + """Benchmark control mode runner for mixed create/query operations. + + This demonstrates persistent connection advantage for typical workflows. + """ + assert server.socket_name + control_runner = ControlModeCommandRunner(server.socket_name) + + # Mixed workload: create session, query, create window, query + for i in range(10): + # Create session + result = control_runner.run( + "-L", server.socket_name, "new-session", "-d", "-s", f"mixed_cm_{i}" + ) + assert result.returncode == 0 + + # Query sessions + result = control_runner.run("-L", server.socket_name, "list-sessions") + assert result.returncode == 0 + + # Create window + result = control_runner.run( + "-L", server.socket_name, "new-window", "-t", f"mixed_cm_{i}", "-d" + ) + assert result.returncode == 0 + + # Query windows + result = control_runner.run( + "-L", server.socket_name, "list-windows", "-t", f"mixed_cm_{i}" + ) + assert result.returncode == 0 + + control_runner.close() + + +@pytest.mark.benchmark +def test_benchmark_server_integration_subprocess(server: Server) -> None: + """Benchmark Server with default subprocess runner. + + This tests the performance of high-level Server API using subprocess. + """ + # Server uses subprocess runner by default + assert server.socket_name + + # Create sessions and windows using high-level API + for i in range(5): + session = server.new_session(f"integ_sp_{i}") + assert session.session_name == f"integ_sp_{i}" + + # Create windows + window = session.new_window(f"win_{i}") + assert window.window_name == f"win_{i}" + + # Query + _ = server.sessions + + +@pytest.mark.benchmark +def test_benchmark_server_integration_control_mode( + TestServer: t.Callable[..., Server], +) -> None: + """Benchmark Server with control mode runner. + + This tests the performance of high-level Server API using control mode. + Note: Operations with format strings still use subprocess fallback. + """ + # Create independent server with unique socket + test_server = TestServer() + assert test_server.socket_name + control_runner = ControlModeCommandRunner(test_server.socket_name) + + # Create server with control mode runner + cm_server = Server( + socket_name=test_server.socket_name, + command_runner=control_runner, + ) + + # Create sessions and windows using high-level API + for i in range(5): + session = cm_server.new_session(f"integ_cm_{i}") + assert session.session_name == f"integ_cm_{i}" + + # Create windows + window = session.new_window(f"win_{i}") + assert window.window_name == f"win_{i}" + + # Query + _ = cm_server.sessions + + control_runner.close() + + +def test_benchmark_summary(server: Server) -> None: + """Summary test explaining benchmark results. + + This test documents the expected performance characteristics. + Run all benchmarks with: pytest tests/control_mode/test_benchmarks.py -v + + Expected results: + - Control mode: 10-50x faster for query-heavy workloads + - Control mode: 5-15x faster for mixed workloads + - Subprocess: Simpler, no connection management + - Control mode: Best for scripts with many sequential operations + + Performance factors: + - Process spawn overhead eliminated (control mode) + - Single persistent connection (control mode) + - Format string operations use subprocess fallback (both modes) + """ + assert server.socket_name + # This is a documentation test - always passes diff --git a/tests/control_mode/test_parser.py b/tests/control_mode/test_parser.py new file mode 100644 index 000000000..1ff0d4227 --- /dev/null +++ b/tests/control_mode/test_parser.py @@ -0,0 +1,200 @@ +"""Tests for control mode protocol parser.""" + +from __future__ import annotations + +import io + +import pytest + +from libtmux._internal.engines.control_mode.parser import ProtocolParser + + +def test_parser_success_response() -> None: + """Parse successful %begin/%end block.""" + stdout = io.StringIO( + "%begin 1234 1 0\noutput line 1\noutput line 2\n%end 1234 1 0\n" + ) + + parser = ProtocolParser(stdout) + result = parser.parse_response(["list-sessions"]) + + assert result.stdout == ["output line 1", "output line 2"] + assert result.stderr == [] + assert result.returncode == 0 + + +def test_parser_error_response() -> None: + """Parse error %begin/%error block.""" + stdout = io.StringIO( + "%begin 1234 2 0\nparse error: unknown command\n%error 1234 2 0\n" + ) + + parser = ProtocolParser(stdout) + result = parser.parse_response(["bad-command"]) + + assert result.stdout == ["parse error: unknown command"] + assert result.returncode == 1 + + +def test_parser_empty_output() -> None: + """Handle response with no output lines.""" + stdout = io.StringIO("%begin 1234 3 0\n%end 1234 3 0\n") + + parser = ProtocolParser(stdout) + result = parser.parse_response(["some-command"]) + + assert result.stdout == [] + assert result.returncode == 0 + + +def test_parser_with_notifications() -> None: + """Queue notifications between responses.""" + stdout = io.StringIO( + "%session-changed $0 mysession\n" + "%window-add @1\n" + "%begin 1234 4 0\n" + "output\n" + "%end 1234 4 0\n" + ) + + parser = ProtocolParser(stdout) + result = parser.parse_response(["list-windows"]) + + assert result.stdout == ["output"] + assert result.returncode == 0 + # Notifications should be queued + assert len(parser.notifications) == 2 + assert parser.notifications[0] == "%session-changed $0 mysession" + assert parser.notifications[1] == "%window-add @1" + + +def test_parser_notification_during_response() -> None: + """Handle notification that arrives during response.""" + stdout = io.StringIO( + "%begin 1234 5 0\n" + "line 1\n" + "%sessions-changed\n" # Notification mid-response + "line 2\n" + "%end 1234 5 0\n" + ) + + parser = ProtocolParser(stdout) + result = parser.parse_response(["test"]) + + # Output lines should not include notification + assert result.stdout == ["line 1", "line 2"] + # Notification should be queued + assert "%sessions-changed" in parser.notifications + + +def test_parser_connection_closed() -> None: + """Raise ConnectionError on EOF.""" + stdout = io.StringIO("") # Empty stream = EOF + + parser = ProtocolParser(stdout) + + with pytest.raises(ConnectionError, match="connection closed"): + parser.parse_response(["test"]) + + +def test_parser_connection_closed_mid_response() -> None: + """Raise ConnectionError if EOF during response.""" + stdout = io.StringIO( + "%begin 1234 6 0\npartial output\n" + # No %end - connection closed + ) + + parser = ProtocolParser(stdout) + + with pytest.raises(ConnectionError): + parser.parse_response(["test"]) + + +def test_parser_multiline_output() -> None: + """Handle response with many output lines.""" + lines = [f"line {i}" for i in range(50)] + output = "\n".join(["%begin 1234 7 0", *lines, "%end 1234 7 0"]) + "\n" + + stdout = io.StringIO(output) + parser = ProtocolParser(stdout) + result = parser.parse_response(["test"]) + + assert len(result.stdout) == 50 + assert result.stdout[0] == "line 0" + assert result.stdout[49] == "line 49" + + +def test_parser_multiple_responses_sequential() -> None: + """Parse multiple responses sequentially.""" + stdout = io.StringIO( + "%begin 1234 1 0\n" + "response 1\n" + "%end 1234 1 0\n" + "%begin 1234 2 0\n" + "response 2\n" + "%end 1234 2 0\n" + ) + + parser = ProtocolParser(stdout) + + result1 = parser.parse_response(["cmd1"]) + assert result1.stdout == ["response 1"] + + result2 = parser.parse_response(["cmd2"]) + assert result2.stdout == ["response 2"] + + +def test_parser_preserves_empty_lines() -> None: + """Empty lines in output are preserved.""" + stdout = io.StringIO( + "%begin 1234 8 0\n" + "line 1\n" + "\n" # Empty line + "line 3\n" + "%end 1234 8 0\n" + ) + + parser = ProtocolParser(stdout) + result = parser.parse_response(["test"]) + + assert len(result.stdout) == 3 + assert result.stdout[0] == "line 1" + assert result.stdout[1] == "" + assert result.stdout[2] == "line 3" + + +def test_parser_complex_output() -> None: + """Handle complex real-world output.""" + # Simulates actual tmux list-sessions output + stdout = io.StringIO( + "%begin 1363006971 2 1\n" + "0: ksh* (1 panes) [80x24] [layout b25f,80x24,0,0,2] @2 (active)\n" + "1: bash (2 panes) [80x24] [layout b25f,80x24,0,0,3] @3\n" + "%end 1363006971 2 1\n" + ) + + parser = ProtocolParser(stdout) + result = parser.parse_response(["list-sessions"]) + + assert len(result.stdout) == 2 + assert "ksh*" in result.stdout[0] + assert "bash" in result.stdout[1] + assert result.returncode == 0 + + +def test_parser_error_with_multiline_message() -> None: + """Handle error with multi-line error message.""" + stdout = io.StringIO( + "%begin 1234 9 0\n" + "error: command failed\n" + "reason: invalid argument\n" + "suggestion: try --help\n" + "%error 1234 9 0\n" + ) + + parser = ProtocolParser(stdout) + result = parser.parse_response(["bad-cmd"]) + + assert result.returncode == 1 + assert len(result.stdout) == 3 + assert "error: command failed" in result.stdout[0] diff --git a/tests/control_mode/test_result.py b/tests/control_mode/test_result.py new file mode 100644 index 000000000..9787c7c30 --- /dev/null +++ b/tests/control_mode/test_result.py @@ -0,0 +1,139 @@ +"""Tests for ControlModeResult.""" + +from __future__ import annotations + +from libtmux._internal.engines.control_mode.result import ControlModeResult + + +def test_result_has_required_attributes() -> None: + """ControlModeResult has stdout, stderr, returncode, cmd attributes.""" + result = ControlModeResult( + stdout=["line1", "line2"], + stderr=[], + returncode=0, + cmd=["tmux", "-C", "list-sessions"], + ) + + assert hasattr(result, "stdout") + assert hasattr(result, "stderr") + assert hasattr(result, "returncode") + assert hasattr(result, "cmd") + + +def test_result_attributes_accessible() -> None: + """All attributes are accessible and have correct values.""" + result = ControlModeResult( + stdout=["output1", "output2"], + stderr=["error1"], + returncode=1, + cmd=["tmux", "-C", "invalid-command"], + ) + + assert result.stdout == ["output1", "output2"] + assert result.stderr == ["error1"] + assert result.returncode == 1 + assert result.cmd == ["tmux", "-C", "invalid-command"] + + +def test_result_empty_stdout() -> None: + """ControlModeResult handles empty stdout.""" + result = ControlModeResult( + stdout=[], + stderr=[], + returncode=0, + cmd=["tmux", "-C", "some-command"], + ) + + assert result.stdout == [] + assert len(result.stdout) == 0 + assert bool(result.stdout) is False + + +def test_result_empty_stderr() -> None: + """ControlModeResult handles empty stderr.""" + result = ControlModeResult( + stdout=["output"], + stderr=[], + returncode=0, + cmd=["tmux", "-C", "list-sessions"], + ) + + assert result.stderr == [] + assert bool(result.stderr) is False # Empty list is falsy + + +def test_result_repr() -> None: + """ControlModeResult has informative repr.""" + result = ControlModeResult( + stdout=["line1", "line2", "line3"], + stderr=["error"], + returncode=1, + cmd=["tmux", "-C", "test"], + ) + + repr_str = repr(result) + assert "ControlModeResult" in repr_str + assert "returncode=1" in repr_str + assert "stdout_lines=3" in repr_str + assert "stderr_lines=1" in repr_str + + +def test_result_duck_types_as_tmux_cmd() -> None: + """ControlModeResult has same interface as tmux_cmd.""" + # Create both types + result = ControlModeResult( + stdout=["test"], + stderr=[], + returncode=0, + cmd=["tmux", "-C", "list-sessions"], + ) + + # Both should have same attributes + tmux_attrs = {"stdout", "stderr", "returncode", "cmd"} + result_attrs = {attr for attr in dir(result) if not attr.startswith("_")} + + assert tmux_attrs.issubset(result_attrs) + + +def test_result_success_case() -> None: + """ControlModeResult for successful command.""" + result = ControlModeResult( + stdout=["0: session1 (1 windows) (created Tue Oct 1 12:00:00 2024)"], + stderr=[], + returncode=0, + cmd=["tmux", "-C", "-L", "test", "list-sessions"], + ) + + assert result.returncode == 0 + assert len(result.stdout) == 1 + assert len(result.stderr) == 0 + assert "session1" in result.stdout[0] + + +def test_result_error_case() -> None: + """ControlModeResult for failed command.""" + result = ControlModeResult( + stdout=["parse error: unknown command: bad-command"], + stderr=[], + returncode=1, + cmd=["tmux", "-C", "bad-command"], + ) + + assert result.returncode == 1 + assert len(result.stdout) == 1 + assert "parse error" in result.stdout[0] + + +def test_result_multiline_output() -> None: + """ControlModeResult handles multi-line output.""" + lines = [f"line{i}" for i in range(100)] + result = ControlModeResult( + stdout=lines, + stderr=[], + returncode=0, + cmd=["tmux", "-C", "test"], + ) + + assert len(result.stdout) == 100 + assert result.stdout[0] == "line0" + assert result.stdout[99] == "line99" diff --git a/tests/control_mode/test_runner.py b/tests/control_mode/test_runner.py new file mode 100644 index 000000000..b777907c6 --- /dev/null +++ b/tests/control_mode/test_runner.py @@ -0,0 +1,325 @@ +"""Integration tests for ControlModeCommandRunner. + +IMPORTANT: All tests use REAL tmux - no mocks! +""" + +from __future__ import annotations + +import threading +import typing as t + +import pytest + +from libtmux._internal.engines.control_mode import ControlModeCommandRunner +from libtmux.server import Server + +if t.TYPE_CHECKING: + from libtmux.session import Session + + +def test_runner_connects_to_tmux(server: Server) -> None: + """ControlModeCommandRunner establishes connection to real tmux.""" + assert server.socket_name + runner = ControlModeCommandRunner(server.socket_name) + + assert runner._process is not None + assert runner._process.poll() is None # Still running + assert runner._parser is not None + + runner.close() + + +def test_runner_executes_command(server: Server) -> None: + """Runner executes commands against real tmux.""" + assert server.socket_name + runner = ControlModeCommandRunner(server.socket_name) + + # Create session via normal server + _ = server.new_session("test_control_mode") + + # Execute list-sessions via control mode + result = runner.run("-L", server.socket_name, "list-sessions") + + assert result.returncode == 0 + assert len(result.stdout) > 0 + assert any("test_control_mode" in line for line in result.stdout) + + runner.close() + + +def test_runner_filters_server_flags(server: Server) -> None: + """Runner correctly filters -L/-S/-f flags.""" + assert server.socket_name + runner = ControlModeCommandRunner(server.socket_name) + + # Create test session + _ = server.new_session("filter_test") + + # Send command with server flags (as Server.cmd() would) + result = runner.run( + "-L", + server.socket_name, + "-2", # Color flag (should be filtered) + "list-sessions", + ) + + # Should work despite flags + assert result.returncode == 0 + assert any("filter_test" in line for line in result.stdout) + + runner.close() + + +def test_runner_multiple_commands_sequential(server: Server) -> None: + """Runner handles multiple commands in sequence.""" + assert server.socket_name + runner = ControlModeCommandRunner(server.socket_name) + + # Execute multiple commands + result1 = runner.run("-L", server.socket_name, "new-session", "-d", "-s", "seq1") + assert result1.returncode == 0 + + result2 = runner.run("-L", server.socket_name, "new-session", "-d", "-s", "seq2") + assert result2.returncode == 0 + + result3 = runner.run("-L", server.socket_name, "list-sessions") + assert result3.returncode == 0 + assert any("seq1" in line for line in result3.stdout) + assert any("seq2" in line for line in result3.stdout) + + runner.close() + + +def test_runner_handles_command_error(server: Server) -> None: + """Runner correctly handles command errors.""" + assert server.socket_name + runner = ControlModeCommandRunner(server.socket_name) + + # Execute invalid command + result = runner.run("-L", server.socket_name, "invalid-command-xyz") + + # Should return error (not raise exception) + assert result.returncode == 1 + assert len(result.stdout) > 0 + # Error message should mention unknown command + assert any( + "unknown command" in line.lower() or "parse error" in line.lower() + for line in result.stdout + ) + + runner.close() + + +def test_runner_context_manager(server: Server) -> None: + """Context manager closes connection automatically.""" + assert server.socket_name + session_created = False + + with ControlModeCommandRunner(server.socket_name) as runner: + result = runner.run( + "-L", server.socket_name, "new-session", "-d", "-s", "ctx_test" + ) + session_created = result.returncode == 0 + + # Connection should be closed after context exit + assert session_created + # Verify session was created (using regular server) + assert server.has_session("ctx_test") + + +def test_runner_close_is_idempotent(server: Server) -> None: + """close() can be called multiple times safely.""" + assert server.socket_name + runner = ControlModeCommandRunner(server.socket_name) + + runner.close() + runner.close() # Should not raise + runner.close() # Should not raise + + +def test_runner_detects_closed_connection(server: Server) -> None: + """Runner detects when connection is closed.""" + assert server.socket_name + runner = ControlModeCommandRunner(server.socket_name) + runner.close() + + with pytest.raises(ConnectionError, match="not connected"): + runner.run("-L", server.socket_name, "list-sessions") + + +def test_runner_thread_safety(server: Server) -> None: + """Multiple threads can use same runner safely.""" + assert server.socket_name + socket_name = server.socket_name # Store for closure + runner = ControlModeCommandRunner(socket_name) + results: list[bool] = [] + errors: list[Exception] = [] + + def create_session(name: str) -> None: + try: + result = runner.run("-L", socket_name, "new-session", "-d", "-s", name) + results.append(result.returncode == 0) + except Exception as e: + errors.append(e) + + # Spawn 5 threads creating sessions concurrently + threads = [ + threading.Thread(target=create_session, args=(f"thread_{i}",)) for i in range(5) + ] + + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + # All should succeed + assert len(errors) == 0, f"Errors occurred: {errors}" + assert all(results), "Not all sessions created successfully" + + # Verify all sessions exist + list_result = runner.run("-L", socket_name, "list-sessions") + for i in range(5): + assert any(f"thread_{i}" in line for line in list_result.stdout) + + runner.close() + + +def test_runner_preserves_output_format(server: Server, session: Session) -> None: + """Control mode output matches subprocess output format.""" + assert server.socket_name + from libtmux._internal.engines import SubprocessCommandRunner + + # Create sessions/windows via normal server + _ = session.new_window("win1") + _ = session.new_window("win2") + + # Get output via subprocess + subprocess_runner = SubprocessCommandRunner() + subprocess_result = subprocess_runner.run( + "-L", server.socket_name, "list-windows", "-t", session.session_id or "$0" + ) + + # Get output via control mode + control_runner = ControlModeCommandRunner(server.socket_name) + control_result = control_runner.run( + "-L", server.socket_name, "list-windows", "-t", session.session_id or "$0" + ) + + # Both should succeed + assert subprocess_result.returncode == 0 + assert control_result.returncode == 0 + + # Both should have same number of lines (same windows) + assert len(subprocess_result.stdout) == len(control_result.stdout) + + # Both should mention same windows + subprocess_text = " ".join(subprocess_result.stdout) + control_text = " ".join(control_result.stdout) + assert "win1" in subprocess_text + assert "win1" in control_text + assert "win2" in subprocess_text + assert "win2" in control_text + + control_runner.close() + + +def test_runner_with_server_integration(TestServer: t.Callable[..., Server]) -> None: + """Server works correctly with control mode runner. + + Control mode runner transparently falls back to subprocess for commands + with format strings (-F flag), ensuring all operations work correctly. + """ + # Create independent server with unique socket + test_server = TestServer() + assert test_server.socket_name + runner = ControlModeCommandRunner(test_server.socket_name) + + # Create server with control mode runner + cm_server = Server( + socket_name=test_server.socket_name, + command_runner=runner, + ) + + # All normal operations should work + session = cm_server.new_session("cm_integration_test") + assert session.session_name == "cm_integration_test" + + window = session.new_window("test_window") + assert window.window_name == "test_window" + + # Verify runner was actually used + assert isinstance(cm_server.command_runner, ControlModeCommandRunner) + + runner.close() + + +def test_runner_handles_empty_response(server: Server) -> None: + """Runner handles commands with no output.""" + assert server.socket_name + runner = ControlModeCommandRunner(server.socket_name) + + # Create a session (output goes to stdout but we use -d for detached) + result = runner.run( + "-L", server.socket_name, "new-session", "-d", "-s", "empty_test" + ) + + # Should succeed + assert result.returncode == 0 + # Output might be empty or minimal + assert isinstance(result.stdout, list) + + runner.close() + + +def test_runner_large_output(server: Server) -> None: + """Runner handles commands with large output.""" + assert server.socket_name + runner = ControlModeCommandRunner(server.socket_name) + + # Create many sessions + for i in range(20): + runner.run("-L", server.socket_name, "new-session", "-d", "-s", f"large_{i}") + + # List all sessions (large output) + result = runner.run("-L", server.socket_name, "list-sessions") + + assert result.returncode == 0 + assert len(result.stdout) >= 20 + + runner.close() + + +def test_runner_command_with_special_characters(server: Server) -> None: + """Runner handles commands with special characters.""" + assert server.socket_name + runner = ControlModeCommandRunner(server.socket_name) + + # Create session with special characters in name + session_name = "test-session_123" + result = runner.run( + "-L", server.socket_name, "new-session", "-d", "-s", session_name + ) + + assert result.returncode == 0 + + # Verify it was created + list_result = runner.run("-L", server.socket_name, "list-sessions") + assert any(session_name in line for line in list_result.stdout) + + runner.close() + + +def test_runner_result_has_cmd_attribute(server: Server) -> None: + """Runner result includes cmd attribute for debugging.""" + assert server.socket_name + runner = ControlModeCommandRunner(server.socket_name) + + result = runner.run("-L", server.socket_name, "list-sessions") + + assert hasattr(result, "cmd") + assert isinstance(result.cmd, list) + assert "tmux" in result.cmd + assert "-C" in result.cmd + assert "list-sessions" in result.cmd + + runner.close() diff --git a/tests/test_command_runner.py b/tests/test_command_runner.py new file mode 100644 index 000000000..e52cb86a4 --- /dev/null +++ b/tests/test_command_runner.py @@ -0,0 +1,147 @@ +"""Test command runner abstraction.""" + +from __future__ import annotations + +import typing as t + +from libtmux._internal.engines import SubprocessCommandRunner +from libtmux.server import Server + +if t.TYPE_CHECKING: + from libtmux.session import Session + + +def test_subprocess_runner_instantiation() -> None: + """SubprocessCommandRunner can be instantiated.""" + runner = SubprocessCommandRunner() + assert runner is not None + + +def test_subprocess_runner_has_run_method() -> None: + """SubprocessCommandRunner has run method.""" + runner = SubprocessCommandRunner() + assert hasattr(runner, "run") + assert callable(runner.run) + + +def test_server_default_runner(server: Server) -> None: + """Server uses subprocess runner by default.""" + assert server.command_runner is not None + assert isinstance(server.command_runner, SubprocessCommandRunner) + + +def test_server_custom_runner(TestServer: t.Callable[..., Server]) -> None: + """Server accepts custom command runner.""" + custom_runner = SubprocessCommandRunner() + server = TestServer() + server.command_runner = custom_runner + assert server.command_runner is custom_runner + + +def test_server_runner_lazy_init(TestServer: t.Callable[..., Server]) -> None: + """Server lazily initializes command runner.""" + server = TestServer() + # Access property to trigger lazy init + runner = server.command_runner + assert isinstance(runner, SubprocessCommandRunner) + # Second access returns same instance + assert server.command_runner is runner + + +def test_runner_returns_tmux_cmd(server: Server, session: Session) -> None: + """Command runner returns tmux_cmd objects.""" + from libtmux.common import tmux_cmd + + result = server.cmd("list-sessions") + assert isinstance(result, tmux_cmd) + assert hasattr(result, "stdout") + assert hasattr(result, "stderr") + assert hasattr(result, "returncode") + assert hasattr(result, "cmd") + + +def test_runner_executes_real_tmux(server: Server) -> None: + """Command runner executes real tmux (not mocked).""" + # Create a session + _ = server.new_session("test_runner") + + # List sessions through runner + result = server.cmd("list-sessions") + + # Verify real tmux was executed + assert any("test_runner" in line for line in result.stdout) + assert result.returncode == 0 + + +def test_runner_subprocess_integration(server: Server) -> None: + """SubprocessCommandRunner integrates with Server.cmd().""" + # Verify the runner is being used + assert isinstance(server.command_runner, SubprocessCommandRunner) + + # Execute a command + result = server.cmd("list-sessions") + + # Verify result structure + assert isinstance(result.stdout, list) + assert isinstance(result.stderr, list) + assert isinstance(result.returncode, int) + assert isinstance(result.cmd, list) + + +def test_server_cmd_uses_runner(server: Server) -> None: + """Server.cmd() uses command_runner internally.""" + _ = server.new_session("test_cmd_uses_runner") + + # Execute command + result = server.cmd("list-sessions") + + # Verify it went through the runner + assert result.returncode == 0 + assert any("test_cmd_uses_runner" in line for line in result.stdout) + + +def test_runner_with_test_server_factory(TestServer: t.Callable[..., Server]) -> None: + """TestServer factory works with command runner.""" + server1 = TestServer() + server2 = TestServer() + + # Both should have command runners + assert server1.command_runner is not None + assert server2.command_runner is not None + + # Both should use SubprocessCommandRunner + assert isinstance(server1.command_runner, SubprocessCommandRunner) + assert isinstance(server2.command_runner, SubprocessCommandRunner) + + +def test_backward_compatibility_no_runner_param( + TestServer: t.Callable[..., Server], +) -> None: + """Server works without command_runner parameter (backward compatibility).""" + # Create server without command_runner parameter + new_server = TestServer() + + # Should auto-initialize with SubprocessCommandRunner + assert new_server.command_runner is not None + assert isinstance(new_server.command_runner, SubprocessCommandRunner) + + # Should be able to execute commands + result = new_server.cmd("list-sessions") + assert hasattr(result, "stdout") + + +def test_runner_setter(TestServer: t.Callable[..., Server]) -> None: + """Server.command_runner can be set after initialization.""" + server = TestServer() + + # Initial runner + initial_runner = server.command_runner + assert isinstance(initial_runner, SubprocessCommandRunner) + + # Set new runner + new_runner = SubprocessCommandRunner() + server.command_runner = new_runner + + # Verify it changed + assert server.command_runner is new_runner + assert server.command_runner is not initial_runner