Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Wokwi Python Client 🚀

Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API**
Typed Python SDK for the **Wokwi Simulation API**, with async and sync APIs

[![PyPI version](https://img.shields.io/pypi/v/wokwi-client?logo=pypi)](https://pypi.org/project/wokwi-client/)
[![Python versions](https://img.shields.io/pypi/pyversions/wokwi-client)](https://pypi.org/project/wokwi-client/)
Expand Down Expand Up @@ -32,7 +32,9 @@ The basic example is in the [examples/hello_esp32/main.py](examples/hello_esp32/
- Connect to the Wokwi Simulator
- Upload a diagram and firmware files
- Start a simulation
- Monitor the serial output
- Monitor the serial output and write to them
- Read GPIO pins
- Control peripherals (buttons, MPU6050, etc.)

You can run the example with:

Expand Down
103 changes: 71 additions & 32 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# Wokwi Python Client Library

Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API**.
Typed Python SDK for the **Wokwi Simulation API**, with async and sync APIs.

## Features

- Connect to the Wokwi Simulator from Python
- Upload diagrams and firmware files
- Start, pause, resume, and restart simulations
- Monitor serial output asynchronously
- Monitor serial output asynchronously and write to them
- Control peripherals and read GPIO pins
- Fully type-annotated and easy to use with asyncio
- Async and sync APIs

## Installation

Expand All @@ -24,37 +26,74 @@ Get your API token from [https://wokwi.com/dashboard/ci](https://wokwi.com/dashb

## Quickstart Example

```python
import asyncio
import os
from wokwi_client import WokwiClient, GET_TOKEN_URL


async def main():
token = os.getenv("WOKWI_CLI_TOKEN")
if not token:
raise SystemExit(
f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}."
)

client = WokwiClient(token)
await client.connect()
await client.upload_file("diagram.json")
await client.upload_file("firmware.bin")
await client.start_simulation(firmware="firmware.bin")
serial_task = asyncio.create_task(
client.serial_monitor_cat()
) # Stream serial output
await client.wait_until_simulation_time(10) # Run simulation for 10 seconds
serial_task.cancel()
await client.disconnect()


if __name__ == "__main__":
asyncio.run(main())
```
=== "Async (recommended)"

```python
import asyncio
import os
from wokwi_client import WokwiClient, GET_TOKEN_URL


async def main():
token = os.getenv("WOKWI_CLI_TOKEN")
if not token:
raise SystemExit(
f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}."
)

client = WokwiClient(token)
await client.connect()
await client.upload_file("diagram.json")
await client.upload_file("firmware.bin")
await client.start_simulation(firmware="firmware.bin")

serial_task = asyncio.create_task(
client.serial_monitor_cat()
) # Stream serial output
await client.wait_until_simulation_time(10) # Run simulation for 10 seconds
serial_task.cancel()
await client.disconnect()


if __name__ == "__main__":
asyncio.run(main())
```

=== "Sync"

```python
import os
from wokwi_client import WokwiClientSync, GET_TOKEN_URL


def main():
token = os.getenv("WOKWI_CLI_TOKEN")
if not token:
raise SystemExit(
f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}."
)

client = WokwiClientSync(token)
client.connect()
client.upload_file("diagram.json")
client.upload_file("firmware.bin")
client.start_simulation(firmware="firmware.bin")

# Stream serial output concurrently for 10 seconds
client.monitor_serial(lambda line: print(line.decode("utf-8"), end="", flush=True))
client.wait_until_simulation_time(10)
client.disconnect()


if __name__ == "__main__":
main()
```

## Examples

See the [examples/hello_esp32/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/hello_esp32/main.py) for a full example including serial monitoring, and [examples/micropython_esp32/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/micropython_esp32/main.py) for an example of running MicroPython on a simulated ESP32 board.
- [examples/hello_esp32/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/hello_esp32/main.py) full async example including serial monitoring
- [examples/hello_esp32_sync/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/hello_esp32_sync/main.py) sync wrapper usage
- [examples/micropython_esp32/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/micropython_esp32/main.py) example of running MicroPython on a simulated ESP32 board.

## API Reference

Expand Down
3 changes: 3 additions & 0 deletions examples/hello_esp32_sync/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Ignore the firmware files, as they are downloaded from the internet
hello_world.bin
hello_world.elf
Empty file.
22 changes: 22 additions & 0 deletions examples/hello_esp32_sync/diagram.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"version": 1,
"author": "Uri Shaked",
"editor": "wokwi",
"parts": [
{
"type": "wokwi-esp32-devkit-v1",
"id": "esp",
"top": 0,
"left": 0,
"attrs": { "fullBoot": "1" }
}
],
"connections": [
["esp:TX0", "$serialMonitor:RX", "", []],
["esp:RX0", "$serialMonitor:TX", "", []]
],
"serialMonitor": {
"display": "terminal"
},
"dependencies": {}
}
63 changes: 63 additions & 0 deletions examples/hello_esp32_sync/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# SPDX-License-Identifier: MIT
# Copyright (C) 2025, CodeMagic LTD

import os
from pathlib import Path

import requests

from wokwi_client import GET_TOKEN_URL, WokwiClientSync

EXAMPLE_DIR = Path(__file__).parent
HELLO_WORLD_URL = "https://github.com/wokwi/esp-idf-hello-world/raw/refs/heads/main/bin"
FIRMWARE_FILES = {
"hello_world.bin": f"{HELLO_WORLD_URL}/hello_world.bin",
"hello_world.elf": f"{HELLO_WORLD_URL}/hello_world.elf",
}
SLEEP_TIME = int(os.getenv("WOKWI_SLEEP_TIME", "10"))


def main() -> None:
token = os.getenv("WOKWI_CLI_TOKEN")
if not token:
raise SystemExit(
f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}."
)

for filename, url in FIRMWARE_FILES.items():
if (EXAMPLE_DIR / filename).exists():
continue
print(f"Downloading {filename} from {url}")
response = requests.get(url)
response.raise_for_status()
with open(EXAMPLE_DIR / filename, "wb") as f:
f.write(response.content)

client = WokwiClientSync(token)
print(f"Wokwi client library version: {client.version}")

hello = client.connect()
print("Connected to Wokwi Simulator, server version:", hello["version"])

# Upload the diagram and firmware files
client.upload_file("diagram.json", EXAMPLE_DIR / "diagram.json")
client.upload_file("hello_world.bin", EXAMPLE_DIR / "hello_world.bin")
client.upload_file("hello_world.elf", EXAMPLE_DIR / "hello_world.elf")

# Start the simulation
client.start_simulation(
firmware="hello_world.bin",
elf="hello_world.elf",
)

# Stream serial output for a few seconds (non-blocking)
client.monitor_serial(lambda line: print(line.decode("utf-8"), end="", flush=True))
print(f"Simulation started, waiting for {SLEEP_TIME} seconds…")
client.wait_until_simulation_time(SLEEP_TIME)

# Disconnect from the simulator
client.disconnect()


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ markdown_extensions:
- pymdownx.inlinehilite
- pymdownx.superfences
- pymdownx.tabbed
- pymdownx.tabbed:
alternate_style: true
- toc:
permalink: "¶"

Expand Down
3 changes: 2 additions & 1 deletion src/wokwi_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

from .__version__ import get_version
from .client import WokwiClient
from .client_sync import WokwiClientSync
from .constants import GET_TOKEN_URL

__version__ = get_version()
__all__ = ["WokwiClient", "__version__", "GET_TOKEN_URL"]
__all__ = ["WokwiClient", "WokwiClientSync", "__version__", "GET_TOKEN_URL"]
103 changes: 100 additions & 3 deletions src/wokwi_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,25 @@
#
# SPDX-License-Identifier: MIT

import typing
from pathlib import Path
from typing import Any, Optional

from wokwi_client.framebuffer import (
compare_framebuffer_png,
framebuffer_png_bytes,
framebuffer_read,
save_framebuffer_png,
)

from .__version__ import get_version
from .constants import DEFAULT_WS_URL
from .control import set_control
from .event_queue import EventQueue
from .file_ops import upload, upload_file
from .file_ops import download, download_file, upload, upload_file
from .pins import gpio_list, pin_listen, pin_read
from .protocol_types import EventMessage, ResponseMessage
from .serial import monitor_lines
from .serial import monitor_lines, write_serial
from .simulation import pause, restart, resume, start
from .transport import Transport

Expand Down Expand Up @@ -39,7 +49,8 @@ def __init__(self, token: str, server: Optional[str] = None):
self._transport = Transport(token, server or DEFAULT_WS_URL)
self.last_pause_nanos = 0
self._transport.add_event_listener("sim:pause", self._on_pause)
self._pause_queue = EventQueue(self._transport, "sim:pause")
# Lazily create the pause queue inside the running event loop
self._pause_queue: Optional[EventQueue] = None

async def connect(self) -> dict[str, Any]:
"""
Expand Down Expand Up @@ -84,6 +95,28 @@ async def upload_file(
"""
return await upload_file(self._transport, filename, local_path)

async def download(self, name: str) -> ResponseMessage:
"""
Download a file from the simulator.

Args:
name: The name of the file to download.

Returns:
The response message from the server.
"""
return await download(self._transport, name)

async def download_file(self, name: str, local_path: Optional[Path] = None) -> None:
"""
Download a file from the simulator and save it to a local path.

Args:
name: The name of the file to download.
local_path: The local path to save the downloaded file. If not provided, uses the name as the path.
"""
await download_file(self._transport, name, local_path)

async def start_simulation(
self,
firmware: str,
Expand Down Expand Up @@ -156,6 +189,8 @@ async def wait_until_simulation_time(self, seconds: float) -> None:
await pause(self._transport)
remaining_nanos = seconds * 1e9 - self.last_pause_nanos
if remaining_nanos > 0:
if self._pause_queue is None:
self._pause_queue = EventQueue(self._transport, "sim:pause")
self._pause_queue.flush()
await resume(self._transport, int(remaining_nanos))
await self._pause_queue.get()
Expand Down Expand Up @@ -191,5 +226,67 @@ async def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "repl
else:
print(line, end="", flush=True)

async def serial_write(self, data: typing.Union[bytes, str, list[int]]) -> None:
"""Write data to the simulation serial monitor interface."""
await write_serial(self._transport, data)

def _on_pause(self, event: EventMessage) -> None:
self.last_pause_nanos = int(event["nanos"])

async def read_pin(self, part: str, pin: str) -> ResponseMessage:
"""Read the current state of a pin.

Args:
part: The part id (e.g. "uno").
pin: The pin name (e.g. "A2").
"""
return await pin_read(self._transport, part=part, pin=pin)

async def listen_pin(self, part: str, pin: str, listen: bool = True) -> ResponseMessage:
"""Start or stop listening for changes on a pin.

When enabled, "pin:change" events will be delivered via the transport's
event mechanism.

Args:
part: The part id.
pin: The pin name.
listen: True to start listening, False to stop.
"""
return await pin_listen(self._transport, part=part, pin=pin, listen=listen)

async def gpio_list(self) -> ResponseMessage:
"""Get a list of all GPIO pins available in the simulation."""
return await gpio_list(self._transport)

async def set_control(
self, part: str, control: str, value: typing.Union[int, bool, float]
) -> ResponseMessage:
"""Set a control value (e.g. simulate button press).

Args:
part: Part id (e.g. "btn1").
control: Control name (e.g. "pressed").
value: Control value to set (float).
"""
return await set_control(self._transport, part=part, control=control, value=value)

async def framebuffer_read(self, id: str) -> ResponseMessage:
"""Read the current framebuffer for the given device id."""
return await framebuffer_read(self._transport, id=id)

async def framebuffer_png_bytes(self, id: str) -> bytes:
"""Return the current framebuffer as PNG bytes."""
return await framebuffer_png_bytes(self._transport, id=id)

async def save_framebuffer_png(self, id: str, path: Path, overwrite: bool = True) -> Path:
"""Save the current framebuffer as a PNG file."""
return await save_framebuffer_png(self._transport, id=id, path=path, overwrite=overwrite)

async def compare_framebuffer_png(
self, id: str, reference: Path, save_mismatch: Optional[Path] = None
) -> bool:
"""Compare the current framebuffer with a reference PNG file."""
return await compare_framebuffer_png(
self._transport, id=id, reference=reference, save_mismatch=save_mismatch
)
Loading
Loading