From de005de23bba3c851c5db1cbc1aa17a42d823dc6 Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Thu, 21 Aug 2025 14:39:49 +0200 Subject: [PATCH 1/2] feat: add synchronous Wokwi client and corresponding tests --- .pre-commit-config.yaml | 1 + examples/hello_esp32/main.py | 6 + examples/hello_esp32_sync/.gitignore | 3 + examples/hello_esp32_sync/__init__.py | 0 examples/hello_esp32_sync/diagram.json | 22 +++ examples/hello_esp32_sync/main.py | 66 ++++++++ src/wokwi_client/__init__.py | 3 +- src/wokwi_client/client.py | 51 +++++- src/wokwi_client/client_sync.py | 212 +++++++++++++++++++++++++ src/wokwi_client/client_sync.pyi | 59 +++++++ tests/test_hello_esp32.py | 7 + 11 files changed, 428 insertions(+), 2 deletions(-) create mode 100644 examples/hello_esp32_sync/.gitignore create mode 100644 examples/hello_esp32_sync/__init__.py create mode 100644 examples/hello_esp32_sync/diagram.json create mode 100644 examples/hello_esp32_sync/main.py create mode 100644 src/wokwi_client/client_sync.py create mode 100644 src/wokwi_client/client_sync.pyi diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5f9399b..6a5b2f6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,6 +13,7 @@ repos: rev: v1.16.1 hooks: - id: mypy + exclude: ^src/wokwi_client/client_sync\.pyi$ additional_dependencies: [pydantic==2.8.0, typing-extensions, types-click, types-requests] diff --git a/examples/hello_esp32/main.py b/examples/hello_esp32/main.py index 99e39df..c688d67 100644 --- a/examples/hello_esp32/main.py +++ b/examples/hello_esp32/main.py @@ -53,6 +53,12 @@ async def main() -> None: # Stream serial output for a few seconds serial_task = asyncio.create_task(client.serial_monitor_cat()) + + # Alternative lambda version + # serial_task = client.serial_monitor( + # lambda line: print(line.decode("utf-8", errors="replace"), end="", flush=True) + # ) + print(f"Simulation started, waiting for {SLEEP_TIME} seconds…") await client.wait_until_simulation_time(SLEEP_TIME) serial_task.cancel() diff --git a/examples/hello_esp32_sync/.gitignore b/examples/hello_esp32_sync/.gitignore new file mode 100644 index 0000000..359127f --- /dev/null +++ b/examples/hello_esp32_sync/.gitignore @@ -0,0 +1,3 @@ +# Ignore the firmware files, as they are downloaded from the internet +hello_world.bin +hello_world.elf diff --git a/examples/hello_esp32_sync/__init__.py b/examples/hello_esp32_sync/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/hello_esp32_sync/diagram.json b/examples/hello_esp32_sync/diagram.json new file mode 100644 index 0000000..6b9b1dd --- /dev/null +++ b/examples/hello_esp32_sync/diagram.json @@ -0,0 +1,22 @@ +{ + "version": 1, + "author": "Uri Shaked", + "editor": "wokwi", + "parts": [ + { + "type": "wokwi-esp32-devkit-v1", + "id": "esp", + "top": 0, + "left": 0, + "attrs": { "fullBoot": "1" } + } + ], + "connections": [ + ["esp:TX0", "$serialMonitor:RX", "", []], + ["esp:RX0", "$serialMonitor:TX", "", []] + ], + "serialMonitor": { + "display": "terminal" + }, + "dependencies": {} +} diff --git a/examples/hello_esp32_sync/main.py b/examples/hello_esp32_sync/main.py new file mode 100644 index 0000000..44476d6 --- /dev/null +++ b/examples/hello_esp32_sync/main.py @@ -0,0 +1,66 @@ +# SPDX-License-Identifier: MIT +# Copyright (C) 2025, CodeMagic LTD + +import os +from pathlib import Path + +import requests + +from wokwi_client import GET_TOKEN_URL, WokwiClientSync + +EXAMPLE_DIR = Path(__file__).parent +HELLO_WORLD_URL = "https://github.com/wokwi/esp-idf-hello-world/raw/refs/heads/main/bin" +FIRMWARE_FILES = { + "hello_world.bin": f"{HELLO_WORLD_URL}/hello_world.bin", + "hello_world.elf": f"{HELLO_WORLD_URL}/hello_world.elf", +} +SLEEP_TIME = int(os.getenv("WOKWI_SLEEP_TIME", "10")) + + +def main() -> None: + token = os.getenv("WOKWI_CLI_TOKEN") + if not token: + raise SystemExit( + f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}." + ) + + for filename, url in FIRMWARE_FILES.items(): + if (EXAMPLE_DIR / filename).exists(): + continue + print(f"Downloading {filename} from {url}") + response = requests.get(url) + response.raise_for_status() + with open(EXAMPLE_DIR / filename, "wb") as f: + f.write(response.content) + + client = WokwiClientSync(token) + print(f"Wokwi client library version: {client.version}") + + hello = client.connect() + print("Connected to Wokwi Simulator, server version:", hello["version"]) + + # Upload the diagram and firmware files + client.upload_file("diagram.json", EXAMPLE_DIR / "diagram.json") + client.upload_file("hello_world.bin", EXAMPLE_DIR / "hello_world.bin") + client.upload_file("hello_world.elf", EXAMPLE_DIR / "hello_world.elf") + + # Start the simulation + client.start_simulation( + firmware="hello_world.bin", + elf="hello_world.elf", + ) + + # Stream serial output for a few seconds (non-blocking) + client.serial_monitor_cat() + # Alternative lambda version + # client.serial_monitor(lambda line: print(line.decode("utf-8", errors="replace"), end="", flush=True)) + + print(f"Simulation started, waiting for {SLEEP_TIME} seconds…") + client.wait_until_simulation_time(SLEEP_TIME) + + # Disconnect from the simulator + client.disconnect() + + +if __name__ == "__main__": + main() diff --git a/src/wokwi_client/__init__.py b/src/wokwi_client/__init__.py index 7fbe634..0b91573 100644 --- a/src/wokwi_client/__init__.py +++ b/src/wokwi_client/__init__.py @@ -12,7 +12,8 @@ from .__version__ import get_version from .client import WokwiClient +from .client_sync import WokwiClientSync from .constants import GET_TOKEN_URL __version__ = get_version() -__all__ = ["WokwiClient", "__version__", "GET_TOKEN_URL"] +__all__ = ["WokwiClient", "WokwiClientSync", "__version__", "GET_TOKEN_URL"] diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index 03cf878..9acbb92 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -2,9 +2,11 @@ # # SPDX-License-Identifier: MIT +import asyncio import base64 +import inspect from pathlib import Path -from typing import Any, Optional, Union, cast +from typing import Any, Callable, Optional, Union, cast from .__version__ import get_version from .constants import DEFAULT_WS_URL @@ -48,6 +50,7 @@ def __init__(self, token: str, server: Optional[str] = None): self.last_pause_nanos = 0 self._transport.add_event_listener("sim:pause", self._on_pause) self._pause_queue = EventQueue(self._transport, "sim:pause") + self._serial_monitor_tasks: set[asyncio.Task[None]] = set() async def connect(self) -> dict[str, Any]: """ @@ -61,7 +64,10 @@ async def connect(self) -> dict[str, Any]: async def disconnect(self) -> None: """ Disconnect from the Wokwi simulator server. + + This also stops all active serial monitors. """ + self.stop_serial_monitors() await self._transport.close() async def upload(self, name: str, content: bytes) -> None: @@ -188,6 +194,49 @@ async def restart_simulation(self, pause: bool = False) -> None: """ await restart(self._transport, pause) + def serial_monitor(self, callback: Callable[[bytes], Any]) -> asyncio.Task[None]: + """ + Start monitoring the serial output in the background and invoke `callback` for each line. + + This method **does not block**: it creates and returns an asyncio.Task that runs until the + transport is closed or the task is cancelled. The callback may be synchronous or async. + + Example: + task = client.serial_monitor(lambda line: print(line.decode(), end="")) + ... do other async work ... + task.cancel() + """ + + async def _runner() -> None: + try: + async for line in monitor_lines(self._transport): + try: + result = callback(line) + if inspect.isawaitable(result): + await result + except Exception: + # Swallow callback exceptions to keep the monitor alive. + # Users can add their own error handling inside the callback. + pass + finally: + # Clean up task from the set when it completes + self._serial_monitor_tasks.discard(task) + + task = asyncio.create_task(_runner(), name="wokwi-serial-monitor") + self._serial_monitor_tasks.add(task) + return task + + def stop_serial_monitors(self) -> None: + """ + Stop all active serial monitor tasks. + + This method cancels all tasks created by the serial_monitor method. + After calling this method, all active serial monitors will stop receiving data. + """ + for task in self._serial_monitor_tasks.copy(): + task.cancel() + self._serial_monitor_tasks.clear() + async def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace") -> None: """ Print serial monitor output to stdout as it is received from the simulation. diff --git a/src/wokwi_client/client_sync.py b/src/wokwi_client/client_sync.py new file mode 100644 index 0000000..d6cf3a1 --- /dev/null +++ b/src/wokwi_client/client_sync.py @@ -0,0 +1,212 @@ +""" +Synchronous Wokwi Client Library + +This module exposes a blocking (sync) interface that mirrors the async +`WokwiClient` API by running an asyncio loop in a dedicated thread and +delegating all coroutine calls to that loop. +""" + +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +from __future__ import annotations + +import asyncio +import contextlib +import inspect +import threading +from collections.abc import Coroutine +from concurrent.futures import Future +from concurrent.futures import TimeoutError as FutureTimeoutError +from typing import Any, Callable, TypeVar + +from .client import WokwiClient +from .serial import monitor_lines + +T = TypeVar("T") + + +class WokwiClientSync: + """ + Synchronous client for the Wokwi Simulation API. + + Design: + • A private asyncio loop runs on a dedicated background thread. + • Public methods mirror the async API by submitting the underlying + coroutine calls onto that loop and waiting for results (blocking). + • Long-lived streamers (serial monitors) are scheduled on the loop and + tracked, so we can cancel & drain them on `disconnect()`. + """ + + # Public attributes mirrored for convenience + version: str + last_pause_nanos: int # this proxy resolves via __getattr__ + + def __init__(self, token: str, server: str | None = None): + # Create a fresh event loop + thread (daemon so it won't prevent process exit). + self._loop = asyncio.new_event_loop() + self._thread = threading.Thread( + target=self._run_loop, args=(self._loop,), daemon=True, name="wokwi-sync-loop" + ) + self._thread.start() + + # Underlying async client + self._async_client = WokwiClient(token, server) + + # Mirror library version for quick access + self.version = self._async_client.version + + # Track background tasks created via run_coroutine_threadsafe (serial monitors) + self._bg_futures: set[Future[Any]] = set() + + # Idempotent disconnect guard + self._closed = False + + @staticmethod + def _run_loop(loop: asyncio.AbstractEventLoop) -> None: + """Background thread loop runner.""" + asyncio.set_event_loop(loop) + loop.run_forever() + + # ----- Internal helpers ------------------------------------------------- + def _submit(self, coro: Coroutine[Any, Any, T]) -> Future[T]: + """Submit a coroutine to the loop and return its concurrent.futures.Future.""" + return asyncio.run_coroutine_threadsafe(coro, self._loop) + + def _call(self, coro: Coroutine[Any, Any, T]) -> T: + """Submit a coroutine to the loop and block until it completes (or raises).""" + return self._submit(coro).result() + + def _add_bg_future(self, fut: Future[Any]) -> None: + """Track a background future so we can cancel & drain on shutdown.""" + self._bg_futures.add(fut) + + # ----- Context manager sugar ------------------------------------------- + def __enter__(self) -> WokwiClientSync: + self.connect() + return self + + def __exit__(self, exc_type: type, exc_val: Exception, exc_tb: Any) -> None: + self.disconnect() + + # ----- Lifecycle -------------------------------------------------------- + def connect(self) -> dict[str, Any]: + """Connect to the simulator (blocking) and return server info.""" + return self._call(self._async_client.connect()) + + def disconnect(self) -> None: + """Disconnect and stop the background loop. + + Order matters: + 1) Cancel and drain background serial-monitor futures. + 2) Disconnect the underlying transport. + 3) Stop the loop and join the thread. + Safe to call multiple times. + """ + if self._closed: + return + self._closed = True + + # (1) Cancel + drain monitors + for fut in list(self._bg_futures): + fut.cancel() + for fut in list(self._bg_futures): + with contextlib.suppress(FutureTimeoutError, Exception): + # Give each monitor a short window to handle cancellation cleanly. + fut.result(timeout=1.0) + self._bg_futures.discard(fut) + + # (2) Disconnect transport + with contextlib.suppress(Exception): + self._call(self._async_client.disconnect()) + + # (3) Stop loop / join thread + if self._loop.is_running(): + self._loop.call_soon_threadsafe(self._loop.stop) + if self._thread.is_alive(): + self._thread.join(timeout=5.0) + + # ----- Serial monitoring ------------------------------------------------ + def serial_monitor(self, callback: Callable[[bytes], Any]) -> None: + """ + Start monitoring the serial output in the background and invoke `callback` + for each line. Non-blocking. Runs until `disconnect()`. + + The callback may be sync or async. Exceptions raised by the callback are + swallowed to keep the monitor alive (add your own logging as needed). + """ + + async def _runner() -> None: + async for line in monitor_lines(self._async_client._transport): + try: + maybe_awaitable = callback(line) + if inspect.isawaitable(maybe_awaitable): + await maybe_awaitable + except Exception: + # Keep the monitor alive even if the callback throws. + pass + + fut = self._submit(_runner()) + self._add_bg_future(fut) + + def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace") -> None: + """ + Print serial monitor output in the background (non-blocking). Runs until `disconnect()`. + + Args: + decode_utf8: Whether to decode bytes as UTF-8 (default True). + errors: UTF-8 decoding error strategy ('strict'|'ignore'|'replace'). + """ + + async def _runner() -> None: + async for line in monitor_lines(self._async_client._transport): + try: + if decode_utf8: + try: + print(line.decode("utf-8", errors=errors), end="", flush=True) + except UnicodeDecodeError: + print(line, end="", flush=True) + else: + print(line, end="", flush=True) + except Exception: + # Keep the monitor alive even if printing raises intermittently. + pass + + fut = self._submit(_runner()) + self._add_bg_future(fut) + + def stop_serial_monitors(self) -> None: + """ + Cancel and drain all running serial monitors without disconnecting. + + Useful if you want to stop printing but keep the connection alive. + """ + for fut in list(self._bg_futures): + fut.cancel() + for fut in list(self._bg_futures): + with contextlib.suppress(FutureTimeoutError, Exception): + fut.result(timeout=1.0) + self._bg_futures.discard(fut) + + # ----- Dynamic method wrapping ----------------------------------------- + def __getattr__(self, name: str) -> Any: + """ + Delegate attribute access to the underlying async client. + + If the attribute on `WokwiClient` is a coroutine function, return a + sync wrapper that blocks until the coroutine completes. + """ + # Explicit methods above (serial monitors) take precedence. + attr = getattr(self._async_client, name) + if callable(attr): + func = getattr(WokwiClient, name, None) + if func is not None and inspect.iscoroutinefunction(func): + + def sync_wrapper(*args: Any, **kwargs: Any) -> Any: + return self._call(attr(*args, **kwargs)) + + sync_wrapper.__name__ = name + sync_wrapper.__doc__ = func.__doc__ + return sync_wrapper + return attr diff --git a/src/wokwi_client/client_sync.pyi b/src/wokwi_client/client_sync.pyi new file mode 100644 index 0000000..a6edea5 --- /dev/null +++ b/src/wokwi_client/client_sync.pyi @@ -0,0 +1,59 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any, Callable + +from .pins import PinReadMessage + +__all__ = ["WokwiClientSync"] + +class WokwiClientSync: + """ + Synchronous client for the Wokwi Simulation API. + + This stub provides precise types for IDEs and type checkers. The runtime + implementation mirrors the async API of `WokwiClient`, but executes calls + synchronously by delegating to an internal event loop. + """ + + version: str + last_pause_nanos: int + + def __init__(self, token: str, server: str | None = None) -> None: ... + + # Context manager + def __enter__(self) -> WokwiClientSync: ... + def __exit__(self, exc_type: type, exc_val: Exception, exc_tb: Any) -> None: ... + + # Lifecycle + def connect(self) -> dict[str, Any]: ... + def disconnect(self) -> None: ... + + # Serial monitoring (non-blocking background tasks managed internally) + def serial_monitor(self, callback: Callable[[bytes], Any]) -> None: ... + def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace") -> None: ... + def stop_serial_monitors(self) -> None: ... + + # Methods mirrored from WokwiClient (sync variants) + def upload(self, name: str, content: bytes) -> None: ... + def upload_file(self, filename: str, local_path: Path | None = None) -> None: ... + def download(self, name: str) -> bytes: ... + def download_file(self, name: str, local_path: Path | None = None) -> None: ... + def start_simulation( + self, + firmware: str, + elf: str | None = None, + pause: bool = False, + chips: list[str] = ..., # default empty list at runtime + ) -> None: ... + def pause_simulation(self) -> None: ... + def resume_simulation(self, pause_after: int | None = None) -> None: ... + def wait_until_simulation_time(self, seconds: float) -> None: ... + def restart_simulation(self, pause: bool = False) -> None: ... + def serial_write(self, data: bytes | str | list[int]) -> None: ... + def read_pin(self, part: str, pin: str) -> PinReadMessage: ... + def listen_pin(self, part: str, pin: str, listen: bool = True) -> None: ... + def gpio_list(self) -> list[str]: ... + def set_control(self, part: str, control: str, value: int | bool | float) -> None: ... + def read_framebuffer_png_bytes(self, id: str) -> bytes: ... + def save_framebuffer_png(self, id: str, path: Path, overwrite: bool = True) -> Path: ... diff --git a/tests/test_hello_esp32.py b/tests/test_hello_esp32.py index 8045b2c..21f0e31 100644 --- a/tests/test_hello_esp32.py +++ b/tests/test_hello_esp32.py @@ -10,3 +10,10 @@ def test_hello_esp32_example() -> None: result = run_example_module("examples.hello_esp32.main") assert result.returncode == 0 assert "main_task: Calling app_main()" in result.stdout + + +def test_hello_esp32_sync_example() -> None: + """Sync hello_esp32 example should run and exit with 0.""" + result = run_example_module("examples.hello_esp32_sync.main") + assert result.returncode == 0 + assert "main_task: Calling app_main()" in result.stdout From 1a64d845f7a2eae4b1fbb6adcd2a3a7f31c715df Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Thu, 21 Aug 2025 15:01:42 +0200 Subject: [PATCH 2/2] fix: update disconnect method to close transport directly --- src/wokwi_client/client.py | 5 ++++- src/wokwi_client/client_sync.py | 2 +- src/wokwi_client/event_queue.py | 6 +++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index 9acbb92..20f0fd0 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -49,7 +49,8 @@ def __init__(self, token: str, server: Optional[str] = None): self._transport = Transport(token, server or DEFAULT_WS_URL) self.last_pause_nanos = 0 self._transport.add_event_listener("sim:pause", self._on_pause) - self._pause_queue = EventQueue(self._transport, "sim:pause") + # Lazily create in an active event loop (important for py3.9 and sync client) + self._pause_queue: Optional[EventQueue] = None self._serial_monitor_tasks: set[asyncio.Task[None]] = set() async def connect(self) -> dict[str, Any]: @@ -181,6 +182,8 @@ async def wait_until_simulation_time(self, seconds: float) -> None: await pause(self._transport) remaining_nanos = seconds * 1e9 - self.last_pause_nanos if remaining_nanos > 0: + if self._pause_queue is None: + self._pause_queue = EventQueue(self._transport, "sim:pause") self._pause_queue.flush() await resume(self._transport, int(remaining_nanos)) await self._pause_queue.get() diff --git a/src/wokwi_client/client_sync.py b/src/wokwi_client/client_sync.py index d6cf3a1..daca415 100644 --- a/src/wokwi_client/client_sync.py +++ b/src/wokwi_client/client_sync.py @@ -119,7 +119,7 @@ def disconnect(self) -> None: # (2) Disconnect transport with contextlib.suppress(Exception): - self._call(self._async_client.disconnect()) + self._call(self._async_client._transport.close()) # (3) Stop loop / join thread if self._loop.is_running(): diff --git a/src/wokwi_client/event_queue.py b/src/wokwi_client/event_queue.py index c4c17c9..346515a 100644 --- a/src/wokwi_client/event_queue.py +++ b/src/wokwi_client/event_queue.py @@ -23,12 +23,16 @@ class EventQueue: """A queue for events from a specific event type.""" def __init__(self, transport: Transport, event_type: str) -> None: + # Bind the queue to the current running loop (important for py3.9) + self._loop = asyncio.get_running_loop() self._queue: asyncio.Queue[EventMessage] = asyncio.Queue() self._transport = transport self._event_type = event_type def listener(event: EventMessage) -> None: - self._queue.put_nowait(event) + # Ensure we enqueue on the loop that owns the queue, even if + # the listener is invoked from a different loop/thread. + self._loop.call_soon_threadsafe(self._queue.put_nowait, event) self._listener = listener self._transport.add_event_listener(self._event_type, self._listener)