From 7e6c4993af14642d89082a2bf17e11e880200124 Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Mon, 11 Aug 2025 12:36:38 +0200 Subject: [PATCH 01/10] feat: add pin control and reading functionalities --- src/wokwi_client/client.py | 34 +++++++++++++++++++++++++ src/wokwi_client/control.py | 31 +++++++++++++++++++++++ src/wokwi_client/pins.py | 49 +++++++++++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+) create mode 100644 src/wokwi_client/control.py create mode 100644 src/wokwi_client/pins.py diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index 01354ae..7308300 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -7,8 +7,10 @@ from .__version__ import get_version from .constants import DEFAULT_WS_URL +from .control import set_control from .event_queue import EventQueue from .file_ops import upload, upload_file +from .pins import pin_listen, pin_read from .protocol_types import EventMessage, ResponseMessage from .serial import monitor_lines from .simulation import pause, restart, resume, start @@ -181,3 +183,35 @@ async def serial_monitor_cat(self) -> None: def _on_pause(self, event: EventMessage) -> None: self.last_pause_nanos = int(event["nanos"]) + + async def read_pin(self, part: str, pin: str) -> ResponseMessage: + """Read the current state of a pin. + + Args: + part: The part id (e.g. "uno"). + pin: The pin name (e.g. "A2"). + """ + return await pin_read(self._transport, part=part, pin=pin) + + async def listen_pin(self, part: str, pin: str, listen: bool = True) -> ResponseMessage: + """Start or stop listening for changes on a pin. + + When enabled, "pin:change" events will be delivered via the transport's + event mechanism. + + Args: + part: The part id. + pin: The pin name. + listen: True to start listening, False to stop. + """ + return await pin_listen(self._transport, part=part, pin=pin, listen=listen) + + async def set_control(self, part: str, control: str, value: int | bool | float) -> ResponseMessage: + """Set a control value (e.g. simulate button press). + + Args: + part: Part id (e.g. "btn1"). + control: Control name (e.g. "pressed"). + value: Control value to set (float). + """ + return await set_control(self._transport, part=part, control=control, value=value) diff --git a/src/wokwi_client/control.py b/src/wokwi_client/control.py new file mode 100644 index 0000000..0d5a612 --- /dev/null +++ b/src/wokwi_client/control.py @@ -0,0 +1,31 @@ +"""Control command helpers for virtual parts. + +Provides `set_control` to manipulate part controls (e.g. press a button). + +Assumptions: +* Underlying websocket command name: "control:set". +* Parameter names expected by server: part, control, value. +""" + +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +from .protocol_types import ResponseMessage +from .transport import Transport + + +async def set_control( + transport: Transport, *, part: str, control: str, value: int | bool | float +) -> ResponseMessage: + """Set a control value on a part (e.g. simulate button press/release). + + Args: + transport: Active Transport. + part: Part identifier (e.g. "btn1"). + control: Control name (e.g. "pressed"). + value: Control value to set (float). + """ + return await transport.request( + "control:set", {"part": part, "control": control, "value": float(value)} + ) diff --git a/src/wokwi_client/pins.py b/src/wokwi_client/pins.py new file mode 100644 index 0000000..5e7ce7f --- /dev/null +++ b/src/wokwi_client/pins.py @@ -0,0 +1,49 @@ +"""Pin command helpers for the Wokwi Simulation API. + +This module exposes helper coroutines for issuing pin-related commands: + +* pin:read - Read the current state of a pin. +* pin:listen - Start/stop listening for changes on a pin (emits pin:change + events). +""" + +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +from .protocol_types import ResponseMessage +from .transport import Transport + + +async def pin_read( + transport: Transport, *, part: str, pin: str +) -> ResponseMessage: + """Read the state of a pin. + + Args: + transport: The active Transport instance. + part: Part identifier (e.g. "uno"). + pin: Pin name (e.g. "A2", "13"). + """ + + return await transport.request("pin:read", {"part": part, "pin": pin}) + + +async def pin_listen( + transport: Transport, *, part: str, pin: str, listen: bool = True +) -> ResponseMessage: + """Enable or disable listening for changes on a pin. + + When listening is enabled, "pin:change" events will be emitted with the + pin state. + + Args: + transport: The active Transport instance. + part: Part identifier. + pin: Pin name. + listen: True to start listening, False to stop. + """ + + return await transport.request( + "pin:listen", {"part": part, "pin": pin, "listen": listen} + ) From 6f068f39af4d10d12005890007a0b83716ad26d1 Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Mon, 11 Aug 2025 12:38:43 +0200 Subject: [PATCH 02/10] docs: update features list to include control of peripherals and GPIO pins --- docs/index.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/index.md b/docs/index.md index 953341f..6e5553d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -8,6 +8,7 @@ Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API**. - Upload diagrams and firmware files - Start, pause, resume, and restart simulations - Monitor serial output asynchronously +- Control peripherals and read GPIO pins - Fully type-annotated and easy to use with asyncio ## Installation From 538191a0252942d9db8309eb52948bc558b46129 Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Tue, 12 Aug 2025 12:36:18 +0200 Subject: [PATCH 03/10] feat: enhance serial monitor functionality with write support and update documentation --- README.md | 4 +++- docs/index.md | 2 +- src/wokwi_client/client.py | 6 +++++- src/wokwi_client/serial.py | 16 +++++++++++++++- 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index b3be48e..999efaa 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,9 @@ The basic example is in the [examples/hello_esp32/main.py](examples/hello_esp32/ - Connect to the Wokwi Simulator - Upload a diagram and firmware files - Start a simulation -- Monitor the serial output +- Monitor the serial output and write to them +- Read GPIO pins +- Control peripherals (buttons, MPU6050, etc.) You can run the example with: diff --git a/docs/index.md b/docs/index.md index 6e5553d..5f684c6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -7,7 +7,7 @@ Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API**. - Connect to the Wokwi Simulator from Python - Upload diagrams and firmware files - Start, pause, resume, and restart simulations -- Monitor serial output asynchronously +- Monitor serial output asynchronously and write to them - Control peripherals and read GPIO pins - Fully type-annotated and easy to use with asyncio diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index 7308300..9858c40 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -12,7 +12,7 @@ from .file_ops import upload, upload_file from .pins import pin_listen, pin_read from .protocol_types import EventMessage, ResponseMessage -from .serial import monitor_lines +from .serial import monitor_lines, write_serial from .simulation import pause, restart, resume, start from .transport import Transport @@ -181,6 +181,10 @@ async def serial_monitor_cat(self) -> None: async for line in monitor_lines(self._transport): print(line.decode("utf-8"), end="", flush=True) + async def serial_write(self, data: bytes | str | list[int]) -> None: + """Write data to the simulation serial monitor interface.""" + await write_serial(self._transport, data) + def _on_pause(self, event: EventMessage) -> None: self.last_pause_nanos = int(event["nanos"]) diff --git a/src/wokwi_client/serial.py b/src/wokwi_client/serial.py index fbaea4c..3563740 100644 --- a/src/wokwi_client/serial.py +++ b/src/wokwi_client/serial.py @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: MIT -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Iterable from .event_queue import EventQueue from .transport import Transport @@ -14,3 +14,17 @@ async def monitor_lines(transport: Transport) -> AsyncGenerator[bytes, None]: while True: event_msg = await queue.get() yield bytes(event_msg["payload"]["bytes"]) + + +async def write_serial(transport: Transport, data: bytes | str | Iterable[int]) -> None: + """Write data to the serial monitor. + + Accepts bytes, str (encoded as utf-8), or an iterable of integer byte values. + """ + if isinstance(data, str): + payload = list(data.encode("utf-8")) + elif isinstance(data, bytes): + payload = list(data) + else: + payload = list(int(b) & 0xFF for b in data) + await transport.request("serial-monitor:write", {"bytes": payload}) From cc325dfcbc90ba89595e0e9368ba3c3828a3def0 Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Tue, 12 Aug 2025 16:19:00 +0200 Subject: [PATCH 04/10] feat: add synchronous client wrapper and example usage for Wokwi simulation --- README.md | 2 +- docs/index.md | 100 +++++++++----- examples/hello_esp32_sync/.gitignore | 3 + examples/hello_esp32_sync/__init__.py | 0 examples/hello_esp32_sync/diagram.json | 22 +++ examples/hello_esp32_sync/main.py | 63 +++++++++ mkdocs.yml | 2 + src/wokwi_client/__init__.py | 3 +- src/wokwi_client/client_sync.py | 180 +++++++++++++++++++++++++ tests/test_hello_esp32.py | 23 ++-- tests/test_micropython_esp32.py | 14 ++ tests/utils.py | 37 +++++ 12 files changed, 401 insertions(+), 48 deletions(-) create mode 100644 examples/hello_esp32_sync/.gitignore create mode 100644 examples/hello_esp32_sync/__init__.py create mode 100644 examples/hello_esp32_sync/diagram.json create mode 100644 examples/hello_esp32_sync/main.py create mode 100644 src/wokwi_client/client_sync.py create mode 100644 tests/test_micropython_esp32.py create mode 100644 tests/utils.py diff --git a/README.md b/README.md index 999efaa..ff98020 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Wokwi Python Client 🚀 -Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API** +Typed Python SDK for the **Wokwi Simulation API**, with async and sync APIs [![PyPI version](https://img.shields.io/pypi/v/wokwi-client?logo=pypi)](https://pypi.org/project/wokwi-client/) [![Python versions](https://img.shields.io/pypi/pyversions/wokwi-client)](https://pypi.org/project/wokwi-client/) diff --git a/docs/index.md b/docs/index.md index 5f684c6..934d73f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,6 +1,6 @@ # Wokwi Python Client Library -Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API**. +Typed Python SDK for the **Wokwi Simulation API**, with async and sync APIs. ## Features @@ -10,6 +10,7 @@ Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API**. - Monitor serial output asynchronously and write to them - Control peripherals and read GPIO pins - Fully type-annotated and easy to use with asyncio +- Async and sync APIs ## Installation @@ -25,37 +26,74 @@ Get your API token from [https://wokwi.com/dashboard/ci](https://wokwi.com/dashb ## Quickstart Example -```python -import asyncio -import os -from wokwi_client import WokwiClient, GET_TOKEN_URL - - -async def main(): - token = os.getenv("WOKWI_CLI_TOKEN") - if not token: - raise SystemExit( - f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}." - ) - - client = WokwiClient(token) - await client.connect() - await client.upload_file("diagram.json") - await client.upload_file("firmware.bin") - await client.start_simulation(firmware="firmware.bin") - serial_task = asyncio.create_task( - client.serial_monitor_cat() - ) # Stream serial output - await client.wait_until_simulation_time(10) # Run simulation for 10 seconds - serial_task.cancel() - await client.disconnect() - - -if __name__ == "__main__": - asyncio.run(main()) -``` +=== "Async (recommended)" + + ```python + import asyncio + import os + from wokwi_client import WokwiClient, GET_TOKEN_URL + + + async def main(): + token = os.getenv("WOKWI_CLI_TOKEN") + if not token: + raise SystemExit( + f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}." + ) + + client = WokwiClient(token) + await client.connect() + await client.upload_file("diagram.json") + await client.upload_file("firmware.bin") + await client.start_simulation(firmware="firmware.bin") + + serial_task = asyncio.create_task( + client.serial_monitor_cat() + ) # Stream serial output + await client.wait_until_simulation_time(10) # Run simulation for 10 seconds + serial_task.cancel() + await client.disconnect() + + + if __name__ == "__main__": + asyncio.run(main()) + ``` + +=== "Sync" + + ```python + import os + from wokwi_client import WokwiClientSync, GET_TOKEN_URL + + + def main(): + token = os.getenv("WOKWI_CLI_TOKEN") + if not token: + raise SystemExit( + f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}." + ) + + client = WokwiClientSync(token) + client.connect() + client.upload_file("diagram.json") + client.upload_file("firmware.bin") + client.start_simulation(firmware="firmware.bin") + + # Stream serial output concurrently for 10 seconds + client.monitor_serial(lambda line: print(line.decode("utf-8"), end="", flush=True)) + client.wait_until_simulation_time(10) + client.disconnect() + + + if __name__ == "__main__": + main() + ``` + +## Examples -See the [examples/hello_esp32/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/hello_esp32/main.py) for a full example including serial monitoring, and [examples/micropython_esp32/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/micropython_esp32/main.py) for an example of running MicroPython on a simulated ESP32 board. +- [examples/hello_esp32/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/hello_esp32/main.py) full async example including serial monitoring +- [examples/hello_esp32_sync/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/hello_esp32_sync/main.py) sync wrapper usage +- [examples/micropython_esp32/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/micropython_esp32/main.py) example of running MicroPython on a simulated ESP32 board. ## API Reference diff --git a/examples/hello_esp32_sync/.gitignore b/examples/hello_esp32_sync/.gitignore new file mode 100644 index 0000000..359127f --- /dev/null +++ b/examples/hello_esp32_sync/.gitignore @@ -0,0 +1,3 @@ +# Ignore the firmware files, as they are downloaded from the internet +hello_world.bin +hello_world.elf diff --git a/examples/hello_esp32_sync/__init__.py b/examples/hello_esp32_sync/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/hello_esp32_sync/diagram.json b/examples/hello_esp32_sync/diagram.json new file mode 100644 index 0000000..6b9b1dd --- /dev/null +++ b/examples/hello_esp32_sync/diagram.json @@ -0,0 +1,22 @@ +{ + "version": 1, + "author": "Uri Shaked", + "editor": "wokwi", + "parts": [ + { + "type": "wokwi-esp32-devkit-v1", + "id": "esp", + "top": 0, + "left": 0, + "attrs": { "fullBoot": "1" } + } + ], + "connections": [ + ["esp:TX0", "$serialMonitor:RX", "", []], + ["esp:RX0", "$serialMonitor:TX", "", []] + ], + "serialMonitor": { + "display": "terminal" + }, + "dependencies": {} +} diff --git a/examples/hello_esp32_sync/main.py b/examples/hello_esp32_sync/main.py new file mode 100644 index 0000000..e1e009c --- /dev/null +++ b/examples/hello_esp32_sync/main.py @@ -0,0 +1,63 @@ +# SPDX-License-Identifier: MIT +# Copyright (C) 2025, CodeMagic LTD + +import os +from pathlib import Path + +import requests + +from wokwi_client import GET_TOKEN_URL, WokwiClientSync + +EXAMPLE_DIR = Path(__file__).parent +HELLO_WORLD_URL = "https://github.com/wokwi/esp-idf-hello-world/raw/refs/heads/main/bin" +FIRMWARE_FILES = { + "hello_world.bin": f"{HELLO_WORLD_URL}/hello_world.bin", + "hello_world.elf": f"{HELLO_WORLD_URL}/hello_world.elf", +} +SLEEP_TIME = int(os.getenv("WOKWI_SLEEP_TIME", "10")) + + +def main() -> None: + token = os.getenv("WOKWI_CLI_TOKEN") + if not token: + raise SystemExit( + f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}." + ) + + for filename, url in FIRMWARE_FILES.items(): + if (EXAMPLE_DIR / filename).exists(): + continue + print(f"Downloading {filename} from {url}") + response = requests.get(url) + response.raise_for_status() + with open(EXAMPLE_DIR / filename, "wb") as f: + f.write(response.content) + + client = WokwiClientSync(token) + print(f"Wokwi client library version: {client.version}") + + hello = client.connect() + print("Connected to Wokwi Simulator, server version:", hello["version"]) + + # Upload the diagram and firmware files + client.upload_file("diagram.json", EXAMPLE_DIR / "diagram.json") + client.upload_file("hello_world.bin", EXAMPLE_DIR / "hello_world.bin") + client.upload_file("hello_world.elf", EXAMPLE_DIR / "hello_world.elf") + + # Start the simulation + client.start_simulation( + firmware="hello_world.bin", + elf="hello_world.elf", + ) + + # Stream serial output for a few seconds (non-blocking) + client.monitor_serial(lambda line: print(line.decode("utf-8"), end="", flush=True)) + print(f"Simulation started, waiting for {SLEEP_TIME} seconds…") + client.wait_until_simulation_time(SLEEP_TIME) + + # Disconnect from the simulator + client.disconnect() + + +if __name__ == "__main__": + main() diff --git a/mkdocs.yml b/mkdocs.yml index 8bbdf5d..303ad77 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -42,6 +42,8 @@ markdown_extensions: - pymdownx.inlinehilite - pymdownx.superfences - pymdownx.tabbed + - pymdownx.tabbed: + alternate_style: true - toc: permalink: "¶" diff --git a/src/wokwi_client/__init__.py b/src/wokwi_client/__init__.py index 7fbe634..0b91573 100644 --- a/src/wokwi_client/__init__.py +++ b/src/wokwi_client/__init__.py @@ -12,7 +12,8 @@ from .__version__ import get_version from .client import WokwiClient +from .client_sync import WokwiClientSync from .constants import GET_TOKEN_URL __version__ = get_version() -__all__ = ["WokwiClient", "__version__", "GET_TOKEN_URL"] +__all__ = ["WokwiClient", "WokwiClientSync", "__version__", "GET_TOKEN_URL"] diff --git a/src/wokwi_client/client_sync.py b/src/wokwi_client/client_sync.py new file mode 100644 index 0000000..6abfac1 --- /dev/null +++ b/src/wokwi_client/client_sync.py @@ -0,0 +1,180 @@ +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +import asyncio +import logging +import threading +import typing as t +from pathlib import Path + +from wokwi_client import WokwiClient +from wokwi_client.serial import monitor_lines as monitor_serial_lines + + +class WokwiClientSync: + """Synchronous wrapper around the async WokwiClient.""" + + def __init__(self, token: str, server: t.Optional[str] = None): + self.token = token + self.server = server + self._loop = None + self._loop_thread = None + self._client = None + self._connected = False + + def _ensure_loop(self): + """Ensure the async event loop is running.""" + if self._loop is None: + self._loop = asyncio.new_event_loop() + self._loop_thread = threading.Thread(target=self._loop.run_forever, daemon=True) + self._loop_thread.start() + + def _run_async(self, coro, timeout=30): + """Run an async coroutine synchronously.""" + self._ensure_loop() + future = asyncio.run_coroutine_threadsafe(coro, self._loop) + return future.result(timeout=timeout) + + def connect(self): + """Connect to Wokwi server.""" + if not self._connected: + self._client = WokwiClient(self.token, self.server) + result = self._run_async(self._client.connect()) + self._connected = True + return result + return {} + + def disconnect(self): + """Disconnect from Wokwi server.""" + if self._connected and self._client: + try: + # Stop any ongoing monitor task + if hasattr(self, '_monitor_task') and self._monitor_task: + self._monitor_task.cancel() + + # Disconnect the client + self._run_async(self._client.disconnect(), timeout=5) + except Exception as e: + logging.debug(f"Error during disconnect: {e}") + finally: + self._connected = False + self._client = None + if self._loop and self._loop_thread: + try: + self._loop.call_soon_threadsafe(self._loop.stop) + self._loop_thread.join(timeout=2) + except Exception as e: + logging.debug(f"Error stopping event loop: {e}") + finally: + self._loop = None + self._loop_thread = None + + def upload(self, name: str, content: bytes): + """Upload a file to the simulator from bytes content.""" + if not self._connected: + raise RuntimeError("Client not connected") + return self._run_async(self._client.upload(name, content)) + + def upload_file(self, filename: str, local_path: t.Optional[Path] = None): + """Upload a file to the simulator.""" + if not self._connected: + raise RuntimeError("Client not connected") + return self._run_async(self._client.upload_file(filename, local_path)) + + def start_simulation(self, firmware: str, elf: t.Optional[str] = None, pause: bool = False, chips: list[str] = []): + """Start a simulation.""" + if not self._connected: + raise RuntimeError("Client not connected") + return self._run_async(self._client.start_simulation(firmware, elf, pause, chips)) + + def pause_simulation(self): + """Pause the running simulation.""" + if not self._connected: + raise RuntimeError("Client not connected") + return self._run_async(self._client.pause_simulation()) + + def resume_simulation(self, pause_after: t.Optional[int] = None): + """Resume the simulation, optionally pausing after a given number of nanoseconds.""" + if not self._connected: + raise RuntimeError("Client not connected") + return self._run_async(self._client.resume_simulation(pause_after)) + + def wait_until_simulation_time(self, seconds: float): + """Pause and resume the simulation until the given simulation time (in seconds) is reached.""" + if not self._connected: + raise RuntimeError("Client not connected") + return self._run_async(self._client.wait_until_simulation_time(seconds)) + + def restart_simulation(self, pause: bool = False): + """Restart the simulation, optionally starting paused.""" + if not self._connected: + raise RuntimeError("Client not connected") + return self._run_async(self._client.restart_simulation(pause)) + + def serial_monitor_cat(self): + """Print serial monitor output to stdout as it is received from the simulation.""" + if not self._connected: + raise RuntimeError("Client not connected") + return self._run_async(self._client.serial_monitor_cat()) + + def write_serial(self, data: t.Union[bytes, str, list[int]]): + """Write data to serial.""" + if not self._connected: + raise RuntimeError("Client not connected") + return self._run_async(self._client.serial_write(data)) + + def read_pin(self, part: str, pin: str): + """Read the current state of a pin.""" + if not self._connected: + raise RuntimeError("Client not connected") + return self._run_async(self._client.read_pin(part, pin)) + + def listen_pin(self, part: str, pin: str, listen: bool = True): + """Start or stop listening for changes on a pin.""" + if not self._connected: + raise RuntimeError("Client not connected") + return self._run_async(self._client.listen_pin(part, pin, listen)) + + def monitor_serial(self, callback): + """Start monitoring serial output with a callback.""" + if not self._connected: + raise RuntimeError("Client not connected") + + async def _monitor(): + try: + async for line in monitor_serial_lines(self._client._transport): + if not self._connected: + break + try: + callback(line) + except Exception as e: + logging.error(f"Error in serial monitor callback: {e}") + break + except Exception as e: + logging.error(f"Error in serial monitor: {e}") + + # Start monitoring in background + self._monitor_task = asyncio.run_coroutine_threadsafe(_monitor(), self._loop) + + def set_control(self, part: str, control: str, value: t.Union[int, bool, float]): + """Set control value for a part.""" + if not self._connected: + raise RuntimeError("Client not connected") + return self._run_async(self._client.set_control(part, control, value)) + + @property + def version(self): + """Get client version.""" + if self._client: + return self._client.version + # Return a default version if client not initialized yet + client = WokwiClient(self.token, self.server) + return client.version + + @property + def last_pause_nanos(self): + """Get the last pause time in nanoseconds.""" + if self._client: + return self._client.last_pause_nanos + return 0 diff --git a/tests/test_hello_esp32.py b/tests/test_hello_esp32.py index 348233d..21f0e31 100644 --- a/tests/test_hello_esp32.py +++ b/tests/test_hello_esp32.py @@ -2,25 +2,18 @@ # # SPDX-License-Identifier: MIT -import os -import subprocess -import sys +from .utils import run_example_module def test_hello_esp32_example() -> None: - """`python -m examples.hello_esp32.main` runs the hello_esp32 example and exits with 0.""" - - assert os.environ.get("WOKWI_CLI_TOKEN") is not None, ( - "WOKWI_CLI_TOKEN environment variable is not set. You can get it from https://wokwi.com/dashboard/ci." - ) + """Async hello_esp32 example should run and exit with 0.""" + result = run_example_module("examples.hello_esp32.main") + assert result.returncode == 0 + assert "main_task: Calling app_main()" in result.stdout - result = subprocess.run( - [sys.executable, "-m", "examples.hello_esp32.main"], - check=False, - capture_output=True, - text=True, - env={**os.environ, "WOKWI_SLEEP_TIME": "1"}, - ) +def test_hello_esp32_sync_example() -> None: + """Sync hello_esp32 example should run and exit with 0.""" + result = run_example_module("examples.hello_esp32_sync.main") assert result.returncode == 0 assert "main_task: Calling app_main()" in result.stdout diff --git a/tests/test_micropython_esp32.py b/tests/test_micropython_esp32.py new file mode 100644 index 0000000..e7be202 --- /dev/null +++ b/tests/test_micropython_esp32.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +from .utils import run_example_module + + +def test_micropython_esp32_example() -> None: + """MicroPython ESP32 example should run and print MicroPython banner.""" + # MicroPython boot can take a bit longer; give it a few seconds + result = run_example_module("examples.micropython_esp32.main", sleep_time="3") + assert result.returncode == 0 + # Expect a line from the injected MicroPython script + assert "Hello, MicroPython! I'm running on a Wokwi ESP32 simulator." in result.stdout diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..5e8d6ff --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,37 @@ +""" +Test utilities for running example modules. + +Provides a helper to execute `python -m ` with a short sleep to keep +CI fast and shared environment handling (WOKWI_CLI_TOKEN, etc.). +""" + +from __future__ import annotations + +import os +import subprocess +import sys +from collections.abc import Mapping + + +def run_example_module(module: str, *, sleep_time: str = "1", extra_env: Mapping[str, str] | None = None) -> subprocess.CompletedProcess: + """Run an example module with a short simulation time. + + Requires WOKWI_CLI_TOKEN to be set in the environment. + Returns the CompletedProcess so tests can assert on return code and output. + """ + + assert os.environ.get("WOKWI_CLI_TOKEN") is not None, ( + "WOKWI_CLI_TOKEN environment variable is not set. You can get it from https://wokwi.com/dashboard/ci." + ) + + env = {**os.environ, "WOKWI_SLEEP_TIME": sleep_time} + if extra_env: + env.update(extra_env) + + return subprocess.run( + [sys.executable, "-m", module], + check=False, + capture_output=True, + text=True, + env=env, + ) From b4194a88e3de096a7e6a60f78d5a73d780ca1373 Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Tue, 12 Aug 2025 16:59:45 +0200 Subject: [PATCH 05/10] feat: enhance type hinting across client and control modules for improved clarity --- src/wokwi_client/client.py | 7 ++- src/wokwi_client/client_sync.py | 108 ++++++++++++++++++-------------- src/wokwi_client/control.py | 4 +- src/wokwi_client/pins.py | 8 +-- src/wokwi_client/serial.py | 3 +- tests/utils.py | 5 +- 6 files changed, 76 insertions(+), 59 deletions(-) diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index 9858c40..8099c5f 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: MIT +import typing from pathlib import Path from typing import Any, Optional @@ -181,7 +182,7 @@ async def serial_monitor_cat(self) -> None: async for line in monitor_lines(self._transport): print(line.decode("utf-8"), end="", flush=True) - async def serial_write(self, data: bytes | str | list[int]) -> None: + async def serial_write(self, data: typing.Union[bytes, str, list[int]]) -> None: """Write data to the simulation serial monitor interface.""" await write_serial(self._transport, data) @@ -210,7 +211,9 @@ async def listen_pin(self, part: str, pin: str, listen: bool = True) -> Response """ return await pin_listen(self._transport, part=part, pin=pin, listen=listen) - async def set_control(self, part: str, control: str, value: int | bool | float) -> ResponseMessage: + async def set_control( + self, part: str, control: str, value: typing.Union[int, bool, float] + ) -> ResponseMessage: """Set a control value (e.g. simulate button press). Args: diff --git a/src/wokwi_client/client_sync.py b/src/wokwi_client/client_sync.py index 6abfac1..ddc4c20 100644 --- a/src/wokwi_client/client_sync.py +++ b/src/wokwi_client/client_sync.py @@ -2,58 +2,67 @@ # # SPDX-License-Identifier: MIT +from __future__ import annotations + import asyncio import logging import threading import typing as t +from concurrent.futures import Future from pathlib import Path from wokwi_client import WokwiClient from wokwi_client.serial import monitor_lines as monitor_serial_lines +if t.TYPE_CHECKING: + from collections.abc import Iterable + class WokwiClientSync: """Synchronous wrapper around the async WokwiClient.""" - def __init__(self, token: str, server: t.Optional[str] = None): + token: str + server: t.Optional[str] + _loop: t.Optional[asyncio.AbstractEventLoop] + _loop_thread: t.Optional[threading.Thread] + _client: t.Optional[WokwiClient] + _monitor_task: t.Optional[Future[t.Any]] + _connected: bool + + def __init__(self, token: str, server: t.Optional[str] = None) -> None: self.token = token self.server = server self._loop = None self._loop_thread = None self._client = None + self._monitor_task = None self._connected = False - def _ensure_loop(self): - """Ensure the async event loop is running.""" + def _ensure_loop(self) -> None: if self._loop is None: self._loop = asyncio.new_event_loop() self._loop_thread = threading.Thread(target=self._loop.run_forever, daemon=True) self._loop_thread.start() - def _run_async(self, coro, timeout=30): - """Run an async coroutine synchronously.""" + def _run_async(self, coro: t.Coroutine[t.Any, t.Any, t.Any], timeout: float = 30) -> t.Any: self._ensure_loop() + assert self._loop is not None future = asyncio.run_coroutine_threadsafe(coro, self._loop) return future.result(timeout=timeout) - def connect(self): - """Connect to Wokwi server.""" + def connect(self) -> t.Dict[str, t.Any]: if not self._connected: self._client = WokwiClient(self.token, self.server) - result = self._run_async(self._client.connect()) + result: t.Dict[str, t.Any] = t.cast(t.Dict[str, t.Any], self._run_async(self._client.connect())) self._connected = True return result return {} - def disconnect(self): - """Disconnect from Wokwi server.""" + def disconnect(self) -> None: if self._connected and self._client: try: - # Stop any ongoing monitor task - if hasattr(self, '_monitor_task') and self._monitor_task: + if self._monitor_task: self._monitor_task.cancel() - - # Disconnect the client self._run_async(self._client.disconnect(), timeout=5) except Exception as e: logging.debug(f"Error during disconnect: {e}") @@ -70,79 +79,85 @@ def disconnect(self): self._loop = None self._loop_thread = None - def upload(self, name: str, content: bytes): - """Upload a file to the simulator from bytes content.""" + def upload(self, name: str, content: bytes) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") + assert self._client is not None return self._run_async(self._client.upload(name, content)) - def upload_file(self, filename: str, local_path: t.Optional[Path] = None): - """Upload a file to the simulator.""" + def upload_file(self, filename: str, local_path: t.Optional[Path] = None) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") + assert self._client is not None return self._run_async(self._client.upload_file(filename, local_path)) - def start_simulation(self, firmware: str, elf: t.Optional[str] = None, pause: bool = False, chips: list[str] = []): - """Start a simulation.""" + def start_simulation( + self, + firmware: str, + elf: t.Optional[str] = None, + pause: bool = False, + chips: t.Optional[t.List[str]] = None, + ) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") - return self._run_async(self._client.start_simulation(firmware, elf, pause, chips)) + assert self._client is not None + return self._run_async(self._client.start_simulation(firmware, elf, pause, chips or [])) - def pause_simulation(self): - """Pause the running simulation.""" + def pause_simulation(self) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") + assert self._client is not None return self._run_async(self._client.pause_simulation()) - def resume_simulation(self, pause_after: t.Optional[int] = None): - """Resume the simulation, optionally pausing after a given number of nanoseconds.""" + def resume_simulation(self, pause_after: t.Optional[int] = None) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") + assert self._client is not None return self._run_async(self._client.resume_simulation(pause_after)) - def wait_until_simulation_time(self, seconds: float): - """Pause and resume the simulation until the given simulation time (in seconds) is reached.""" + def wait_until_simulation_time(self, seconds: float) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") + assert self._client is not None return self._run_async(self._client.wait_until_simulation_time(seconds)) - def restart_simulation(self, pause: bool = False): - """Restart the simulation, optionally starting paused.""" + def restart_simulation(self, pause: bool = False) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") + assert self._client is not None return self._run_async(self._client.restart_simulation(pause)) - def serial_monitor_cat(self): - """Print serial monitor output to stdout as it is received from the simulation.""" + def serial_monitor_cat(self) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") + assert self._client is not None return self._run_async(self._client.serial_monitor_cat()) - def write_serial(self, data: t.Union[bytes, str, list[int]]): - """Write data to serial.""" + def write_serial(self, data: t.Union[bytes, str, t.List[int]]) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") + assert self._client is not None return self._run_async(self._client.serial_write(data)) - def read_pin(self, part: str, pin: str): - """Read the current state of a pin.""" + def read_pin(self, part: str, pin: str) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") + assert self._client is not None return self._run_async(self._client.read_pin(part, pin)) - def listen_pin(self, part: str, pin: str, listen: bool = True): - """Start or stop listening for changes on a pin.""" + def listen_pin(self, part: str, pin: str, listen: bool = True) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") + assert self._client is not None return self._run_async(self._client.listen_pin(part, pin, listen)) - def monitor_serial(self, callback): - """Start monitoring serial output with a callback.""" + def monitor_serial(self, callback: t.Callable[[bytes], None]) -> None: if not self._connected: raise RuntimeError("Client not connected") - async def _monitor(): + async def _monitor() -> None: try: + assert self._client is not None async for line in monitor_serial_lines(self._client._transport): if not self._connected: break @@ -154,27 +169,24 @@ async def _monitor(): except Exception as e: logging.error(f"Error in serial monitor: {e}") - # Start monitoring in background + assert self._loop is not None self._monitor_task = asyncio.run_coroutine_threadsafe(_monitor(), self._loop) - def set_control(self, part: str, control: str, value: t.Union[int, bool, float]): - """Set control value for a part.""" + def set_control(self, part: str, control: str, value: t.Union[int, bool, float]) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") + assert self._client is not None return self._run_async(self._client.set_control(part, control, value)) @property - def version(self): - """Get client version.""" + def version(self) -> str: if self._client: return self._client.version - # Return a default version if client not initialized yet client = WokwiClient(self.token, self.server) return client.version @property - def last_pause_nanos(self): - """Get the last pause time in nanoseconds.""" + def last_pause_nanos(self) -> int: if self._client: return self._client.last_pause_nanos return 0 diff --git a/src/wokwi_client/control.py b/src/wokwi_client/control.py index 0d5a612..93e6dbd 100644 --- a/src/wokwi_client/control.py +++ b/src/wokwi_client/control.py @@ -11,12 +11,14 @@ # # SPDX-License-Identifier: MIT +import typing + from .protocol_types import ResponseMessage from .transport import Transport async def set_control( - transport: Transport, *, part: str, control: str, value: int | bool | float + transport: Transport, *, part: str, control: str, value: typing.Union[int, bool, float] ) -> ResponseMessage: """Set a control value on a part (e.g. simulate button press/release). diff --git a/src/wokwi_client/pins.py b/src/wokwi_client/pins.py index 5e7ce7f..82879ad 100644 --- a/src/wokwi_client/pins.py +++ b/src/wokwi_client/pins.py @@ -15,9 +15,7 @@ from .transport import Transport -async def pin_read( - transport: Transport, *, part: str, pin: str -) -> ResponseMessage: +async def pin_read(transport: Transport, *, part: str, pin: str) -> ResponseMessage: """Read the state of a pin. Args: @@ -44,6 +42,4 @@ async def pin_listen( listen: True to start listening, False to stop. """ - return await transport.request( - "pin:listen", {"part": part, "pin": pin, "listen": listen} - ) + return await transport.request("pin:listen", {"part": part, "pin": pin, "listen": listen}) diff --git a/src/wokwi_client/serial.py b/src/wokwi_client/serial.py index 3563740..7d7ad4f 100644 --- a/src/wokwi_client/serial.py +++ b/src/wokwi_client/serial.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: MIT +import typing from collections.abc import AsyncGenerator, Iterable from .event_queue import EventQueue @@ -16,7 +17,7 @@ async def monitor_lines(transport: Transport) -> AsyncGenerator[bytes, None]: yield bytes(event_msg["payload"]["bytes"]) -async def write_serial(transport: Transport, data: bytes | str | Iterable[int]) -> None: +async def write_serial(transport: Transport, data: typing.Union[bytes, str, Iterable[int]]) -> None: """Write data to the serial monitor. Accepts bytes, str (encoded as utf-8), or an iterable of integer byte values. diff --git a/tests/utils.py b/tests/utils.py index 5e8d6ff..1bdaabe 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -11,9 +11,12 @@ import subprocess import sys from collections.abc import Mapping +from subprocess import CompletedProcess -def run_example_module(module: str, *, sleep_time: str = "1", extra_env: Mapping[str, str] | None = None) -> subprocess.CompletedProcess: +def run_example_module( + module: str, *, sleep_time: str = "1", extra_env: Mapping[str, str] | None = None +) -> CompletedProcess[str]: """Run an example module with a short simulation time. Requires WOKWI_CLI_TOKEN to be set in the environment. From 0f7246c141d95ada0dc8fe27c4094195fca878a1 Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Tue, 12 Aug 2025 17:06:36 +0200 Subject: [PATCH 06/10] feat: enhance type hinting in WokwiClientSync for improved clarity and consistency --- src/wokwi_client/client_sync.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/wokwi_client/client_sync.py b/src/wokwi_client/client_sync.py index ddc4c20..742cb1c 100644 --- a/src/wokwi_client/client_sync.py +++ b/src/wokwi_client/client_sync.py @@ -15,21 +15,21 @@ from wokwi_client.serial import monitor_lines as monitor_serial_lines if t.TYPE_CHECKING: - from collections.abc import Iterable + pass class WokwiClientSync: """Synchronous wrapper around the async WokwiClient.""" token: str - server: t.Optional[str] - _loop: t.Optional[asyncio.AbstractEventLoop] - _loop_thread: t.Optional[threading.Thread] - _client: t.Optional[WokwiClient] - _monitor_task: t.Optional[Future[t.Any]] + server: str | None + _loop: asyncio.AbstractEventLoop | None + _loop_thread: threading.Thread | None + _client: WokwiClient | None + _monitor_task: Future[t.Any] | None _connected: bool - def __init__(self, token: str, server: t.Optional[str] = None) -> None: + def __init__(self, token: str, server: str | None = None) -> None: self.token = token self.server = server self._loop = None @@ -50,10 +50,12 @@ def _run_async(self, coro: t.Coroutine[t.Any, t.Any, t.Any], timeout: float = 30 future = asyncio.run_coroutine_threadsafe(coro, self._loop) return future.result(timeout=timeout) - def connect(self) -> t.Dict[str, t.Any]: + def connect(self) -> dict[str, t.Any]: if not self._connected: self._client = WokwiClient(self.token, self.server) - result: t.Dict[str, t.Any] = t.cast(t.Dict[str, t.Any], self._run_async(self._client.connect())) + result: dict[str, t.Any] = t.cast( + dict[str, t.Any], self._run_async(self._client.connect()) + ) self._connected = True return result return {} @@ -85,7 +87,7 @@ def upload(self, name: str, content: bytes) -> t.Any: assert self._client is not None return self._run_async(self._client.upload(name, content)) - def upload_file(self, filename: str, local_path: t.Optional[Path] = None) -> t.Any: + def upload_file(self, filename: str, local_path: Path | None = None) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") assert self._client is not None @@ -94,9 +96,9 @@ def upload_file(self, filename: str, local_path: t.Optional[Path] = None) -> t.A def start_simulation( self, firmware: str, - elf: t.Optional[str] = None, + elf: str | None = None, pause: bool = False, - chips: t.Optional[t.List[str]] = None, + chips: list[str] | None = None, ) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") @@ -109,7 +111,7 @@ def pause_simulation(self) -> t.Any: assert self._client is not None return self._run_async(self._client.pause_simulation()) - def resume_simulation(self, pause_after: t.Optional[int] = None) -> t.Any: + def resume_simulation(self, pause_after: int | None = None) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") assert self._client is not None @@ -133,7 +135,7 @@ def serial_monitor_cat(self) -> t.Any: assert self._client is not None return self._run_async(self._client.serial_monitor_cat()) - def write_serial(self, data: t.Union[bytes, str, t.List[int]]) -> t.Any: + def write_serial(self, data: bytes | str | list[int]) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") assert self._client is not None @@ -172,7 +174,7 @@ async def _monitor() -> None: assert self._loop is not None self._monitor_task = asyncio.run_coroutine_threadsafe(_monitor(), self._loop) - def set_control(self, part: str, control: str, value: t.Union[int, bool, float]) -> t.Any: + def set_control(self, part: str, control: str, value: int | bool | float) -> t.Any: if not self._connected: raise RuntimeError("Client not connected") assert self._client is not None From be09efa2e0bc2b4eb428c5b0cf3d79fc8debc9b6 Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Wed, 13 Aug 2025 10:44:38 +0200 Subject: [PATCH 07/10] feat: lazily initialize pause queue in WokwiClient and ensure thread-safe event handling in EventQueue --- src/wokwi_client/client.py | 5 ++++- src/wokwi_client/event_queue.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index 8099c5f..bf0befe 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -42,7 +42,8 @@ def __init__(self, token: str, server: Optional[str] = None): self._transport = Transport(token, server or DEFAULT_WS_URL) self.last_pause_nanos = 0 self._transport.add_event_listener("sim:pause", self._on_pause) - self._pause_queue = EventQueue(self._transport, "sim:pause") + # Lazily create the pause queue inside the running event loop + self._pause_queue: Optional[EventQueue] = None async def connect(self) -> dict[str, Any]: """ @@ -159,6 +160,8 @@ async def wait_until_simulation_time(self, seconds: float) -> None: await pause(self._transport) remaining_nanos = seconds * 1e9 - self.last_pause_nanos if remaining_nanos > 0: + if self._pause_queue is None: + self._pause_queue = EventQueue(self._transport, "sim:pause") self._pause_queue.flush() await resume(self._transport, int(remaining_nanos)) await self._pause_queue.get() diff --git a/src/wokwi_client/event_queue.py b/src/wokwi_client/event_queue.py index c4c17c9..c49dede 100644 --- a/src/wokwi_client/event_queue.py +++ b/src/wokwi_client/event_queue.py @@ -23,12 +23,15 @@ class EventQueue: """A queue for events from a specific event type.""" def __init__(self, transport: Transport, event_type: str) -> None: + # Bind the queue to the current running loop + self._loop = asyncio.get_running_loop() self._queue: asyncio.Queue[EventMessage] = asyncio.Queue() self._transport = transport self._event_type = event_type def listener(event: EventMessage) -> None: - self._queue.put_nowait(event) + # Ensure put happens on the queue's loop (safe across threads/loops) + self._loop.call_soon_threadsafe(self._queue.put_nowait, event) self._listener = listener self._transport.add_event_listener(self._event_type, self._listener) From 089d3a965333f162e215dae6a1b7a794eb64b58c Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Wed, 13 Aug 2025 16:44:08 +0200 Subject: [PATCH 08/10] feat: add download and GPIO listing functionality to WokwiClient and WokwiClientSync --- src/wokwi_client/client.py | 30 ++++++++++++++++++++++++++++-- src/wokwi_client/client_sync.py | 18 ++++++++++++++++++ src/wokwi_client/file_ops.py | 13 +++++++++++++ src/wokwi_client/models.py | 4 ++++ src/wokwi_client/pins.py | 10 ++++++++++ 5 files changed, 73 insertions(+), 2 deletions(-) diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index bf0befe..8aaf234 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -10,8 +10,8 @@ from .constants import DEFAULT_WS_URL from .control import set_control from .event_queue import EventQueue -from .file_ops import upload, upload_file -from .pins import pin_listen, pin_read +from .file_ops import download, download_file, upload, upload_file +from .pins import gpio_list, pin_listen, pin_read from .protocol_types import EventMessage, ResponseMessage from .serial import monitor_lines, write_serial from .simulation import pause, restart, resume, start @@ -88,6 +88,28 @@ async def upload_file( """ return await upload_file(self._transport, filename, local_path) + async def download(self, name: str) -> ResponseMessage: + """ + Download a file from the simulator. + + Args: + name: The name of the file to download. + + Returns: + The response message from the server. + """ + return await download(self._transport, name) + + async def download_file(self, name: str, local_path: Optional[Path] = None) -> None: + """ + Download a file from the simulator and save it to a local path. + + Args: + name: The name of the file to download. + local_path: The local path to save the downloaded file. If not provided, uses the name as the path. + """ + await download_file(self._transport, name, local_path) + async def start_simulation( self, firmware: str, @@ -214,6 +236,10 @@ async def listen_pin(self, part: str, pin: str, listen: bool = True) -> Response """ return await pin_listen(self._transport, part=part, pin=pin, listen=listen) + async def gpio_list(self) -> ResponseMessage: + """Get a list of all GPIO pins available in the simulation.""" + return await gpio_list(self._transport) + async def set_control( self, part: str, control: str, value: typing.Union[int, bool, float] ) -> ResponseMessage: diff --git a/src/wokwi_client/client_sync.py b/src/wokwi_client/client_sync.py index 742cb1c..6a02289 100644 --- a/src/wokwi_client/client_sync.py +++ b/src/wokwi_client/client_sync.py @@ -93,6 +93,18 @@ def upload_file(self, filename: str, local_path: Path | None = None) -> t.Any: assert self._client is not None return self._run_async(self._client.upload_file(filename, local_path)) + def download(self, name: str) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.download(name)) + + def download_file(self, name: str, local_path: Path | None = None) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.download_file(name, local_path)) + def start_simulation( self, firmware: str, @@ -153,6 +165,12 @@ def listen_pin(self, part: str, pin: str, listen: bool = True) -> t.Any: assert self._client is not None return self._run_async(self._client.listen_pin(part, pin, listen)) + def gpio_list(self) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.gpio_list()) + def monitor_serial(self, callback: t.Callable[[bytes], None]) -> None: if not self._connected: raise RuntimeError("Client not connected") diff --git a/src/wokwi_client/file_ops.py b/src/wokwi_client/file_ops.py index 032bfd9..0907509 100644 --- a/src/wokwi_client/file_ops.py +++ b/src/wokwi_client/file_ops.py @@ -22,3 +22,16 @@ async def upload_file( async def upload(transport: Transport, name: str, content: bytes) -> ResponseMessage: params = UploadParams(name=name, binary=base64.b64encode(content).decode()) return await transport.request("file:upload", params.model_dump()) + + +async def download(transport: Transport, name: str) -> ResponseMessage: + return await transport.request("file:download", {"name": name}) + + +async def download_file(transport: Transport, name: str, local_path: Optional[Path] = None) -> None: + if local_path is None: + local_path = Path(name) + + result = await download(transport, name) + with open(local_path, "wb") as f: + f.write(base64.b64decode(result["result"]["binary"])) diff --git a/src/wokwi_client/models.py b/src/wokwi_client/models.py index e085a31..58a006f 100644 --- a/src/wokwi_client/models.py +++ b/src/wokwi_client/models.py @@ -10,6 +10,10 @@ class UploadParams(BaseModel): binary: str # base64 +class DownloadParams(BaseModel): + binary: str # base64 + + class SimulationParams(BaseModel): firmware: str elf: str diff --git a/src/wokwi_client/pins.py b/src/wokwi_client/pins.py index 82879ad..87d96a9 100644 --- a/src/wokwi_client/pins.py +++ b/src/wokwi_client/pins.py @@ -43,3 +43,13 @@ async def pin_listen( """ return await transport.request("pin:listen", {"part": part, "pin": pin, "listen": listen}) + + +async def gpio_list(transport: Transport) -> ResponseMessage: + """List all GPIO pins and their current states. + + Args: + transport: The active Transport instance. + """ + + return await transport.request("gpio:list", {}) From c8d3388de0b50ccec425e0b7c035478c4dad5a70 Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Wed, 13 Aug 2025 16:51:12 +0200 Subject: [PATCH 09/10] feat: add framebuffer utilities for reading, saving, and comparing PNG images --- src/wokwi_client/client.py | 27 ++++++++++ src/wokwi_client/client_sync.py | 35 +++++++++++++ src/wokwi_client/framebuffer.py | 93 +++++++++++++++++++++++++++++++++ 3 files changed, 155 insertions(+) create mode 100644 src/wokwi_client/framebuffer.py diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index 8aaf234..419d2b3 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -6,6 +6,13 @@ from pathlib import Path from typing import Any, Optional +from wokwi_client.framebuffer import ( + compare_framebuffer_png, + framebuffer_png_bytes, + framebuffer_read, + save_framebuffer_png, +) + from .__version__ import get_version from .constants import DEFAULT_WS_URL from .control import set_control @@ -251,3 +258,23 @@ async def set_control( value: Control value to set (float). """ return await set_control(self._transport, part=part, control=control, value=value) + + async def framebuffer_read(self, id: str) -> ResponseMessage: + """Read the current framebuffer for the given device id.""" + return await framebuffer_read(self._transport, id=id) + + async def framebuffer_png_bytes(self, id: str) -> bytes: + """Return the current framebuffer as PNG bytes.""" + return await framebuffer_png_bytes(self._transport, id=id) + + async def save_framebuffer_png(self, id: str, path: Path, overwrite: bool = True) -> Path: + """Save the current framebuffer as a PNG file.""" + return await save_framebuffer_png(self._transport, id=id, path=path, overwrite=overwrite) + + async def compare_framebuffer_png( + self, id: str, reference: Path, save_mismatch: Optional[Path] = None + ) -> bool: + """Compare the current framebuffer with a reference PNG file.""" + return await compare_framebuffer_png( + self._transport, id=id, reference=reference, save_mismatch=save_mismatch + ) diff --git a/src/wokwi_client/client_sync.py b/src/wokwi_client/client_sync.py index 6a02289..1099fd2 100644 --- a/src/wokwi_client/client_sync.py +++ b/src/wokwi_client/client_sync.py @@ -198,6 +198,41 @@ def set_control(self, part: str, control: str, value: int | bool | float) -> t.A assert self._client is not None return self._run_async(self._client.set_control(part, control, value)) + def framebuffer_read(self, id: str) -> t.Any: + """Read the current framebuffer for the given device id.""" + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.framebuffer_read(id)) + + def framebuffer_png_bytes(self, id: str) -> bytes: + """Return the current framebuffer as PNG bytes.""" + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return t.cast(bytes, self._run_async(self._client.framebuffer_png_bytes(id))) + + def save_framebuffer_png(self, id: str, path: Path, overwrite: bool = True) -> Path: + """Save the current framebuffer as a PNG file.""" + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return t.cast( + Path, self._run_async(self._client.save_framebuffer_png(id, path, overwrite=overwrite)) + ) + + def compare_framebuffer_png( + self, id: str, reference: Path, save_mismatch: Path | None = None + ) -> bool: + """Compare the current framebuffer with a reference PNG file.""" + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return t.cast( + bool, + self._run_async(self._client.compare_framebuffer_png(id, reference, save_mismatch)), + ) + @property def version(self) -> str: if self._client: diff --git a/src/wokwi_client/framebuffer.py b/src/wokwi_client/framebuffer.py new file mode 100644 index 0000000..ef3ae53 --- /dev/null +++ b/src/wokwi_client/framebuffer.py @@ -0,0 +1,93 @@ +"""Framebuffer command helpers. + +Provides utilities to interact with devices exposing a framebuffer (e.g. LCD +modules) via the `framebuffer:read` command. + +Exposed helpers: +* framebuffer_read -> raw response (contains base64 PNG at result.png) +* framebuffer_png_bytes -> decoded PNG bytes +* save_framebuffer_png -> save PNG to disk +* compare_framebuffer_png -> compare current framebuffer against reference +""" + +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +from __future__ import annotations + +import base64 +from pathlib import Path + +from .exceptions import WokwiError +from .protocol_types import ResponseMessage +from .transport import Transport + +__all__ = [ + "framebuffer_read", + "framebuffer_png_bytes", + "save_framebuffer_png", + "compare_framebuffer_png", +] + + +async def framebuffer_read(transport: Transport, *, id: str) -> ResponseMessage: + """Issue `framebuffer:read` for the given device id and return raw response.""" + return await transport.request("framebuffer:read", {"id": id}) + + +def _extract_png_b64(resp: ResponseMessage) -> str: + result = resp.get("result", {}) + png_b64 = result.get("png") + if not isinstance(png_b64, str): # pragma: no cover - defensive + raise WokwiError("Malformed framebuffer:read response: missing 'png' base64 string") + return png_b64 + + +async def framebuffer_png_bytes(transport: Transport, *, id: str) -> bytes: + """Return decoded PNG bytes for the framebuffer of device `id`.""" + resp = await framebuffer_read(transport, id=id) + return base64.b64decode(_extract_png_b64(resp)) + + +async def save_framebuffer_png( + transport: Transport, *, id: str, path: Path, overwrite: bool = True +) -> Path: + """Save the framebuffer PNG to `path` and return the path. + + Args: + transport: Active transport. + id: Device id (e.g. "lcd1"). + path: Destination file path. + overwrite: Overwrite existing file (default True). If False and file + exists, raises WokwiError. + """ + if path.exists() and not overwrite: + raise WokwiError(f"File already exists and overwrite=False: {path}") + data = await framebuffer_png_bytes(transport, id=id) + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "wb") as f: + f.write(data) + return path + + +async def compare_framebuffer_png( + transport: Transport, *, id: str, reference: Path, save_mismatch: Path | None = None +) -> bool: + """Compare the current framebuffer PNG with a reference file. + + Performs a byte-for-byte comparison. If different and `save_mismatch` is + provided, writes the current framebuffer PNG there. + + Returns True if identical, False otherwise. + """ + if not reference.exists(): + raise WokwiError(f"Reference image does not exist: {reference}") + current = await framebuffer_png_bytes(transport, id=id) + ref_bytes = reference.read_bytes() + if current == ref_bytes: + return True + if save_mismatch: + save_mismatch.parent.mkdir(parents=True, exist_ok=True) + save_mismatch.write_bytes(current) + return False From 09a3efd3959c99309d9e522ca32ea90e746d2600 Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Fri, 15 Aug 2025 09:34:04 +0200 Subject: [PATCH 10/10] feat: enhance serial monitor functionality with UTF-8 decoding options --- src/wokwi_client/client.py | 16 ++++++++++++++-- src/wokwi_client/client_sync.py | 6 ++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index 419d2b3..c0cde89 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -207,12 +207,24 @@ async def restart_simulation(self, pause: bool = False) -> ResponseMessage: """ return await restart(self._transport, pause) - async def serial_monitor_cat(self) -> None: + async def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace") -> None: """ Print serial monitor output to stdout as it is received from the simulation. + + Args: + decode_utf8: Whether to decode bytes as UTF-8. If False, prints raw bytes (default: True). + errors: How to handle UTF-8 decoding errors. Options: 'strict', 'ignore', 'replace' (default: 'replace'). """ async for line in monitor_lines(self._transport): - print(line.decode("utf-8"), end="", flush=True) + if decode_utf8: + try: + output = line.decode("utf-8", errors=errors) + print(output, end="", flush=True) + except UnicodeDecodeError: + # Fallback to raw bytes if decoding fails completely + print(line, end="", flush=True) + else: + print(line, end="", flush=True) async def serial_write(self, data: typing.Union[bytes, str, list[int]]) -> None: """Write data to the simulation serial monitor interface.""" diff --git a/src/wokwi_client/client_sync.py b/src/wokwi_client/client_sync.py index 1099fd2..7b10277 100644 --- a/src/wokwi_client/client_sync.py +++ b/src/wokwi_client/client_sync.py @@ -141,11 +141,13 @@ def restart_simulation(self, pause: bool = False) -> t.Any: assert self._client is not None return self._run_async(self._client.restart_simulation(pause)) - def serial_monitor_cat(self) -> t.Any: + def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace") -> t.Any: if not self._connected: raise RuntimeError("Client not connected") assert self._client is not None - return self._run_async(self._client.serial_monitor_cat()) + return self._run_async( + self._client.serial_monitor_cat(decode_utf8=decode_utf8, errors=errors) + ) def write_serial(self, data: bytes | str | list[int]) -> t.Any: if not self._connected: