44
55import io
66import pathlib
7+ import queue
8+ import threading
79import time
810import typing as t
911from collections import deque
@@ -321,27 +323,138 @@ def test_iter_notifications_survives_overflow(
321323 assert first .kind .name == "SESSIONS_CHANGED"
322324
323325
326+ class ScriptedStdin :
327+ """Fake stdin that can optionally raise BrokenPipeError on write."""
328+
329+ def __init__ (self , broken : bool = False ) -> None :
330+ """Initialize stdin.
331+
332+ Parameters
333+ ----------
334+ broken : bool
335+ If True, write() and flush() raise BrokenPipeError.
336+ """
337+ self ._broken = broken
338+ self ._buf : list [str ] = []
339+
340+ def write (self , data : str ) -> int :
341+ """Write data or raise BrokenPipeError if broken."""
342+ if self ._broken :
343+ raise BrokenPipeError
344+ self ._buf .append (data )
345+ return len (data )
346+
347+ def flush (self ) -> None :
348+ """Flush or raise BrokenPipeError if broken."""
349+ if self ._broken :
350+ raise BrokenPipeError
351+
352+
353+ class ScriptedStdout :
354+ """Queue-backed stdout that blocks like real subprocess I/O.
355+
356+ Lines are fed from a background thread, simulating the pacing of real
357+ process output. The iterator blocks on __next__ until a line is available
358+ or EOF.
359+ """
360+
361+ def __init__ (self , lines : list [str ], delay : float = 0.0 ) -> None :
362+ """Initialize stdout iterator.
363+
364+ Parameters
365+ ----------
366+ lines : list[str]
367+ Lines to emit (without trailing newlines).
368+ delay : float
369+ Optional delay between lines in seconds.
370+ """
371+ self ._queue : queue .Queue [str | None ] = queue .Queue ()
372+ self ._delay = delay
373+ self ._closed = threading .Event ()
374+ self ._lines_fed = threading .Event ()
375+
376+ # Start feeder thread that pushes lines with optional delay
377+ self ._feeder = threading .Thread (
378+ target = self ._feed ,
379+ args = (lines ,),
380+ daemon = True ,
381+ )
382+ self ._feeder .start ()
383+
384+ def _feed (self , lines : list [str ]) -> None :
385+ """Feed lines into the queue from a background thread."""
386+ for line in lines :
387+ if self ._delay > 0 :
388+ time .sleep (self ._delay )
389+ self ._queue .put (line )
390+ # Sentinel signals EOF
391+ self ._queue .put (None )
392+ self ._lines_fed .set ()
393+
394+ def __iter__ (self ) -> ScriptedStdout :
395+ """Return iterator (self)."""
396+ return self
397+
398+ def __next__ (self ) -> str :
399+ """Block until next line or raise StopIteration at EOF."""
400+ item = self ._queue .get () # Blocks until available
401+ if item is None :
402+ self ._closed .set ()
403+ raise StopIteration
404+ return item
405+
406+ def wait_until_fed (self , timeout : float | None = None ) -> bool :
407+ """Wait until all lines have been put into the queue."""
408+ return self ._lines_fed .wait (timeout = timeout )
409+
410+ def wait_until_consumed (self , timeout : float | None = None ) -> bool :
411+ """Wait until the iterator has reached EOF."""
412+ return self ._closed .wait (timeout = timeout )
413+
414+
324415@dataclass
325416class ScriptedProcess :
326- """Fake control-mode process that plays back scripted stdout and errors."""
417+ """Fake control-mode process that plays back scripted stdout and errors.
418+
419+ Uses ScriptedStdout (queue-backed iterator) instead of a tuple to match
420+ real subprocess I/O semantics where reads are blocking/async.
421+ """
327422
328423 stdin : t .TextIO | None
329424 stdout : t .Iterable [str ] | None
330425 stderr : t .Iterable [str ] | None
331426 pid : int | None = 4242
332427 broken_on_write : bool = False
333428 writes : list [str ] | None = None
429+ _stdin_impl : ScriptedStdin | None = None
430+ _stdout_impl : ScriptedStdout | None = None
334431
335432 def __init__ (
336433 self ,
337434 stdout_lines : list [str ],
338435 * ,
339436 broken_on_write : bool = False ,
340437 pid : int | None = 4242 ,
438+ line_delay : float = 0.0 ,
341439 ) -> None :
342- self .stdin = io .StringIO ()
343- self .stdout = tuple (stdout_lines )
344- self .stderr = ()
440+ """Initialize scripted process.
441+
442+ Parameters
443+ ----------
444+ stdout_lines : list[str]
445+ Lines to emit on stdout (without trailing newlines).
446+ broken_on_write : bool
447+ If True, writes to stdin raise BrokenPipeError.
448+ pid : int | None
449+ Process ID to report.
450+ line_delay : float
451+ Delay between stdout lines in seconds. Use for timeout tests.
452+ """
453+ self ._stdin_impl = ScriptedStdin (broken = broken_on_write )
454+ self .stdin = t .cast (t .TextIO , self ._stdin_impl )
455+ self ._stdout_impl = ScriptedStdout (stdout_lines , delay = line_delay )
456+ self .stdout : t .Iterable [str ] | None = self ._stdout_impl
457+ self .stderr : t .Iterable [str ] | None = iter (())
345458 self .pid = pid
346459 self .broken_on_write = broken_on_write
347460 self .writes = []
@@ -369,6 +482,18 @@ def write_line(self, line: str) -> None:
369482 assert self .writes is not None
370483 self .writes .append (line )
371484
485+ def wait_stdout_fed (self , timeout : float | None = None ) -> bool :
486+ """Wait until all stdout lines have been queued."""
487+ if self ._stdout_impl is None :
488+ return True
489+ return self ._stdout_impl .wait_until_fed (timeout )
490+
491+ def wait_stdout_consumed (self , timeout : float | None = None ) -> bool :
492+ """Wait until stdout iteration has reached EOF."""
493+ if self ._stdout_impl is None :
494+ return True
495+ return self ._stdout_impl .wait_until_consumed (timeout )
496+
372497
373498class ProcessFactory :
374499 """Scriptable process factory for control-mode tests."""
@@ -420,7 +545,14 @@ class RetryOutcome(t.NamedTuple):
420545def test_run_result_retries_with_process_factory (
421546 case : RetryOutcome ,
422547) -> None :
423- """run_result should restart and succeed after broken pipe or timeout."""
548+ """run_result should restart and succeed after broken pipe or timeout.
549+
550+ This test verifies that after a failure (broken pipe or timeout) on the
551+ first attempt, a subsequent call to run_result() succeeds with a fresh
552+ process.
553+
554+ Uses max_retries=0 so errors surface immediately on the first call.
555+ """
424556 # First process: either breaks on write or hangs (timeout path).
425557 if case .expect_timeout :
426558 first_stdout : list [str ] = [] # no output triggers timeout
@@ -431,22 +563,29 @@ def test_run_result_retries_with_process_factory(
431563
432564 first = ScriptedProcess (first_stdout , broken_on_write = broken , pid = 1111 )
433565
434- # Second process: successful %begin/%end for list-sessions.
566+ # Second process: successful %begin/%end for bootstrap AND list-sessions.
567+ # The reader will consume all lines, so we need output for:
568+ # 1. Bootstrap command (new-session): %begin/%end
569+ # 2. list-sessions command: %begin/%end
570+ # Small delay allows command registration before response is parsed.
435571 second = ScriptedProcess (
436572 [
437- "%begin 1 1 0" ,
438- "%end 1 1 0" ,
573+ "%begin 1 1 0" , # bootstrap begin
574+ "%end 1 1 0" , # bootstrap end
575+ "%begin 2 1 0" , # list-sessions begin
576+ "%end 2 1 0" , # list-sessions end
439577 ],
440578 pid = 2222 ,
579+ line_delay = 0.01 , # 10ms between lines for proper sequencing
441580 )
442581
443582 factory = ProcessFactory (deque ([first , second ]))
444583
445584 engine = ControlModeEngine (
446- command_timeout = 0.01 if case .expect_timeout else 5.0 ,
585+ command_timeout = 0.05 if case .expect_timeout else 5.0 ,
447586 process_factory = factory ,
448587 start_threads = True ,
449- max_retries = 1 ,
588+ max_retries = 0 , # No internal retries - error surfaces immediately
450589 )
451590
452591 if case .expect_timeout :
@@ -456,10 +595,11 @@ def test_run_result_retries_with_process_factory(
456595 with pytest .raises (exc .ControlModeConnectionError ):
457596 engine .run ("list-sessions" )
458597
598+ # After failure, _restarts should be incremented
459599 assert engine ._restarts == 1
460- assert factory .calls == 1 or factory . calls == 2
600+ assert factory .calls == 1
461601
462- # Second attempt should succeed.
602+ # Second attempt should succeed with fresh process .
463603 res = engine .run_result ("list-sessions" )
464604 assert res .exit_status is ExitStatus .OK
465605 assert engine ._restarts >= 1
@@ -489,11 +629,17 @@ class BackpressureFixture(t.NamedTuple):
489629)
490630def test_notifications_overflow_then_iter (case : BackpressureFixture ) -> None :
491631 """Flood notif queue then ensure iter_notifications still yields."""
492- # Build scripted process that emits many notifications and a single command block.
632+ # Build scripted process that emits:
633+ # 1. Bootstrap command response (%begin/%end)
634+ # 2. Many notifications (to overflow the queue)
635+ # 3. A command response for list-sessions
636+ bootstrap_block = ["%begin 1 1 0" , "%end 1 1 0" ]
493637 notif_lines = ["%sessions-changed" ] * case .overflow
494638 command_block = ["%begin 99 1 0" , "%end 99 1 0" ]
495- script = [* notif_lines , * command_block , "%exit" ]
496- factory = ProcessFactory (deque ([ScriptedProcess (script , pid = 3333 )]))
639+ script = [* bootstrap_block , * notif_lines , * command_block , "%exit" ]
640+ factory = ProcessFactory (
641+ deque ([ScriptedProcess (script , pid = 3333 , line_delay = 0.01 )])
642+ )
497643
498644 engine = ControlModeEngine (
499645 process_factory = factory ,
0 commit comments