|
| 1 | +import json |
| 2 | +import logging |
| 3 | +import os |
| 4 | +import typing as t |
| 5 | +from pathlib import Path |
| 6 | + |
| 7 | +from packaging.version import Version |
| 8 | +from pytest_embedded.log import DuplicateStdoutPopen, MessageQueue |
| 9 | +from pytest_embedded.utils import Meta |
| 10 | +from wokwi_client import GET_TOKEN_URL, WokwiClientSync |
| 11 | + |
| 12 | +from pytest_embedded_wokwi import WOKWI_CLI_MINIMUM_VERSION |
| 13 | + |
| 14 | +from .idf import IDFFirmwareResolver |
| 15 | + |
| 16 | +if t.TYPE_CHECKING: # pragma: no cover |
| 17 | + from pytest_embedded_idf.app import IdfApp |
| 18 | + |
| 19 | + |
| 20 | +target_to_board = { |
| 21 | + 'esp32': 'board-esp32-devkit-c-v4', |
| 22 | + 'esp32c3': 'board-esp32-c3-devkitm-1', |
| 23 | + 'esp32c6': 'board-esp32-c6-devkitc-1', |
| 24 | + 'esp32h2': 'board-esp32-h2-devkitm-1', |
| 25 | + 'esp32p4': 'board-esp32-p4-function-ev', |
| 26 | + 'esp32s2': 'board-esp32-s2-devkitm-1', |
| 27 | + 'esp32s3': 'board-esp32-s3-devkitc-1', |
| 28 | +} |
| 29 | + |
| 30 | + |
| 31 | +class Wokwi(DuplicateStdoutPopen): |
| 32 | + """Synchronous Wokwi integration that inherits from DuplicateStdoutPopen. |
| 33 | +
|
| 34 | + This class provides a synchronous interface to the Wokwi simulator while maintaining |
| 35 | + compatibility with pytest-embedded's logging and message queue infrastructure. |
| 36 | + """ |
| 37 | + |
| 38 | + SOURCE = 'Wokwi' |
| 39 | + REDIRECT_CLS = None # We'll handle output redirection manually |
| 40 | + |
| 41 | + def __init__( |
| 42 | + self, |
| 43 | + msg_queue: MessageQueue, |
| 44 | + firmware_resolver: IDFFirmwareResolver, |
| 45 | + wokwi_diagram: str | None = None, |
| 46 | + app: t.Optional['IdfApp'] = None, |
| 47 | + meta: Meta | None = None, |
| 48 | + **kwargs, |
| 49 | + ): |
| 50 | + self.app = app |
| 51 | + |
| 52 | + # Get Wokwi API token |
| 53 | + token = os.getenv('WOKWI_CLI_TOKEN') |
| 54 | + if not token: |
| 55 | + raise SystemExit(f'Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}.') |
| 56 | + |
| 57 | + # Initialize synchronous Wokwi client |
| 58 | + self.client = WokwiClientSync(token) |
| 59 | + |
| 60 | + # Check version compatibility |
| 61 | + if Version(self.client.version) < Version(WOKWI_CLI_MINIMUM_VERSION): |
| 62 | + logging.warning( |
| 63 | + 'Wokwi client version %s < required %s (compatibility not guaranteed)', |
| 64 | + self.client.version, |
| 65 | + WOKWI_CLI_MINIMUM_VERSION, |
| 66 | + ) |
| 67 | + logging.info('Wokwi client library version: %s', self.client.version) |
| 68 | + |
| 69 | + # Prepare diagram file if not supplied |
| 70 | + if wokwi_diagram is None: |
| 71 | + self.create_diagram_json() |
| 72 | + wokwi_diagram = os.path.join(self.app.app_path, 'diagram.json') |
| 73 | + |
| 74 | + # Initialize parent class |
| 75 | + super().__init__(msg_queue=msg_queue, meta=meta, **kwargs) |
| 76 | + |
| 77 | + # Connect and start simulation |
| 78 | + try: |
| 79 | + flasher_args = firmware_resolver.resolve_firmware(app) |
| 80 | + firmware_path = Path(flasher_args).as_posix() |
| 81 | + elf_path = Path(app.elf_file).as_posix() |
| 82 | + |
| 83 | + self._setup_simulation(wokwi_diagram, firmware_path, elf_path) |
| 84 | + self._start_serial_monitoring() |
| 85 | + except Exception as e: |
| 86 | + self.close() |
| 87 | + raise e |
| 88 | + |
| 89 | + def _setup_simulation(self, diagram: str, firmware_path: str, elf_path: str): |
| 90 | + """Set up the Wokwi simulation.""" |
| 91 | + hello = self.client.connect() |
| 92 | + logging.info('Connected to Wokwi Simulator, server version: %s', hello.get('version', 'unknown')) |
| 93 | + |
| 94 | + # Upload files |
| 95 | + self.client.upload_file('diagram.json', diagram) |
| 96 | + firmware = self.client.upload_file('pytest.bin', firmware_path) |
| 97 | + |
| 98 | + self.client.upload_file('pytest.elf', elf_path) |
| 99 | + |
| 100 | + logging.info('Uploaded diagram and firmware to Wokwi. Starting simulation...') |
| 101 | + |
| 102 | + # Start simulation |
| 103 | + self.client.start_simulation(firmware, elf='pytest.elf') |
| 104 | + |
| 105 | + def _start_serial_monitoring(self): |
| 106 | + """Start monitoring serial output and forward to stdout and message queue.""" |
| 107 | + |
| 108 | + def serial_callback(data: bytes): |
| 109 | + # Write to stdout for live monitoring |
| 110 | + try: |
| 111 | + decoded = data.decode('utf-8', errors='replace') |
| 112 | + print(decoded, end='', flush=True) |
| 113 | + except Exception as e: |
| 114 | + logging.debug(f'Error writing to stdout: {e}') |
| 115 | + |
| 116 | + # Write to log file if available |
| 117 | + try: |
| 118 | + if hasattr(self, '_fw') and self._fw and not self._fw.closed: |
| 119 | + decoded = data.decode('utf-8', errors='replace') |
| 120 | + self._fw.write(decoded) |
| 121 | + self._fw.flush() |
| 122 | + except Exception as e: |
| 123 | + logging.debug(f'Error writing to log file: {e}') |
| 124 | + |
| 125 | + # Put in message queue for expect() functionality |
| 126 | + try: |
| 127 | + if hasattr(self, '_q') and self._q: |
| 128 | + self._q.put(data) |
| 129 | + except Exception as e: |
| 130 | + logging.debug(f'Error putting data in message queue: {e}') |
| 131 | + |
| 132 | + # Start monitoring in background |
| 133 | + self.client.serial_monitor(serial_callback) |
| 134 | + |
| 135 | + def write(self, s: str | bytes) -> None: |
| 136 | + """Write data to the Wokwi serial interface.""" |
| 137 | + try: |
| 138 | + data = s if isinstance(s, bytes) else s.encode('utf-8') |
| 139 | + self.client.serial_write(data) |
| 140 | + logging.debug(f'{self.SOURCE} ->: {s}') |
| 141 | + except Exception as e: |
| 142 | + logging.error(f'Failed to write to Wokwi serial: {e}') |
| 143 | + |
| 144 | + def close(self): |
| 145 | + """Clean up resources.""" |
| 146 | + try: |
| 147 | + if hasattr(self, 'client') and self.client: |
| 148 | + self.client.disconnect() |
| 149 | + except Exception as e: |
| 150 | + logging.debug(f'Error during Wokwi cleanup: {e}') |
| 151 | + finally: |
| 152 | + super().close() |
| 153 | + |
| 154 | + def __del__(self): |
| 155 | + """Destructor to ensure cleanup when object is garbage collected.""" |
| 156 | + self.close() |
| 157 | + super().__del__() |
| 158 | + |
| 159 | + def terminate(self): |
| 160 | + """Terminate the Wokwi connection.""" |
| 161 | + self.close() |
| 162 | + super().terminate() |
| 163 | + |
| 164 | + def create_diagram_json(self): |
| 165 | + """Create a diagram.json file for the simulation.""" |
| 166 | + app = self.app |
| 167 | + target_board = target_to_board[app.target] |
| 168 | + |
| 169 | + # Check for existing diagram.json file |
| 170 | + diagram_json_path = os.path.join(app.app_path, 'diagram.json') |
| 171 | + if os.path.exists(diagram_json_path): |
| 172 | + with open(diagram_json_path) as f: |
| 173 | + json_data = json.load(f) |
| 174 | + if not any(part['type'] == target_board for part in json_data['parts']): |
| 175 | + logging.warning( |
| 176 | + f'diagram.json exists, no part with type "{target_board}" found. ' |
| 177 | + + 'You may need to update the diagram.json file manually to match the target board.' |
| 178 | + ) |
| 179 | + return |
| 180 | + |
| 181 | + # Create default diagram |
| 182 | + if app.target == 'esp32p4': |
| 183 | + rx_pin = '38' |
| 184 | + tx_pin = '37' |
| 185 | + else: |
| 186 | + rx_pin = 'RX' |
| 187 | + tx_pin = 'TX' |
| 188 | + |
| 189 | + diagram = { |
| 190 | + 'version': 1, |
| 191 | + 'author': 'Uri Shaked', |
| 192 | + 'editor': 'wokwi', |
| 193 | + 'parts': [{'type': target_board, 'id': 'esp'}], |
| 194 | + 'connections': [ |
| 195 | + ['esp:' + tx_pin, '$serialMonitor:RX', ''], |
| 196 | + ['esp:' + rx_pin, '$serialMonitor:TX', ''], |
| 197 | + ], |
| 198 | + } |
| 199 | + |
| 200 | + with open(diagram_json_path, 'w') as f: |
| 201 | + json.dump(diagram, f, indent=2) |
| 202 | + |
| 203 | + def _hard_reset(self): |
| 204 | + """Fake hard_reset to maintain API consistency.""" |
| 205 | + raise NotImplementedError('Hard reset not supported in Wokwi simulation') |
0 commit comments