Skip to content

Commit 9659359

Browse files
committed
ControlModeEngine(fix[robustness]): Harden control mode I/O
1 parent 5f22143 commit 9659359

File tree

1 file changed

+144
-110
lines changed

1 file changed

+144
-110
lines changed

src/libtmux/_internal/engines/control_mode.py

Lines changed: 144 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
from __future__ import annotations
44

5+
import contextlib
56
import logging
7+
import queue
68
import shlex
79
import shutil
810
import subprocess
911
import threading
12+
import time
1013
import typing as t
1114

1215
from libtmux import exc
@@ -24,6 +27,10 @@ def __init__(self, command_timeout: float | None = 10.0) -> None:
2427
self._lock = threading.Lock()
2528
self._server_args: tuple[str | int, ...] | None = None
2629
self.command_timeout = command_timeout
30+
self._queue: queue.Queue[str | None] = queue.Queue(maxsize=1024)
31+
self._reader_thread: threading.Thread | None = None
32+
self._stderr_thread: threading.Thread | None = None
33+
self.tmux_bin: str | None = None
2734

2835
def close(self) -> None:
2936
"""Terminate the tmux control mode process."""
@@ -36,16 +43,53 @@ def close(self) -> None:
3643
self.process.wait()
3744
self.process = None
3845
self._server_args = None
46+
# Unblock any waiters
47+
with contextlib.suppress(queue.Full):
48+
self._queue.put_nowait(None)
3949

4050
def __del__(self) -> None:
4151
"""Cleanup the process on destruction."""
4252
self.close()
4353

54+
def _reader(self, process: subprocess.Popen[str]) -> None:
55+
"""Background thread to read stdout and push to queue."""
56+
assert process.stdout is not None
57+
while True:
58+
try:
59+
line = process.stdout.readline()
60+
except (ValueError, OSError):
61+
break
62+
63+
if not line:
64+
# EOF
65+
with contextlib.suppress(queue.Full):
66+
self._queue.put_nowait(None)
67+
break
68+
69+
if line.startswith("%") and not line.startswith(
70+
("%begin", "%end", "%error"),
71+
):
72+
logger.debug("Control Mode Notification: %s", line.rstrip("\n"))
73+
continue
74+
75+
try:
76+
self._queue.put(line, timeout=1)
77+
except queue.Full:
78+
logger.warning("Control Mode queue full; dropping line")
79+
80+
def _drain_stderr(self, process: subprocess.Popen[str]) -> None:
81+
"""Continuously drain stderr to prevent child blocking."""
82+
if process.stderr is None:
83+
return
84+
for err_line in process.stderr:
85+
logger.debug("Control Mode stderr: %s", err_line.rstrip("\n"))
86+
4487
def _start_process(self, server_args: t.Sequence[str | int] | None) -> None:
4588
"""Start the tmux control mode process."""
4689
tmux_bin = shutil.which("tmux")
4790
if not tmux_bin:
4891
raise exc.TmuxCommandNotFound
92+
self.tmux_bin = tmux_bin
4993

5094
normalized_args: tuple[str | int, ...] = tuple(server_args or ())
5195

@@ -67,31 +111,95 @@ def _start_process(self, server_args: t.Sequence[str | int] | None) -> None:
67111
)
68112
self._server_args = normalized_args
69113

114+
# reset queues from prior runs
115+
while not self._queue.empty():
116+
self._queue.get_nowait()
117+
118+
# Start IO threads
119+
self._reader_thread = threading.Thread(
120+
target=self._reader,
121+
args=(self.process,),
122+
daemon=True,
123+
)
124+
self._reader_thread.start()
125+
126+
self._stderr_thread = threading.Thread(
127+
target=self._drain_stderr,
128+
args=(self.process,),
129+
daemon=True,
130+
)
131+
self._stderr_thread.start()
132+
70133
# Consume startup command output with the same timeout used for commands.
71-
assert self.process.stdout is not None
134+
try:
135+
self._read_response(cmd="new-session", timeout=self.command_timeout)
136+
except exc.ControlModeTimeout:
137+
logger.exception("Control Mode bootstrap command timed out")
138+
self.close()
139+
raise
140+
141+
def _read_response(self, cmd: str, timeout: float | None) -> tmux_cmd:
142+
"""Read response from the queue."""
143+
stdout_lines: list[str] = []
144+
stderr_lines: list[str] = []
145+
returncode = 0
146+
147+
start_time = time.monotonic()
148+
149+
while True:
150+
if timeout is None:
151+
remaining: float | None = None
152+
else:
153+
elapsed = time.monotonic() - start_time
154+
remaining = timeout - elapsed
155+
if remaining <= 0:
156+
msg = "tmux control mode command timed out"
157+
raise exc.ControlModeTimeout(msg)
72158

73-
def bootstrap_reader() -> None:
74-
assert self.process is not None
75-
assert self.process.stdout is not None
76-
while True:
77-
line = self.process.stdout.readline()
78-
if not line:
79-
logger.warning("Control Mode process exited immediately")
80-
break
81-
if line.startswith("%end") or line.startswith("%error"):
82-
break
83-
84-
if self.command_timeout is None:
85-
bootstrap_reader()
86-
else:
87-
bootstrap_thread = threading.Thread(target=bootstrap_reader, daemon=True)
88-
bootstrap_thread.start()
89-
bootstrap_thread.join(timeout=self.command_timeout)
90-
if bootstrap_thread.is_alive():
91-
logger.error("Control Mode bootstrap command timed out")
92-
self.close()
159+
try:
160+
line = self._queue.get(timeout=remaining)
161+
except queue.Empty:
93162
msg = "tmux control mode command timed out"
94-
raise exc.ControlModeTimeout(msg)
163+
raise exc.ControlModeTimeout(msg) from None
164+
165+
if line is None:
166+
logger.error("Unexpected EOF from Control Mode process")
167+
returncode = 1
168+
break
169+
170+
line = line.rstrip("\n")
171+
172+
if line.startswith("%begin"):
173+
parts = line.split()
174+
if len(parts) > 3:
175+
flags = int(parts[3])
176+
if flags & 1:
177+
returncode = 1
178+
continue
179+
if line.startswith("%end"):
180+
parts = line.split()
181+
if len(parts) > 3:
182+
flags = int(parts[3])
183+
if flags & 1:
184+
returncode = 1
185+
break
186+
if line.startswith("%error"):
187+
returncode = 1
188+
stderr_lines = stdout_lines
189+
stdout_lines = []
190+
break
191+
192+
stdout_lines.append(line)
193+
194+
if cmd == "has-session" and stderr_lines and not stdout_lines:
195+
stdout_lines = [stderr_lines[0]]
196+
197+
return tmux_cmd(
198+
cmd=[],
199+
stdout=stdout_lines,
200+
stderr=stderr_lines,
201+
returncode=returncode,
202+
)
95203

96204
def run(
97205
self,
@@ -120,11 +228,7 @@ def run(
120228

121229
assert self.process is not None
122230
assert self.process.stdin is not None
123-
assert self.process.stdout is not None
124-
125231
# Construct the command line
126-
# We use shlex.join for correct shell-like quoting, required by tmux control
127-
# mode.
128232
full_args = [cmd]
129233
if cmd_args:
130234
full_args.extend(str(a) for a in cmd_args)
@@ -146,89 +250,19 @@ def run(
146250
self.process.stdin.write(command_line + "\n")
147251
self.process.stdin.flush()
148252

149-
# Read response
150-
stdout_lines: list[str] = []
151-
stderr_lines: list[str] = []
152-
returncode = 0
253+
effective_timeout = timeout if timeout is not None else self.command_timeout
153254

154-
def reader() -> None:
155-
nonlocal returncode, stdout_lines, stderr_lines
156-
assert self.process is not None
157-
assert self.process.stdout is not None
158-
while True:
159-
line = self.process.stdout.readline()
160-
if not line:
161-
# EOF
162-
logger.error("Unexpected EOF from Control Mode process")
163-
returncode = 1
164-
break
165-
166-
line = line.rstrip("\n")
167-
168-
if line.startswith("%begin"):
169-
# %begin time id flags
170-
parts = line.split()
171-
if len(parts) > 3:
172-
flags = int(parts[3])
173-
if flags & 1:
174-
returncode = 1
175-
continue
176-
elif line.startswith("%end"):
177-
# %end time id flags
178-
parts = line.split()
179-
if len(parts) > 3:
180-
flags = int(parts[3])
181-
if flags & 1:
182-
returncode = 1
183-
break
184-
elif line.startswith("%error"):
185-
returncode = 1
186-
stderr_lines = stdout_lines
187-
stdout_lines = []
188-
break
189-
elif line.startswith("%"):
190-
logger.debug("Control Mode Notification: %s", line)
191-
continue
192-
else:
193-
stdout_lines.append(line)
194-
195-
reader_exc: list[BaseException] = []
196-
197-
def wrapped_reader() -> None:
198-
try:
199-
reader()
200-
except BaseException as read_exc: # pragma: no cover - defensive
201-
reader_exc.append(read_exc)
255+
try:
256+
result = self._read_response(cmd=cmd, timeout=effective_timeout)
257+
except exc.ControlModeTimeout:
258+
self.close()
259+
raise
202260

203-
effective_timeout = timeout if timeout is not None else self.command_timeout
204-
if effective_timeout is None:
205-
wrapped_reader()
206-
else:
207-
reader_thread = threading.Thread(target=wrapped_reader, daemon=True)
208-
reader_thread.start()
209-
reader_thread.join(timeout=effective_timeout)
210-
if reader_thread.is_alive():
211-
logger.error(
212-
"Control Mode command timed out waiting for response: %s",
213-
command_line,
214-
)
215-
self.close()
216-
msg = "tmux control mode command timed out"
217-
raise exc.ControlModeTimeout(msg)
218-
if reader_exc:
219-
raise reader_exc[0]
220-
221-
# Tmux usually puts error message in stdout (captured above) for %error
222-
# But we moved it to stderr_lines if %error occurred.
223-
224-
# If we detected failure via flags but got %end, treat stdout as potentially
225-
# containing info?
226-
# For now, keep stdout as is.
227-
228-
# Mimic subprocess.communicate output structure
229-
return tmux_cmd(
230-
cmd=[cmd] + (list(map(str, cmd_args)) if cmd_args else []),
231-
stdout=stdout_lines,
232-
stderr=stderr_lines,
233-
returncode=returncode,
234-
)
261+
full_cmd = [self.tmux_bin or "tmux"]
262+
if incoming_server_args:
263+
full_cmd.extend(str(x) for x in incoming_server_args)
264+
full_cmd.append(cmd)
265+
if cmd_args:
266+
full_cmd.extend(str(x) for x in cmd_args)
267+
result.cmd = full_cmd
268+
return result

0 commit comments

Comments
 (0)