diff --git a/adapters/cli_adapter.py b/adapters/cli_adapter.py index e756cb0..b1e4c73 100644 --- a/adapters/cli_adapter.py +++ b/adapters/cli_adapter.py @@ -2,9 +2,12 @@ Reads lines from stdin (or a pipe), outputs responses to stdout. Non-blocking: returns empty list if no input available. + +Works on both Unix (select) and Windows (msvcrt.kbhit for tty, +threading for pipes). """ -import select import sys +import threading import time from adapters.base import BaseAdapter, Message @@ -17,41 +20,84 @@ class CLIAdapter(BaseAdapter): def __init__(self, config): super().__init__(config) - self._seen = set() + self._queue = [] + self._lock = threading.Lock() + self._eof = False + self._reader_started = False + + def _start_pipe_reader(self): + """Start background thread to read from piped stdin without blocking.""" + if self._reader_started: + return + self._reader_started = True + + def reader(): + try: + for line in sys.stdin: + text = line.strip() + if text: + with self._lock: + self._queue.append(text) + except (EOFError, OSError): + pass + finally: + self._eof = True + + t = threading.Thread(target=reader, daemon=True) + t.start() def poll(self): """Read a line from stdin if available (non-blocking).""" - # On Windows, select doesn't work on stdin — use a simple approach - if hasattr(select, 'select') and sys.platform != 'win32': - ready, _, _ = select.select([sys.stdin], [], [], 0) - if not ready: + if sys.stdin.isatty(): + return self._poll_tty() + else: + return self._poll_pipe() + + def _poll_tty(self): + """Non-blocking read from interactive terminal.""" + if sys.platform == 'win32': + import msvcrt + if not msvcrt.kbhit(): return [] else: - # Windows: check if stdin has data (works for pipes) - if sys.stdin.isatty(): - return [] # Skip in interactive mode during poll loop - # For pipes, just try to read - pass + import select as _select + ready, _, _ = _select.select([sys.stdin], [], [], 0) + if not ready: + return [] try: line = sys.stdin.readline() except (EOFError, OSError): return [] - if not line: - return [] - - text = line.strip() + text = (line or '').strip() if not text: return [] - msg = Message( + return [Message( message_id=Message.make_id(text, 'cli-user'), sender='cli-user', text=text, timestamp=time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), - ) - return [msg] + )] + + def _poll_pipe(self): + """Non-blocking read from piped stdin via background thread.""" + self._start_pipe_reader() + + with self._lock: + lines = list(self._queue) + self._queue.clear() + + messages = [] + for text in lines: + messages.append(Message( + message_id=Message.make_id(text, 'cli-user'), + sender='cli-user', + text=text, + timestamp=time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), + )) + return messages def send(self, text): """Write response to stdout.""" diff --git a/scripts/test/test-webhook.sh b/scripts/test/test-webhook.sh index bebeaee..7463047 100644 --- a/scripts/test/test-webhook.sh +++ b/scripts/test/test-webhook.sh @@ -204,7 +204,8 @@ cfg = {'webhook_port': 18996, 'webhook_path': '/webhook/inbound'} adapter = WebhookAdapter(cfg) time.sleep(0.2) -# Oversized payload — should get 413 +# Oversized payload — should get 413 (or ConnectionAbortedError on Windows +# when server closes socket while client is still sending large body) big_body = b'x' * (MAX_BODY_SIZE + 1) req = urllib.request.Request('http://127.0.0.1:18996/webhook/inbound', data=big_body, method='POST') req.add_header('Content-Type', 'application/json') @@ -214,6 +215,8 @@ try: assert False, 'should have gotten 413' except urllib.error.HTTPError as e: assert e.code == 413, f'expected 413, got {e.code}' +except (ConnectionAbortedError, ConnectionResetError, urllib.error.URLError): + pass # Windows: server closes connection while client sends large body adapter.shutdown() print(' PASS: oversized payload rejected with 413')