Skip to content
Merged

Rtt #1945

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 55 additions & 36 deletions pyocd/utility/rtt_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ class RTTConfig:
_session: Session
_target: str
_core: int
has_rtt_config: bool = False
control_block: RTTControlBlock = None
channels: RTTChannelList = None
num_systemview_channels: int = 0

def __post_init__(self):
self._rtt_configuration()
Expand All @@ -53,42 +55,46 @@ def _add_channel(self, ch_list: List["RTTConfig.RTTChannel"], number: int, mode:

if number is None:
# Warn about missing channel number
LOG.warning("RTT channel configuration for core %d is missing channel number; channel disabled", self._core)
LOG.warning("RTT channel configuration for core %d: missing channel number; channel disabled", self._core)
return
if ch_list and any(number == ch[0] for ch in ch_list):
LOG.warning("RTT channel %d for core %d is already configured; skipping duplicate", number, self._core)
LOG.warning("RTT channel %d for core %d: already configured; skipping duplicate", number, self._core)
return
if mode is None:
# Warn about missing channel mode
LOG.warning("RTT channel %d configuration for core %d is missing mode; channel disabled", number, self._core)
LOG.warning("RTT channel %d configuration for core %d: missing mode; channel disabled", number, self._core)
return
if mode not in SUPPORTED_MODES:
# Warn about unsupported channel mode
LOG.warning("RTT channel %d configuration for core %d has unsupported mode '%s'; channel disabled",
LOG.warning("RTT channel %d configuration for core %d: unsupported mode '%s'; channel disabled",
number, self._core, mode)
return
# Server mode or SystemView server mode requires a port number
if mode in {'server', 'systemview-server'}:
if port is None:
LOG.warning("RTT channel %d configuration for core %d is missing port for %s mode; channel disabled", number, self._core, mode)
LOG.warning("RTT channel %d configuration for core %d: missing port for %s mode; channel disabled", number, self._core, mode)
return
conflict = next((ch for ch in ch_list if ch[1] == 'telnet' and ch[2] == port), None)
if next((ch for ch in ch_list if ch[1] == 'telnet' and ch[2] == port), None) is not None:
LOG.warning("RTT telnet port %d is already in use for channel %d on core %d; channel disabled", port, conflict[0], self._core)
conflict = next((ch for ch in ch_list if ch[1] in {'server', 'systemview-server'} and ch[2] == port), None)
if conflict is not None:
LOG.warning("RTT channel %d configuration for core %d: port %d is already in use for %s mode on channel %d; channel disabled",
number, self._core, port, conflict[1], conflict[0])
return
else:
port = None
if mode == 'systemview':
# Count the number of SystemView channels
self.num_systemview_channels += 1

port_str = f", port={port}" if port is not None else ""
LOG.debug("RTT channel %d configuration for core %d: mode=%s%s", number, self._core, mode, port_str)
ch_list.append((number, mode, port))

def _rtt_configuration(self) -> None:
rtt_config_list = self._session.options.get('rtt') or []
if not rtt_config_list:
LOG.debug("No RTT configuration found in session options for core %d", self._core)
return

self.has_rtt_config = True

rtt_config_by_pname = {cfg.get('pname'): cfg for cfg in rtt_config_list}
proc_name = self._target.node_name if self._target.node_name else None

Expand All @@ -107,8 +113,8 @@ def _rtt_configuration(self) -> None:
else:
address = cb_l.get('address') if (cb_l and cb_l.get('address') is not None) else (cb_g.get('address') if cb_g else None)
size = cb_l.get('size') if (cb_l and cb_l.get('size') is not None) else (cb_g.get('size') if cb_g else None)
auto_detect = cb_l.get('auto-detect') if (cb_l and cb_l.get('auto-detect') is not None) else (cb_g.get('auto-detect', False) if cb_g else False)
if address is not None or auto_detect:
auto_detect = cb_l.get('auto-detect') if (cb_l and cb_l.get('auto-detect') is not None) else (cb_g.get('auto-detect', None) if cb_g else None)
if address is not None or auto_detect is not None:
rtt_cb = (address, size, auto_detect)
else:
rtt_cb = None
Expand All @@ -126,7 +132,7 @@ def _rtt_configuration(self) -> None:
for ch in ch_g:
if ch_l is not None and any(ch.get('number') == local_ch.get('number') for local_ch in ch_l):
# Skip global channel configuration if a local channel with the same number exists
LOG.debug("RTT channel number %d configuration for core %d is overridden by pname specific configuration; skipping global channel configuration",
LOG.debug("RTT channel %d configuration for core %d: pname specific configuration used; skipping global channel configuration",
ch.get('number'), self._core)
else:
self._add_channel(rtt_ch, ch.get('number'), ch.get('mode'), ch.get('port'))
Expand Down Expand Up @@ -166,7 +172,7 @@ def _start_rtt_server(self, address: Optional[int], size: Optional[int]) -> Opti
try:
server = RTTServer(self._target, address, size, b'SEGGER RTT')
server.start()
LOG.info("RTT server started for core %d", self._core)
LOG.debug("RTT started for core %d", self._core)
return server
except exceptions.RTTError:
return None
Expand All @@ -193,107 +199,120 @@ def _find_segger_rtt_symbol(self) -> Optional[int]:
return symbol_info.address
return None
except Exception as e:
LOG.warning("Failed to get _SEGGER_RTT symbol address from ELF for core %d: %s", self._core, e)
LOG.warning("RTT for core %d: failed to get _SEGGER_RTT symbol address from ELF: %s", self._core, e)
return None

def start_server(self) -> Optional[RTTServer]:
"""@brief Create and start RTT server."""

if not self._rtt_config.has_rtt_config:
LOG.debug("RTT for core %d: no RTT configuration; RTT disabled", self._core)
return None

if self._rtt_config.channels is None:
LOG.warning("No RTT channels configured for core %d; cannot start RTT server", self._core)
LOG.warning("RTT for core %d: no channels configured; RTT disabled", self._core)
return None

if self._rtt_server is not None:
LOG.warning("RTT server for core %d is already running; start_server() call ignored", self._core)
LOG.warning("RTT for core %d: already running; start_server() call ignored", self._core)
return self._rtt_server

# Get RTT control block configuration for this core
rtt_cb = self._rtt_config.control_block

if rtt_cb is not None:
address, size, auto_detect = rtt_cb
if address is None and auto_detect is False:
LOG.warning("RTT for core %d: control block configuration is missing address while auto-detect is disabled; RTT disabled", self._core)
return None
if address is not None:
self._rtt_server = self._start_rtt_server(address, size)
if self._rtt_server is not None:
if size:
LOG.debug("RTT for core %d: RTT control block found via scan of memory specified with address 0x%X and size 0x%X", self._core, address, size)
else:
LOG.debug("RTT for core %d: RTT control block found via specified address 0x%X", self._core, address)
return self._rtt_server
else:
if size:
LOG.warning("Failed to start RTT server with specified address 0x%X and size 0x%X for core %d", address, size, self._core)
LOG.warning("RTT for core %d: failed to find RTT control block with specified address 0x%X and size 0x%X", self._core, address, size)
else:
LOG.warning("Failed to start RTT server with specified address 0x%X for core %d", address, self._core)
LOG.warning("RTT for core %d: failed to find RTT control block with specified address 0x%X", self._core, address)
if auto_detect:
# Fallback: auto-detect via memory scan in default memory region if no address specified
self._rtt_server = self._start_rtt_server(None, None)
if self._rtt_server is not None:
LOG.debug("RTT for core %d: RTT control block found via auto-detect memory scan in default memory region", self._core)
return self._rtt_server
else:
LOG.warning("Failed to start RTT server with auto-detected address for core %d", self._core)
LOG.warning("RTT for core %d: failed to find RTT control block with auto-detected address", self._core)
return None
else:
# Auto-detect via symbol "_SEGGER_RTT" lookup in the ELF file
address = self._find_segger_rtt_symbol()
if address is None:
LOG.warning("Failed to find _SEGGER_RTT symbol in ELF for core %d; cannot auto-detect RTT control block address", self._core)
LOG.warning("RTT for core %d: failed to find _SEGGER_RTT symbol in ELF; cannot auto-detect RTT control block", self._core)
return None
self._rtt_server = self._start_rtt_server(address, None)
if self._rtt_server is not None:
LOG.debug("RTT for core %d: RTT control block found via _SEGGER_RTT symbol lookup at address 0x%X", self._core, address)
return self._rtt_server

LOG.warning("Failed to start RTT server with _SEGGER_RTT symbol address 0x%X for core %d", address, self._core)
LOG.warning("RTT for core %d: failed to find RTT control block with _SEGGER_RTT symbol address 0x%X", self._core, address)
return None

def configure_channels(self, stdio_handler: Optional[StdioHandler] = None):
"""@brief Configure RTT channels."""

if self._rtt_server is None:
LOG.warning("RTT server not started; cannot configure RTT channels for core %d", self._core)
LOG.warning("RTT for core %d: RTT not started; cannot configure RTT channels", self._core)
return

stdio_enabled = False

for number, mode, server_port in self._rtt_config.channels:
if self._rtt_server.is_channel_idx_valid(number) is False:
LOG.warning("RTT channel index %d for core %d is out of range; skipping configuration for channel %d", number, self._core, number)
LOG.warning("RTT for core %d: channel index %d is out of range; skipping configuration for channel %d", self._core, number, number)
continue
if self._rtt_server.is_channel_configured(number):
LOG.warning("RTT channel %d for core %d is already configured; skipping configuration for channel %d", number, self._core, number)
LOG.warning("RTT for core %d: channel %d is already configured; skipping configuration for channel %d", self._core, number, number)
continue
# STDIO mode
if mode == 'stdio':
if stdio_handler is None:
LOG.warning("StdioHandler for core %d is not provided; skipping configuration for channel %d", self._core, number)
LOG.warning("RTT for core %d: StdioHandler is not provided; skipping configuration for channel %d", self._core, number)
continue
if stdio_enabled:
LOG.warning("STDIO mode is already enabled for core %d; skipping configuration for channel %d", self._core, number)
LOG.warning("RTT for core %d: stdio mode is already enabled; skipping configuration for channel %d", self._core, number)
continue
try:
self._rtt_server.add_channel_worker(number, lambda: RTTChanStdioWorker(channel=number, stdio=stdio_handler))
LOG.info("STDIO mode enabled on RTT channel %d for core %d", number, self._core)
LOG.info("RTT channel %d configuration for core %d: mode=%s", number, self._core, mode)
stdio_enabled = True
except exceptions.RTTError as e:
LOG.error("Failed to enable STDIO mode for RTT channel %d for core %d: %s", number, self._core, e)
LOG.error("RTT for core %d: failed to enable stdio mode for RTT channel %d: %s", self._core, number, e)
# Server mode
elif mode == 'server':
try:
self._rtt_server.add_channel_worker(number, lambda: RTTChanTCPWorker(server_port, listen=True))
LOG.info("Server mode on port %d enabled on RTT channel %d for core %d", server_port, number, self._core)
LOG.info("RTT channel %d configuration for core %d: mode=%s, port=%d", number, self._core, mode, server_port)
except exceptions.RTTError as e:
LOG.error("Failed to enable server mode for RTT channel %d for core %d: %s", number, self._core, e)
LOG.error("RTT for core %d: failed to enable server mode for RTT channel %d: %s", self._core, number, e)
# SystemView mode
elif mode == 'systemview':
try:
fname_root = self._systemview.file.rsplit('.', 1)[0]
fname = f'{fname_root}.core{self._core}.ch{number}.bin'
self._rtt_server.add_channel_worker(number, lambda: RTTChanSysViewFileWorker(rtt_server=self._rtt_server, rtt_channel=number, file_out=fname, auto_start=self._systemview.auto_start, auto_stop=self._systemview.auto_stop))
LOG.info("SystemView mode enabled on RTT channel %d for core %d", number, self._core)
LOG.info("RTT channel %d configuration for core %d: mode=%s", number, self._core, mode)
except (IOError, exceptions.RTTError) as e:
LOG.error("Failed to enable SystemView mode for RTT channel %d for core %d: %s", number, self._core, e)
LOG.error("RTT for core %d: failed to enable systemview mode for RTT channel %d: %s", self._core, number, e)
# SystemView server mode
elif mode == 'systemview-server':
try:
self._rtt_server.add_channel_worker(number, lambda: RTTChanSysViewTCPWorker(server_port, listen=True))
LOG.info("SystemView server mode on port %d enabled on RTT channel %d for core %d", server_port, number, self._core)
LOG.info("RTT channel %d configuration for core %d: mode=%s, port=%d", number, self._core, mode, server_port)
except exceptions.RTTError as e:
LOG.error("Failed to enable SystemView server mode for RTT channel %d for core %d: %s", number, self._core, e)
LOG.error("RTT for core %d: failed to enable systemview server mode for RTT channel %d: %s", self._core, number, e)
else:
LOG.warning("Unsupported RTT channel mode '%s' for channel %d for core %d; skipping configuration for channel %d", mode, number, self._core, number)
LOG.warning("RTT for core %d: unsupported channel mode '%s' for channel %d; skipping configuration for channel %d", self._core, mode, number, number)
Loading