diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cb3fbf5c..e27d4199 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,7 +22,7 @@ repos: language: pygrep types: [python] files: ^pysquared/ - exclude: ^pysquared/beacon\.py|^pysquared/logger\.py|^pysquared/rtc/manager/microcontroller\.py + exclude: ^pysquared/beacon\.py|^pysquared/logger\.py|^pysquared/rtc/manager/microcontroller\.py|pysquared/hardware/sd_card/manager/sd_card\.py - repo: local hooks: diff --git a/mocks/circuitpython/storage.py b/mocks/circuitpython/storage.py new file mode 100644 index 00000000..beaf59ff --- /dev/null +++ b/mocks/circuitpython/storage.py @@ -0,0 +1,26 @@ +"""Mock for the CircuitPython storage module. + +This module provides a mock implementation of the CircuitPython storage module +for testing purposes. It allows for simulating the behavior of the storage +module without the need for actual hardware. +""" + + +def disable_usb_drive() -> None: + """A mock function to disable the USB drive.""" + pass + + +def enable_usb_drive() -> None: + """A mock function to enable the USB drive.""" + pass + + +def remount(path: str, readonly: bool) -> None: + """A mock function to remount the filesystem. + + Args: + path: The path to remount. + readonly: Whether to mount as read-only. + """ + pass diff --git a/pyproject.toml b/pyproject.toml index 25ab6489..ee3ed6da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,26 +45,29 @@ docs = [ [tool.setuptools] packages = [ "pysquared", + "pysquared.boot", "pysquared.config", - "pysquared.hardware", - "pysquared.hardware.imu", + "pysquared.hardware.burnwire.manager", + "pysquared.hardware.burnwire", "pysquared.hardware.imu.manager", - "pysquared.hardware.magnetometer", + "pysquared.hardware.imu", + "pysquared.hardware.light_sensor.manager", "pysquared.hardware.magnetometer.manager", - "pysquared.hardware.radio", + "pysquared.hardware.magnetometer", + "pysquared.hardware.power_monitor.manager", + "pysquared.hardware.power_monitor", "pysquared.hardware.radio.manager", "pysquared.hardware.radio.packetizer", - "pysquared.hardware.power_monitor", - "pysquared.hardware.power_monitor.manager", - "pysquared.hardware.temperature_sensor", + "pysquared.hardware.radio", + "pysquared.hardware.sd_card.manager", + "pysquared.hardware.sd_card", "pysquared.hardware.temperature_sensor.manager", - "pysquared.hardware.burnwire", - "pysquared.hardware.burnwire.manager", - "pysquared.hardware.light_sensor.manager", + "pysquared.hardware.temperature_sensor", + "pysquared.hardware", "pysquared.nvm", "pysquared.protos", - "pysquared.rtc", "pysquared.rtc.manager", + "pysquared.rtc", "pysquared.sensor_reading" ] diff --git a/pysquared/beacon.py b/pysquared/beacon.py index 2caaafc9..0896210f 100644 --- a/pysquared/beacon.py +++ b/pysquared/beacon.py @@ -19,9 +19,9 @@ from collections import OrderedDict try: - from mocks.circuitpython.microcontroller import Processor -except ImportError: from microcontroller import Processor +except ImportError: + from mocks.circuitpython.microcontroller import Processor from .hardware.radio.packetizer.packet_manager import PacketManager from .logger import Logger diff --git a/pysquared/boot/__init__.py b/pysquared/boot/__init__.py new file mode 100644 index 00000000..84519555 --- /dev/null +++ b/pysquared/boot/__init__.py @@ -0,0 +1,3 @@ +""" +This module provides utilities that can run during the boot process by adding them to boot.py. +""" diff --git a/pysquared/boot/filesystem.py b/pysquared/boot/filesystem.py new file mode 100644 index 00000000..c8b8ad55 --- /dev/null +++ b/pysquared/boot/filesystem.py @@ -0,0 +1,52 @@ +"""File includes utilities for managing the filesystem during the boot process.""" + +import os +import time + +try: + import storage +except ImportError: + import mocks.circuitpython.storage as storage + + +def mkdir( + path: str, + storage_action_delay: float = 0.02, +) -> None: + """ + Create directories on internal storage during boot. + + In CircuitPython the internal storage is not writable by default. In order to mount + any external storage (such as an SD Card) the drive must be remounted in read/write mode. + This function handles the necessary steps to safely create a directory on the internal + storage during boot. + + Args: + mount_point: Path to mount point + storage_action_delay: Delay after storage actions to ensure stability + + Usage: + ```python + from pysquared.boot.filesystem import mkdir + mkdir("/sd") + ``` + """ + try: + storage.disable_usb_drive() + time.sleep(storage_action_delay) + print("Disabled USB drive") + + storage.remount("/", False) + time.sleep(storage_action_delay) + print("Remounted root filesystem") + + try: + os.mkdir(path) + print(f"Mount point {path} created.") + except OSError: + print(f"Mount point {path} already exists.") + + finally: + storage.enable_usb_drive() + time.sleep(storage_action_delay) + print("Enabled USB drive") diff --git a/pysquared/boot_functions.py b/pysquared/boot_functions.py deleted file mode 100644 index fdd1cd35..00000000 --- a/pysquared/boot_functions.py +++ /dev/null @@ -1,56 +0,0 @@ -"""Module that provides function relevant on boot""" - -import os -import time - -import storage - - -def set_mount_points( - mount_points=[ - "/sd", - ], - wait_time=0.02, - max_attempt=5, -) -> None: - """ - Mounts the specified mount points. - - Args: - mount_points ([String]): List of mount points - wait_time (float): Time to wait between mount attempts - max_attempts (int): Amount of attempts before failure - """ - mount_points = [ - "/sd", - ] - - wait_time = 0.02 - - storage.disable_usb_drive() - print("Disabling USB drive") - time.sleep(wait_time) - - storage.mount("/", False) - print("Remounting root filesystem") - time.sleep(wait_time) - - attempts = 0 - while attempts < max_attempt: - attempts += 1 - try: - for path in mount_points: - try: - os.mkdir(path) - print(f"Mount point {path} created.") - except OSError: - print(f"Mount point {path} already exists.") - except Exception as e: - print(f"Error creating mount point {path}: {e}") - time.sleep(wait_time) - continue - - break - - storage.enable_usb_drive() - print("Enabling USB drive") diff --git a/pysquared/hardware/sd_card/manager/__init__.py b/pysquared/hardware/sd_card/manager/__init__.py new file mode 100644 index 00000000..ff1ea6e4 --- /dev/null +++ b/pysquared/hardware/sd_card/manager/__init__.py @@ -0,0 +1,3 @@ +""" +This module provides an interface for controlling SD cards. +""" diff --git a/pysquared/sd_card.py b/pysquared/hardware/sd_card/manager/sd_card.py similarity index 61% rename from pysquared/sd_card.py rename to pysquared/hardware/sd_card/manager/sd_card.py index 9484730f..adced9ae 100644 --- a/pysquared/sd_card.py +++ b/pysquared/hardware/sd_card/manager/sd_card.py @@ -1,17 +1,11 @@ """This module provides a SD Card class to manipulate the sd card filesystem""" -# import os - import sdcardio import storage +from busio import SPI +from microcontroller import Pin -from .hardware.exception import HardwareInitializationError - -try: - from busio import SPI - from microcontroller import Pin -except ImportError: - pass +from ...exception import HardwareInitializationError class SDCardManager: @@ -26,13 +20,9 @@ def __init__( baudrate: int = 400000, mount_path: str = "/sd", ) -> None: - self.mounted = False - self.mount_path = mount_path - try: sd = sdcardio.SDCard(spi_bus, chip_select, baudrate) - vfs = storage.VfsFat(sd) - storage.mount(vfs, self.mount_path) - self.mounted = True - except (OSError, ValueError) as e: + vfs = storage.VfsFat(sd) # type: ignore # Issue: https://github.com/adafruit/Adafruit_CircuitPython_Typing/issues/51 + storage.mount(vfs, mount_path) + except Exception as e: raise HardwareInitializationError("Failed to initialize SD Card") from e diff --git a/pysquared/logger.py b/pysquared/logger.py index 8799abd4..20e08678 100644 --- a/pysquared/logger.py +++ b/pysquared/logger.py @@ -18,8 +18,6 @@ import traceback from collections import OrderedDict -import adafruit_pathlib - from .nvm.counter import Counter @@ -79,11 +77,11 @@ class LogLevel: class Logger: """Handles logging messages with different severity levels.""" + _log_dir: str | None = None + def __init__( self, error_counter: Counter, - sd_path: adafruit_pathlib.Path = None, - # sd_card: SDCardManager = None, log_level: int = LogLevel.NOTSET, colorized: bool = False, ) -> None: @@ -91,19 +89,13 @@ def __init__( Initializes the Logger instance. Args: - error_counter (Counter): Counter for error occurrences. - log_level (int): Initial log level. - colorized (bool): Whether to colorize output. + error_counter: Counter for error occurrences. + log_level: Initial log level. + colorized: Whether to colorize output. """ self._error_counter: Counter = error_counter - self.sd_path: adafruit_pathlib.Path = sd_path self._log_level: int = log_level - self.colorized: bool = colorized - - try: - self.sd_path = self.sd_path / "logs" - except TypeError as e: - print(f"path not set: {e}") + self._colorized: bool = colorized def _can_print_this_level(self, level_value: int) -> bool: """ @@ -162,30 +154,15 @@ def _log(self, level: str, level_value: int, message: str, **kwargs) -> None: json_order.update(kwargs) - try: - json_output = json.dumps(json_order) - except TypeError as e: - json_output = json.dumps( - OrderedDict( - [ - ("time", asctime), - ("level", "ERROR"), - ("msg", f"Failed to serialize log message: {e}"), - ] - ), - ) + json_output = json.dumps(json_order) if self._can_print_this_level(level_value): - # Write to sd card if mounted - if self.path: - if "logs" not in self.sd_path.iterdir(): - print("/sd/logs does not exist, creating...") - os.mkdir("/sd/logs") - - with open("/sd/logs/activity.log", "a") as f: + if self._log_dir is not None: + file = self._log_dir + os.sep + "activity.log" + with open(file, "a") as f: f.write(json_output + "\n") - if self.colorized: + if self._colorized: json_output = json_output.replace( f'"level": "{level}"', f'"level": "{LogColors[level]}"' ) @@ -256,3 +233,26 @@ def get_error_count(self) -> int: int: The number of errors logged. """ return self._error_counter.get() + + def set_log_dir(self, log_dir: str) -> None: + """ + Sets the log directory for file logging. + + Args: + log_dir (str): Directory to save log files. + + Raises: + ValueError: If the provided path is not a valid directory. + """ + try: + # Octal number 0o040000 is the stat mode indicating the file being stat'd is a directory + directory_mode: int = 0o040000 + st_mode = os.stat(log_dir)[0] + if st_mode != directory_mode: + raise ValueError( + f"Logging path must be a directory, received {st_mode}." + ) + except OSError as e: + raise ValueError("Invalid logging path.") from e + + self._log_dir = log_dir diff --git a/tests/unit/boot/test_filesystem.py b/tests/unit/boot/test_filesystem.py new file mode 100644 index 00000000..35118653 --- /dev/null +++ b/tests/unit/boot/test_filesystem.py @@ -0,0 +1,177 @@ +"""Unit tests for the filesystem class. + +This file contains unit tests for the `filesystem` class, which provides utilities +for managing the filesystem during the boot process. The tests focus on verifying that +the correct sequence of filesystem operations. +""" + +# pyright: reportAttributeAccessIssue=false + +import os +import shutil +import tempfile +from unittest.mock import MagicMock, patch + +import pytest + +from pysquared.boot.filesystem import mkdir + + +@pytest.fixture(autouse=True) +def test_dir(): + """Sets up a temporary directory for testing and cleans it up afterwards.""" + temp_dir = tempfile.mkdtemp() + + test_dir = os.path.join(temp_dir, "mytestdir") + yield test_dir + + shutil.rmtree(temp_dir, ignore_errors=True) + + +@patch("pysquared.boot.filesystem.storage") +@patch("pysquared.boot.filesystem.time.sleep") +def test_mkdir_success( + mock_sleep: MagicMock, + mock_storage: MagicMock, + test_dir: str, + capsys: pytest.CaptureFixture[str], +) -> None: + """Test the mkdir function for successful directory creation.""" + mkdir(test_dir, 0.02) + + # Verify each expected call was made + mock_storage.disable_usb_drive.assert_called_once() + mock_storage.remount.assert_called_once_with("/", False) + mock_storage.enable_usb_drive.assert_called_once() + + assert mock_sleep.call_count == 3 + mock_sleep.assert_called_with(0.02) + + # Verify correct print messages + captured = capsys.readouterr() + assert "Disabled USB drive" in captured.out + assert "Remounted root filesystem" in captured.out + assert f"Mount point {test_dir} created." in captured.out + assert "Enabled USB drive" in captured.out + + # directory exists + assert os.path.exists(test_dir) + + +@patch("pysquared.boot.filesystem.storage") +@patch("pysquared.boot.filesystem.time.sleep") +@patch("pysquared.boot.filesystem.os.mkdir") +def test_mkdir_directory_already_exists( + mock_os_mkdir: MagicMock, + mock_sleep: MagicMock, + mock_storage: MagicMock, + test_dir: str, + capsys: pytest.CaptureFixture[str], +) -> None: + """Test the mkdir function when directory already exists.""" + mock_os_mkdir.side_effect = OSError("Directory already exists") + + mkdir(test_dir, 0.02) + + # Verify all storage operations still performed + mock_storage.disable_usb_drive.assert_called_once() + mock_storage.remount.assert_called_once_with("/", False) + mock_os_mkdir.assert_called_once_with(test_dir) + mock_storage.enable_usb_drive.assert_called_once() + + # Verify correct print messages + captured = capsys.readouterr() + assert "Disabled USB drive" in captured.out + assert "Remounted root filesystem" in captured.out + assert f"Mount point {test_dir} already exists." in captured.out + assert "Enabled USB drive" in captured.out + + +@patch("pysquared.boot.filesystem.storage") +@patch("pysquared.boot.filesystem.time.sleep") +@patch("pysquared.boot.filesystem.os.mkdir") +def test_mkdir_custom_delay( + mock_os_mkdir: MagicMock, + mock_sleep: MagicMock, + mock_storage: MagicMock, + test_dir: str, +) -> None: + """Test the mkdir function with custom storage action delay.""" + custom_delay = 0.05 + mkdir(test_dir, custom_delay) + + # Verify sleep was called with custom delay + assert mock_sleep.call_count == 3 + mock_sleep.assert_called_with(custom_delay) + + +@patch("pysquared.boot.filesystem.storage") +@patch("pysquared.boot.filesystem.time.sleep") +@patch("pysquared.boot.filesystem.os.mkdir") +def test_mkdir_default_delay( + mock_os_mkdir: MagicMock, + mock_sleep: MagicMock, + mock_storage: MagicMock, + test_dir: str, +) -> None: + """Test the mkdir function with default storage action delay.""" + mkdir(test_dir) # No delay parameter provided + + # Verify sleep was called with default delay + assert mock_sleep.call_count == 3 + mock_sleep.assert_called_with(0.02) + + +@patch("pysquared.boot.filesystem.storage") +@patch("pysquared.boot.filesystem.time.sleep") +@patch("pysquared.boot.filesystem.os.mkdir") +def test_mkdir_storage_disable_exception( + mock_os_mkdir: MagicMock, + mock_sleep: MagicMock, + mock_storage: MagicMock, + test_dir: str, +) -> None: + """Test mkdir function when storage.disable_usb_drive() raises an exception.""" + mock_storage.disable_usb_drive.side_effect = Exception("USB disable failed") + + with pytest.raises(Exception, match="USB disable failed"): + mkdir(test_dir, 0.02) + + # Verify that enable_usb_drive is still called in finally block + mock_storage.enable_usb_drive.assert_called_once() + + +@patch("pysquared.boot.filesystem.storage") +@patch("pysquared.boot.filesystem.time.sleep") +def test_mkdir_storage_remount_exception( + mock_sleep: MagicMock, + mock_storage: MagicMock, + test_dir: str, +) -> None: + """Test mkdir function when storage.remount() raises an exception.""" + mock_storage.remount.side_effect = Exception("Remount failed") + + with pytest.raises(Exception, match="Remount failed"): + mkdir(test_dir, 0.02) + + # Verify that enable_usb_drive is still called in finally block + mock_storage.enable_usb_drive.assert_called_once() + + +@patch("pysquared.boot.filesystem.storage") +@patch("pysquared.boot.filesystem.time.sleep") +@patch("pysquared.boot.filesystem.os.mkdir") +def test_mkdir_mkdir_exception_not_oserror( + mock_os_mkdir: MagicMock, + mock_sleep: MagicMock, + mock_storage: MagicMock, + test_dir: str, +) -> None: + """Test mkdir function when os.mkdir() raises a non-OSError exception.""" + mock_os_mkdir.side_effect = ValueError("Invalid path") + + with pytest.raises(ValueError, match="Invalid path"): + mkdir(test_dir, 0.02) + + # Verify that enable_usb_drive is still called in finally block + mock_storage.enable_usb_drive.assert_called_once() diff --git a/tests/unit/hardware/test_burnwire.py b/tests/unit/hardware/burnwire/manager/test_burnwire.py similarity index 100% rename from tests/unit/hardware/test_burnwire.py rename to tests/unit/hardware/burnwire/manager/test_burnwire.py diff --git a/tests/unit/hardware/sd_card/manager/test_sd_card.py b/tests/unit/hardware/sd_card/manager/test_sd_card.py new file mode 100644 index 00000000..a2322d63 --- /dev/null +++ b/tests/unit/hardware/sd_card/manager/test_sd_card.py @@ -0,0 +1,72 @@ +"""Unit tests for the filesystem class. + +This file contains unit tests for the `filesystem` class, which provides utilities +for managing the filesystem during the boot process. The tests focus on verifying that +the correct sequence of filesystem operations. +""" + +# pyright: reportAttributeAccessIssue=false + +import sys +from unittest.mock import MagicMock, patch + +import pytest +from busio import SPI +from microcontroller import Pin + +from pysquared.hardware.exception import HardwareInitializationError + +sys.modules["storage"] = MagicMock() +sys.modules["sdcardio"] = MagicMock() + +from pysquared.hardware.sd_card.manager.sd_card import SDCardManager # noqa: E402 + + +@patch("pysquared.hardware.sd_card.manager.sd_card.storage") +@patch("pysquared.hardware.sd_card.manager.sd_card.sdcardio") +def test_sdcard_init_success( + mock_sdcardio: MagicMock, + mock_storage: MagicMock, +) -> None: + """Test SD Card successful initialization.""" + mock_sd_card = MagicMock() + mock_sdcardio.SDCard.return_value = mock_sd_card + + mock_block_device = MagicMock() + mock_storage.VfsFat.return_value = mock_block_device + + spi = MagicMock(spec=SPI) + cs = MagicMock(spec=Pin) + baudrate = 400000 + mount_path = "/sd" + _ = SDCardManager( + spi_bus=spi, + chip_select=cs, + baudrate=baudrate, + mount_path=mount_path, + ) + + # Verify each expected call was made + mock_sdcardio.SDCard.assert_called_once_with(spi, cs, baudrate) + mock_storage.VfsFat.assert_called_once_with(mock_sd_card) + mock_storage.mount.assert_called_once_with(mock_block_device, mount_path) + + +@patch("pysquared.hardware.sd_card.manager.sd_card.storage") +@patch("pysquared.hardware.sd_card.manager.sd_card.sdcardio") +def test_sdcard_init_error( + mock_sdcardio: MagicMock, + mock_storage: MagicMock, +) -> None: + """Test SD Card failing initialization""" + mock_sdcardio.SDCard.side_effect = Exception("Evan (SD) Ortiz") + + with pytest.raises( + HardwareInitializationError, match="Failed to initialize SD Card" + ): + _ = SDCardManager( + spi_bus=MagicMock(spec=SPI), + chip_select=MagicMock(spec=Pin), + baudrate=400000, + mount_path="/sd", + ) diff --git a/tests/unit/test_logger.py b/tests/unit/test_logger.py index 152f15d6..1dc790f0 100644 --- a/tests/unit/test_logger.py +++ b/tests/unit/test_logger.py @@ -4,7 +4,11 @@ functionality with different severity levels, colorized output, and error counting. """ -from unittest.mock import MagicMock +# pyright: reportAttributeAccessIssue=false + +import os +import tempfile +from unittest.mock import MagicMock, patch import pytest from microcontroller import Pin @@ -282,3 +286,75 @@ def test_invalid_json_type_pin(capsys, logger): logger.debug("Initializing watchdog", pin=mock_pin) captured = capsys.readouterr() assert "TypeError" not in captured.out + + +@patch("pysquared.logger.os.stat") +def test_logger_init_with_valid_directory( + mock_os_stat: MagicMock, +): + """Tests Logger initialization with a valid directory for log_dir.""" + count = MagicMock(spec=Counter) + + mock_os_stat.return_value = [0o040000] + + logger = Logger(error_counter=count) + logger.set_log_dir("placeholder") + assert logger._log_dir == "placeholder" + + +@patch("pysquared.logger.os.stat") +def test_logger_init_with_not_a_directory( + mock_os_stat: MagicMock, +): + """Tests Logger initialization with filesystem object that is not a directory for log_dir.""" + count = MagicMock(spec=Counter) + + mock_os_stat.return_value = [1234] + + logger = Logger(error_counter=count) + + with pytest.raises(ValueError): + logger.set_log_dir("placeholder") + + +@patch("pysquared.logger.os.stat") +def test_logger_init_with_invalid_directory( + mock_os_stat: MagicMock, +): + """Tests Logger initialization with invalid directory for log_dir.""" + count = MagicMock(spec=Counter) + + mock_os_stat.side_effect = OSError("Stat failed") + + logger = Logger(error_counter=count) + + with pytest.raises(ValueError): + logger.set_log_dir("placeholder") + + +@patch("pysquared.logger.os.stat") +def test_log_to_file(mock_os_stat: MagicMock): + """Tests logging messages to a file.""" + count = MagicMock(spec=Counter) + + mock_os_stat.return_value = [0o040000] + + logger = Logger(error_counter=count) + + with tempfile.TemporaryDirectory() as temp_dir: + log_path = os.path.join(temp_dir, "poke") + os.mkdir(log_path) + logger.set_log_dir(log_path) + logger.info("Aaron Siemsen rocks") + + with open(os.path.join(log_path, "activity.log"), "r") as f: + contents = f.read() + assert "Aaron Siemsen rocks" in contents + + +def test_get_error_count(): + """Tests retrieving the error count from the Logger.""" + count = MagicMock(spec=Counter) + count.get.return_value = 5 + logger = Logger(error_counter=count) + assert logger.get_error_count() == 5