diff --git a/pyproject.toml b/pyproject.toml index 52c0527..543db64 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "zview" -version = "0.7.3" +version = "0.8.0" authors = [{ name = "Paulo Santos", email = "pauloxrms@gmail.com" }] description = "ZView, a Zephyr RTOS runtime visualizer" dependencies = ["PyYAML", "pyelftools", "pyocd", "pylink-square"] diff --git a/src/backend/z_scraper.py b/src/backend/z_scraper.py index 0dfc750..1feac19 100644 --- a/src/backend/z_scraper.py +++ b/src/backend/z_scraper.py @@ -489,6 +489,7 @@ def __init__( self._polling_thread: threading.Thread | None = None self._thread_pool: list[ThreadInfo] | None = None self._stop_event: Event | None = None + self.inspection_period = 0.2 self.has_heaps: bool = True self.has_usage: bool = True @@ -756,6 +757,7 @@ def start_polling_thread( data_queue.put({"error": "Already started..."}) return + self.inspection_period = inspection_period self._polling_thread = threading.Thread( target=self._poll_thread_worker, args=(data_queue, stop_event, inspection_period), diff --git a/src/frontend/tui_widgets.py b/src/frontend/tui_widgets.py new file mode 100644 index 0000000..1e12bfe --- /dev/null +++ b/src/frontend/tui_widgets.py @@ -0,0 +1,327 @@ +# Copyright (c) 2025 Paulo Santos (@wkhadgar) +# +# SPDX-License-Identifier: Apache-2.0 + +import curses + +from backend.z_scraper import HeapInfo, ThreadInfo, ThreadRuntime + + +def _truncate_str(text: str, max_size: int) -> str: + return text if len(text) < max_size else text[: max_size - 3] + "..." + + +class TUIProgressBar: + def __init__( + self, + width: int, + std_attribute: int, + medium_threshold: tuple[float, int], + high_threshold: tuple[float, int], + ): + self.width = width + self._bar_width = self.width - 2 + + self._low_threshold_attr: int = std_attribute + + self._medium_threshold: float = medium_threshold[0] + self._medium_threshold_attr: int = medium_threshold[1] + + self._high_threshold: float = high_threshold[0] + self._high_threshold_attr: int = high_threshold[1] + + def draw( + self, + stdscr: curses.window, + y: int, + x: int, + percentage: float, + ): + if percentage > self._high_threshold: + bar_color_attr = self._high_threshold_attr + elif percentage > self._medium_threshold: + bar_color_attr = self._medium_threshold_attr + else: + bar_color_attr = self._low_threshold_attr + + completed_chars = int(self._bar_width * (percentage / 100)) + stdscr.addstr(y, x, "│" + "·" * self._bar_width + "│") + x += 1 + + stdscr.attron(bar_color_attr) + stdscr.addstr(y, x, "█" * completed_chars) + + percent_display = f"{percentage:.1f}%" + percent_start_x = x + (self.width // 2) - (len(percent_display) // 2) + bar_end_x = x + completed_chars + + split_point = max(0, min(len(percent_display), bar_end_x - percent_start_x)) + + text_over_bar = percent_display[:split_point] + if text_over_bar: + stdscr.attron(curses.A_REVERSE) + stdscr.addstr(y, percent_start_x, text_over_bar) + stdscr.attroff(curses.A_REVERSE) + + text_outside_bar = percent_display[split_point:] + if text_outside_bar: + stdscr.addstr(y, percent_start_x + split_point, text_outside_bar) + + stdscr.attroff(bar_color_attr) + + +class TUIBox: + def __init__(self, title: str, description: str, attribute: int): + self._title: str = title + self._description: str = description + self._attr: int = attribute + + def draw( + self, + stdscr: curses.window, + y: int, + x: int, + height: int, + width: int, + **kwargs, + ): + title = _truncate_str(self._title, width - 2) + description = _truncate_str(self._description, width - 2) + + horizontal_bar = "─" * (width - 2) + top_str = "┌" + title + horizontal_bar[len(title) :] + "┐" + bottom_str = "└" + description + horizontal_bar[len(description) :] + "┘" + side_str = "│" + + stdscr.attron(self._attr) + + stdscr.addstr(y, x, top_str) + for row in range(1, height - 1): + stdscr.addstr(y + row, x, side_str) + stdscr.addstr(y + row, x + width - 1, side_str) + stdscr.addstr(y + height - 1, x, bottom_str) + + stdscr.attroff(self._attr) + + +class TUIGraph(TUIBox): + def __init__(self, title: str, description: str, limits: tuple[int, int], attribute: int): + super().__init__(title, description, attribute) + + self._max_limit: int = max(limits) or 1 + self._min_limit: int = min(limits) + self._max_limit_str = f"{self._max_limit}" + self._min_limit_str = f"{self._min_limit}" + + self._blocks = [" ", "▁", "▂", "▃", "▄", "▅", "▆", "▇", "█"] + self._blocks_res = len(self._blocks) + + def _process_points(self, points: list[int | float], target_len: int): + n = len(points) + + if n < target_len: + return [0.0] * (target_len - n) + points + + # n >= target_len + res = [0.0] * target_len + for i in range(target_len): + start = (i * n) // target_len + end = ((i + 1) * n) // target_len + + bucket = points[start:end] + res[i] = sum(bucket) // len(bucket) if bucket else 0 + + return res + + def draw( + self, + stdscr: curses.window, + y: int, + x: int, + height: int, + width: int, + **kwargs, + ): + super().draw(stdscr, y, x, height, width) + + all_points: list[float | int] = kwargs.get("points", []) + if not all_points: + return + + norm_points = self._process_points(all_points, width - 2) + + internal_height = height - 2 + internal_width = width - 2 + stdscr.attron(self._attr) + for x_step in range(internal_width): + x_pos = x + x_step + 1 + full_blocks_f = (norm_points[x_step] / self._max_limit) * internal_height + full_blocks_count = int(full_blocks_f) + last_block_idx = int((full_blocks_f - full_blocks_count) * (self._blocks_res - 1)) + + for y_step in range(internal_height): + y_pos = y + internal_height - y_step + if y_step < full_blocks_count: + stdscr.addstr(y_pos, x_pos, self._blocks[-1]) + elif y_step == full_blocks_count: + stdscr.addstr(y_pos, x_pos, self._blocks[last_block_idx]) + else: + stdscr.addstr(y_pos, x_pos, " ") + + stdscr.addstr(y + 1, x + width - len(self._max_limit_str), self._max_limit_str) + stdscr.addstr( + y + internal_height, x + width - len(self._min_limit_str), self._min_limit_str + ) + stdscr.attroff(self._attr) + + +class TUIThreadInfo: + def __init__( + self, + selected_attribute: int, + active_attribute: int, + inactive_attribute: int, + bar_attributes: tuple[int, int, int], + ): + self._selected_attribute: int = selected_attribute + self._active_attribute: int = active_attribute + self._inactive_attribute: int = inactive_attribute + + # These are nice values to default to + self._thread_name_width = 30 + self._cpu_usage_width = 8 + self._load_usage_width = 8 + self._stack_bytes_width = 18 + + self.watermark_bar = TUIProgressBar( + 32, + bar_attributes[0], + (75, bar_attributes[1]), + (90, bar_attributes[2]), + ) + + def set_field_widths( + self, name: int, cpu_usage: int, load_usage: int, stack_bar: int, stack_bytes: int + ): + self._thread_name_width = name + self._cpu_usage_width = cpu_usage + self._load_usage_width = load_usage + self._stack_bytes_width = stack_bytes + + self.watermark_bar.width = stack_bar + + def draw( + self, stdscr: curses.window, y: int, x: int, thread_info: ThreadInfo, selected: bool = False + ): + col_pos = x + + runtime = thread_info.runtime or ThreadRuntime( + cpu=-1.0, + cpu_normalized=-1.0, + active=False, + stack_watermark=0, + stack_watermark_percent=0.0, + ) + + # Thread name + thread_name_attr = ( + self._selected_attribute + if selected + else (self._active_attribute if runtime.active else self._inactive_attribute) + ) + stdscr.addstr( + y, col_pos, _truncate_str(thread_info.name, self._thread_name_width), thread_name_attr + ) + col_pos += self._thread_name_width + 1 + + # Thread CPUs + if runtime.cpu >= 0: + cpu_display = f"{runtime.cpu_normalized:.2f}%".center(self._cpu_usage_width) + else: + cpu_display = f"{'-':^{self._cpu_usage_width}}" + stdscr.addstr(y, col_pos, cpu_display) + col_pos += self._cpu_usage_width + 1 + + # Thread Loads + if runtime.cpu >= 0: + load_display = f"{runtime.cpu:.1f}%".center(self._load_usage_width) + else: + load_display = f"{'-':^{self._load_usage_width}}" + stdscr.addstr(y, col_pos, load_display) + col_pos += self._load_usage_width + 1 + + # Thread Watermark Progress Bar + self.watermark_bar.draw(stdscr, y, col_pos, runtime.stack_watermark_percent) + col_pos += self.watermark_bar.width + 1 + + # Thread Watermark Bytes + watermark_bytes_display = f"{runtime.stack_watermark} / {thread_info.stack_size}".center( + self._stack_bytes_width + ) + stdscr.addstr(y, col_pos, watermark_bytes_display) + + +class TUIHeapInfo: + def __init__( + self, + selected_attribute: int, + default_attribute: int, + bar_attributes: tuple[int, int, int], + ): + self._selected_attribute: int = selected_attribute + self._default_attribute: int = default_attribute + + # These are nice values to default to + self._heap_name_width = 30 + self._free_bytes_width = 8 + self._allocated_bytes_width = 8 + self._watermark_width = 18 + + self.usage_bar = TUIProgressBar( + 32, + bar_attributes[0], + (75, bar_attributes[1]), + (90, bar_attributes[2]), + ) + + def set_field_widths( + self, name: int, free_bytes: int, allocated_bytes: int, usage_bar: int, watermark: int + ): + self._heap_name_width = name + self._free_bytes_width = free_bytes + self._allocated_bytes_width = allocated_bytes + self._watermark_width = watermark + + self.usage_bar.width = usage_bar + + def draw( + self, stdscr: curses.window, y: int, x: int, heap_info: HeapInfo, selected: bool = False + ): + col_pos = x + + # Heap name + heap_name_display = _truncate_str(heap_info.name, self._heap_name_width) + heap_name_attr = self._selected_attribute if selected else self._default_attribute + stdscr.addstr(y, col_pos, heap_name_display, heap_name_attr) + col_pos += self._heap_name_width + 1 + + # Free bytes + free_bytes_display = f"{heap_info.free_bytes:^{self._free_bytes_width}}" + stdscr.addstr(y, col_pos, free_bytes_display) + col_pos += self._free_bytes_width + 1 + + # Allocated bytes + allocated_bytes_display = f"{heap_info.allocated_bytes:^{self._allocated_bytes_width}}" + stdscr.addstr(y, col_pos, allocated_bytes_display) + col_pos += self._allocated_bytes_width + 1 + + # Heap Usage Progress Bar + heap_size = heap_info.allocated_bytes + heap_info.free_bytes + self.usage_bar.draw(stdscr, y, col_pos, heap_info.usage_percent) + col_pos += self.usage_bar.width + 1 + + # Heap Watermark Bytes + watermark_bytes_display = f"{heap_info.max_allocated_bytes} / {heap_size}".ljust( + self._watermark_width + ) + stdscr.addstr(y, col_pos, watermark_bytes_display) diff --git a/src/frontend/zview_tui.py b/src/frontend/zview_tui.py index 544c775..933b55a 100755 --- a/src/frontend/zview_tui.py +++ b/src/frontend/zview_tui.py @@ -8,7 +8,9 @@ import queue import threading import time +from abc import abstractmethod from dataclasses import dataclass +from typing import Any from backend.z_scraper import ( HeapInfo, @@ -16,19 +18,34 @@ ThreadRuntime, ZScraper, ) +from frontend.tui_widgets import TUIBox, TUIGraph, TUIHeapInfo, TUIThreadInfo @dataclass -class ZViewTUIScheme: - col_widths: dict[str, int] +class ZViewTUIAttributes: + ACTIVE: int + INACTIVE: int + PROGRESS_BAR_LOW: int + PROGRESS_BAR_MEDIUM: int + PROGRESS_BAR_HIGH: int + HEADER_FOOTER: int + ERROR: int + CURSOR: int + GRAPH_A: int + GRAPH_B: int + + @classmethod + def create_mono(cls): + """Returns a default monochromatic theme for featureless consoles.""" + return cls(0, 0, 0, 0, 0, 0, 0, 0, 0, 0) class ZViewState(enum.Enum): FATAL_ERROR = 1 - DEFAULT_VIEW = 2 - THREAD_DETAIL = 3 - HEAPS_VIEW = 4 - HEAPS_DETAIL = 5 + THREAD_LIST_VIEW = 2 + THREAD_DETAIL_VIEW = 3 + HEAP_LIST_VIEW = 4 + HEAPS_DETAIL_VIEW = 5 class SpecialCode: @@ -41,384 +58,177 @@ class SpecialCode: RECONNECT = ord("r") -class ZView: - """ - A curses-based application for viewing Zephyr RTOS thread runtime information. - - This class manages the curses UI, starts a background thread for data polling, - and updates the display with real-time thread statistics from a connected MCU. - """ - - def __init__(self, scraper: ZScraper, stdscr): - """ - Initializes the ZView application. - - Args: - stdscr: The main curses window object provided by curses.wrapper. - """ - self.min_dimensions = (14, 86) - self.stdscr = stdscr - self.scraper: ZScraper = scraper - self.running = True - self.threads_data: list[ThreadInfo] = [] - self.heaps_data: list[HeapInfo] = [] - self.status_message: str = "" - self.data_queue = queue.Queue() - self.stop_event = threading.Event() - self.update_count = 0 - - self.state: ZViewState = ZViewState.DEFAULT_VIEW - self.ui: dict[ZViewState, ZViewTUIScheme] = {} - - self.cursor: dict[ZViewState, int] = {ZViewState.DEFAULT_VIEW: 0, ZViewState.HEAPS_VIEW: 0} - self.top_line: int = 0 - - self.sort_keys: dict[ZViewState, list] = { - ZViewState.DEFAULT_VIEW: [ - lambda t: t.name, - lambda t: t.runtime.cpu if t.runtime else -1, - lambda t: t.runtime.cpu_normalized if t.runtime else -1, - lambda t: t.runtime.stack_watermark_percent if t.runtime else -1, - lambda t: t.runtime.stack_watermark if t.runtime else -1, - ], - ZViewState.HEAPS_VIEW: [ - lambda h: h.name, - lambda h: h.free_bytes, - lambda h: h.allocated_bytes, - lambda h: h.allocated_bytes / (h.allocated_bytes + h.free_bytes), - lambda h: h.max_allocated_bytes, - ], - ZViewState.FATAL_ERROR: [], - } - - self.current_sort: dict[ZViewState, int] = { - ZViewState.DEFAULT_VIEW: 0, - ZViewState.HEAPS_VIEW: 0, - ZViewState.FATAL_ERROR: 0, - } - self.invert_sorting: bool = False - self.detailing_thread: str | None = None - self.detailing_thread_usages = {} - self.idle_thread: ThreadInfo | None = None - - self._init_curses() - self._set_ui_schemes() - - def _init_curses(self): +class BaseStateView: + def __init__(self, controller: Any, theme: ZViewTUIAttributes): """ - Initializes curses settings and defines color pairs used in the UI. + The controller reference allows the view to access global state + (like colors or the max threads limit) without owning it. """ - curses.curs_set(0) - curses.noecho() - curses.cbreak() - self.stdscr.keypad(True) - self.stdscr.nodelay(True) - - if curses.has_colors(): - curses.start_color() - # Active thread name - curses.init_pair(1, curses.COLOR_CYAN, curses.COLOR_BLACK) - # Inactive thread name - curses.init_pair(2, curses.COLOR_WHITE, curses.COLOR_BLACK) - # Progress bar: low usage - curses.init_pair(3, curses.COLOR_GREEN, curses.COLOR_BLACK) - # Progress bar: medium usage - curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK) - # Progress bar: high usage - curses.init_pair(5, curses.COLOR_RED, curses.COLOR_BLACK) - # Header/Footer background - curses.init_pair(6, curses.COLOR_WHITE, curses.COLOR_BLUE) - # Error message text - curses.init_pair(7, curses.COLOR_RED, curses.COLOR_BLACK) - # Cursor selection - curses.init_pair(8, curses.COLOR_BLACK, curses.COLOR_WHITE) - # Graph A - curses.init_pair(9, curses.COLOR_MAGENTA, curses.COLOR_BLACK) - # Graph B - curses.init_pair(10, curses.COLOR_CYAN, curses.COLOR_BLACK) - - self.ATTR_ACTIVE_THREAD = curses.color_pair(1) - self.ATTR_INACTIVE_THREAD = curses.color_pair(2) - self.ATTR_PROGRESS_BAR_LOW = curses.color_pair(3) - self.ATTR_PROGRESS_BAR_MEDIUM = curses.color_pair(4) - self.ATTR_PROGRESS_BAR_HIGH = curses.color_pair(5) - self.ATTR_HEADER_FOOTER = curses.color_pair(6) - self.ATTR_ERROR = curses.color_pair(7) - self.ATTR_CURSOR = curses.color_pair(8) - self.ATTR_GRAPH_A = curses.color_pair(9) - self.ATTR_GRAPH_B = curses.color_pair(10) - - def purge_queue(self): - with self.data_queue.mutex: - self.data_queue.queue.clear() + self.controller = controller + self.cursor: int = 0 + self._frame_attr: int = theme.HEADER_FOOTER + self._error_attr: int = theme.ERROR - def _set_ui_schemes(self): - thread_basic_info_scheme = { - "Thread": 30, - "CPU %": 8, - "Load %": 8, - "Stack Usage % (Watermark)": 32, - "Watermark Bytes": 18, - } - heaps_info_scheme = { - "Heap": 30, - "Free B": 8, - "Used B": 8, - "Heap Usage %": 32, - "Watermark Bytes": 18, - } - - thread_scheme = ZViewTUIScheme(thread_basic_info_scheme) - heap_scheme = ZViewTUIScheme(heaps_info_scheme) - - self.ui[ZViewState.DEFAULT_VIEW] = thread_scheme - self.ui[ZViewState.THREAD_DETAIL] = thread_scheme - self.ui[ZViewState.HEAPS_VIEW] = heap_scheme - self.ui[ZViewState.HEAPS_DETAIL] = heap_scheme - self.ui[ZViewState.FATAL_ERROR] = ZViewTUIScheme({}) - - def _draw_graph( + def _render_status( self, - y, - x, - h, - w, - history_list, - title, - attribute, - maximum=100, + stdscr: curses.window, + width: int, + y: int, ): - horizontal_limit = "─" * (w - 2) - self.stdscr.attron(attribute) - self.stdscr.addstr(y, x, "┌" + horizontal_limit + "┐") - self.stdscr.addstr(y + h, x, "└" + horizontal_limit + "┘") - self.stdscr.addstr(y, x + 1, title) - - blocks = ["▁", "▂", "▃", "▄", "▅", "▆", "▇", "█"] - x_idx = len(history_list) - for x_step in range(w): - for y_step in range(h - 1): - y_pos = y + y_step + 1 - x_pos = x + w - x_step - 1 - if x_step == 0 or x_step == w - 1: - self.stdscr.addstr(y_pos, x_pos, "│") - else: - full_blocks = int((history_list[x_idx] / maximum) * (h - 2)) - last_block = int( - (((history_list[x_idx] / maximum) * (h - 2)) - full_blocks) - * (len(blocks) - 1) - ) - self.stdscr.addstr( - y_pos, - x_pos, - " " if y_step < ((h - 1) - full_blocks) else blocks[-1], - ) - self.stdscr.addstr( - y_pos, - x_pos, - blocks[last_block] if y_step == ((h - 2) - full_blocks) else "", - ) - x_idx -= 1 if x_idx else 0 - - self.stdscr.addstr(y + 1, x + w - (len(f"{maximum}")), str(maximum)) - self.stdscr.addstr(y + h - 1, x + w - 1, "0") - self.stdscr.attroff(attribute) + is_error = self.controller.status_message.startswith("Error") + attr = stdscr.getbkgd() + attr = attr & ~0xFF if isinstance(attr, int) else attr[0] + status_row = y + stdscr.addstr( + status_row, + 0, + self.controller.status_message[:width], + (attr | self._error_attr) if is_error else attr, + ) - def _draw_progress_bar( + def _render_frame( self, - y, - x, + stdscr: curses.window, + footer_hint: str, + height: int, width: int, - percentage: float, - medium_threshold: float, - high_threshold: float, ): - if percentage > high_threshold: - bar_color_attr = self.ATTR_PROGRESS_BAR_HIGH - elif percentage > medium_threshold: - bar_color_attr = self.ATTR_PROGRESS_BAR_MEDIUM - else: - bar_color_attr = self.ATTR_PROGRESS_BAR_LOW - bar_width = width - 2 - completed_chars = int(bar_width * (percentage / 100)) - bar_str = "█" * completed_chars - self.stdscr.addstr(y, x, "│" + "·" * bar_width + "│") - self.stdscr.attron(bar_color_attr) - x += 1 - self.stdscr.addstr(y, x, bar_str) - - percent_display = f"{percentage:.1f}%" - percent_start_x = x + (width // 2) - (len(percent_display) // 2) - bar_end_x = x + completed_chars - - split_point = max(0, min(len(percent_display), bar_end_x - percent_start_x)) - - text_over_bar = percent_display[:split_point] - if text_over_bar: - self.stdscr.attron(curses.A_REVERSE) - self.stdscr.addstr(y, percent_start_x, text_over_bar) - self.stdscr.attroff(curses.A_REVERSE) - - text_outside_bar = percent_display[split_point:] - if text_outside_bar: - self.stdscr.addstr(y, percent_start_x + split_point, text_outside_bar) - - self.stdscr.attroff(bar_color_attr) - - def get_sparsity_map(self, chunks: list[dict], width: int, height: int) -> list[str]: - """ - Compresses a linear map of physical heap chunks into a 2D terminal grid. + header_text = "ZView - Zephyr RTOS Runtime Viewer" - Args: - chunks: List of dicts, e.g., [{"used": True, "size": 32}, {"used": False, "size": 128}] - width: The exact integer number of columns available for rendering. - height: The exact integer number of rows available for rendering. + stdscr.move(0, 0) + stdscr.clrtoeol() + stdscr.addstr(0, 0, f"{header_text:^{width}}", self._frame_attr) + footer_row = height - 1 + stdscr.move(footer_row, 0) + stdscr.clrtoeol() + with contextlib.suppress(curses.error): + # This is needed since curses try to advance the cursor to the next + # position, wich is outside the terminal, we safely ignore this. + stdscr.addstr(footer_row, 0, f"{footer_hint:>{width}}", self._frame_attr) - Returns: - A list of strings, where each string is exactly `width` characters long. + @abstractmethod + def render(self, stdscr: curses.window, height: int, width: int) -> None: """ - total_chars = width * height - if not chunks or total_chars <= 0: - return [] - - total_bytes = sum(chunk["size"] for chunk in chunks) - if total_bytes == 0: - return [] - - bytes_per_char = total_bytes / total_chars - - output = [] - chunk_idx = 0 - - chunk_rem = float(chunks[0]["size"]) - chunk_is_used = chunks[0]["used"] - - for _ in range(total_chars): - bucket_used = 0.0 - bucket_rem = bytes_per_char - - while bucket_rem > 0 and chunk_idx < len(chunks): - take = min(chunk_rem, bucket_rem) - - if chunk_is_used: - bucket_used += take - - chunk_rem -= take - bucket_rem -= take - - if chunk_rem <= 0: - chunk_idx += 1 - if chunk_idx < len(chunks): - chunk_rem = float(chunks[chunk_idx]["size"]) - chunk_is_used = chunks[chunk_idx]["used"] - - ratio = bucket_used / bytes_per_char - - if ratio == 0: - output.append(" ") - elif ratio <= 0.33: - output.append("░") - elif ratio <= 0.66: - output.append("▒") - elif ratio <= 0.99: - output.append("▓") - else: - output.append("█") - - return ["".join(output[i : i + width]) for i in range(0, len(output), width)] - - def _get_fragmentation_metrics(self, chunks: list[dict]) -> dict: - if not chunks: - return {} + Draw the state to the provided curses window. + Must be implemented by all subclasses. + """ + pass - total_chunks = len(chunks) - allocated_chunks = sum(1 for c in chunks if c["used"]) - free_bytes = sum(c["size"] for c in chunks if not c["used"]) - largest_free = max((c["size"] for c in chunks if not c["used"]), default=0) - ratio = (1 - largest_free / free_bytes) * 100 if free_bytes > 0 else 0.0 + @abstractmethod + def handle_input(self, key: int) -> ZViewState | None: + """ + Process navigation and state-specific inputs. - return { - "Largest free": (largest_free, "bytes"), - "Frag ratio": (ratio, "percent"), - "Chunks": (f"{allocated_chunks}/{total_chunks}", "raw"), - } + Returns: + A state enum (e.g., ZViewState.THREAD_DETAIL) to request a + context switch from the Router, or None if the state remains unchanged. + """ + pass - def _draw_heap_details_footer(self, y: int, x: int, metrics: dict): - if not metrics: - return - def fmt(value, hint): - if hint == "bytes": - if value >= 1024: - return f"{value / 1024:.1f} KB" - return f"{value} B" - if hint == "percent": - return f"{value:.1f}%" - return str(value) +class FatalErrorView(BaseStateView): + def __init__(self, controller: Any, theme: ZViewTUIAttributes): + super().__init__(controller, theme) - parts = [f"{k}: {fmt(v, h)}" for k, (v, h) in metrics.items()] - bar = " · ".join(parts) + def render(self, stdscr: curses.window, height: int, width: int) -> None: + stdscr.erase() - with contextlib.suppress(curses.error): - self.stdscr.attron(self.ATTR_GRAPH_B) - self.stdscr.addstr(y, x, bar) - self.stdscr.attroff(self.ATTR_GRAPH_B) + self._render_frame(stdscr, "Quit: q | Reconnect: r ", height, width) - def _base_draw(self, height, width): - self.stdscr.erase() + stdscr.attron(self._error_attr) + msg_lines = self.controller.status_message.split('\n') + start_y = (height // 2) - (len(msg_lines) // 2) - header_text = "ZView - Zephyr RTOS Runtime Viewer" - footer_text = { - ZViewState.DEFAULT_VIEW: "Quit: q | Sort: s | Invert: i | Details: ", - ZViewState.THREAD_DETAIL: "Quit: q | All threads: ", - ZViewState.HEAPS_VIEW: "Quit: q | Threads: h | Details: ", - ZViewState.HEAPS_DETAIL: "Quit: q | All heaps: ", - } + for i, line in enumerate(msg_lines): + if 0 <= start_y + i < height - 2: + clean_line = line[: width - 2] + x_pos = max(0, (width // 2) - (len(clean_line) // 2)) + stdscr.addstr(start_y + i, x_pos, clean_line) + + stdscr.attroff(self._error_attr) + stdscr.refresh() + + def handle_input(self, key: int) -> ZViewState | None: + if key == SpecialCode.QUIT: + self.controller.running = False + + elif key == SpecialCode.RECONNECT: + self.controller.attempt_reconnect() + + return None + + +class ThreadListView(BaseStateView): + SCHEMA: dict[str, int] = { + "Thread": 30, + "CPU %": 8, + "Load %": 8, + "Stack Usage % (Watermark)": 32, + "Watermark Bytes": 18, + } + COLLUMS: list[str] = list(SCHEMA.keys()) + COLLUM_WIDTHS: list[int] = list(SCHEMA.values()) + + def __init__(self, controller: Any, theme: ZViewTUIAttributes): + super().__init__(controller, theme) + + self._current_sort_idx = 0 + self._invert_sorting = False + self._sort_keys = [ + lambda t: t.name, + lambda t: t.runtime.cpu if t.runtime else -1, + lambda t: t.runtime.cpu_normalized if t.runtime else -1, + lambda t: t.runtime.stack_watermark_percent if t.runtime else -1, + lambda t: t.runtime.stack_watermark if t.runtime else -1, + ] - if self.scraper.has_heaps: - footer_text[ZViewState.DEFAULT_VIEW] += "| Heaps: h " + self.top_line: int = 0 - self.stdscr.attron(self.ATTR_HEADER_FOOTER) - self.stdscr.move(0, 0) - self.stdscr.clrtoeol() - self.stdscr.addstr(0, 0, f"{header_text:^{width}}") + bar_theme = (theme.PROGRESS_BAR_LOW, theme.PROGRESS_BAR_MEDIUM, theme.PROGRESS_BAR_HIGH) + self._tui_thread_info: TUIThreadInfo = TUIThreadInfo( + theme.CURSOR, theme.ACTIVE, theme.INACTIVE, bar_theme + ) + self._tui_thread_info.set_field_widths( + self.COLLUM_WIDTHS[0], + self.COLLUM_WIDTHS[1], + self.COLLUM_WIDTHS[2], + self.COLLUM_WIDTHS[3], + self.COLLUM_WIDTHS[4], + ) - footer_row = height - 1 - self.stdscr.move(footer_row, 0) - self.stdscr.clrtoeol() - with contextlib.suppress(curses.error): - # This is needed since curses try to advance the cursor to the next - # position, wich is outside the terminal, we safely ignore this. - self.stdscr.addstr(footer_row, 0, f"{footer_text[self.state]:>{width}}") - self.stdscr.attroff(self.ATTR_HEADER_FOOTER) + def render(self, stdscr: curses.window, height: int, width: int) -> None: + """ + Draws the thread data table and its general informations. + """ - is_error = self.status_message.startswith("Error") + stdscr.erase() - if is_error: - self.stdscr.attron(self.ATTR_ERROR) + self._render_frame( + stdscr, + "Quit: q | Sort: s | Invert: i | Details: " + "| Heaps: h " + if self.controller.scraper.has_heaps + else "", + height, + width, + ) - status_row = footer_row - 1 - self.stdscr.addstr(status_row, 0, self.status_message[:width]) + max_table_rows = height - 6 + total_threads = len(self.controller.threads_data) + start_num = self.top_line + 1 if total_threads > 0 else 0 + end_num = min(self.top_line + max_table_rows, total_threads) + thread_column_width = self.COLLUM_WIDTHS[0] - if is_error: - self.stdscr.attroff(self.ATTR_ERROR) + self.cursor = max(min(total_threads - 1, self.cursor), 0) + if self.cursor >= self.top_line + max_table_rows: + self.top_line = self.cursor - max_table_rows + 1 + elif self.cursor < self.top_line: + self.top_line = self.cursor - if height <= 5: # Realistic minimum height check - self.stdscr.addstr(2, 0, "Terminal too small.") - return + order_symbol = " ▼" if self._invert_sorting else " ▲" + sorting_header = self.COLLUMS[self._current_sort_idx] - ui_cfg = self.ui[self.state] curr_x = 0 - col_headers = list(ui_cfg.col_widths.keys()) - - if self.state in (ZViewState.DEFAULT_VIEW, ZViewState.HEAPS_VIEW): - order_symbol = " ▼" if self.invert_sorting else " ▲" - sorting_header = col_headers[self.current_sort[self.state]] - else: - sorting_header = "" - order_symbol = "" - - for col_header, h_width in ui_cfg.col_widths.items(): + for col_header, h_width in self.SCHEMA.items(): if curr_x >= width: break @@ -426,176 +236,33 @@ def _base_draw(self, height, width): col_header += order_symbol txt = f"{col_header:^{h_width}}"[: width - curr_x] - self.stdscr.addstr(1, curr_x, txt) + stdscr.addstr(1, curr_x, txt) curr_x += h_width + 1 - def _draw_thread_info(self, y, thread_info: ThreadInfo, selected: bool = False): - col_pos = 0 - - # Widths - scheme = self.ui[self.state] - columns = list(scheme.col_widths.keys()) - thread_name_width = scheme.col_widths[columns[0]] - cpu_usage_width = scheme.col_widths[columns[1]] - load_usage_width = scheme.col_widths[columns[2]] - stack_usage_width = scheme.col_widths[columns[3]] - stack_bytes_width = scheme.col_widths[columns[4]] - - # Thread name - if len(thread_info.name) > thread_name_width: - thread_name_display = thread_info.name[: thread_name_width - 3] + "..." - else: - thread_name_display = f"{thread_info.name:<{thread_name_width}}" - - runtime = thread_info.runtime or ThreadRuntime( - cpu=-1.0, - cpu_normalized=-1.0, - active=False, - stack_watermark=0, - stack_watermark_percent=0.0, - ) - - thread_name_attr = ( - self.ATTR_CURSOR - if selected - else (self.ATTR_ACTIVE_THREAD if runtime.active else self.ATTR_INACTIVE_THREAD) - ) - self.stdscr.attron(thread_name_attr) - self.stdscr.addstr(y, col_pos, thread_name_display) - self.stdscr.attroff(thread_name_attr) - col_pos += thread_name_width + 1 - - # Thread CPUs - if runtime.cpu >= 0: - cpu_display = f"{runtime.cpu_normalized:.2f}%".center(cpu_usage_width) - else: - cpu_display = f"{'-':^{cpu_usage_width}}" - self.stdscr.addstr(y, col_pos, cpu_display) - col_pos += cpu_usage_width + 1 - - # Thread Loads - if runtime.cpu >= 0: - load_display = f"{runtime.cpu:.1f}%".center(load_usage_width) - else: - load_display = f"{'-':^{load_usage_width}}" - self.stdscr.addstr(y, col_pos, load_display) - col_pos += load_usage_width + 1 - - # Thread Watermark Progress Bar - self._draw_progress_bar( - y, col_pos, stack_usage_width, runtime.stack_watermark_percent, 70, 90 - ) - col_pos += stack_usage_width + 1 - - # Thread Watermark Bytes - watermark_bytes_display = f"{runtime.stack_watermark} / {thread_info.stack_size}".ljust( - stack_bytes_width - ) - self.stdscr.addstr(y, col_pos, watermark_bytes_display) - - def _draw_heap_info(self, y, heap_info: HeapInfo, selected: bool = False): - col_pos = 0 - - # Widths - scheme = self.ui[self.state] - columns = list(scheme.col_widths.keys()) - heap_name_width = scheme.col_widths[columns[0]] - free_bytes_width = scheme.col_widths[columns[1]] - allocated_bytes_width = scheme.col_widths[columns[2]] - heap_usage_width = scheme.col_widths[columns[3]] - watermark_width = scheme.col_widths[columns[4]] - - # Heap name - heap_name_display = heap_info.name[:heap_name_width].ljust(heap_name_width) - if len(heap_info.name) > heap_name_width: - heap_name_display = heap_name_display[:-3] + "..." - - heap_name_attr = self.ATTR_CURSOR if selected else self.ATTR_ACTIVE_THREAD - self.stdscr.attron(heap_name_attr) - self.stdscr.addstr(y, col_pos, heap_name_display) - self.stdscr.attroff(heap_name_attr) - col_pos += heap_name_width + 1 - - # Free bytes - free_bytes_display = f"{heap_info.free_bytes:^{free_bytes_width}}" - self.stdscr.addstr(y, col_pos, free_bytes_display) - col_pos += free_bytes_width + 1 - - # Allocated bytes - allocated_bytes_display = f"{heap_info.allocated_bytes:^{allocated_bytes_width}}" - self.stdscr.addstr(y, col_pos, allocated_bytes_display) - col_pos += allocated_bytes_width + 1 - - # Heap Usage Progress Bar - heap_size = heap_info.allocated_bytes + heap_info.free_bytes - self._draw_progress_bar(y, col_pos, heap_usage_width, heap_info.usage_percent, 70, 90) - col_pos += heap_usage_width + 1 - - # Heap Watermark Bytes - watermark_bytes_display = f"{heap_info.max_allocated_bytes} / {heap_size}".ljust( - watermark_width - ) - self.stdscr.addstr(y, col_pos, watermark_bytes_display) - - def _draw_fatal_error_view(self, height: int, width: int): - self.stdscr.erase() - self.stdscr.attron(self.ATTR_HEADER_FOOTER) - self.stdscr.addstr(0, 0, f"{'ZView - Zephyr RTOS Runtime Viewer':^{width - 1}}") - self.stdscr.addstr(height - 1, 0, f"{' Quit: q | Reconnect: r ':>{width - 1}}") - self.stdscr.attroff(self.ATTR_HEADER_FOOTER) - - self.stdscr.attron(self.ATTR_ERROR) - msg_lines = self.status_message.split('\n') - - start_y = (height // 2) - (len(msg_lines) // 2) - for i, line in enumerate(msg_lines): - if 0 <= start_y + i < height - 2: - clean_line = line[: width - 2] - self.stdscr.addstr(start_y + i, (width // 2) - (len(clean_line) // 2), clean_line) - self.stdscr.attroff(self.ATTR_ERROR) - self.stdscr.refresh() - - def _draw_default_view(self, height, width): - """ - Draws the thread data table and its general informations. - """ - max_table_rows = height - 6 - total_threads = len(self.threads_data) - start_num = self.top_line + 1 if total_threads > 0 else 0 - end_num = min(self.top_line + max_table_rows, total_threads) - thread_column_width = list(self.ui[self.state].col_widths.values())[0] - scroll_indicator = f" Threads: {start_num}-{end_num} of {total_threads} " - - self.stdscr.attron(self.ATTR_HEADER_FOOTER) - self.stdscr.addstr(height - 1, 0, scroll_indicator[:width]) - self.stdscr.attroff(self.ATTR_HEADER_FOOTER) + stdscr.addstr(height - 1, 0, scroll_indicator[:width], self._frame_attr) table_start = 4 - stack_size_sum = sum(t.stack_size for t in self.threads_data) + stack_size_sum = sum(t.stack_size for t in self.controller.threads_data) stack_watermark_sum = sum( - t.runtime.stack_watermark if t.runtime else 0 for t in self.threads_data + t.runtime.stack_watermark if t.runtime else 0 for t in self.controller.threads_data ) is_any_thread_active = any( - t.runtime.active if t.runtime else False for t in self.threads_data + t.runtime.active if t.runtime else False for t in self.controller.threads_data ) - aggregate_stack_usage_pct = ( (stack_watermark_sum / stack_size_sum * 100) if stack_size_sum > 0 else 0.0 ) - aggregate_stack_usage_pct = ( (stack_watermark_sum / stack_size_sum * 100) if stack_size_sum > 0 else 0.0 ) - - # Dynamically calculate true system load and CPU usage aggregate_load = sum( - t.runtime.cpu for t in self.threads_data if t.runtime and t.runtime.cpu > 0 + t.runtime.cpu for t in self.controller.threads_data if t.runtime and t.runtime.cpu > 0 ) aggregate_cpu = sum( t.runtime.cpu_normalized - for t in self.threads_data + for t in self.controller.threads_data if t.runtime and t.runtime.cpu_normalized > 0 ) @@ -603,7 +270,7 @@ def _draw_default_view(self, height, width): address=0, stack_start=0, stack_size=stack_size_sum, - name="All Threads".center(thread_column_width), + name="All Threads".center(thread_column_width - 1), runtime=ThreadRuntime( cpu=aggregate_load, cpu_normalized=aggregate_cpu, @@ -613,14 +280,14 @@ def _draw_default_view(self, height, width): ), ) - self._draw_thread_info(2, all_threads_info, False) - self.stdscr.hline(3, 0, curses.ACS_S3, width) + self._tui_thread_info.draw(stdscr, 2, 0, all_threads_info, False) + stdscr.addstr(3, 0, "─" * width) - key_func = self.sort_keys[ZViewState.DEFAULT_VIEW][ - self.current_sort[ZViewState.DEFAULT_VIEW] - ] + key_func = self._sort_keys[self._current_sort_idx] - sorted_threads = sorted(self.threads_data, key=key_func, reverse=self.invert_sorting) + sorted_threads = sorted( + self.controller.threads_data, key=key_func, reverse=self._invert_sorting + ) for idx, thread in enumerate( sorted_threads[self.top_line : self.top_line + max_table_rows] @@ -631,84 +298,245 @@ def _draw_default_view(self, height, width): break absolute_idx = self.top_line + idx - self._draw_thread_info( - target_y, thread, selected=(absolute_idx == self.cursor[ZViewState.DEFAULT_VIEW]) + self._tui_thread_info.draw( + stdscr, + target_y, + 0, + thread, + selected=(absolute_idx == self.cursor), ) - self.stdscr.refresh() + self._render_status(stdscr, width, height - 2) - def _draw_thread_detail_view(self, h, w, y=2): - """ - Draws a single thread details, and its recent CPU usage as a graph. - """ - thread = next((t for t in self.threads_data if t.name == self.detailing_thread), None) + stdscr.refresh() + + def handle_input(self, key: int) -> ZViewState | None: + match key: + case curses.KEY_DOWN: + self.cursor += 1 + case curses.KEY_UP: + self.cursor -= 1 + case curses.KEY_ENTER | SpecialCode.NEWLINE | SpecialCode.RETURN: + if not self.controller.threads_data: + return None + + key_func = self._sort_keys[self._current_sort_idx] + sorted_threads = sorted( + self.controller.threads_data, key=key_func, reverse=self._invert_sorting + ) + + self.controller.detailing_thread = sorted_threads[self.cursor].name + + return ZViewState.THREAD_DETAIL_VIEW + + case SpecialCode.SORT: + self._current_sort_idx = (self._current_sort_idx + 1) % len(self._sort_keys) + + case SpecialCode.INVERSE: + self._invert_sorting = not self._invert_sorting + + case SpecialCode.HEAPS: + if not self.controller.scraper.has_heaps: + return None + + return ZViewState.HEAP_LIST_VIEW + + case SpecialCode.QUIT: + self.controller.running = False + case _: + return None + + +class ThreadDetailView(BaseStateView): + def __init__(self, controller: Any, theme: ZViewTUIAttributes): + super().__init__(controller, theme) + self._cpu_graph: TUIGraph = TUIGraph( + "CPU %", "Thread cycles / Cycles", (0, 100), theme.GRAPH_B + ) + self._load_graph: TUIGraph = TUIGraph( + "Load %", "Thread cycles / Non-idle cycles", (0, 100), theme.GRAPH_A + ) + + self._scheme: dict[str, int] = ThreadListView.SCHEMA + bar_theme = (theme.PROGRESS_BAR_LOW, theme.PROGRESS_BAR_MEDIUM, theme.PROGRESS_BAR_HIGH) + self._tui_thread_info: TUIThreadInfo = TUIThreadInfo( + theme.CURSOR, theme.ACTIVE, theme.INACTIVE, bar_theme + ) + self._tui_thread_info.set_field_widths( + ThreadListView.COLLUM_WIDTHS[0], + ThreadListView.COLLUM_WIDTHS[1], + ThreadListView.COLLUM_WIDTHS[2], + ThreadListView.COLLUM_WIDTHS[3], + ThreadListView.COLLUM_WIDTHS[4], + ) + self._current_thread_name: str | None = None + self._usages: dict[str, list[int]] = {"cpu": [], "load": []} + + def render(self, stdscr: curses.window, height: int, width: int) -> None: + stdscr.erase() + + self._render_frame(stdscr, "Quit: q | All threads: ", height, width) + + curr_x = 0 + for col_header, h_width in self._scheme.items(): + if curr_x >= width: + break + + txt = f"{col_header:^{h_width}}"[: width - curr_x] + stdscr.addstr(1, curr_x, txt) + curr_x += h_width + 1 + + thread = next( + (t for t in self.controller.threads_data if t.name == self.controller.detailing_thread), + None, + ) if not thread or thread.runtime is None: + self._render_status(stdscr, width, height - 2) + stdscr.refresh() return - self._draw_thread_info(y, thread) + # Reset history if switching to a new thread + if self._current_thread_name != thread.name: + self._current_thread_name = thread.name + self._usages = {"cpu": [], "load": []} - if not self.detailing_thread_usages.get(thread.name): - self.detailing_thread_usages[thread.name] = {"cpu": [], "load": []} + self._tui_thread_info.draw(stdscr, 2, 0, thread) - self.detailing_thread_usages[thread.name]["cpu"].append(int(thread.runtime.cpu_normalized)) - self.detailing_thread_usages[thread.name]["load"].append(int(thread.runtime.cpu)) + self._usages["cpu"].append(int(thread.runtime.cpu_normalized)) + self._usages["load"].append(int(thread.runtime.cpu)) - graph_height = max(self.min_dimensions[0] - 6, h - 7) - graph_width = w // 2 + graph_height = max(self.controller.min_dimensions[0] - 6, height - 7) + graph_width = width // 2 - if len(self.detailing_thread_usages[thread.name]["load"]) > graph_width - 2: - self.detailing_thread_usages[thread.name]["load"].pop(0) - self.detailing_thread_usages[thread.name]["cpu"].pop(0) + if len(self._usages["load"]) > graph_width - 2: + self._usages["load"].pop(0) + self._usages["cpu"].pop(0) - y += 2 - self._draw_graph( + y = 4 + + self._cpu_graph.draw( + stdscr, y, 0, graph_height, graph_width, - self.detailing_thread_usages[thread.name]["cpu"], - "CPU %", - self.ATTR_GRAPH_B, + points=self._usages["cpu"], ) - self._draw_graph( + + self._load_graph.draw( + stdscr, y, graph_width, graph_height, graph_width, - self.detailing_thread_usages[thread.name]["load"], - "Load %", - self.ATTR_GRAPH_A, + points=self._usages["load"], ) - self.stdscr.refresh() + self._render_status(stdscr, width, height - 2) + + stdscr.refresh() + + def handle_input(self, key: int) -> ZViewState | None: + if key in (curses.KEY_ENTER, SpecialCode.NEWLINE, SpecialCode.RETURN): + return ZViewState.THREAD_LIST_VIEW + elif key == SpecialCode.QUIT: + self.controller.running = False + return None + + +class HeapListView(BaseStateView): + SCHEMA = { + "Heap": 30, + "Free B": 8, + "Used B": 8, + "Heap Usage %": 32, + "Watermark Bytes": 18, + } + COLLUMS: list[str] = list(SCHEMA.keys()) + COLLUM_WIDTHS: list[int] = list(SCHEMA.values()) + + def __init__(self, controller: Any, theme: ZViewTUIAttributes): + super().__init__(controller, theme) + + self._current_sort_idx = 0 + self._invert_sorting = False + + self._sort_keys = [ + lambda h: h.name, + lambda h: h.free_bytes, + lambda h: h.allocated_bytes, + lambda h: h.max_allocated_bytes, + lambda h: ( + (h.allocated_bytes / (h.allocated_bytes + h.free_bytes)) + if (h.allocated_bytes + h.free_bytes) > 0 + else 0 + ), + ] + + self.top_line: int = 0 + + bar_theme = (theme.PROGRESS_BAR_LOW, theme.PROGRESS_BAR_MEDIUM, theme.PROGRESS_BAR_HIGH) + self._tui_heap_info: TUIHeapInfo = TUIHeapInfo(theme.CURSOR, theme.ACTIVE, bar_theme) + self._tui_heap_info.set_field_widths( + self.COLLUM_WIDTHS[0], + self.COLLUM_WIDTHS[1], + self.COLLUM_WIDTHS[2], + self.COLLUM_WIDTHS[3], + self.COLLUM_WIDTHS[4], + ) + + def render(self, stdscr: curses.window, height: int, width: int) -> None: + """ + Draws the heap data table and its aggregate information. + """ + stdscr.erase() + + self._render_frame(stdscr, "Quit: q | Threads: h | Details: ", height, width) - def _draw_heaps_view(self, height, width): max_table_rows = height - 6 - total_heaps = len(self.heaps_data) + total_heaps = len(self.controller.heaps_data) start_num = self.top_line + 1 if total_heaps > 0 else 0 end_num = min(self.top_line + max_table_rows, total_heaps) - heaps_column_width = list(self.ui[self.state].col_widths.values())[0] + heap_column_width = self.COLLUM_WIDTHS[0] - scroll_indicator = f" Heaps: {start_num}-{end_num} of {total_heaps} " + self.cursor = max(min(total_heaps - 1, self.cursor), 0) + if self.cursor >= self.top_line + max_table_rows: + self.top_line = self.cursor - max_table_rows + 1 + elif self.cursor < self.top_line: + self.top_line = self.cursor - self.stdscr.attron(self.ATTR_HEADER_FOOTER) - self.stdscr.addstr(height - 1, 0, scroll_indicator[:width]) - self.stdscr.attroff(self.ATTR_HEADER_FOOTER) + order_symbol = " ▼" if self._invert_sorting else " ▲" + sorting_header = self.COLLUM_WIDTHS[self._current_sort_idx] - table_start = 4 + curr_x = 0 + for col_header, h_width in self.SCHEMA.items(): + if curr_x >= width: + break - free_bytes_sum = sum(h.free_bytes for h in self.heaps_data) - allocated_bytes_sum = sum(h.allocated_bytes for h in self.heaps_data) - max_allocated_bytes_sum = sum(h.max_allocated_bytes for h in self.heaps_data) + if col_header == sorting_header: + col_header += order_symbol + + txt = f"{col_header:^{h_width}}"[: width - curr_x] + stdscr.addstr(1, curr_x, txt) + curr_x += h_width + 1 + + scroll_indicator = f" Heaps: {start_num}-{end_num} of {total_heaps} " + stdscr.addstr(height - 1, 0, scroll_indicator[:width], self._frame_attr) + + table_start = 4 + free_bytes_sum = sum(h.free_bytes for h in self.controller.heaps_data) + allocated_bytes_sum = sum(h.allocated_bytes for h in self.controller.heaps_data) + max_allocated_bytes_sum = sum(h.max_allocated_bytes for h in self.controller.heaps_data) total_heap_bytes = free_bytes_sum + allocated_bytes_sum + aggregate_usage_pct = ( (allocated_bytes_sum / total_heap_bytes * 100) if total_heap_bytes > 0 else 0.0 ) all_heaps_info = HeapInfo( - name="All Heaps".center(heaps_column_width), + name="All Heaps".center(heap_column_width - 1), address=0, free_bytes=free_bytes_sum, allocated_bytes=allocated_bytes_sum, @@ -717,12 +545,13 @@ def _draw_heaps_view(self, height, width): chunks=None, ) - self._draw_heap_info(2, all_heaps_info, False) - self.stdscr.hline(3, 0, curses.ACS_S3, width) - - key_func = self.sort_keys[ZViewState.HEAPS_VIEW][self.current_sort[ZViewState.HEAPS_VIEW]] + self._tui_heap_info.draw(stdscr, 2, 0, all_heaps_info, False) + stdscr.hline(3, 0, curses.ACS_S3, width) - sorted_heaps = sorted(self.heaps_data, key=key_func, reverse=self.invert_sorting) + key_func = self._sort_keys[self._current_sort_idx] + sorted_heaps = sorted( + self.controller.heaps_data, key=key_func, reverse=self._invert_sorting + ) for idx, heap in enumerate(sorted_heaps[self.top_line : self.top_line + max_table_rows]): target_y = table_start + idx @@ -731,225 +560,395 @@ def _draw_heaps_view(self, height, width): break absolute_idx = self.top_line + idx - self._draw_heap_info( - target_y, heap, selected=(absolute_idx == self.cursor[ZViewState.HEAPS_VIEW]) + self._tui_heap_info.draw( + stdscr, + target_y, + 0, + heap, + selected=(absolute_idx == self.cursor), ) - self.stdscr.refresh() + self._render_status(stdscr, width, height - 2) + + stdscr.refresh() + + def handle_input(self, key: int) -> ZViewState | None: + match key: + case curses.KEY_DOWN: + self.cursor += 1 + case curses.KEY_UP: + self.cursor -= 1 + case curses.KEY_ENTER | SpecialCode.NEWLINE | SpecialCode.RETURN: + if not self.controller.heaps_data: + return None + + key_func = self._sort_keys[self._current_sort_idx] + sorted_heaps = sorted( + self.controller.heaps_data, key=key_func, reverse=self._invert_sorting + ) + + self.controller.detailing_heap_address = sorted_heaps[self.cursor].address + return ZViewState.HEAPS_DETAIL_VIEW + + case SpecialCode.SORT: + self._current_sort_idx = (self._current_sort_idx + 1) % len(self._sort_keys) + + case SpecialCode.INVERSE: + self._invert_sorting = not self._invert_sorting + + case SpecialCode.HEAPS: + return ZViewState.THREAD_LIST_VIEW + + case SpecialCode.QUIT: + self.controller.running = False + + case _: + return None + + +class HeapDetailView(BaseStateView): + def __init__(self, controller: Any, theme: ZViewTUIAttributes): + super().__init__(controller, theme) + self._scheme = HeapListView.SCHEMA + self._graph_a_attr = theme.GRAPH_A + self._frag_map_frame: TUIBox = TUIBox("Fragmentation Map", "", theme.GRAPH_B) + + bar_theme = (theme.PROGRESS_BAR_LOW, theme.PROGRESS_BAR_MEDIUM, theme.PROGRESS_BAR_HIGH) + self._tui_heap_info: TUIHeapInfo = TUIHeapInfo(theme.CURSOR, theme.ACTIVE, bar_theme) + self._tui_heap_info.set_field_widths( + HeapListView.COLLUM_WIDTHS[0], + HeapListView.COLLUM_WIDTHS[1], + HeapListView.COLLUM_WIDTHS[2], + HeapListView.COLLUM_WIDTHS[3], + HeapListView.COLLUM_WIDTHS[4], + ) + + @staticmethod + def get_sparsity_map(chunks: list[dict], width: int, height: int) -> list[str]: + total_chars = width * height + if not chunks or total_chars <= 0: + return [] + + total_bytes = sum(chunk["size"] for chunk in chunks) + if total_bytes == 0: + return [] + + bytes_per_char = total_bytes / total_chars + output = [] + chunk_idx = 0 + chunk_rem = float(chunks[0]["size"]) + chunk_is_used = chunks[0]["used"] + + for _ in range(total_chars): + bucket_used = 0.0 + bucket_rem = bytes_per_char + + while bucket_rem > 0 and chunk_idx < len(chunks): + take = min(chunk_rem, bucket_rem) + if chunk_is_used: + bucket_used += take + chunk_rem -= take + bucket_rem -= take + if chunk_rem <= 0: + chunk_idx += 1 + if chunk_idx < len(chunks): + chunk_rem = float(chunks[chunk_idx]["size"]) + chunk_is_used = chunks[chunk_idx]["used"] + + ratio = bucket_used / bytes_per_char + if ratio == 0: + output.append(" ") + elif ratio <= 0.33: + output.append("░") + elif ratio <= 0.66: + output.append("▒") + elif ratio <= 0.99: + output.append("▓") + else: + output.append("█") + + return ["".join(output[i : i + width]) for i in range(0, len(output), width)] + + @staticmethod + def _get_fragmentation_metrics(chunks: list[dict]) -> dict: + if not chunks: + return {} + total_chunks = len(chunks) + allocated_chunks = sum(1 for c in chunks if c["used"]) + free_bytes = sum(c["size"] for c in chunks if not c["used"]) + largest_free = max((c["size"] for c in chunks if not c["used"]), default=0) + ratio = (1 - largest_free / free_bytes) * 100 if free_bytes > 0 else 0.0 + return { + "Largest free": (largest_free, "bytes"), + "Frag ratio": (ratio, "percent"), + "Chunks": (f"{allocated_chunks}/{total_chunks}", "raw"), + } + + @staticmethod + def _get_heap_details_footer(metrics: dict): + if not metrics: + return "" - def _draw_heaps_detail_view(self, height: int, width: int): - for heap in self.heaps_data: - if heap.address != self.scraper.extra_info_heap_address or not heap.chunks: - continue + def fmt(value, hint): + if hint == "bytes": + return f"{value / 1024:.1f} KB" if value >= 1024 else f"{value} B" + if hint == "percent": + return f"{value:.1f}%" + return str(value) - self._draw_heap_info(2, heap, selected=False) + return " · ".join([f"{k}: {fmt(v, h)}" for k, (v, h) in metrics.items()]) - start_y = 5 - start_x = 1 + def render(self, stdscr: curses.window, height: int, width: int) -> None: + stdscr.erase() - map_height = height - start_y - 4 - map_width = width - start_x - 1 + self._render_frame(stdscr, "Quit: q | All heaps: ", height, width) - if map_height <= 0 or map_width <= 0: + curr_x = 0 + for col_header, h_width in self._scheme.items(): + if curr_x >= width: break + txt = f"{col_header:^{h_width}}"[: width - curr_x] + stdscr.addstr(1, curr_x, txt) + curr_x += h_width + 1 + + heap = next( + ( + h + for h in self.controller.heaps_data + if h.address == self.controller.detailing_heap_address + ), + None, + ) + + if not heap or not heap.chunks: + self._render_status(stdscr, width, height - 2) + stdscr.refresh() + return + + self._tui_heap_info.draw(stdscr, 2, 0, heap, False) + + start_y = 5 + start_x = 1 + map_height = height - start_y - 4 + map_width = width - start_x - 1 + + if map_height > 0 and map_width > 0: sparsity_matrix = self.get_sparsity_map(heap.chunks, map_width, map_height) + metrics = self._get_fragmentation_metrics(heap.chunks) + desc = self._get_heap_details_footer(metrics) + + self._frag_map_frame._description = desc + self._frag_map_frame.draw( + stdscr, + start_y - 1, + start_x - 1, + map_height + 2, + map_width + 2, + ) - horizontal_limit = "─" * (map_width) - self.stdscr.attron(self.ATTR_GRAPH_B) - self.stdscr.addstr(start_y - 1, start_x - 1, "┌" + horizontal_limit + "┐") - self.stdscr.addstr(start_y + map_height, start_x - 1, "└" + horizontal_limit + "┘") - self.stdscr.addstr(start_y - 1, start_x, f"Fragmentation Map ({heap.name})") for i, row_str in enumerate(sparsity_matrix): - with contextlib.suppress(curses.error): - self.stdscr.addstr(start_y + i, start_x - 1, "│" + row_str + "│") - self.stdscr.attroff(self.ATTR_GRAPH_B) + stdscr.addstr(start_y + i, start_x, row_str, self._graph_a_attr) - metrics = self._get_fragmentation_metrics(heap.chunks) - self._draw_heap_details_footer(height - 4, start_x, metrics) + self._render_status(stdscr, width, height - 2) - break + stdscr.refresh() - self.stdscr.refresh() + def handle_input(self, key: int) -> ZViewState | None: + if key in (curses.KEY_ENTER, SpecialCode.NEWLINE, SpecialCode.RETURN): + return ZViewState.HEAP_LIST_VIEW + elif key == SpecialCode.QUIT: + self.controller.running = False + return None - def draw_state(self, state: ZViewState, height: int, width: int): - if state == ZViewState.FATAL_ERROR: - self._draw_fatal_error_view(height, width) - return - self._base_draw(height, width) - match state: - case ZViewState.DEFAULT_VIEW: - self._draw_default_view(height, width) - case ZViewState.THREAD_DETAIL: - self._draw_thread_detail_view(height, width) - case ZViewState.HEAPS_VIEW: - self._draw_heaps_view(height, width) - case ZViewState.HEAPS_DETAIL: - self._draw_heaps_detail_view(height, width) - - def draw_terminal_size_warning(self, height: int, width: int): - self.stdscr.erase() - - msgs = [ - "Terminal is too small.", - "Please resize your terminal to at least " - f"{self.min_dimensions[1]}x{self.min_dimensions[0]}", - f"Current: {width}x{height}", - ] +class ZView: + """ + A curses-based application for viewing Zephyr RTOS thread runtime information. - mid_y = height // 2 - start_y = mid_y - 1 + This class manages the curses UI, starts a background thread for data polling, + and updates the display with real-time thread statistics from a connected MCU. + """ - for i, msg in enumerate(msgs): - if 0 <= start_y + i < height: - centered_line = f"{msg:^{width}}"[: width - 1] - self.stdscr.addstr(start_y + i, 0, centered_line) + def __init__(self, scraper: ZScraper, stdscr): + """ + Initializes the ZView application. - def draw_tui(self, height, width): - if height < self.min_dimensions[0] or width < self.min_dimensions[1]: - self.draw_terminal_size_warning(height, width) + Args: + stdscr: The main curses window object provided by curses.wrapper. + """ + self.min_dimensions = (14, 86) + self.stdscr: curses.window = stdscr + self.scraper: ZScraper = scraper + self.running = True + self.threads_data: list[ThreadInfo] = [] + self.heaps_data: list[HeapInfo] = [] + self.status_message: str = "" + self.data_queue = queue.Queue() + self.stop_event = threading.Event() + self.update_count = 0 + + self.state: ZViewState = ZViewState.THREAD_LIST_VIEW + + self.detailing_thread: str | None = None + self.detailing_heap_address: int | None = None + self.idle_thread: ThreadInfo | None = None + + theme = self._init_curses() + + self.views: dict[ZViewState, BaseStateView] = { + ZViewState.FATAL_ERROR: FatalErrorView(self, theme), + ZViewState.THREAD_LIST_VIEW: ThreadListView(self, theme), + ZViewState.THREAD_DETAIL_VIEW: ThreadDetailView(self, theme), + ZViewState.HEAP_LIST_VIEW: HeapListView(self, theme), + ZViewState.HEAPS_DETAIL_VIEW: HeapDetailView(self, theme), + } + + def _init_curses(self) -> ZViewTUIAttributes: + """ + Initializes curses settings and defines color pairs used in the UI. + """ + curses.curs_set(0) + curses.noecho() + curses.cbreak() + self.stdscr.keypad(True) + self.stdscr.nodelay(True) + + if not curses.has_colors(): + return ZViewTUIAttributes.create_mono() else: - self.draw_state(self.state, height, width) + curses.start_color() + # Active thread name + curses.init_pair(1, curses.COLOR_CYAN, curses.COLOR_BLACK) + # Inactive thread name + curses.init_pair(2, curses.COLOR_WHITE, curses.COLOR_BLACK) + # Progress bar: low usage + curses.init_pair(3, curses.COLOR_GREEN, curses.COLOR_BLACK) + # Progress bar: medium usage + curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK) + # Progress bar: high usage + curses.init_pair(5, curses.COLOR_RED, curses.COLOR_BLACK) + # Header/Footer background + curses.init_pair(6, curses.COLOR_WHITE, curses.COLOR_BLUE) + # Error message text + curses.init_pair(7, curses.COLOR_RED, curses.COLOR_BLACK) + # Cursor selection + curses.init_pair(8, curses.COLOR_BLACK, curses.COLOR_WHITE) + # Graph A + curses.init_pair(9, curses.COLOR_MAGENTA, curses.COLOR_BLACK) + # Graph B + curses.init_pair(10, curses.COLOR_CYAN, curses.COLOR_BLACK) - def process_events(self, height, inspection_period): - key = self.stdscr.getch() + return ZViewTUIAttributes( + curses.color_pair(1), + curses.color_pair(2), + curses.color_pair(3), + curses.color_pair(4), + curses.color_pair(5), + curses.color_pair(6), + curses.color_pair(7), + curses.color_pair(8), + curses.color_pair(9), + curses.color_pair(10), + ) - if self.state == ZViewState.FATAL_ERROR: - if key == SpecialCode.QUIT: - self.running = False - elif key == SpecialCode.RECONNECT: - self.status_message = "Attempting to reconnect..." - self.scraper.finish_polling_thread() - self.scraper._m_scraper.disconnect() - self.purge_queue() - self.state = ZViewState.DEFAULT_VIEW + def purge_queue(self): + with self.data_queue.mutex: + self.data_queue.queue.clear() - self.stop_event.clear() + def attempt_reconnect(self): + """Executes hardware reconnection and data pipeline reset.""" + self.status_message = "Attempting to reconnect..." + self.scraper.finish_polling_thread() + self.scraper._m_scraper.disconnect() + self.purge_queue() - # Re-initialize the connection cleanly - try: - self.scraper.update_available_threads() - self.scraper.reset_thread_pool() - self.scraper.start_polling_thread( - self.data_queue, self.stop_event, inspection_period - ) - except Exception as e: - self.process_data({"fatal_error": f"Reconnection failed: {e}"}) - return # Block all other input during fatal error + self.stop_event.clear() - if self.state in (ZViewState.DEFAULT_VIEW, ZViewState.HEAPS_VIEW): - max_table_size = len( - self.threads_data if self.state is ZViewState.DEFAULT_VIEW else self.heaps_data + try: + self.scraper.update_available_threads() + self.scraper.reset_thread_pool() + self.scraper.start_polling_thread( + self.data_queue, self.stop_event, self.scraper.inspection_period ) + self.transition_to(ZViewState.THREAD_LIST_VIEW) + self.stdscr.clear() + except Exception as e: + self.process_data({"fatal_error": f"Reconnection failed: {e}"}) - match key: - case curses.KEY_DOWN: - if max_table_size > 0: - self.cursor[self.state] = min( - max_table_size - 1, self.cursor[self.state] + 1 - ) - if self.cursor[self.state] >= self.top_line + (height - 6): - self.top_line = self.cursor[self.state] - (height - 7) - return - case curses.KEY_UP: - if max_table_size > 0: - self.cursor[self.state] = max(0, self.cursor[self.state] - 1) - if self.cursor[self.state] < self.top_line: - self.top_line = self.cursor[self.state] - return + def draw_tui(self, height, width): + if height < self.min_dimensions[0] or width < self.min_dimensions[1]: + self.stdscr.erase() + + msgs = [ + "Terminal is too small.", + "Please resize your terminal to at least " + f"{self.min_dimensions[1]}x{self.min_dimensions[0]}", + f"Current: {width}x{height}", + ] + + mid_y = height // 2 + start_y = mid_y - 1 + + for i, msg in enumerate(msgs): + if 0 <= start_y + i < height: + centered_line = f"{msg:^{width}}"[: width - 1] + self.stdscr.addstr(start_y + i, 0, centered_line) + return - match key: - case curses.KEY_ENTER | SpecialCode.NEWLINE | SpecialCode.RETURN: - match self.state: - case ZViewState.DEFAULT_VIEW: - if not self.threads_data: - return - - key_func = self.sort_keys[ZViewState.DEFAULT_VIEW][ - self.current_sort[ZViewState.DEFAULT_VIEW] - ] - sorted_threads = sorted( - self.threads_data, key=key_func, reverse=self.invert_sorting - ) - - self.state = ZViewState.THREAD_DETAIL - self.detailing_thread = sorted_threads[ - self.cursor[ZViewState.DEFAULT_VIEW] - ].name - - new_pool = [self.scraper.all_threads[self.detailing_thread]] - idle_t = next( - ( - t - for t in self.scraper.all_threads.values() - if t.address == self.scraper.idle_threads_address - ), - None, - ) - if idle_t and idle_t.address != new_pool[0].address: - new_pool.append(idle_t) - - self.scraper.thread_pool = new_pool - self.purge_queue() - - case ZViewState.THREAD_DETAIL: - self.state = ZViewState.DEFAULT_VIEW - self.scraper.thread_pool = list(self.scraper.all_threads.values()) - self.purge_queue() - - case ZViewState.HEAPS_VIEW: - if not self.heaps_data: - return - - key_func = self.sort_keys[ZViewState.HEAPS_VIEW][ - self.current_sort[ZViewState.HEAPS_VIEW] - ] - sorted_heaps = sorted( - self.heaps_data, key=key_func, reverse=self.invert_sorting - ) - - self.state = ZViewState.HEAPS_DETAIL - self.scraper.extra_info_heap_address = sorted_heaps[ - self.cursor[ZViewState.HEAPS_VIEW] - ].address - self.purge_queue() - - case ZViewState.HEAPS_DETAIL: - self.state = ZViewState.HEAPS_VIEW - self.scraper.extra_info_heap_address = None - self.purge_queue() + self.views[self.state].render(self.stdscr, height, width) - case SpecialCode.SORT: - if self.state not in (ZViewState.DEFAULT_VIEW, ZViewState.HEAPS_VIEW): - pass + def transition_to(self, new_state: ZViewState): + """Centralized state transition and data pipeline management.""" + if new_state not in self.views: + self.status_message = f"Warning: {new_state.name} is not yet implemented." + return - self.current_sort[self.state] = (self.current_sort[self.state] + 1) % len( - self.sort_keys[self.state] - ) - case SpecialCode.INVERSE: - if self.state not in (ZViewState.DEFAULT_VIEW, ZViewState.HEAPS_VIEW): - pass + match new_state: + case ZViewState.THREAD_LIST_VIEW: + self.scraper.thread_pool = list(self.scraper.all_threads.values()) + self.purge_queue() - self.invert_sorting = not self.invert_sorting - case SpecialCode.HEAPS: - if not self.scraper.has_heaps: + case ZViewState.THREAD_DETAIL_VIEW: + if self.detailing_thread is None: return - if self.state is ZViewState.HEAPS_VIEW: - self.scraper.reset_thread_pool() - self.state = ZViewState.DEFAULT_VIEW - elif self.state is ZViewState.DEFAULT_VIEW: - self.scraper.thread_pool = [] - self.state = ZViewState.HEAPS_VIEW + target_thread = self.scraper.all_threads.get(self.detailing_thread) + if target_thread: + new_pool = [target_thread] + idle_t = next( + ( + t + for t in self.scraper.all_threads.values() + if t.address == self.scraper.idle_threads_address + ), + None, + ) + if idle_t and idle_t.address != new_pool[0].address: + new_pool.append(idle_t) + + self.scraper.thread_pool = new_pool + self.purge_queue() + case ZViewState.HEAP_LIST_VIEW: + self.scraper.extra_info_heap_address = None + self.scraper.thread_pool = [] self.purge_queue() - case SpecialCode.QUIT: - self.running = False - case _: - return - return + case ZViewState.HEAPS_DETAIL_VIEW: + self.scraper.extra_info_heap_address = self.detailing_heap_address + self.purge_queue() + + self.state = new_state + + def process_events(self): + key = self.stdscr.getch() + if key == -1: + return + + new_state = self.views[self.state].handle_input(key) + if new_state and new_state != self.state: + self.transition_to(new_state) def process_data(self, data): if data.get("fatal_error"): @@ -995,15 +994,17 @@ def run(self, inspection_period): self.scraper.start_polling_thread(self.data_queue, self.stop_event, inspection_period) while self.running: - with contextlib.suppress(queue.Empty): - data = self.data_queue.get_nowait() - self.process_data(data) + # Drain the queue completely on every frame + while not self.data_queue.empty(): + with contextlib.suppress(queue.Empty): + data = self.data_queue.get_nowait() + self.process_data(data) h, w = self.stdscr.getmaxyx() self.draw_tui(h, w) - self.process_events(h, inspection_period) + self.process_events() time.sleep(0.01)