Skip to content

Commit cfcc986

Browse files
committed
ControlModeEngine(test): Backpressure integration via scripted process
why: Replace xfail with deterministic notification overflow test using process_factory. what: - Add scripted-process factory to flood %sessions-changed then deliver a command block - Assert dropped_notifications increments and iter_notifications still yields - Remove backpressure xfail placeholder
1 parent 0969eb1 commit cfcc986

File tree

1 file changed

+21
-9
lines changed

1 file changed

+21
-9
lines changed

tests/test_control_mode_engine.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -471,10 +471,6 @@ class BackpressureFixture(t.NamedTuple):
471471
expect_iter: bool
472472

473473

474-
@pytest.mark.xfail(
475-
reason="control-mode notification backpressure integration not stable yet",
476-
strict=False,
477-
)
478474
@pytest.mark.parametrize(
479475
"case",
480476
[
@@ -489,13 +485,29 @@ class BackpressureFixture(t.NamedTuple):
489485
)
490486
def test_notifications_overflow_then_iter(case: BackpressureFixture) -> None:
491487
"""Flood notif queue then ensure iter_notifications still yields."""
492-
engine = ControlModeEngine()
493-
engine._protocol = ControlProtocol(notification_queue_size=case.queue_size)
494-
for _ in range(case.overflow):
495-
engine._protocol.feed_line("%sessions-changed")
488+
# Build scripted process that emits many notifications and a single command block.
489+
notif_lines = ["%sessions-changed"] * case.overflow
490+
command_block = ["%begin 99 1 0", "%end 99 1 0"]
491+
script = [*notif_lines, *command_block, "%exit"]
492+
factory = ProcessFactory(deque([ScriptedProcess(script, pid=3333)]))
493+
494+
engine = ControlModeEngine(
495+
process_factory=factory,
496+
start_threads=True,
497+
notification_queue_size=case.queue_size,
498+
)
499+
500+
# Run a dummy command to consume the %begin/%end.
501+
res = engine.run_result("list-sessions")
502+
assert res.exit_status is ExitStatus.OK
503+
504+
stats = engine.get_stats()
505+
assert stats.dropped_notifications >= case.overflow - case.queue_size
506+
496507
if case.expect_iter:
497-
notif = next(engine.iter_notifications(timeout=0.05), None)
508+
notif = next(engine.iter_notifications(timeout=0.1), None)
498509
assert notif is not None
510+
assert notif.kind.name == "SESSIONS_CHANGED"
499511

500512

501513
class TimeoutRestartFixture(t.NamedTuple):

0 commit comments

Comments
 (0)