diff --git a/affinity-setup.sh b/affinity-setup.sh old mode 100644 new mode 100755 index b953db4..998d9c8 --- a/affinity-setup.sh +++ b/affinity-setup.sh @@ -1,256 +1,69 @@ #!/bin/sh -# Klipper stack affinity + PREEMPT_RT realtime setup (no unit file edits on disk) +# Minimal display affinity helper: +# - optionally set CPU governor to performance +# - de-prioritize display.service so UI work is less likely to interfere +# with Klipper/Moonraker on low-power SBCs set -eu -TAG="affinity-setup" +TAG="display-affinity-minimal" +ENABLE_PERFORMANCE_GOVERNOR="${ENABLE_PERFORMANCE_GOVERNOR:-yes}" + log() { - logger -t "$TAG" -- "$@" + logger -t "$TAG" -- "$@" 2>/dev/null || true printf '%s: %s\n' "$TAG" "$*" } -# Re-exec as root if needed +have() { command -v "$1" >/dev/null 2>&1; } + +# Re-exec as root because systemd may call this via affinity.service. if [ "$(id -u)" != 0 ]; then exec sudo -E -- "$0" "$@" fi -# --- CPU layout (0-based) ---------------------------------------------------- -MISC_CPU=0 # moonraker/mobileraker/mjpg-streamer/power_monitor + ttyS2 IRQ -DISPLAY_TTY_CPU=1 # display.service + ttyS1 IRQ -KLIPPER_MCU_RPI_CPU=2 # klipper-mcu.service (host-MCU tasks) -KLIPPER_MCU_TTY_CPU=3 # klipper.service + ttyS0 IRQ - -# --- basic env checks (warn-only) -------------------------------------------- -have() { command -v "$1" >/dev/null 2>&1; } -if systemctl is-active --quiet irqbalance.service 2>/dev/null; then - log "WARN: irqbalance is active; it may override IRQ affinities." -fi -for bin in systemctl awk ps sed chrt taskset ionice renice stty sysctl logger; do - have "$bin" || log "WARN: missing helper '$bin' (some steps may be skipped)." -done - -cpu_online() { - c="$1" - [ -d "/sys/devices/system/cpu/cpu$c" ] || return 1 - onf="/sys/devices/system/cpu/cpu$c/online" - [ ! -f "$onf" ] || [ "$(cat "$onf" 2>/dev/null || echo 1)" = "1" ] -} - -# --- helpers ----------------------------------------------------------------- -wait_active() { # wait until systemd unit is active and has a MainPID - unit="$1"; t=0 - while [ "$t" -lt 30 ]; do - state=$(systemctl is-active "$unit" 2>/dev/null || true) - pid=$(systemctl show -p MainPID --value "$unit" 2>/dev/null || echo 0) - if [ "$state" = "active" ] && [ "${pid:-0}" -gt 0 ]; then - return 0 - fi - sleep 0.5 - t=$((t+1)) - done - log "WARN: $unit did not become active in time; continuing." - return 0 -} - -wait_irq_present() { # wait until /proc/interrupts shows the device name - dev="$1"; t=0 - while [ "$t" -lt 20 ]; do - irq=$(awk -v n="$dev" '$NF==n{gsub(":", "", $1); print $1; exit}' /proc/interrupts) - [ -n "$irq" ] && return 0 - sleep 0.5 - t=$((t+1)) - done - log "WARN: IRQ for $dev not found; continuing." - return 0 -} - -irq_for() { - awk -v name="$1" '$NF==name{gsub(":", "", $1); print $1; exit}' /proc/interrupts -} - -pin_irq() { - irq="$1"; cpu="$2" - [ -n "$irq" ] || return 0 - p="/proc/irq/$irq" - [ -d "$p" ] || return 0 - cpu_online "$cpu" || { log "WARN: CPU $cpu not online; skip pin IRQ $irq"; return 0; } - - if [ -w "$p/smp_affinity_list" ]; then - echo "$cpu" > "$p/smp_affinity_list" 2>/dev/null || true - else - # list path preferred; mask unsafe for cpu>=32 - printf '%x\n' "$((1< "$p/smp_affinity" 2>/dev/null || true - fi - log "Pinned IRQ $irq to CPU $cpu" -} - -set_unit_cpus() { - unit="$1"; cpus="$2" - cpu_online "$cpus" || { log "WARN: CPU $cpus not online; skip $unit CPU pin"; return 0; } - if systemctl set-property --runtime "$unit" "AllowedCPUs=$cpus" >/dev/null 2>&1; then - log "AllowedCPUs for $unit -> $cpus" - else - pid=$(systemctl show -p MainPID --value "$unit" 2>/dev/null || echo 0) - if [ "${pid:-0}" -gt 0 ] && taskset -pc "$cpus" "$pid" >/dev/null 2>&1; then - log "taskset fallback for $unit(pid=$pid) -> $cpus" - fi - fi +mainpid() { + unit="$1" + systemctl show -p MainPID --value "$unit" 2>/dev/null || echo 0 } renice_unit() { - unit="$1"; niceval="$2" - pid=$(systemctl show -p MainPID --value "$unit" 2>/dev/null || echo 0) - if [ "${pid:-0}" -gt 0 ] && renice -n "$niceval" -p "$pid" >/dev/null 2>&1; then - log "renice $unit(pid=$pid) -> $niceval" - fi - return 0 + unit="$1" + nice_val="$2" + pid="$(mainpid "$unit")" + [ "$pid" -gt 0 ] || return 0 + renice "$nice_val" -p "$pid" >/dev/null 2>&1 || true } ionice_idle_unit() { unit="$1" - pid=$(systemctl show -p MainPID --value "$unit" 2>/dev/null || echo 0) - if [ "${pid:-0}" -gt 0 ] && ionice -c3 -p "$pid" >/dev/null 2>&1; then - log "ionice idle $unit(pid=$pid)" - fi - return 0 + pid="$(mainpid "$unit")" + [ "$pid" -gt 0 ] || return 0 + have ionice || return 0 + ionice -c3 -p "$pid" >/dev/null 2>&1 || true } -ps_line() { - pid="$1" - ps -o pid,cls,rtprio,psr,cmd -p "$pid" --no-headers 2>/dev/null | sed 's/^/ /' -} - -# Ensure a unit runs as SCHED_FIFO:prio even if the process set its own (-r/49) -promote_unit_fifo() { - unit="$1"; prio="$2" - systemctl set-property --runtime "$unit" CPUSchedulingPolicy=fifo CPUSchedulingPriority="$prio" >/dev/null 2>&1 || true - pid=$(systemctl show -p MainPID --value "$unit" 2>/dev/null || echo 0) - [ "${pid:-0}" -gt 0 ] || return 0 - n=0 - while [ "$n" -lt 10 ]; do - chrt -a -f -p "$prio" "$pid" >/dev/null 2>&1 || true - cls=$(ps -o cls= -p "$pid" 2>/dev/null | xargs || true) - rt=$(ps -o rtprio= -p "$pid" 2>/dev/null | xargs || true) - if [ "$cls" = "FF" ] && [ "$rt" = "$prio" ]; then - log "$unit(pid=$pid) -> FIFO $prio (all threads)" - return 0 - fi - sleep 0.3 - n=$((n+1)) - done - log "WARN: $unit(pid=$pid) did not reach FIFO $prio (last: cls=$cls rtprio=$rt)" - return 0 -} +set_performance_governor() { + [ "$ENABLE_PERFORMANCE_GOVERNOR" = "yes" ] || { + log "Skipping CPU governor change" + return 0 + } -# Promote a threaded IRQ kernel thread (irq/-*) to FIFO priority -# Handles kernel threads displayed as "[irq/-...]" by stripping brackets. -chrt_irq_thread() { - irq="$1"; prio="$2"; t=0 - [ -n "$irq" ] || return 0 - while [ "$t" -lt 20 ]; do - pid=$( - ps -eLo pid=,cmd= 2>/dev/null | awk -v irq="$irq" ' - { - pid=$1; $1=""; sub(/^[ \t]+/,""); name=$0; - gsub(/^\[/,"",name); gsub(/\]$/,"",name); - if (name ~ ("^irq/" irq "-")) { print pid; exit } - }' - ) - if [ -n "$pid" ]; then - chrt -f -p "$prio" "$pid" >/dev/null 2>&1 || true - log "IRQ thread irq/$irq -> FIFO $prio (pid=$pid)" - return 0 - fi - sleep 0.5 - t=$((t+1)) - done - log "WARN: did not find threaded IRQ for $irq" - return 0 + if have cpupower; then + cpupower frequency-set -g performance >/dev/null 2>&1 || true + else + for g in /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor; do + [ -w "$g" ] && echo performance > "$g" 2>/dev/null || true + done + fi + log "CPU governor set to performance (best effort)" } -# --- wait for services ------------------------------------------------------- -for u in klipper.service klipper-mcu.service moonraker.service display.service; do - wait_active "$u" -done - -# --- wait for UART IRQs ------------------------------------------------------ -wait_irq_present ttyS0 -wait_irq_present ttyS1 -wait_irq_present ttyS2 - -# --- pin UART IRQs ----------------------------------------------------------- -IRQ_S0="$(irq_for ttyS0 || true)" -IRQ_S1="$(irq_for ttyS1 || true)" -IRQ_S2="$(irq_for ttyS2 || true)" - -[ -n "${IRQ_S0:-}" ] && pin_irq "$IRQ_S0" "$KLIPPER_MCU_TTY_CPU" -[ -n "${IRQ_S1:-}" ] && pin_irq "$IRQ_S1" "$DISPLAY_TTY_CPU" -[ -n "${IRQ_S2:-}" ] && pin_irq "$IRQ_S2" "$MISC_CPU" +set_performance_governor -# --- place units on CPUs ----------------------------------------------------- -set_unit_cpus klipper-mcu.service "$KLIPPER_MCU_RPI_CPU" -set_unit_cpus klipper.service "$KLIPPER_MCU_TTY_CPU" -set_unit_cpus display.service "$DISPLAY_TTY_CPU" -set_unit_cpus moonraker.service "$MISC_CPU" -set_unit_cpus mjpg-streamer-webcam1.service "$MISC_CPU" || true -set_unit_cpus mobileraker.service "$MISC_CPU" || true -set_unit_cpus power_monitor.service "$MISC_CPU" || true - -# --- serial tuning for /dev/ttyS0 ------------------------------------------- -if [ -e /dev/ttyS0 ]; then - have setserial && setserial /dev/ttyS0 low_latency || true - stty -F /dev/ttyS0 cs8 -parenb -cstopb -ixon -ixoff -crtscts \ - -icanon -echo -echoe -echok -echoctl -echoke -iexten \ - -inlcr -igncr -icrnl -opost -hupcl min 1 time 0 || true -else - log "WARN: /dev/ttyS0 not present; skipped stty/setserial" -fi - -# --- make display gentle ----------------------------------------------------- -renice_unit display.service 19 +# Keep the UI process gentle. Do not touch klipper/klipper-mcu scheduling, +# CPU affinity, IRQ affinity, or serial driver tuning here. +renice_unit display.service 19 ionice_idle_unit display.service +log "Applied gentle priority tuning to display.service" -# --- RT budget --------------------------------------------------------------- -sysctl -w kernel.sched_rt_runtime_us=-1 >/dev/null 2>&1 || \ - echo -1 > /proc/sys/kernel/sched_rt_runtime_us 2>/dev/null || \ - log "WARN: failed to set sched_rt_runtime_us" - -# --- promote Klippy + host MCU to SCHED_FIFO 60 (sticky) --------------------- -promote_unit_fifo klipper-mcu.service 60 -promote_unit_fifo klipper.service 60 - -# --- bump ttyS0 IRQ thread if present ---------------------------------------- -if [ -n "${IRQ_S0:-}" ]; then - chrt_irq_thread "$IRQ_S0" 70 -fi - -# --- summary ----------------------------------------------------------------- -if [ -n "${IRQ_S0:-}" ]; then - aff0=$( - cat "/proc/irq/$IRQ_S0/smp_affinity_list" 2>/dev/null || - cat "/proc/irq/$IRQ_S0/smp_affinity" 2>/dev/null || - echo "?" - ) - log "ttyS0 irq=$IRQ_S0 aff=$aff0" -fi -if [ -n "${IRQ_S1:-}" ]; then - aff1=$( - cat "/proc/irq/$IRQ_S1/smp_affinity_list" 2>/dev/null || - cat "/proc/irq/$IRQ_S1/smp_affinity" 2>/dev/null || - echo "?" - ) - log "ttyS1 irq=$IRQ_S1 aff=$aff1" -fi -if [ -n "${IRQ_S2:-}" ]; then - aff2=$( - cat "/proc/irq/$IRQ_S2/smp_affinity_list" 2>/dev/null || - cat "/proc/irq/$IRQ_S2/smp_affinity" 2>/dev/null || - echo "?" - ) - log "ttyS2 irq=$IRQ_S2 aff=$aff2" -fi - -log "klipper-mcu:$(ps_line "$(systemctl show -p MainPID --value klipper-mcu.service 2>/dev/null || echo 0)" || true)" -log "klipper: $(ps_line "$(systemctl show -p MainPID --value klipper.service 2>/dev/null || echo 0)" || true)" -log "done (IRQs: ttyS0:${IRQ_S0:-?} ttyS1:${IRQ_S1:-?} ttyS2:${IRQ_S2:-?})" exit 0 diff --git a/affinity.service b/affinity.service index 8b630c6..9d9bf91 100644 --- a/affinity.service +++ b/affinity.service @@ -1,20 +1,16 @@ [Unit] -Description=Pin UART IRQs and set CPU affinities/priorities for Klipper stack (dynamic detection) +Description=Apply minimal display priority tuning -# Start after these are up, and pull them in if needed -After=klipper-mcu.service klipper.service display.service moonraker.service -Wants=klipper-mcu.service klipper.service display.service moonraker.service +# Run after display is up so the script can tune display.service safely. +After=display.service -# If any of these restart/stop, restart this oneshot too -PartOf=klipper-mcu.service klipper.service display.service +# Re-run when display.service is restarted/stopped. +PartOf=display.service [Service] Type=oneshot RemainAfterExit=yes -ExecStart=/usr/local/sbin/affinity-setup.sh +ExecStart=/home/mks/display_connector/affinity-setup.sh [Install] -WantedBy=multi-user.target -WantedBy=klipper-mcu.service -WantedBy=klipper.service WantedBy=display.service diff --git a/display-service-installer.sh b/display-service-installer.sh index dfc4b73..7bd8fb5 100755 --- a/display-service-installer.sh +++ b/display-service-installer.sh @@ -58,9 +58,8 @@ echo "Reloading systemd units..." sudo systemctl daemon-reload # --- Enable + start services ------------------------------------------------- -echo "Enabling and starting affinity.service..." +echo "Enabling affinity.service..." sudo systemctl enable affinity.service -sudo systemctl start affinity.service echo "Enabling and starting display.service..." sudo systemctl enable display.service diff --git a/display.py b/display.py index def1ae3..d0e763b 100644 --- a/display.py +++ b/display.py @@ -105,6 +105,7 @@ def signal_handler(signum, frame): PRINTING_PAGES = [ PAGE_PRINTING, + PAGE_PRINTING_KAMP, PAGE_PRINTING_FILAMENT, PAGE_PRINTING_PAUSE, PAGE_PRINTING_STOP, @@ -264,6 +265,7 @@ def __init__(self, config, loop): self.bed_leveling_counts = [0, 0] self.bed_leveling_probed_count = 0 self.bed_leveling_last_position = None + self._rapid_scan_mode = False self.klipper_restart_event = asyncio.Event() @@ -444,8 +446,9 @@ async def special_page_handling(self, current_page): await self.display.draw_initial_zprobe_leveling(self.z_probe_step, self.z_probe_distance) self._loop.create_task(self.handle_zprobe_leveling()) elif current_page == PAGE_PRINTING_KAMP: - await self.display.draw_kamp_page(self.bed_leveling_counts) - return + if not self._rapid_scan_mode: + await self.display.draw_kamp_page(self.bed_leveling_counts) + return await self.display.special_page_handling(current_page) @@ -708,9 +711,10 @@ async def _handle_file_selection(): selected = self.dir_contents[(self.files_page * 5) + index] is_dir = selected["type"] == "dir" file_path = selected["path"] + if is_dir: + self.current_dir = file_path + self.files_page = 0 if is_dir: - self.current_dir = file_path - self.files_page = 0 await self._load_files() else: async with self._filename_lock: @@ -993,7 +997,7 @@ async def _load_files(self): {"path": "/".join(["gcodes", self.current_dir])}, ) dir_info = data["result"] - self.dir_contents = [] + dirs = [] for item in dir_info["dirs"]: if not item["dirname"].startswith("."): @@ -1023,15 +1027,18 @@ async def _load_files(self): sort_folders_first = self.config["files"].getboolean( "sort_folders_first", fallback=True ) - if sort_folders_first: - self.dir_contents = self.sort_dir_contents(dirs) + self.sort_dir_contents( - files - ) - else: - self.dir_contents = self.sort_dir_contents(dirs + files) - await self.display.show_files_page( - self.current_dir, self.dir_contents, self.files_page - ) + + # Update shared state under lock + async with self._files_lock: + if sort_folders_first: + self.dir_contents = self.sort_dir_contents(dirs) + self.sort_dir_contents(files) + else: + self.dir_contents = self.sort_dir_contents(dirs + files) + current_dir = self.current_dir + dir_contents = self.dir_contents + files_page = self.files_page + + await self.display.show_files_page(current_dir, dir_contents, files_page) def _page_id(self, page): return self.display.mapper.map_page(page) @@ -1052,17 +1059,22 @@ async def _go_back(self): history_len = len(self.history) if history_len <= 1: logger.debug("Already at the main page.") - return # ← This is the ONLY place this message should appear + return # Get current page WITHOUT releasing lock (safe - no await) current_page = self.history[-1] if self.history else None - # PHASE 2: Handle FILES special case (outside lock) - if current_page == PAGE_FILES and self.current_dir != "": - self.current_dir = "/".join(self.current_dir.split("/")[:-1]) - self.files_page = 0 - await self._load_files() # I/O outside lock - return + # PHASE 2: Handle FILES special case (with proper file locking) + if current_page == PAGE_FILES: + should_load_files = False + async with self._files_lock: + if self.current_dir != "": + self.current_dir = "/".join(self.current_dir.split("/")[:-1]) + self.files_page = 0 + should_load_files = True + if should_load_files: + await self._load_files() + return # PHASE 3: Pop history and determine navigation (under lock - DATA ONLY) back_page = None @@ -1085,15 +1097,12 @@ async def _go_back(self): # Map page under lock (it's just a dict lookup - fast and safe) mapped_page = self.display.mapper.map_page(back_page) - # PHASE 4: Navigate (outside lock - I/O) - if back_page is None or mapped_page is None: - logger.debug("No valid page to navigate back to.") - return - - await self.display.navigate_to(mapped_page) - logger.debug(f"Navigating back to {back_page}") + # PHASE 4: Perform navigation (outside lock - I/O) + if back_page is not None and mapped_page is not None: + await self.display.navigate_to(mapped_page) + logger.debug(f"Navigating back to {back_page}") - # PHASE 5: Special page handling (outside lock - I/O) + # PHASE 5: Handle special page logic (outside lock - may do I/O) try: await self.special_page_handling(back_page) except Exception as e: @@ -1197,9 +1206,10 @@ async def listen(self): logger.error(f"Unexpected response format from printer.objects.subscribe: {ret}") raise Exception("Failed to subscribe to printer objects") - # Now wait for the process_stream task to complete (keeps connection alive) + # Keep the listen task alive while the Moonraker stream reader runs. logger.info("Listen task now monitoring connection...") if self._process_stream_task: + await asyncio.shield(self._process_stream_task) logger.info("Process stream task completed, connection closed") else: logger.warning("No process_stream task found!") @@ -1242,6 +1252,9 @@ async def _cleanup_stale_requests(self): await asyncio.sleep(60) async def _send_moonraker_request(self, method, params=None): + if not self.connected or self.writer is None: + raise ConnectionError("Not connected to Moonraker") + if params is None: params = {} message = self._make_rpc_msg(method, **params) @@ -1260,13 +1273,22 @@ async def _send_moonraker_request(self, method, params=None): self.writer.write(data) await self.writer.drain() return await asyncio.wait_for(fut, timeout=self.REQUEST_TIMEOUT) - except asyncio.TimeoutError: + except asyncio.CancelledError: + # External cancellation (shutdown, reconnect) - always propagate + async with self.pending_reqs_lock: + self.pending_reqs.pop(message["id"], None) + raise + except (asyncio.TimeoutError, ConnectionError): + # Timeout or connection closed - let caller handle reconnect logic async with self.pending_reqs_lock: self.pending_reqs.pop(message["id"], None) raise except Exception: + # Unexpected error - log but don't call close() here + # Let the error propagate; caller/listen() handles reconnection logger.exception("Unexpected error _send_moonraker_request") - await self.close() + async with self.pending_reqs_lock: + self.pending_reqs.pop(message["id"], None) raise def _find_ips(self, network): @@ -1311,6 +1333,7 @@ async def connect_moonraker(self) -> None: logger.error( "KeyError encountered in software_version_response. Attempting to reconnect." ) + await self.close() await asyncio.sleep(5) # Wait before reconnecting continue # Retry the connection loop @@ -1318,6 +1341,7 @@ async def connect_moonraker(self) -> None: raise except Exception as e: logger.error(f"Error connecting to Moonraker: {e}") + await self.close() await asyncio.sleep(5) # Wait before reconnecting continue @@ -1434,21 +1458,23 @@ async def _process_stream(self, reader: asyncio.StreamReader) -> None: errors_remaining: int = 10 while not reader.at_eof(): if self.klipper_restart_event.is_set(): - await self._attempt_reconnect() self.klipper_restart_event.clear() + await self._attempt_reconnect() + return # Exit - new connection started try: data = await reader.readuntil(b"\x03") decoded = data[:-1].decode(encoding="utf-8") item = json.loads(decoded) except (ConnectionError, asyncio.IncompleteReadError): await self._attempt_reconnect() - break + return # Exit - new connection started except asyncio.CancelledError: raise except Exception: errors_remaining -= 1 if not errors_remaining or not self.connected: await self._attempt_reconnect() + return # Exit - new connection started continue errors_remaining = 10 if "id" in item: @@ -1456,11 +1482,12 @@ async def _process_stream(self, reader: asyncio.StreamReader) -> None: request_data = self.pending_reqs.pop(item["id"], None) if request_data is not None: fut, _ = request_data - fut.set_result(item) - elif item["method"] == "notify_status_update": + if not fut.done(): + fut.set_result(item) + elif item.get("method") == "notify_status_update": await self.handle_status_update(item["params"][0]) - elif item["method"] == "notify_gcode_response": - await self.handle_gcode_response(item["params"][0]) + elif item.get("method") == "notify_gcode_response": + await self.handle_gcode_response(item["params"][0]) logger.info("Unix Socket Disconnection from _process_stream()") await self.close() @@ -1498,29 +1525,25 @@ def safe_int_convert(value, default=0): async def _attempt_reconnect(self): async with self._reconnect_lock: if self._is_reconnecting: - logger.debug("Reconnection already in progress, skipping...") return self._is_reconnecting = True try: - # Close existing connection - if self.writer and not self.writer.is_closing(): + # Cancel existing listen task first + if self._listen_task and not self._listen_task.done(): + self._listen_task.cancel() try: - self.writer.close() - await self.writer.wait_closed() - except Exception as e: - logger.debug(f"Error closing writer: {e}") + await self._listen_task + except asyncio.CancelledError: + pass - self.connected = False + # Close existing connection + await self.close() logger.info("Attempting to reconnect to Moonraker...") await asyncio.sleep(1) - # Clear the listening flag if it's stuck self._is_listening = False - - # Allow thread pool recreation self.resources.allow_new_pool() - self.start_listening() finally: self._is_reconnecting = False @@ -1859,19 +1882,28 @@ async def handle_status_update(self, new_data, data_mapping=None): # Load thumbnail if needed if should_load_thumbnail and thumbnail_filename: logger.info(f"Loading thumbnail for {thumbnail_filename} on printing page") - if self._thumbnail_task and not self._thumbnail_task.done(): - self._thumbnail_task.cancel() + + # Cancel existing task under lock + task_to_cancel = None + async with self._filename_lock: + if self._thumbnail_task and not self._thumbnail_task.done(): + task_to_cancel = self._thumbnail_task + self._thumbnail_task = None + + if task_to_cancel: + task_to_cancel.cancel() try: - await self._thumbnail_task + await task_to_cancel except asyncio.CancelledError: pass - self._thumbnail_task = self._loop.create_task( - self.load_thumbnail_for_page( - thumbnail_filename, - thumbnail_page_id + async with self._filename_lock: + self._thumbnail_task = self._loop.create_task( + self.load_thumbnail_for_page( + thumbnail_filename, + thumbnail_page_id + ) ) - ) elif state_to_process == "complete": if current_page is None or current_page != PAGE_PRINTING_COMPLETE: @@ -1988,14 +2020,23 @@ async def close(self): if not self.connected: return self.connected = False + + # Cancel all pending request futures + async with self.pending_reqs_lock: + for fut, _ in list(self.pending_reqs.values()): + if not fut.done(): + fut.set_exception(ConnectionError("Connection closed")) + self.pending_reqs.clear() - # Cancel the process_stream task if it's still running + # Cancel process_stream task only if it's not the current task (avoid self-cancellation) + current_task = asyncio.current_task() if self._process_stream_task and not self._process_stream_task.done(): - self._process_stream_task.cancel() - try: - await self._process_stream_task - except asyncio.CancelledError: - pass + if self._process_stream_task is not current_task: + self._process_stream_task.cancel() + try: + await self._process_stream_task + except asyncio.CancelledError: + pass if self.writer: self.writer.close() @@ -2026,7 +2067,7 @@ async def handle_zprobe_leveling(self): async def handle_gcode_response(self, response): if self.leveling_mode == "screw": - if "probe at" in response: + if "probe: at" in response: self.screw_probe_count += 1 self._loop.create_task( self.display.update_screw_level_description( @@ -2054,11 +2095,38 @@ async def handle_gcode_response(self, response): self.bed_leveling_counts = [x_count, y_count] elif response.startswith("// Adapted mesh bounds"): self.bed_leveling_probed_count = 0 - current_page = await self._get_current_page() + self.bed_leveling_last_position = None + self._bed_leveling_complete = False + # Don't reset _rapid_scan_mode here - it may have been set by + # "Beginning rapid surface scan" which can come before or after this message + # Reset leveling_mode for KAMP during printing (not manual full bed level) + if self.current_state in ("printing", "paused"): + self.leveling_mode = None + current_page = await self._get_current_page() if current_page != PAGE_PRINTING_KAMP: self._loop.create_task(self._navigate_to_page(PAGE_PRINTING_KAMP, clear_history=True)) - elif response.startswith("// probe at"): - current_page = await self._get_current_page() + elif "Beginning rapid surface scan" in response or "Touch home at" in response: + # Rapid scan mode (Eddy/Cartographer/Beacon) - these probes don't send + # "Adapted mesh bounds" or "probe: at" messages, so we handle everything here + self._rapid_scan_mode = True + self.bed_leveling_probed_count = 0 + self.bed_leveling_last_position = None + self._bed_leveling_complete = False + # Reset leveling_mode for KAMP during printing + if self.current_state in ("printing", "paused"): + self.leveling_mode = None + logger.info("Rapid scan mode detected - using simplified visualization") + current_page = await self._get_current_page() + if current_page != PAGE_PRINTING_KAMP: + self._loop.create_task(self._navigate_to_page(PAGE_PRINTING_KAMP, clear_history=True)) + self._loop.create_task( + self.display.update_kamp_text("Scanning bed surface...") + ) + elif response.startswith("// probe: at"): + # Skip all probe messages during rapid scan to avoid overwhelming the system + if self._rapid_scan_mode: + return + current_page = await self._get_current_page() if current_page != PAGE_PRINTING_KAMP: # We are not leveling, likely response came from manual probe e.g. from console, # Skip updating the state, otherwise it messes up bed leveling screen when printing @@ -2088,13 +2156,23 @@ async def handle_gcode_response(self, response): ) ) - elif response.startswith("// Mesh Bed Leveling Complete"): + elif response.startswith("// Mesh Bed Leveling Complete") or "Collecting samples along the scanning path completed" in response: + # If rapid scan mode was active, show completion + # Draw boxes if we received probe counts (some probes send both rapid scan AND counts) + if self._rapid_scan_mode: + self._loop.create_task( + self.display.update_kamp_text("Scan complete!") + ) + self.bed_leveling_probed_count = 0 self.bed_leveling_counts = self.full_bed_leveling_counts - current_page = await self._get_current_page() + self._rapid_scan_mode = False + + current_page = await self._get_current_page() if current_page == PAGE_PRINTING_KAMP: self._bed_leveling_complete = True - if self.leveling_mode == "full_bed": + # Only show bed mesh final for manual full bed leveling (not during printing) + if self.leveling_mode == "full_bed" and self.current_state not in ("printing", "paused"): self._loop.create_task(self.display.show_bed_mesh_final()) else: self._loop.create_task(self._handle_bed_leveling_complete()) @@ -2225,22 +2303,31 @@ async def watchdog_ping(): if config_observer.is_alive(): config_observer.join(timeout=5) - # Ensure all tasks are cancelled (Py 3.13 safe) + # Best-effort controller shutdown before closing the loop. + if "controller" in locals() and not loop.is_closed(): + try: + loop.run_until_complete(controller.close()) + except Exception as e: + logger.error(f"Error during controller close(): {e}") + + # Ensure all tasks are cancelled and given time to handle CancelledError. try: - asyncio.get_running_loop() # raises if no loop is running - pending = asyncio.all_tasks() - except RuntimeError: + pending = {t for t in asyncio.all_tasks(loop=loop) if not t.done()} + except Exception: pending = set() for task in list(pending): task.cancel() - if pending: + if pending and not loop.is_closed(): try: - loop.run_until_complete(asyncio.wait(pending, timeout=5)) + loop.run_until_complete( + asyncio.gather(*pending, return_exceptions=True) + ) + loop.run_until_complete(loop.shutdown_asyncgens()) except RuntimeError: # Loop may already be closed or not runnable here; best-effort shutdown. pass - + loop.close() logger.info("Service stopped") diff --git a/display.service b/display.service index 6eee02d..41c8721 100644 --- a/display.service +++ b/display.service @@ -9,6 +9,13 @@ Type=simple User=mks Group=mks WorkingDirectory=/home/mks/display_connector +# Self-sync systemd units from the repo on startup so Moonraker-triggered restarts +# can apply updated unit files without re-running the installer manually. +ExecStartPre=+/usr/bin/install -m 0644 /home/mks/display_connector/affinity.service /etc/systemd/system/affinity.service +ExecStartPre=+/usr/bin/install -m 0644 /home/mks/display_connector/display.service /etc/systemd/system/display.service +ExecStartPre=+/bin/chmod 0755 /home/mks/display_connector/affinity-setup.sh +ExecStartPre=+/bin/systemctl daemon-reload +ExecStartPre=+/bin/systemctl enable affinity.service ExecStartPre=/bin/sleep 10 ExecStart=/home/mks/display_connector/venv/bin/python /home/mks/display_connector/display.py diff --git a/src/communicator.py b/src/communicator.py index 5481b44..518ea3a 100644 --- a/src/communicator.py +++ b/src/communicator.py @@ -68,7 +68,7 @@ async def _execute_command(self, data, timeout=None): # Any other error: log and continue self.logger.warning(f"Unexpected error writing to display: {e}") - async def write(self, data, timeout=None, blocked_key=None): + async def write(self, data, timeout=None, blocked_key=None, auto_unblock=True): # Fast path: decide blocking under lock async with self._write_lock: # If someone else is blocking, queue this command @@ -85,7 +85,7 @@ async def write(self, data, timeout=None, blocked_key=None): await self._execute_command(data, timeout) finally: # If this was a blocking op, release block and send the next queued command (if any) - if blocked_key: + if blocked_key and auto_unblock: await self.unblock(blocked_key) async def unblock(self, blocked_key): @@ -127,9 +127,11 @@ async def retrieve_nested_data(self, path): async def navigate_to(self, page_id): # Block other writes while we switch pages - await self.write(f"page {page_id}", blocked_key="__nav__") - await asyncio.sleep(0.25) # give the HMI time to swap pages - await self.unblock("__nav__") + try: + await self.write(f"page {page_id}", blocked_key="__nav__", auto_unblock=False) + await asyncio.sleep(0.25) # give the HMI time to swap pages + finally: + await self.unblock("__nav__") async def update_data(self, new_data, data_mapping=None, current_data=None): if data_mapping is None: diff --git a/src/elegoo_display.py b/src/elegoo_display.py index 82f1f93..5c5a020 100644 --- a/src/elegoo_display.py +++ b/src/elegoo_display.py @@ -338,6 +338,7 @@ def __init__(self, logger: Logger, model: str, port: str, event_handler, baudrat self._cached_wifi_status = None self._last_wifi_check = 0 self._wifi_check_interval = 30 # Only check WiFi every 30 seconds + self._warning_task = None async def get_firmware_version(self) -> str: if self._firmware_version is None: # Check if the firmware version is cached @@ -345,10 +346,13 @@ async def get_firmware_version(self) -> str: return self._firmware_version async def send_warning_message(self): - await asyncio.sleep(0.6) # Add delay if needed - await self.write( - f'xstr 0,464,320,16,2,{TEXT_WARNING},{BACKGROUND_GRAY},1,1,1,"WARNING: Unsupported Display Firmware Version"' - ) + try: + await asyncio.sleep(0.6) # Add delay if needed + await self.write( + f'xstr 0,464,320,16,2,{TEXT_WARNING},{BACKGROUND_GRAY},1,1,1,"WARNING: Unsupported Display Firmware Version"' + ) + finally: + self._warning_task = None async def check_valid_version(self): version = await self.get_firmware_version() @@ -361,7 +365,8 @@ async def check_valid_version(self): "Unsupported firmware version. Consider updating to a supported version: " + ", ".join(self.supported_firmware_versions) ) - asyncio.create_task(self.send_warning_message()) # Send warning asynchronously + if self._warning_task is None or self._warning_task.done(): + self._warning_task = asyncio.create_task(self.send_warning_message()) return False return True @@ -555,7 +560,7 @@ async def update_wifi_ui(self): if self._cached_wifi_status is not None and (current_time - self._last_wifi_check) < self._wifi_check_interval: has_wifi, ssid, rssi_category = self._cached_wifi_status else: - has_wifi, ssid, rssi_category = get_wlan0_status() + has_wifi, ssid, rssi_category = await asyncio.to_thread(get_wlan0_status) self._cached_wifi_status = (has_wifi, ssid, rssi_category) self._last_wifi_check = current_time @@ -650,34 +655,27 @@ async def draw_completed_screw_leveling(self, screw_levels): await self.write("vis b[7],0") await self.write("vis b[8],1") await self.write("fill 0,110,320,290,10665") + await self.write('xstr 12,320,100,20,1,65535,10665,1,1,1,"front left"') await self.draw_screw_level_info_at("12,340,100,20", screw_levels["front left"]) await self.write('xstr 170,320,100,20,1,65535,10665,1,1,1,"front right"') - await self.draw_screw_level_info_at( - "170,340,100,20", screw_levels["front right"] - ) - - await self.write('xstr 170,120,100,20,1,65535,10665,1,1,1,"rear right"') - await self.draw_screw_level_info_at( - "170,140,100,20", screw_levels["rear right"] - ) + await self.draw_screw_level_info_at("170,340,100,20", screw_levels["front right"]) await self.write('xstr 12,120,100,20,1,65535,10665,1,1,1,"rear left"') await self.draw_screw_level_info_at("12,140,100,20", screw_levels["rear left"]) + await self.write('xstr 170,120,100,20,1,65535,10665,1,1,1,"rear right"') + await self.draw_screw_level_info_at("170,140,100,20", screw_levels["rear right"]) + if "center right" in screw_levels: - await self.write('xstr 12,220,100,30,1,65535,10665,1,1,1,"center\\rright"') - await self.draw_screw_level_info_at( - "170,240,100,20", screw_levels["center right"] - ) + await self.write('xstr 172,220,100,20,1,65535,10665,1,1,1,"center right"') + await self.draw_screw_level_info_at("172,240,100,20", screw_levels["center right"]) if "center left" in screw_levels: - await self.write('xstr 12,120,100,20,1,65535,10665,1,1,1,"center\\rleft"') - await self.draw_screw_level_info_at( - "12,240,100,20", screw_levels["center left"] - ) + await self.write('xstr 0,220,100,20,1,65535,10665,1,1,1,"center left"') + await self.draw_screw_level_info_at("0,240,100,20", screw_levels["center left"]) - await self.write('xstr 96,215,100,50,1,65535,15319,1,1,1,"Retry"') + await self.write('xstr 106,214,60,60,1,65535,15319,1,1,1,"Retry"') async def draw_screw_level_info_at(self, position, level): if level == "base": @@ -769,7 +767,8 @@ async def display_thumbnail(self, page_number, thumbnail): for part in parts: await self.write( str(page_number) + '.cp0.write("' + str(part) + '")', - blocked_key=f"thumbnail_{page_number}" + blocked_key=f"thumbnail_{page_number}", + auto_unblock=False, ) self.logger.debug("Thumbnail sent to display") await self.unblock(f"thumbnail_{page_number}") diff --git a/tests/communicator_test.py b/tests/communicator_test.py index 5767852..7fc32fa 100644 --- a/tests/communicator_test.py +++ b/tests/communicator_test.py @@ -29,4 +29,8 @@ async def test_navigate(communicator): await communicator.navigate_to("1") # navigate_to blocks other writes with a blocked_key="__nav__" - communicator.write.assert_awaited_once_with("page 1", blocked_key="__nav__") + communicator.write.assert_awaited_once_with( + "page 1", + blocked_key="__nav__", + auto_unblock=False, + )