Skip to content

Commit 5f22143

Browse files
committed
ControlModeEngine(fix[timeout]): Add timeouts and cleanup
1 parent 723774b commit 5f22143

File tree

5 files changed

+173
-64
lines changed

5 files changed

+173
-64
lines changed

src/libtmux/_internal/engines/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def run(
1818
cmd: str,
1919
cmd_args: t.Sequence[str | int] | None = None,
2020
server_args: t.Sequence[str | int] | None = None,
21+
timeout: float | None = None,
2122
) -> tmux_cmd:
2223
"""Run a tmux command and return the result."""
2324
...

src/libtmux/_internal/engines/control_mode.py

Lines changed: 117 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,23 @@
1919
class ControlModeEngine(Engine):
2020
"""Engine that runs tmux commands via a persistent Control Mode process."""
2121

22-
def __init__(self) -> None:
22+
def __init__(self, command_timeout: float | None = 10.0) -> None:
2323
self.process: subprocess.Popen[str] | None = None
2424
self._lock = threading.Lock()
25-
self._server_args: t.Sequence[str | int] | None = None
25+
self._server_args: tuple[str | int, ...] | None = None
26+
self.command_timeout = command_timeout
2627

2728
def close(self) -> None:
2829
"""Terminate the tmux control mode process."""
2930
if self.process:
3031
self.process.terminate()
31-
self.process.wait()
32+
try:
33+
self.process.wait(timeout=1)
34+
except subprocess.TimeoutExpired:
35+
self.process.kill()
36+
self.process.wait()
3237
self.process = None
38+
self._server_args = None
3339

3440
def __del__(self) -> None:
3541
"""Cleanup the process on destruction."""
@@ -41,13 +47,15 @@ def _start_process(self, server_args: t.Sequence[str | int] | None) -> None:
4147
if not tmux_bin:
4248
raise exc.TmuxCommandNotFound
4349

50+
normalized_args: tuple[str | int, ...] = tuple(server_args or ())
51+
4452
cmd = [tmux_bin]
45-
if server_args:
46-
cmd.extend(str(a) for a in server_args)
53+
if normalized_args:
54+
cmd.extend(str(a) for a in normalized_args)
4755
cmd.append("-C")
4856
cmd.extend(["new-session", "-A", "-s", "libtmux_control_mode"])
4957

50-
logger.debug(f"Starting Control Mode process: {cmd}")
58+
logger.debug("Starting Control Mode process: %s", cmd)
5159
self.process = subprocess.Popen(
5260
cmd,
5361
stdin=subprocess.PIPE,
@@ -57,38 +65,58 @@ def _start_process(self, server_args: t.Sequence[str | int] | None) -> None:
5765
bufsize=0, # Unbuffered
5866
errors="backslashreplace",
5967
)
60-
self._server_args = server_args
68+
self._server_args = normalized_args
6169

62-
# Consume startup command output
70+
# Consume startup command output with the same timeout used for commands.
6371
assert self.process.stdout is not None
64-
while True:
65-
line = self.process.stdout.readline()
66-
if not line:
67-
# EOF immediately?
68-
logger.warning("Control Mode process exited immediately")
69-
break
70-
if line.startswith("%end") or line.startswith("%error"):
71-
break
72+
73+
def bootstrap_reader() -> None:
74+
assert self.process is not None
75+
assert self.process.stdout is not None
76+
while True:
77+
line = self.process.stdout.readline()
78+
if not line:
79+
logger.warning("Control Mode process exited immediately")
80+
break
81+
if line.startswith("%end") or line.startswith("%error"):
82+
break
83+
84+
if self.command_timeout is None:
85+
bootstrap_reader()
86+
else:
87+
bootstrap_thread = threading.Thread(target=bootstrap_reader, daemon=True)
88+
bootstrap_thread.start()
89+
bootstrap_thread.join(timeout=self.command_timeout)
90+
if bootstrap_thread.is_alive():
91+
logger.error("Control Mode bootstrap command timed out")
92+
self.close()
93+
msg = "tmux control mode command timed out"
94+
raise exc.ControlModeTimeout(msg)
7295

7396
def run(
7497
self,
7598
cmd: str,
7699
cmd_args: t.Sequence[str | int] | None = None,
77100
server_args: t.Sequence[str | int] | None = None,
101+
timeout: float | None = None,
78102
) -> tmux_cmd:
79103
"""Run a tmux command via Control Mode."""
80104
with self._lock:
105+
incoming_server_args = tuple(server_args or ())
106+
81107
if self.process is None:
82-
self._start_process(server_args)
83-
elif server_args != self._server_args:
108+
self._start_process(incoming_server_args)
109+
elif incoming_server_args != self._server_args:
84110
# If server_args changed, we might need a new process.
85111
# For now, just warn or restart. Restarting is safer.
86112
logger.warning(
87-
"Server args changed, restarting Control Mode process. "
88-
f"Old: {self._server_args}, New: {server_args}"
113+
"Server args changed; restarting Control Mode process. "
114+
"Old: %s, New: %s",
115+
self._server_args,
116+
incoming_server_args,
89117
)
90118
self.close()
91-
self._start_process(server_args)
119+
self._start_process(incoming_server_args)
92120

93121
assert self.process is not None
94122
assert self.process.stdin is not None
@@ -103,15 +131,15 @@ def run(
103131

104132
command_line = shlex.join(full_args)
105133

106-
logger.debug(f"Sending to Control Mode: {command_line}")
134+
logger.debug("Sending to Control Mode: %s", command_line)
107135
try:
108136
self.process.stdin.write(command_line + "\n")
109137
self.process.stdin.flush()
110138
except BrokenPipeError:
111139
# Process died?
112140
logger.exception("Control Mode process died, restarting...")
113141
self.close()
114-
self._start_process(server_args)
142+
self._start_process(incoming_server_args)
115143
assert self.process is not None
116144
assert self.process.stdin is not None
117145
assert self.process.stdout is not None
@@ -123,47 +151,72 @@ def run(
123151
stderr_lines: list[str] = []
124152
returncode = 0
125153

126-
while True:
127-
line = self.process.stdout.readline()
128-
if not line:
129-
# EOF
130-
logger.error("Unexpected EOF from Control Mode process")
131-
returncode = 1
132-
break
133-
134-
line = line.rstrip("\n")
135-
136-
if line.startswith("%begin"):
137-
# Start of response
138-
# %begin time id flags
139-
parts = line.split()
140-
if len(parts) > 3:
141-
flags = int(parts[3])
142-
if flags & 1:
143-
returncode = 1
144-
continue
145-
elif line.startswith("%end"):
146-
# End of success response
147-
# %end time id flags
148-
parts = line.split()
149-
if len(parts) > 3:
150-
flags = int(parts[3])
151-
if flags & 1:
152-
returncode = 1
153-
break
154-
elif line.startswith("%error"):
155-
# End of error response
156-
returncode = 1
157-
# Captured lines are the error message
158-
stderr_lines = stdout_lines
159-
stdout_lines = []
160-
break
161-
elif line.startswith("%"):
162-
# Notification (ignore for now)
163-
logger.debug(f"Control Mode Notification: {line}")
164-
continue
165-
else:
166-
stdout_lines.append(line)
154+
def reader() -> None:
155+
nonlocal returncode, stdout_lines, stderr_lines
156+
assert self.process is not None
157+
assert self.process.stdout is not None
158+
while True:
159+
line = self.process.stdout.readline()
160+
if not line:
161+
# EOF
162+
logger.error("Unexpected EOF from Control Mode process")
163+
returncode = 1
164+
break
165+
166+
line = line.rstrip("\n")
167+
168+
if line.startswith("%begin"):
169+
# %begin time id flags
170+
parts = line.split()
171+
if len(parts) > 3:
172+
flags = int(parts[3])
173+
if flags & 1:
174+
returncode = 1
175+
continue
176+
elif line.startswith("%end"):
177+
# %end time id flags
178+
parts = line.split()
179+
if len(parts) > 3:
180+
flags = int(parts[3])
181+
if flags & 1:
182+
returncode = 1
183+
break
184+
elif line.startswith("%error"):
185+
returncode = 1
186+
stderr_lines = stdout_lines
187+
stdout_lines = []
188+
break
189+
elif line.startswith("%"):
190+
logger.debug("Control Mode Notification: %s", line)
191+
continue
192+
else:
193+
stdout_lines.append(line)
194+
195+
reader_exc: list[BaseException] = []
196+
197+
def wrapped_reader() -> None:
198+
try:
199+
reader()
200+
except BaseException as read_exc: # pragma: no cover - defensive
201+
reader_exc.append(read_exc)
202+
203+
effective_timeout = timeout if timeout is not None else self.command_timeout
204+
if effective_timeout is None:
205+
wrapped_reader()
206+
else:
207+
reader_thread = threading.Thread(target=wrapped_reader, daemon=True)
208+
reader_thread.start()
209+
reader_thread.join(timeout=effective_timeout)
210+
if reader_thread.is_alive():
211+
logger.error(
212+
"Control Mode command timed out waiting for response: %s",
213+
command_line,
214+
)
215+
self.close()
216+
msg = "tmux control mode command timed out"
217+
raise exc.ControlModeTimeout(msg)
218+
if reader_exc:
219+
raise reader_exc[0]
167220

168221
# Tmux usually puts error message in stdout (captured above) for %error
169222
# But we moved it to stderr_lines if %error occurred.

src/libtmux/_internal/engines/subprocess_engine.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ def run(
2222
cmd: str,
2323
cmd_args: t.Sequence[str | int] | None = None,
2424
server_args: t.Sequence[str | int] | None = None,
25+
timeout: float | None = None,
2526
) -> tmux_cmd:
2627
"""Run a tmux command using subprocess.Popen."""
28+
# Kept for API parity with ControlModeEngine; subprocess variant is synchronous.
29+
_ = timeout
2730
tmux_bin = shutil.which("tmux")
2831
if not tmux_bin:
2932
raise exc.TmuxCommandNotFound

src/libtmux/exc.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ class WaitTimeout(LibTmuxException):
9292
"""Function timed out without meeting condition."""
9393

9494

95+
class ControlModeTimeout(LibTmuxException):
96+
"""tmux control mode command did not return within the timeout."""
97+
98+
9599
class VariableUnpackingError(LibTmuxException):
96100
"""Error unpacking variable."""
97101

tests/test_control_mode_engine.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,15 @@
22

33
from __future__ import annotations
44

5+
import io
56
import pathlib
7+
import subprocess
8+
import time
9+
import typing as t
610

11+
import pytest
12+
13+
from libtmux import exc
714
from libtmux._internal.engines.control_mode import ControlModeEngine
815
from libtmux.server import Server
916

@@ -50,3 +57,44 @@ def test_control_mode_engine_basic(tmp_path: pathlib.Path) -> None:
5057

5158
engine.process.wait(timeout=2)
5259
assert engine.process.poll() is not None
60+
61+
62+
def test_control_mode_timeout(monkeypatch: pytest.MonkeyPatch) -> None:
63+
"""ControlModeEngine should surface timeouts and clean up the process."""
64+
65+
class BlockingStdout:
66+
def readline(self) -> str:
67+
time.sleep(0.05)
68+
return ""
69+
70+
class FakeProcess:
71+
def __init__(self) -> None:
72+
self.stdin = io.StringIO()
73+
self.stdout = BlockingStdout()
74+
self._terminated = False
75+
76+
def poll(self) -> None:
77+
return None
78+
79+
def terminate(self) -> None:
80+
self._terminated = True
81+
82+
def wait(self, timeout: float | None = None) -> None:
83+
return None
84+
85+
engine = ControlModeEngine(command_timeout=0.01)
86+
87+
fake_process = FakeProcess()
88+
89+
def fake_start(
90+
server_args: t.Sequence[str | int] | None,
91+
) -> None: # pragma: no cover - simple stub
92+
engine.process = t.cast(subprocess.Popen[str], fake_process)
93+
engine._server_args = tuple(server_args or ())
94+
95+
monkeypatch.setattr(engine, "_start_process", fake_start)
96+
97+
with pytest.raises(exc.ControlModeTimeout):
98+
engine.run("list-sessions")
99+
100+
assert engine.process is None

0 commit comments

Comments
 (0)