diff --git a/.gitignore b/.gitignore index 04f9bea8..1b98f3f2 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ __pycache__/ .coverage* .coverage-reports/ .DS_Store +.hypothesis .venv artifacts/ coverage-reports/ diff --git a/docs/design-guide.md b/docs/design-guide.md index 675359cb..0bce48fd 100644 --- a/docs/design-guide.md +++ b/docs/design-guide.md @@ -120,7 +120,7 @@ The following table lists possible sensor properties, their corresponding types | duty_cycle | int | 16-bit PWM duty cycle | | eCO2 | float | equivalent/estimated CO₂ in ppm | | frequency | int | Hertz (Hz) | -| gyro | (float, float, float)| x, y, z radians per second | +| angular velocity | (float, float, float)| x, y, z radians per second | | light | float | non-unit-specific light levels | | lux | float | SI lux | | magnetic | (float, float, float)| x, y, z micro-Tesla (uT) | diff --git a/mocks/adafruit_lis2mdl/lis2mdl.py b/mocks/adafruit_lis2mdl/lis2mdl.py index 17a12345..36e041e9 100644 --- a/mocks/adafruit_lis2mdl/lis2mdl.py +++ b/mocks/adafruit_lis2mdl/lis2mdl.py @@ -5,6 +5,8 @@ need for actual hardware. """ +from pysquared.sensor_reading.magnetic import Magnetic + class LIS2MDL: """A mock LIS2MDL magnetometer.""" @@ -17,4 +19,7 @@ def __init__(self, i2c) -> None: """ self.i2c = i2c - magnetic: tuple[float, float, float] = (0.0, 0.0, 0.0) + @property + async def async_magnetic(self): + """Asynchronously returns a mock magnetic field vector.""" + return Magnetic(0.0, 0.0, 0.0) diff --git a/mocks/adafruit_lsm6ds/lsm6dsox.py b/mocks/adafruit_lsm6ds/lsm6dsox.py index 041bc74a..62704431 100644 --- a/mocks/adafruit_lsm6ds/lsm6dsox.py +++ b/mocks/adafruit_lsm6ds/lsm6dsox.py @@ -21,5 +21,5 @@ def __init__(self, i2c_bus: I2C, address: int) -> None: ... acceleration: tuple[float, float, float] = (0.0, 0.0, 0.0) - gyro: tuple[float, float, float] = (0.0, 0.0, 0.0) + angular_velocity: tuple[float, float, float] = (0.0, 0.0, 0.0) temperature: float = 0.0 diff --git a/pyproject.toml b/pyproject.toml index 85ef457b..8bb063e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dev = [ "pre-commit==4.2.0", "pyright[nodejs]==1.1.402", "pytest==8.4.1", + "hypothesis==6.136.7", ] docs = [ "mkdocs-material==9.6.14", diff --git a/pysquared/attitude_control/b_dot_detumble.py b/pysquared/attitude_control/b_dot_detumble.py new file mode 100644 index 00000000..ac83d83b --- /dev/null +++ b/pysquared/attitude_control/b_dot_detumble.py @@ -0,0 +1,134 @@ +"""This file provides functions for detumbling the satellite using the B-dot algorithm. + +Coding style for this file foregoes more complex programming constructs in favor readability. +We assume that non-programmers may read this code to understand and validate detumbling logic. + +Units and concepts used in this file: +- B-dot detumbling algorithm + - https://en.wikipedia.org/wiki/Spacecraft_detumbling#Magnetic_control:_B-dot +- Magnetic field (B-field) strength in micro-Tesla (uT) + - https://en.wikipedia.org/wiki/Magnetic_field + - https://en.wikipedia.org/wiki/Tesla_(unit) +- Dipole moment in Ampere-square meter (A⋅m²) + - https://en.wikipedia.org/wiki/Magnetic_dipole + - https://en.wikipedia.org/wiki/Ampere + - https://en.wikipedia.org/wiki/Square_metre +""" + +import math + +from ..sensor_reading.magnetic import Magnetic + + +class BDotDetumble: + """B-dot detumbling for attitude control. + + Example usage: + ```python + b_dot_detumble = BDotDetumble(gain=1.0) + current_mag_field = Magnetic(value=(0.1, 0.2, 0.3), timestamp=1234567890) + previous_mag_field = Magnetic(value=(0.1, 0.2, 0.3), timestamp=1234567880) + dipole_moment = b_dot_detumble.dipole_moment(current_mag_field, previous_mag_field) + print(dipole_moment) + ``` + """ + + def __init__(self, gain: float = 1.0): + """Initializes the BDotDetumble class. + + Args: + gain: Gain constant for the B-dot detumbling algorithm. + + TODO(nateinaction): Create system for teams to set values that compute gain for them. + """ + self._gain = gain + + @staticmethod + def _magnitude(current_mag_field: Magnetic) -> float: + """ + Computes the magnitude of the magnetic field vector. + + Args: + current_mag_field: Magnetic object containing the current magnetic field vector. + + Returns: + The magnitude of the magnetic field vector. + """ + return math.sqrt( + current_mag_field.value[0] ** 2 + + current_mag_field.value[1] ** 2 + + current_mag_field.value[2] ** 2 + ) + + @staticmethod + def _dB_dt( + current_mag_field: Magnetic, previous_mag_field: Magnetic + ) -> tuple[float, float, float]: + """ + Computes the time derivative of the magnetic field vector. + + Args: + current_mag_field: Magnetic object containing the current magnetic field vector + previous_mag_field: Magnetic object containing the previous magnetic field vector + + Returns: + dB_dt: tuple of dB/dt (dBx/dt, dBy/dt, dBz/dt) in micro-Tesla per second (uT/s). + + Raises: + ValueError: If the time difference between the current and previous magnetic field readings is too small to compute dB/dt. + """ + dt = current_mag_field.timestamp - previous_mag_field.timestamp + if dt < 1e-6: + raise ValueError( + "Timestamp difference between current and previous magnetic field readings is too small to compute dB/dt." + ) + + dBx_dt = (current_mag_field.value[0] - previous_mag_field.value[0]) / dt + dBy_dt = (current_mag_field.value[1] - previous_mag_field.value[1]) / dt + dBz_dt = (current_mag_field.value[2] - previous_mag_field.value[2]) / dt + return (dBx_dt, dBy_dt, dBz_dt) + + def dipole_moment( + self, current_mag_field: Magnetic, previous_mag_field: Magnetic + ) -> tuple[float, float, float]: + """ + Computes the required dipole moment to detumble the satellite. + + m = -k * (dB/dt) / |B| + + m is the dipole moment in A⋅m² + k is a gain constant + dB/dt is the time derivative of the magnetic field reading in micro-Tesla per second (uT/s) + |B| is the magnitude of the magnetic field vector in micro-Tesla (uT) + + Args: + current_mag_field: Magnetic object containing the current magnetic field vector. + previous_mag_field: Magnetic object containing the previous magnetic field vector. + + Returns: + The dipole moment in A⋅m² as a tuple (moment_x, moment_y, moment_z). + + Raises: + ValueError: If the magnitude of the current magnetic field is too small to compute the dipole moment. + or if the time difference between the current and previous magnetic field readings is less than or equal to 0. + """ + magnitude = self._magnitude(current_mag_field) + if magnitude < 1e-6: + raise ValueError( + "Current magnetic field magnitude is too small to compute dipole moment." + ) + + if current_mag_field.timestamp <= previous_mag_field.timestamp: + raise ValueError( + "Current magnetic field timestamp must be greater than previous magnetic field timestamp." + ) + + try: + dBx_dt, dBy_dt, dBz_dt = self._dB_dt(current_mag_field, previous_mag_field) + except ValueError: + raise + + moment_x = -self._gain * dBx_dt / magnitude + moment_y = -self._gain * dBy_dt / magnitude + moment_z = -self._gain * dBz_dt / magnitude + return (moment_x, moment_y, moment_z) diff --git a/pysquared/attitude_control/detumble_service.py b/pysquared/attitude_control/detumble_service.py new file mode 100644 index 00000000..4a8a17f8 --- /dev/null +++ b/pysquared/attitude_control/detumble_service.py @@ -0,0 +1,80 @@ +"""Detumble attitude control service using B-dot algorithm with magnetorquers.""" + +import asyncio +import time + +from ..logger import Logger +from ..protos.magnetometer import MagnetometerProto +from ..protos.magnetorquer import MagnetorquerProto +from ..sensor_reading.magnetic import Magnetic +from .b_dot_detumble import BDotDetumble + + +class DetumbleService: + """Attitude control service implementing B-dot detumble algorithm.""" + + previous_mag_field: Magnetic | None = None + + def __init__( + self, + logger: Logger, + magnetometer: MagnetometerProto, + magnetorquer: MagnetorquerProto, + control_period: float = 1.0, + ) -> None: + """Initialize the detumble service. + + Args: + logger: Logger instance + magnetometer: Magnetometer sensor interface + magnetorquer: Magnetorquer control interface + control_period: Control loop period in seconds + """ + self._logger = logger + self._magnetometer = magnetometer + self._magnetorquer = magnetorquer + self._control_period = control_period + + def execute_control_step(self) -> None: + """Execute one step of the detumble control algorithm.""" + # Get sensor readings + try: + magnetic_field = self._magnetometer.get_magnetic_field() + except Exception: + raise + + if self.previous_mag_field is None: + self.previous_mag_field = magnetic_field + return + + # Calculate required dipole moment using B-dot algorithm + try: + dipole_moment = BDotDetumble().dipole_moment( + current_mag_field=magnetic_field, + previous_mag_field=self.previous_mag_field, + ) + except Exception: + raise + finally: + self.previous_mag_field = magnetic_field + + # Apply dipole moment to magnetorquers + self._magnetorquer.set_dipole_moment(dipole_moment) + + async def run_detumble_loop(self, max_iterations: int = 1000) -> None: + """Run the detumble control loop for a specified number of iterations. + + Args: + max_iterations: Maximum number of control iterations + """ + for i in range(max_iterations): + start_time = time.monotonic() + + success = self.execute_control_step() + if not success: + self._logger.warning(f"Control step {i} failed, continuing...") + + # Maintain control period timing + elapsed = time.monotonic() - start_time + if elapsed < self._control_period: + await asyncio.sleep(self._control_period - elapsed) diff --git a/pysquared/beacon.py b/pysquared/beacon.py index b624828c..22bf9b5a 100644 --- a/pysquared/beacon.py +++ b/pysquared/beacon.py @@ -31,9 +31,10 @@ from .protos.power_monitor import PowerMonitorProto from .protos.radio import RadioProto from .protos.temperature_sensor import TemperatureSensorProto +from .sensor_reading.avg import avg_readings try: - from typing import Callable, OrderedDict + from typing import OrderedDict except Exception: pass @@ -85,7 +86,27 @@ def send(self) -> bool: Returns: True if the beacon was sent successfully, False otherwise. """ + state = self._build_beacon_state() + beacon_data = json.dumps(state, separators=(",", ":")).encode("utf-8") + return self._packet_manager.send(beacon_data) + + def _build_beacon_state(self) -> OrderedDict[str, object]: + """Builds the beacon state dictionary with system info and sensor data. + + Returns: + OrderedDict containing the beacon state data. + """ state: OrderedDict[str, object] = OrderedDict() + self._add_system_info(state) + self._add_sensor_data(state) + return state + + def _add_system_info(self, state: OrderedDict[str, object]) -> None: + """Adds system information to the beacon state. + + Args: + state: The state dictionary to update. + """ state["name"] = self._name # Warning: CircuitPython does not support time.gmtime(), when testing this code it will use your local timezone @@ -96,63 +117,149 @@ def send(self) -> bool: state["uptime"] = time.time() - self._boot_time + def _add_sensor_data(self, state: OrderedDict[str, object]) -> None: + """Adds sensor data to the beacon state. + + Args: + state: The state dictionary to update. + """ for index, sensor in enumerate(self._sensors): if isinstance(sensor, Processor): - sensor_name = sensor.__class__.__name__ - state[f"{sensor_name}_{index}_temperature"] = sensor.temperature - if isinstance(sensor, Flag): - state[f"{sensor.get_name()}_{index}"] = sensor.get() - if isinstance(sensor, Counter): - state[f"{sensor.get_name()}_{index}"] = sensor.get() - if isinstance(sensor, RadioProto): - sensor_name = sensor.__class__.__name__ - state[f"{sensor_name}_{index}_modulation"] = ( - sensor.get_modulation().__name__ - ) - if isinstance(sensor, IMUProto): - sensor_name: str = sensor.__class__.__name__ - state[f"{sensor_name}_{index}_acceleration"] = sensor.get_acceleration() - state[f"{sensor_name}_{index}_gyroscope"] = sensor.get_gyro_data() - if isinstance(sensor, PowerMonitorProto): - sensor_name: str = sensor.__class__.__name__ - state[f"{sensor_name}_{index}_current_avg"] = self.avg_readings( - sensor.get_current - ) - state[f"{sensor_name}_{index}_bus_voltage_avg"] = self.avg_readings( - sensor.get_bus_voltage - ) - state[f"{sensor_name}_{index}_shunt_voltage_avg"] = self.avg_readings( - sensor.get_shunt_voltage - ) - if isinstance(sensor, TemperatureSensorProto): - sensor_name = sensor.__class__.__name__ - reading = sensor.get_temperature() - state[f"{sensor_name}_{index}_temperature"] = reading.value - state[f"{sensor_name}_{index}_temperature_timestamp"] = ( - reading.timestamp - ) - - b = json.dumps(state, separators=(",", ":")).encode("utf-8") - return self._packet_manager.send(b) - - def avg_readings( - self, func: Callable[..., float | None], num_readings: int = 50 - ) -> float | None: - """Gets the average of the readings from a function. + self._add_processor_data(state, sensor, index) + elif isinstance(sensor, Flag): + self._add_flag_data(state, sensor, index) + elif isinstance(sensor, Counter): + self._add_counter_data(state, sensor, index) + elif isinstance(sensor, RadioProto): + self._add_radio_data(state, sensor, index) + elif isinstance(sensor, IMUProto): + self._add_imu_data(state, sensor, index) + elif isinstance(sensor, PowerMonitorProto): + self._add_power_monitor_data(state, sensor, index) + elif isinstance(sensor, TemperatureSensorProto): + self._add_temperature_sensor_data(state, sensor, index) - Args: - func: The function to call. - num_readings: The number of readings to take. + def _add_processor_data( + self, state: OrderedDict[str, object], sensor: Processor, index: int + ) -> None: + """Adds processor data to the beacon state.""" + sensor_name = sensor.__class__.__name__ + state[f"{sensor_name}_{index}_temperature"] = sensor.temperature - Returns: - The average of the readings, or None if the readings could not be taken. + def _add_flag_data( + self, state: OrderedDict[str, object], sensor: Flag, index: int + ) -> None: + """Adds flag data to the beacon state.""" + state[f"{sensor.get_name()}_{index}"] = sensor.get() + + def _add_counter_data( + self, state: OrderedDict[str, object], sensor: Counter, index: int + ) -> None: + """Adds counter data to the beacon state.""" + state[f"{sensor.get_name()}_{index}"] = sensor.get() + + def _add_radio_data( + self, state: OrderedDict[str, object], sensor: RadioProto, index: int + ) -> None: + """Adds radio data to the beacon state.""" + sensor_name = sensor.__class__.__name__ + state[f"{sensor_name}_{index}_modulation"] = sensor.get_modulation().__name__ + + def _add_imu_data( + self, state: OrderedDict[str, object], sensor: IMUProto, index: int + ) -> None: + """Adds IMU data to the beacon state.""" + sensor_name = sensor.__class__.__name__ + + self._safe_add_sensor_reading( + state, + f"{sensor_name}_{index}_acceleration", + lambda: sensor.get_acceleration().to_dict(), + "Error retrieving acceleration", + sensor_name, + index, + ) + + self._safe_add_sensor_reading( + state, + f"{sensor_name}_{index}_angular_velocityscope", + lambda: sensor.get_angular_velocity().to_dict(), + "Error retrieving angular velocity", + sensor_name, + index, + ) + + def _add_power_monitor_data( + self, state: OrderedDict[str, object], sensor: PowerMonitorProto, index: int + ) -> None: + """Adds power monitor data to the beacon state.""" + sensor_name = sensor.__class__.__name__ + + self._safe_add_sensor_reading( + state, + f"{sensor_name}_{index}_current_avg", + lambda: avg_readings(sensor.get_current), + "Error retrieving current", + sensor_name, + index, + ) + + self._safe_add_sensor_reading( + state, + f"{sensor_name}_{index}_bus_voltage_avg", + lambda: avg_readings(sensor.get_bus_voltage), + "Error retrieving bus voltage", + sensor_name, + index, + ) + + self._safe_add_sensor_reading( + state, + f"{sensor_name}_{index}_shunt_voltage_avg", + lambda: avg_readings(sensor.get_shunt_voltage), + "Error retrieving shunt voltage", + sensor_name, + index, + ) + + def _add_temperature_sensor_data( + self, + state: OrderedDict[str, object], + sensor: TemperatureSensorProto, + index: int, + ) -> None: + """Adds temperature sensor data to the beacon state.""" + sensor_name = sensor.__class__.__name__ + + self._safe_add_sensor_reading( + state, + f"{sensor_name}_{index}_temperature", + lambda: sensor.get_temperature().to_dict(), + "Error retrieving temperature", + sensor_name, + index, + ) + + def _safe_add_sensor_reading( + self, + state: OrderedDict[str, object], + key: str, + reading_func, + error_msg: str, + sensor_name: str, + index: int, + ) -> None: + """Safely adds a sensor reading to the state with error handling. + + Args: + state: The state dictionary to update. + key: The key to store the reading under. + reading_func: Function that returns the sensor reading. + error_msg: Error message to log if reading fails. + sensor_name: Name of the sensor for logging. + index: Index of the sensor for logging. """ - readings: float = 0 - for _ in range(num_readings): - reading = func() - if reading is None: - self._log.warning(f"Couldn't acquire {func.__name__}") - return - - readings += reading - return readings / num_readings + try: + state[key] = reading_func() + except Exception as e: + self._log.error(error_msg, e, sensor=sensor_name, index=index) diff --git a/pysquared/detumble.py b/pysquared/detumble.py deleted file mode 100644 index e5f93b65..00000000 --- a/pysquared/detumble.py +++ /dev/null @@ -1,64 +0,0 @@ -"""This module provides functions for satellite detumbling using magnetorquers. -Includes vector math utilities and the main dipole calculation for attitude control. -""" - - -def dot_product(vector1: tuple, vector2: tuple) -> float: - """ - Computes the dot product of two 3-element vectors. - - Args: - vector1 (tuple): First vector (length 3). - vector2 (tuple): Second vector (length 3). - - Returns: - float: The dot product of the two vectors. - """ - return sum([a * b for a, b in zip(vector1, vector2)]) - - -def x_product(vector1: tuple, vector2: tuple) -> list: - """ - Computes the cross product of two 3-element vectors. - - Args: - vector1 (tuple): First vector (length 3). - vector2 (tuple): Second vector (length 3). - - Returns: - list: The cross product vector (length 3). - """ - return [ - vector1[1] * vector2[2] - vector1[2] * vector2[1], - vector1[0] * vector2[2] - vector1[2] * vector2[0], - vector1[0] * vector2[1] - vector1[1] * vector2[0], - ] - - -def gain_func() -> float: - """ - Returns the gain value for the detumble control law. - - Returns: - float: Gain value (default 1.0). - """ - return 1.0 - - -def magnetorquer_dipole(mag_field: tuple, ang_vel: tuple) -> list: - """ - Calculates the required dipole moment for the magnetorquers to detumble the satellite. - - Args: - mag_field (tuple): The measured magnetic field vector (length 3). - ang_vel (tuple): The measured angular velocity vector (length 3). - - Returns: - list: The dipole moment vector to be applied (length 3). - """ - gain = gain_func() - scalar_coef = -gain / pow(dot_product(mag_field, mag_field), 0.5) - dipole_out = x_product(mag_field, ang_vel) - for i in range(3): - dipole_out[i] *= scalar_coef - return dipole_out diff --git a/pysquared/hardware/imu/manager/lsm6dsox.py b/pysquared/hardware/imu/manager/lsm6dsox.py index da8bcd39..06c7074a 100644 --- a/pysquared/hardware/imu/manager/lsm6dsox.py +++ b/pysquared/hardware/imu/manager/lsm6dsox.py @@ -1,13 +1,13 @@ """This module defines the `LSM6DSOXManager` class, which provides a high-level interface for interacting with the LSM6DSOX inertial measurement unit. It handles the initialization of the sensor and -provides methods for reading gyroscope, acceleration, and temperature data. +provides methods for reading angular_velocityscope, acceleration, and temperature data. **Usage:** ```python logger = Logger() i2c = busio.I2C(board.SCL, board.SDA) imu = LSM6DSOXManager(logger, i2c, 0x6A) -gyro_data = imu.get_gyro_data() +angular_velocity = imu.get_angular_velocity() accel_data = imu.get_acceleration() temp_data = imu.get_temperature() ``` @@ -19,7 +19,11 @@ from ....logger import Logger from ....protos.imu import IMUProto from ....protos.temperature_sensor import TemperatureSensorProto -from ....sensor_reading.error import SensorReadingUnknownError +from ....sensor_reading.acceleration import Acceleration +from ....sensor_reading.angular_velocity import AngularVelocity +from ....sensor_reading.error import ( + SensorReadingUnknownError, +) from ....sensor_reading.temperature import Temperature from ...exception import HardwareInitializationError @@ -51,29 +55,41 @@ def __init__( except Exception as e: raise HardwareInitializationError("Failed to initialize IMU") from e - def get_gyro_data(self) -> tuple[float, float, float] | None: - """Gets the gyroscope data from the IMU. + def get_angular_velocity(self) -> AngularVelocity: + """Gets the angular velocity from the IMU. Returns: - A tuple containing the x, y, and z angular acceleration values in - radians per second, or None if the data is not available. + An AngularVelocity object containing the x, y, and z angular velocity in radians per second. + + Raises: + SensorReadingUnknownError: If an unknown error occurs while reading the angular velocity. """ try: - return self._imu.gyro + return AngularVelocity( + self._imu.gyro[0], + self._imu.gyro[1], + self._imu.gyro[2], + ) except Exception as e: - self._log.error("Error retrieving IMU gyro sensor values", e) + raise SensorReadingUnknownError("Failed to read angular velocity") from e - def get_acceleration(self) -> tuple[float, float, float] | None: + def get_acceleration(self) -> Acceleration: """Gets the acceleration data from the IMU. Returns: - A tuple containing the x, y, and z acceleration values in m/s^2, or - None if the data is not available. + An Acceleration object containing the x, y, and z acceleration values in m/s². + + Raises: + SensorReadingUnknownError: If an unknown error occurs while reading the acceleration data. """ try: - return self._imu.acceleration + return Acceleration( + self._imu.acceleration[0], + self._imu.acceleration[1], + self._imu.acceleration[2], + ) except Exception as e: - self._log.error("Error retrieving IMU acceleration sensor values", e) + raise SensorReadingUnknownError("Failed to read acceleration") from e def get_temperature(self) -> Temperature: """Gets the temperature reading from the IMU. @@ -87,6 +103,4 @@ def get_temperature(self) -> Temperature: try: return Temperature(self._imu.temperature) except Exception as e: - raise SensorReadingUnknownError( - "Failed to read temperature from IMU" - ) from e + raise SensorReadingUnknownError("Failed to read temperature") from e diff --git a/pysquared/hardware/magnetometer/manager/lis2mdl.py b/pysquared/hardware/magnetometer/manager/lis2mdl.py index 3c5900a7..6da48aad 100644 --- a/pysquared/hardware/magnetometer/manager/lis2mdl.py +++ b/pysquared/hardware/magnetometer/manager/lis2mdl.py @@ -7,7 +7,7 @@ logger = Logger() i2c = busio.I2C(board.SCL, board.SDA) magnetometer = LIS2MDLManager(logger, i2c) -mag_data = magnetometer.get_vector() +mag_field = magnetometer.get_magnetic_field() ``` """ @@ -16,6 +16,10 @@ from ....logger import Logger from ....protos.magnetometer import MagnetometerProto +from ....sensor_reading.error import ( + SensorReadingUnknownError, +) +from ....sensor_reading.magnetic import Magnetic from ...exception import HardwareInitializationError @@ -46,14 +50,24 @@ def __init__( "Failed to initialize magnetometer" ) from e - def get_vector(self) -> tuple[float, float, float] | None: + def get_magnetic_field(self) -> Magnetic: """Gets the magnetic field vector from the magnetometer. Returns: - A tuple containing the x, y, and z magnetic field values in Gauss, or - None if the data is not available. + A Magnetic object containing the x, y, and z magnetic field values in micro-Tesla (uT) + + Raises: + SensorReadingTimeoutError: If the reading times out. + SensorReadingUnknownError: If an unknown error occurs while reading the magnetometer. """ try: - return self._magnetometer.magnetic + m = self._magnetometer.magnetic + return Magnetic( + x=m[0], + y=m[1], + z=m[2], + ) except Exception as e: - self._log.error("Error retrieving magnetometer sensor values", e) + raise SensorReadingUnknownError( + "Unknown error while reading magnetometer data" + ) from e diff --git a/pysquared/hardware/magnetorquer/__init__.py b/pysquared/hardware/magnetorquer/__init__.py new file mode 100644 index 00000000..28cf1410 --- /dev/null +++ b/pysquared/hardware/magnetorquer/__init__.py @@ -0,0 +1 @@ +"""This module implements the magnetorquer protocol.""" diff --git a/pysquared/hardware/magnetorquer/manager/__init__.py b/pysquared/hardware/magnetorquer/manager/__init__.py new file mode 100644 index 00000000..28cf1410 --- /dev/null +++ b/pysquared/hardware/magnetorquer/manager/__init__.py @@ -0,0 +1 @@ +"""This module implements the magnetorquer protocol.""" diff --git a/pysquared/hardware/magnetorquer/manager/proves_v3_manager.py b/pysquared/hardware/magnetorquer/manager/proves_v3_manager.py new file mode 100644 index 00000000..68426009 --- /dev/null +++ b/pysquared/hardware/magnetorquer/manager/proves_v3_manager.py @@ -0,0 +1,130 @@ +"""This module implements the Proves V3 magnetorquer manager. +It provides an interface for controlling the magnetorquers on the Proves V3 hardware. +It inherits from the MagnetorquerProto protocol to ensure it implements the required methods. + +Coding style for this file foregoes more complex programming constructs in favor readability. +We assume that non-programmers may read this code to understand and validate detumbling logic. +""" + +import math + +from ....protos.magnetorquer import MagnetorquerProto + + +class ProvesV3Manager(MagnetorquerProto): + """Manager for Proves V3 hardware components.""" + + _coil_voltage = 3.3 + """Voltage supplied to the magnetorquers in Volts.""" + + _coil_num_turns_x_y = 2 * 24 + """Number of turns in the x and y-axis magnetorquer coil. + The x and y magenetorquer coil consists of 2 layers of 24 turns each. + """ + + _coil_length_x_y = 0.053 + """Length of the x and y-axis coil in meters.""" + + _coil_width_x_y = 0.045 + """Width of the x and y-axis coil in meters.""" + + _coil_area_x_y = _coil_length_x_y * _coil_width_x_y + """Area of the x and y-axis coil in square meters.""" + + _coil_resistance_x_y = 57.2 + """Resistance of the x and y-axis coil in ohms.""" + + _coil_max_current_x_y = _coil_voltage / _coil_resistance_x_y + """Maximum current for the x and y-axis magnetorquers in Amperes.""" + + _coil_num_turns_z = 3 * 51 + """Number of turns in the z-axis magnetorquer coil. + The z magenetorquer coil consists of 3 layers of 51 turns each. + """ + + _coil_diameter_z = 0.05755 + """Diameter of the z-axis coil in meters.""" + + _coil_area_z = math.pi * (_coil_diameter_z / 2) ** 2 + """Area of the z-axis coil in square meters.""" + + _coil_resistance_z = 248.8 + """Resistance of the z-axis coil in ohms.""" + + _coil_max_current_z = _coil_voltage / _coil_resistance_z + """Maximum current for the z-axis magnetorquer in Amperes.""" + + def __init__(self) -> None: + """Initializes the Proves V3 Manager.""" + pass + + def set_dipole_moment(self, dipole_moment: tuple[float, float, float]) -> None: + """Set the magnetic dipole moment for all three axes. + + Args: + dipole_moment: A tuple containing the dipole moment for each axis (x, y, z) in A⋅m². + """ + # Convert dipole moment to current for each axis. + current = self._current_from_dipole_moment(dipole_moment) + + # Limit the current to the maximum allowed current. + limited_current = self._limit_current(current) + + # On Proves V3 we have 2 x-axis, 2 y-axis, and 1 z-axis magnetorquers. + # To not cancel out the x and y components we reverse the current for one of each. + _ = limited_current[0] # x1 + _ = -limited_current[0] # x2 + _ = limited_current[1] # y1 + _ = -limited_current[1] # y2 + _ = limited_current[2] # z1 + + # TODO(nateinaction): Set the current for each magnetorquer + + def _limit_current( + self, current: tuple[float, float, float] + ) -> tuple[float, float, float]: + """Limits the current for each axis to max_current. + + TODO(nateinaction): Michael said that we may want to output a percentage of max current instead of the actual current. + + Args: + current: A tuple containing the current for each axis (x, y, z) in Amperes. + + Returns: + A tuple containing the limited current for each axis. + """ + # Take the minimum of the absolute value of the current and the maximum current for that axis. + # If the current is negative, we keep the sign. + # This ensures that we do not exceed the maximum current for each axis. + return ( + min(abs(current[0]), self._coil_max_current_x_y) + * (1 if current[0] >= 0 else -1), + min(abs(current[1]), self._coil_max_current_x_y) + * (1 if current[1] >= 0 else -1), + min(abs(current[2]), self._coil_max_current_z) + * (1 if current[2] >= 0 else -1), + ) + + def _current_from_dipole_moment( + self, dipole_moment: tuple[float, float, float] + ) -> tuple[float, float, float]: + """ + Converts the dipole moment to current for each axis. + + I = m / (N * A) + + I is the coil current in A + m is magnetic dipole moment in A·m² + N is the number of turns of the coil + A is the area of one turn of the coil in m² + + Args: + dipole_moment: A tuple containing the dipole moment for each axis (x, y, z) in A⋅m². + + Returns: + A tuple containing the current for each axis (current_x, current_y, current_z) in Amperes. + """ + current_x = dipole_moment[0] / (self._coil_num_turns_x_y * self._coil_area_x_y) + current_y = dipole_moment[1] / (self._coil_num_turns_x_y * self._coil_area_x_y) + current_z = dipole_moment[2] / (self._coil_num_turns_z * self._coil_area_z) + return (current_x, current_y, current_z) diff --git a/pysquared/hardware/power_monitor/manager/ina219.py b/pysquared/hardware/power_monitor/manager/ina219.py index 63fcb1c4..7a00789f 100644 --- a/pysquared/hardware/power_monitor/manager/ina219.py +++ b/pysquared/hardware/power_monitor/manager/ina219.py @@ -18,6 +18,11 @@ from ....logger import Logger from ....protos.power_monitor import PowerMonitorProto +from ....sensor_reading.current import Current +from ....sensor_reading.error import ( + SensorReadingUnknownError, +) +from ....sensor_reading.voltage import Voltage from ...exception import HardwareInitializationError @@ -49,35 +54,44 @@ def __init__( "Failed to initialize INA219 power monitor" ) from e - def get_bus_voltage(self) -> float | None: + def get_bus_voltage(self) -> Voltage: """Gets the bus voltage from the INA219. Returns: - The bus voltage in volts, or None if the data is not available. + A Voltage object containing the bus voltage in volts. + + Raises: + SensorReadingUnknownError: If an unknown error occurs while reading the light sensor. """ try: - return self._ina219.bus_voltage + return Voltage(self._ina219.bus_voltage) except Exception as e: - self._log.error("Error retrieving bus voltage", e) + raise SensorReadingUnknownError("Failed to get bus voltage") from e - def get_shunt_voltage(self) -> float | None: + def get_shunt_voltage(self) -> Voltage: """Gets the shunt voltage from the INA219. Returns: - The shunt voltage in volts, or None if the data is not available. + A Voltage object containing the shunt voltage in volts. + + Raises: + SensorReadingUnknownError: If an unknown error occurs while reading the light sensor. """ try: - return self._ina219.shunt_voltage + return Voltage(self._ina219.shunt_voltage) except Exception as e: - self._log.error("Error retrieving shunt voltage", e) + raise SensorReadingUnknownError("Failed to get shunt voltage") from e - def get_current(self) -> float | None: + def get_current(self) -> Current: """Gets the current from the INA219. Returns: - The current in amps, or None if the data is not available. + A Current object containing the current in milliamps (mA) + + Raises: + SensorReadingUnknownError: If an unknown error occurs while reading the light sensor. """ try: - return self._ina219.current + return Current(self._ina219.current) except Exception as e: - self._log.error("Error retrieving current", e) + raise SensorReadingUnknownError("Failed to get current") from e diff --git a/pysquared/power_health.py b/pysquared/power_health.py index b6598a8a..cde8cd31 100644 --- a/pysquared/power_health.py +++ b/pysquared/power_health.py @@ -17,12 +17,7 @@ from .config.config import Config from .logger import Logger from .protos.power_monitor import PowerMonitorProto - -try: - from typing import Callable, List - -except Exception: - pass +from .sensor_reading.avg import avg_readings class State: @@ -81,76 +76,44 @@ def get(self) -> NOMINAL | DEGRADED | CRITICAL | UNKNOWN: Returns: The current power health state. """ - errors: List[str] = [] - self.logger.debug("Power monitor: ", sensor=self._power_monitor) - - # Wrap sensor reading calls in try/catch and handle None values try: - bus_voltage = self._avg_reading(self._power_monitor.get_bus_voltage) - if bus_voltage is None: - self.logger.warning( - "Power monitor failed to provide bus voltage reading" - ) - return UNKNOWN() - - current = self._avg_reading(self._power_monitor.get_current) - if current is None: - self.logger.warning("Power monitor failed to provide current reading") - return UNKNOWN() + bus_voltage = avg_readings(self._power_monitor.get_bus_voltage) + except Exception as e: + self.logger.error("Error retrieving bus voltage", e) + return UNKNOWN() + try: + current = avg_readings(self._power_monitor.get_current) except Exception as e: - self.logger.error("Exception occurred while reading from power monitor", e) + self.logger.error("Error retrieving current", e) return UNKNOWN() - # Critical check first - if battery voltage is below critical threshold if bus_voltage <= self.config.critical_battery_voltage: self.logger.warning( - f"CRITICAL: Battery voltage {bus_voltage:.1f}V is at or below critical threshold {self.config.critical_battery_voltage}V" + "Power is CRITICAL", + voltage=bus_voltage, + threshold=self.config.critical_battery_voltage, ) return CRITICAL() - # Check current deviation from normal if ( abs(current - self.config.normal_charge_current) > self.config.normal_charge_current ): - errors.append( - f"Current reading {current:.1f} is outside of normal range {self.config.normal_charge_current}" + self.logger.warning( + "Power is DEGRADED: Current above threshold", + current=current, + threshold=self.config.normal_charge_current, ) + return DEGRADED() - # Check if bus voltage is below degraded threshold if bus_voltage <= self.config.degraded_battery_voltage: - errors.append( - f"Bus voltage reading {bus_voltage:.1f}V is at or below degraded threshold {self.config.degraded_battery_voltage}V" - ) - - if len(errors) > 0: - self.logger.info( - "Power health is NOMINAL with minor deviations", errors=errors + self.logger.warning( + "Power is DEGRADED: Bus voltage below threshold", + voltage=bus_voltage, + threshold=self.config.degraded_battery_voltage, ) return DEGRADED() - else: - self.logger.info("Power health is NOMINAL") - return NOMINAL() - - def _avg_reading( - self, func: Callable[..., float | None], num_readings: int = 50 - ) -> float | None: - """Gets the average reading from a sensor. - Args: - func: The function to call to get a reading. - num_readings: The number of readings to take. - - Returns: - The average of the readings, or None if a reading could not be taken. - """ - readings: float = 0.0 - for _ in range(num_readings): - reading = func() - if reading is None: - func_name = getattr(func, "__name__", "unknown_function") - self.logger.warning(f"Couldn't get reading from {func_name}") - return - readings += reading - return readings / num_readings + self.logger.debug("Power health is NOMINAL") + return NOMINAL() diff --git a/pysquared/protos/imu.py b/pysquared/protos/imu.py index 777ae90a..c2ff038b 100644 --- a/pysquared/protos/imu.py +++ b/pysquared/protos/imu.py @@ -2,30 +2,35 @@ ensuring consistent behavior across different IMU hardware. """ +from ..sensor_reading.acceleration import Acceleration +from ..sensor_reading.angular_velocity import AngularVelocity + class IMUProto: """Protocol defining the interface for an Inertial Measurement Unit (IMU).""" - def get_gyro_data(self) -> tuple[float, float, float] | None: - """Gets the gyroscope data from the inertial measurement unit. + def get_angular_velocity(self) -> AngularVelocity: + """Gets the angular velocity from the inertial measurement unit. Returns: - A tuple containing the x, y, and z angular acceleration values in - radians per second, or None if not available. + An AngularVelocity object containing the x, y, and z angular velocity in radians per second. Raises: - Exception: If there is an error retrieving the values. + SensorReadingValueError: If the reading returns an invalid value. + SensorReadingTimeoutError: If the reading times out. + SensorReadingUnknownError: If an unknown error occurs while reading the light sensor. """ ... - def get_acceleration(self) -> tuple[float, float, float] | None: + def get_acceleration(self) -> Acceleration: """Gets the acceleration data from the inertial measurement unit. Returns: - A tuple containing the x, y, and z acceleration values in m/s^2, or - None if not available. + An Acceleration object containing the x, y, and z acceleration values in m/s². Raises: - Exception: If there is an error retrieving the values. + SensorReadingValueError: If the reading returns an invalid value. + SensorReadingTimeoutError: If the reading times out. + SensorReadingUnknownError: If an unknown error occurs while reading the light sensor. """ ... diff --git a/pysquared/protos/light_sensor.py b/pysquared/protos/light_sensor.py index 2461c0f4..8d4c1b0d 100644 --- a/pysquared/protos/light_sensor.py +++ b/pysquared/protos/light_sensor.py @@ -24,7 +24,7 @@ def get_light(self) -> Light: ... def get_lux(self) -> Lux: - """Gets the light reading of the sensor. + """Gets the lux reading of the sensor. Returns: A Lux object containing the light level in SI lux. diff --git a/pysquared/protos/magnetometer.py b/pysquared/protos/magnetometer.py index d0fca3d4..4b076781 100644 --- a/pysquared/protos/magnetometer.py +++ b/pysquared/protos/magnetometer.py @@ -2,18 +2,20 @@ adhere to, ensuring consistent behavior across different magnetometer hardware. """ +from ..sensor_reading.magnetic import Magnetic + class MagnetometerProto: """Protocol defining the interface for a Magnetometer.""" - def get_vector(self) -> tuple[float, float, float] | None: + def get_magnetic_field(self) -> Magnetic: """Gets the magnetic field vector from the magnetometer. Returns: - A tuple containing the x, y, and z magnetic field values in Gauss, or - None if not available. + A Magnetic object containing the x, y, and z magnetic field values in micro-Tesla (uT) Raises: - Exception: If there is an error retrieving the values. + SensorReadingTimeoutError: If the reading times out. + SensorReadingUnknownError: If an unknown error occurs while reading the magnetometer. """ ... diff --git a/pysquared/protos/magnetorquer.py b/pysquared/protos/magnetorquer.py new file mode 100644 index 00000000..35c51515 --- /dev/null +++ b/pysquared/protos/magnetorquer.py @@ -0,0 +1,15 @@ +"""This protocol specifies the interface that any magnetorquer implementation must +adhere to, ensuring consistent behavior across different magnetorquer hardware. +""" + + +class MagnetorquerProto: + """Protocol defining the interface for magnetorquer control.""" + + def set_dipole_moment(self, dipole_moment: tuple[float, float, float]) -> None: + """Set the magnetic dipole moment for all three axes. + + Args: + dipole_moment: A tuple containing the dipole moment for each axis (x, y, z) in A⋅m². + """ + ... diff --git a/pysquared/protos/power_monitor.py b/pysquared/protos/power_monitor.py index b2a1e7a3..e8a2388d 100644 --- a/pysquared/protos/power_monitor.py +++ b/pysquared/protos/power_monitor.py @@ -2,30 +2,48 @@ adhere to, ensuring consistent behavior across different power monitor hardware. """ +from ..sensor_reading.current import Current +from ..sensor_reading.voltage import Voltage + class PowerMonitorProto: """Protocol defining the interface for a Power Monitor.""" - def get_bus_voltage(self) -> float | None: + def get_bus_voltage(self) -> Voltage: """Gets the bus voltage from the power monitor. Returns: - The bus voltage in volts, or None if not available. + A Voltage object containing the bus voltage in volts. + + Raises: + SensorReadingValueError: If the reading returns an invalid value. + SensorReadingTimeoutError: If the reading times out. + SensorReadingUnknownError: If an unknown error occurs while reading the light sensor. """ ... - def get_shunt_voltage(self) -> float | None: + def get_shunt_voltage(self) -> Voltage: """Gets the shunt voltage from the power monitor. Returns: - The shunt voltage in volts, or None if not available. + A Voltage object containing the shunt voltage in volts. + + Raises: + SensorReadingValueError: If the reading returns an invalid value. + SensorReadingTimeoutError: If the reading times out. + SensorReadingUnknownError: If an unknown error occurs while reading the light sensor. """ ... - def get_current(self) -> float | None: + def get_current(self) -> Current: """Gets the current from the power monitor. Returns: - The current in amps, or None if not available. + A Current object containing the current in milliamps (mA) + + Raises: + SensorReadingValueError: If the reading returns an invalid value. + SensorReadingTimeoutError: If the reading times out. + SensorReadingUnknownError: If an unknown error occurs while reading the light sensor. """ ... diff --git a/pysquared/protos/reading.py b/pysquared/protos/reading.py new file mode 100644 index 00000000..0368b152 --- /dev/null +++ b/pysquared/protos/reading.py @@ -0,0 +1,35 @@ +"""This protocol specifies the interface that any sensor reading protocol implementation must +adhere to, ensuring consistent behavior across different types of sensor readings. +""" + +try: + from typing import Tuple +except ImportError: + pass + + +class ReadingProto: + """Protocol defining the interface for a sensor reading.""" + + @property + def timestamp(self) -> float: + """Gets the timestamp of the reading. + + Returns: + The timestamp of the reading in seconds since the epoch. + """ + ... + + @property + def value(self) -> Tuple[float, float, float] | float: + """Gets the value of the sensor reading. + + Returns: + The reading value, which may be a float or a tuple of floats, depending on the implementation. + + Raises: + SensorReadingValueError: If the sensor reading returns an invalid value. + SensorReadingTimeoutError: If the sensor reading times out. + SensorReadingUnknownError: If an unknown error occurs while reading the sensor. + """ + ... diff --git a/pysquared/sensor_reading/acceleration.py b/pysquared/sensor_reading/acceleration.py new file mode 100644 index 00000000..c0c92306 --- /dev/null +++ b/pysquared/sensor_reading/acceleration.py @@ -0,0 +1,34 @@ +"""Acceleration sensor reading.""" + +try: + from typing import Tuple +except ImportError: + pass + +from .base import Reading + + +class Acceleration(Reading): + """Acceleration sensor reading in meter per second².""" + + def __init__(self, x: float, y: float, z: float) -> None: + """Initialize the acceleration sensor reading. + + Args: + x: The x acceleration in meter per second² + y: The y acceleration in meter per second² + z: The z acceleration in meter per second² + """ + super().__init__() + self.x = x + self.y = y + self.z = z + + @property + def value(self) -> Tuple[float, float, float]: + """Acceleration in x, y, z meter per second². + + Returns: + A tuple containing the x, y, and z components of the acceleration. + """ + return (self.x, self.y, self.z) diff --git a/pysquared/sensor_reading/angular_velocity.py b/pysquared/sensor_reading/angular_velocity.py new file mode 100644 index 00000000..3723942c --- /dev/null +++ b/pysquared/sensor_reading/angular_velocity.py @@ -0,0 +1,34 @@ +"""AngularVelocity sensor reading.""" + +try: + from typing import Tuple +except ImportError: + pass + +from .base import Reading + + +class AngularVelocity(Reading): + """AngularVelocity sensor reading in radians per second.""" + + def __init__(self, x: float, y: float, z: float) -> None: + """Initialize the angular_velocity sensor reading. + + Args: + x: The x angular velocity in radians per second + y: The y angular velocity in radians per second + z: The z angular velocity in radians per second + """ + super().__init__() + self.x = x + self.y = y + self.z = z + + @property + def value(self) -> Tuple[float, float, float]: + """Angular velocity in x, y, z radians per second + + Returns: + A tuple containing the x, y, and z components of the angular velocity. + """ + return (self.x, self.y, self.z) diff --git a/pysquared/sensor_reading/avg.py b/pysquared/sensor_reading/avg.py new file mode 100644 index 00000000..2fc253b7 --- /dev/null +++ b/pysquared/sensor_reading/avg.py @@ -0,0 +1,35 @@ +"""File with helper for averaging sensor readings.""" + +from .current import Current +from .voltage import Voltage + +try: + from typing import Callable +except ImportError: + pass + + +def avg_readings( + func: Callable[..., Current | Voltage], num_readings: int = 50 +) -> float: + """Gets the average of the readings from a function. + + Args: + func: The function to call. + num_readings: The number of readings to take. + + Returns: + The average of the readings, or None if the readings could not be taken. + + Raises: + RuntimeError: If there is an error retrieving the reading from the function. + """ + readings: float = 0 + for _ in range(num_readings): + try: + reading = func() + except Exception as e: + raise RuntimeError(f"Error retrieving reading from {func.__name__}") from e + + readings += reading.value + return readings / num_readings diff --git a/pysquared/sensor_reading/base.py b/pysquared/sensor_reading/base.py index f795b1a8..793426f0 100644 --- a/pysquared/sensor_reading/base.py +++ b/pysquared/sensor_reading/base.py @@ -2,10 +2,37 @@ import time +from ..protos.reading import ReadingProto -class Reading: +try: + from typing import Tuple +except ImportError: + pass + + +class Reading(ReadingProto): """A sensor reading.""" def __init__(self) -> None: """Initialize the sensor reading with a timestamp.""" - self.timestamp = time.time() + self._timestamp = time.time() + + @property + def timestamp(self): + """Get the timestamp of the reading.""" + return self._timestamp + + @property + def value(self) -> Tuple[float, float, float] | float: + """Get the value of the reading. + + This method should be overridden by subclasses to return the specific sensor reading value. + """ + raise NotImplementedError("Subclasses must implement this method.") + + def to_dict(self) -> dict: + """Convert reading to dictionary for JSON serialization.""" + return { + "timestamp": self.timestamp, + "value": self.value, + } diff --git a/pysquared/sensor_reading/current.py b/pysquared/sensor_reading/current.py new file mode 100644 index 00000000..e6ef2a33 --- /dev/null +++ b/pysquared/sensor_reading/current.py @@ -0,0 +1,28 @@ +"""Current sensor reading.""" + +from .base import Reading + + +class Current(Reading): + """Current sensor reading in milliamps (mA).""" + + _value: float + """Current in milliamps (mA).""" + + def __init__(self, value: float) -> None: + """Initialize the current sensor reading. + + Args: + value: The current in milliamps (mA) + """ + super().__init__() + self._value = value + + @property + def value(self) -> float: + """Get the current value in milliamps (mA). + + Returns: + The current in milliamps (mA). + """ + return self._value diff --git a/pysquared/sensor_reading/light.py b/pysquared/sensor_reading/light.py index d2aa9efb..c75844e1 100644 --- a/pysquared/sensor_reading/light.py +++ b/pysquared/sensor_reading/light.py @@ -6,7 +6,7 @@ class Light(Reading): """Light sensor reading (non-unit-specific light levels).""" - value: float + _value: float """Light level (non-unit-specific).""" def __init__(self, value: float) -> None: @@ -16,4 +16,13 @@ def __init__(self, value: float) -> None: value: The light level (non-unit-specific) """ super().__init__() - self.value = value + self._value = value + + @property + def value(self) -> float: + """Get the light level (non-unit-specific). + + Returns: + The light level (non-unit-specific). + """ + return self._value diff --git a/pysquared/sensor_reading/lux.py b/pysquared/sensor_reading/lux.py index 8c893c87..9b23806a 100644 --- a/pysquared/sensor_reading/lux.py +++ b/pysquared/sensor_reading/lux.py @@ -6,7 +6,7 @@ class Lux(Reading): """Lux sensor reading in SI lux.""" - value: float + _value: float """Light level in SI lux.""" def __init__(self, value: float) -> None: @@ -16,4 +16,13 @@ def __init__(self, value: float) -> None: value: The light level in SI lux """ super().__init__() - self.value = value + self._value = value + + @property + def value(self) -> float: + """Get the light level in SI lux. + + Returns: + The light level in SI lux. + """ + return self._value diff --git a/pysquared/sensor_reading/magnetic.py b/pysquared/sensor_reading/magnetic.py new file mode 100644 index 00000000..c93032e8 --- /dev/null +++ b/pysquared/sensor_reading/magnetic.py @@ -0,0 +1,41 @@ +"""Magnetic sensor reading.""" + +try: + from typing import Tuple +except ImportError: + pass + +from .base import Reading + + +class Magnetic(Reading): + """Magnetic sensor reading in micro-Tesla (uT). + + Tesla is the SI unit of magnetic flux density (also called magnetic B-field strength), + and 1 Tesla = 1,000,000 micro-Tesla (uT). + """ + + def __init__(self, x: float, y: float, z: float) -> None: + """Initialize the magnetic sensor reading. + + Args: + x: The x magnetic field in micro-Tesla (uT) + y: The y magnetic field in micro-Tesla (uT) + z: The z magnetic field in micro-Tesla (uT) + """ + super().__init__() + self.x = x + self.y = y + self.z = z + + @property + def value(self) -> Tuple[float, float, float]: + """Magnetic field in x, y, z micro-Tesla (uT). + + Tesla is the SI unit of magnetic flux density (also called magnetic B-field strength), + and 1 Tesla = 1,000,000 micro-Tesla (uT). + + Returns: + A tuple containing the x, y, and z components of the magnetic field. + """ + return (self.x, self.y, self.z) diff --git a/pysquared/sensor_reading/temperature.py b/pysquared/sensor_reading/temperature.py index eafab9d2..9f53cf4b 100644 --- a/pysquared/sensor_reading/temperature.py +++ b/pysquared/sensor_reading/temperature.py @@ -6,7 +6,7 @@ class Temperature(Reading): """Temperature sensor reading in degrees celsius.""" - value: float + _value: float """Temperature in degrees celsius.""" def __init__(self, value: float) -> None: @@ -16,4 +16,13 @@ def __init__(self, value: float) -> None: value: Temperature in degrees Celsius. """ super().__init__() - self.value = value + self._value = value + + @property + def value(self) -> float: + """Get the temperature value in degrees celsius. + + Returns: + The temperature in degrees Celsius. + """ + return self._value diff --git a/pysquared/sensor_reading/voltage.py b/pysquared/sensor_reading/voltage.py new file mode 100644 index 00000000..20633fc1 --- /dev/null +++ b/pysquared/sensor_reading/voltage.py @@ -0,0 +1,28 @@ +"""Voltage sensor reading.""" + +from .base import Reading + + +class Voltage(Reading): + """Voltage sensor reading.""" + + _value: float + """Voltage in volts (V)""" + + def __init__(self, value: float) -> None: + """Initialize the voltage sensor reading in volts (V). + + Args: + value: The voltage in volts (V) + """ + super().__init__() + self._value = value + + @property + def value(self) -> float: + """Get the voltage value in volts (V). + + Returns: + The voltage in volts (V). + """ + return self._value diff --git a/tests/simulation/proves_v3_detumble/test_detumble.py b/tests/simulation/proves_v3_detumble/test_detumble.py new file mode 100644 index 00000000..f7ebcf69 --- /dev/null +++ b/tests/simulation/proves_v3_detumble/test_detumble.py @@ -0,0 +1,5 @@ +"""Simulate proves v3 detumbling logic in ISS orbit + +Using geomagnetic model from International Geomagnetic Reference Field (IGRF) https://github.com/IAGA-VMOD/ppigrf +And orbit location determination with https://github.com/ut-astria/orbdetpy +""" diff --git a/tests/unit/attitude_control/test_b_dot_detumble.py b/tests/unit/attitude_control/test_b_dot_detumble.py new file mode 100644 index 00000000..cb1ad8b8 --- /dev/null +++ b/tests/unit/attitude_control/test_b_dot_detumble.py @@ -0,0 +1,287 @@ +"""Unit tests for the b_dot_detumble module. + +This module contains property-based unit tests using Hypothesis for the +`b_dot_detumble` module, which provides functions for spacecraft detumbling +using the B-dot algorithm. +""" + +import math +from unittest.mock import patch + +import pytest +from hypothesis import assume, example, given +from hypothesis import strategies as st + +from pysquared.attitude_control.b_dot_detumble import BDotDetumble +from pysquared.sensor_reading.magnetic import Magnetic + +# Strategy for generating finite float values +finite_floats = st.floats( + min_value=-1e3, max_value=1e3, allow_nan=False, allow_infinity=False +) + +# Strategy for generating positive finite float values +positive_floats = st.floats( + min_value=1e-6, max_value=1e3, allow_nan=False, allow_infinity=False +) + +# Strategy for generating magnetic field tuples +magnetic_field_tuples = st.tuples(finite_floats, finite_floats, finite_floats) + +# Strategy for generating timestamps with reasonable differences +timestamps = st.floats( + min_value=0, max_value=1e6, allow_nan=False, allow_infinity=False +) + + +def magnetic_with_timestamp(x: float, y: float, z: float, timestamp: float) -> Magnetic: + """Create a Magnetic object with a specific timestamp.""" + with patch("time.time", return_value=timestamp): + return Magnetic(x, y, z) + + +class TestBDotDetumble: + """Test class for BDotDetumble functionality.""" + + @given( + st.floats(min_value=0.1, max_value=100, allow_nan=False, allow_infinity=False) + ) + def test_initialization(self, gain: float): + """Test that BDotDetumble initializes correctly with various gain values.""" + detumble = BDotDetumble(gain=gain) + assert detumble._gain == gain + + def test_default_initialization(self): + """Test that BDotDetumble initializes with default gain of 1.0.""" + detumble = BDotDetumble() + assert detumble._gain == 1.0 + + @given(magnetic_field_tuples) + @example((0, 0, 0)) # Test zero vector + @example((1, 0, 0)) # Test unit vectors + @example((0, 1, 0)) + @example((0, 0, 1)) + def test_magnitude_properties(self, mag_field: tuple[float, float, float]): + """Test mathematical properties of the magnitude function.""" + x, y, z = mag_field + magnetic = magnetic_with_timestamp(x, y, z, 0.0) + + magnitude = BDotDetumble._magnitude(magnetic) + + # Magnitude should always be non-negative + assert magnitude >= 0 + + # Test that magnitude matches expected mathematical formula + expected_magnitude = math.sqrt(x**2 + y**2 + z**2) + assert pytest.approx(expected_magnitude, 1e-10) == magnitude + + # Magnitude should be zero if and only if all components are effectively zero + if abs(x) < 1e-12 and abs(y) < 1e-12 and abs(z) < 1e-12: + assert pytest.approx(0, 1e-12) == magnitude + else: + # If any component is non-zero, magnitude should be positive + if max(abs(x), abs(y), abs(z)) > 1e-12: + assert magnitude > 0 + + @given( + magnetic_field_tuples, + magnetic_field_tuples, + st.floats(min_value=0.1, max_value=1000, allow_nan=False, allow_infinity=False), + ) + def test_dB_dt_basic_properties( + self, + prev_field: tuple[float, float, float], + curr_field: tuple[float, float, float], + dt: float, + ): + """Test basic mathematical properties of dB_dt calculation.""" + prev_x, prev_y, prev_z = prev_field + curr_x, curr_y, curr_z = curr_field + + prev_magnetic = magnetic_with_timestamp(prev_x, prev_y, prev_z, 0.0) + curr_magnetic = magnetic_with_timestamp(curr_x, curr_y, curr_z, dt) + + dB_dt = BDotDetumble._dB_dt(curr_magnetic, prev_magnetic) + + # Should return a tuple of three floats + assert isinstance(dB_dt, tuple) + assert len(dB_dt) == 3 + assert all(isinstance(x, float) for x in dB_dt) + + # Verify the calculation matches expected formula + expected_dx_dt = (curr_x - prev_x) / dt + expected_dy_dt = (curr_y - prev_y) / dt + expected_dz_dt = (curr_z - prev_z) / dt + + assert pytest.approx(expected_dx_dt, 1e-10) == dB_dt[0] + assert pytest.approx(expected_dy_dt, 1e-10) == dB_dt[1] + assert pytest.approx(expected_dz_dt, 1e-10) == dB_dt[2] + + @given(magnetic_field_tuples, positive_floats) + def test_dB_dt_zero_change(self, mag_field: tuple[float, float, float], dt: float): + """Test that dB_dt is zero when magnetic field doesn't change.""" + x, y, z = mag_field + + prev_magnetic = magnetic_with_timestamp(x, y, z, 0.0) + curr_magnetic = magnetic_with_timestamp(x, y, z, dt) + + dB_dt = BDotDetumble._dB_dt(curr_magnetic, prev_magnetic) + + # All components should be zero + assert pytest.approx(0, 1e-10) == dB_dt[0] + assert pytest.approx(0, 1e-10) == dB_dt[1] + assert pytest.approx(0, 1e-10) == dB_dt[2] + + @given( + magnetic_field_tuples, + st.floats(min_value=-1000, max_value=0, allow_nan=False, allow_infinity=False), + ) + @example((0, 0, 0), 0) # Test zero dt + @example((0, 0, 0), -1.0) # Test negative dt + def test_dB_dt_zero_division( + self, mag_field: tuple[float, float, float], dt: float + ): + """Test behavior when time difference is zero or negative.""" + x, y, z = mag_field + + prev_magnetic = magnetic_with_timestamp(x, y, z, 0.0) + curr_magnetic = magnetic_with_timestamp(x, y, z, dt) + + # This should raise a ValueError due to zero time difference + with pytest.raises(ValueError): + BDotDetumble._dB_dt(curr_magnetic, prev_magnetic) + + @given( + magnetic_field_tuples, magnetic_field_tuples, positive_floats, positive_floats + ) + def test_dipole_moment_properties( + self, + prev_field: tuple[float, float, float], + curr_field: tuple[float, float, float], + dt: float, + gain: float, + ): + """Test properties of dipole moment calculation.""" + prev_x, prev_y, prev_z = prev_field + curr_x, curr_y, curr_z = curr_field + + # Skip test if current magnetic field magnitude is too small + curr_magnitude = math.sqrt(curr_x**2 + curr_y**2 + curr_z**2) + assume(curr_magnitude > 1e-6) + + detumble = BDotDetumble(gain) + + prev_magnetic = magnetic_with_timestamp(prev_x, prev_y, prev_z, 0.0) + curr_magnetic = magnetic_with_timestamp(curr_x, curr_y, curr_z, dt) + + dipole = detumble.dipole_moment(curr_magnetic, prev_magnetic) + + # Should return a tuple of three floats + assert isinstance(dipole, tuple) + assert len(dipole) == 3 + assert all(isinstance(x, float) for x in dipole) + + @given(magnetic_field_tuples, positive_floats, positive_floats) + def test_dipole_moment_zero_change( + self, mag_field: tuple[float, float, float], dt: float, gain: float + ): + """Test that dipole moment is zero when magnetic field doesn't change.""" + x, y, z = mag_field + + # Skip test if magnetic field magnitude is too small + magnitude = math.sqrt(x**2 + y**2 + z**2) + assume(magnitude > 1e-6) + + detumble = BDotDetumble(gain=gain) + + prev_magnetic = magnetic_with_timestamp(x, y, z, 0.0) + curr_magnetic = magnetic_with_timestamp(x, y, z, dt) + + dipole = detumble.dipole_moment(curr_magnetic, prev_magnetic) + + # All components should be approximately zero + assert abs(dipole[0]) < 1e-6 + assert abs(dipole[1]) < 1e-6 + assert abs(dipole[2]) < 1e-6 + + def test_dipole_moment_zero_magnitude(self): + """Test behavior when current magnetic field magnitude is zero.""" + detumble = BDotDetumble(gain=1.0) + + # Create zero magnetic field + prev_magnetic = magnetic_with_timestamp(0, 0, 0, 0.0) + curr_magnetic = magnetic_with_timestamp(0, 0, 0, 1.0) + + # This should raise a ValueError due to zero magnitude + with pytest.raises(ValueError): + detumble.dipole_moment(curr_magnetic, prev_magnetic) + + @given( + magnetic_field_tuples, + st.floats(min_value=-1000, max_value=0, allow_nan=False, allow_infinity=False), + ) + @example((1, 0, 0), 0.0) # Test zero timestamp + @example((1, 0, 0), -1.0) # Test negative timestamp + def test_dipole_moment_zero_or_negative_timestamp( + self, mag_field: tuple[float, float, float], dt: float + ): + """Test behavior when current magnetic field timestamp is zero or negative.""" + x, y, z = mag_field + + # Skip test if current magnetic field magnitude is too small + curr_magnitude = math.sqrt(x**2 + y**2 + z**2) + assume(curr_magnitude > 1e-6) + + detumble = BDotDetumble() + + prev_magnetic = magnetic_with_timestamp(x, y, z, 0.0) + curr_magnetic = magnetic_with_timestamp(x, y, z, dt) + + # This should raise a ValueError due to zero or negative timestamp + with pytest.raises(ValueError): + detumble.dipole_moment(curr_magnetic, prev_magnetic) + + @pytest.mark.parametrize( + "gain, current_mag_field, previous_mag_field, dipole_moment", + [ + ( + 1.0, + magnetic_with_timestamp(1, 0, 0, 1.0), + magnetic_with_timestamp(0, 1, 0, 0.0), + (-1.0, 1.0, 0.0), + ), + ( + 2.0, + magnetic_with_timestamp(1, 0, 0, 1.0), + magnetic_with_timestamp(0, 1, 0, 0.0), + (-2.0, 2.0, 0.0), + ), # Test scaling gain + ( + 1.0, + magnetic_with_timestamp(3, 0, 0, 1.0), + magnetic_with_timestamp(0, 3, 0, 0.0), + (-1.0, 1.0, 0.0), + ), # Test scaling magnitude + ( + 1.7, + magnetic_with_timestamp(-9.5, 8.2, 3.3, 37.0), + magnetic_with_timestamp(-3, -8.4, 6.1, 0.0), + (0.0230152290, -0.058777354, 0.009914252), + ), + ], + ) + def test_dipole_moment_formula_verification( + self, gain, current_mag_field, previous_mag_field, dipole_moment + ): + """Test that dipole moment calculation matches the expected formula: m = -k * (dB/dt) / |B|.""" + detumble = BDotDetumble(gain) + + # Calculate the dipole moment + calculated_dipole = detumble.dipole_moment( + current_mag_field, previous_mag_field + ) + + # Verify the calculated dipole moment matches the expected values + assert pytest.approx(calculated_dipole[0], 1e-6) == dipole_moment[0] + assert pytest.approx(calculated_dipole[1], 1e-6) == dipole_moment[1] + assert pytest.approx(calculated_dipole[2], 1e-6) == dipole_moment[2] diff --git a/tests/unit/hardware/imu/manager/test_lsm6dsox_manager.py b/tests/unit/hardware/imu/manager/test_lsm6dsox_manager.py index 2bc4cb15..7a874311 100644 --- a/tests/unit/hardware/imu/manager/test_lsm6dsox_manager.py +++ b/tests/unit/hardware/imu/manager/test_lsm6dsox_manager.py @@ -2,7 +2,7 @@ This module contains unit tests for the `LSM6DSOXManager` class, which manages the LSM6DSOX IMU. The tests cover initialization, successful data retrieval, -and error handling for acceleration, gyroscope, and temperature readings. +and error handling for acceleration, angular_velocityscope, and temperature readings. """ import math @@ -16,6 +16,8 @@ from pysquared.hardware.exception import HardwareInitializationError from pysquared.hardware.imu.manager.lsm6dsox import LSM6DSOXManager from pysquared.logger import Logger +from pysquared.sensor_reading.acceleration import Acceleration +from pysquared.sensor_reading.angular_velocity import AngularVelocity from pysquared.sensor_reading.error import SensorReadingUnknownError from pysquared.sensor_reading.temperature import Temperature @@ -107,7 +109,10 @@ def test_get_acceleration_success( imu_manager._imu.acceleration = expected_accel vector = imu_manager.get_acceleration() - assert vector == expected_accel + assert isinstance(vector, Acceleration) + assert vector.x == expected_accel[0] + assert vector.y == expected_accel[1] + assert vector.z == expected_accel[2] def test_get_acceleration_failure( @@ -132,22 +137,16 @@ def test_get_acceleration_failure( ) type(mock_imu_instance).acceleration = mock_accel_property - vector = imu_manager.get_acceleration() - - assert vector is None - assert mock_logger.error.call_count == 1 - call_args, _ = mock_logger.error.call_args - assert call_args[0] == "Error retrieving IMU acceleration sensor values" - assert isinstance(call_args[1], RuntimeError) - assert str(call_args[1]) == "Simulated retrieval error" + with pytest.raises(SensorReadingUnknownError): + imu_manager.get_acceleration() -def test_get_gyro_success( +def test_get_angular_velocity_success( mock_lsm6dsox: MagicMock, mock_i2c: MagicMock, mock_logger: MagicMock, ) -> None: - """Tests successful retrieval of the gyro vector. + """Tests successful retrieval of the angular_velocity vector. Args: mock_lsm6dsox: Mocked LSM6DSOX class. @@ -159,16 +158,19 @@ def test_get_gyro_success( expected_gyro = (0.1, 0.2, 0.3) imu_manager._imu.gyro = expected_gyro - vector = imu_manager.get_gyro_data() - assert vector == expected_gyro + vector = imu_manager.get_angular_velocity() + assert isinstance(vector, AngularVelocity) + assert vector.x == expected_gyro[0] + assert vector.y == expected_gyro[1] + assert vector.z == expected_gyro[2] -def test_get_gyro_failure( +def test_get_angular_velocity_failure( mock_lsm6dsox: MagicMock, mock_i2c: MagicMock, mock_logger: MagicMock, ) -> None: - """Tests handling of exceptions when retrieving the gyro vector. + """Tests handling of exceptions when retrieving the angular_velocity vector. Args: mock_lsm6dsox: Mocked LSM6DSOX class. @@ -179,19 +181,13 @@ def test_get_gyro_failure( mock_imu_instance = MagicMock(spec=LSM6DSOX) imu_manager._imu = mock_imu_instance - mock_gyro_property = PropertyMock( + mock_angular_velocity_property = PropertyMock( side_effect=RuntimeError("Simulated retrieval error") ) - type(mock_imu_instance).gyro = mock_gyro_property + type(mock_imu_instance).angular_velocity = mock_angular_velocity_property - vector = imu_manager.get_gyro_data() - - assert vector is None - assert mock_logger.error.call_count == 1 - call_args, _ = mock_logger.error.call_args - assert call_args[0] == "Error retrieving IMU gyro sensor values" - assert isinstance(call_args[1], RuntimeError) - assert str(call_args[1]) == "Simulated retrieval error" + with pytest.raises(SensorReadingUnknownError): + imu_manager.get_angular_velocity() def test_get_temperature_success( diff --git a/tests/unit/hardware/magnetometer/manager/test_lis2mdl_manager.py b/tests/unit/hardware/magnetometer/manager/test_lis2mdl_manager.py index a2eb197f..b5cb0a5f 100644 --- a/tests/unit/hardware/magnetometer/manager/test_lis2mdl_manager.py +++ b/tests/unit/hardware/magnetometer/manager/test_lis2mdl_manager.py @@ -6,13 +6,17 @@ """ from typing import Generator -from unittest.mock import MagicMock, PropertyMock, patch +from unittest.mock import MagicMock, patch import pytest from mocks.adafruit_lis2mdl.lis2mdl import LIS2MDL from pysquared.hardware.exception import HardwareInitializationError from pysquared.hardware.magnetometer.manager.lis2mdl import LIS2MDLManager +from pysquared.sensor_reading.error import ( + SensorReadingUnknownError, +) +from pysquared.sensor_reading.magnetic import Magnetic @pytest.fixture @@ -85,7 +89,7 @@ def test_create_magnetometer_failed( assert mock_i2c.call_count <= 3 -def test_get_vector_success( +def test_get_magnetic_field_success( mock_lis2mdl: MagicMock, mock_i2c: MagicMock, mock_logger: MagicMock, @@ -99,18 +103,29 @@ def test_get_vector_success( """ magnetometer = LIS2MDLManager(mock_logger, mock_i2c) magnetometer._magnetometer = MagicMock(spec=LIS2MDL) - magnetometer._magnetometer.magnetic = (1.0, 2.0, 3.0) - vector = magnetometer.get_vector() - assert vector == (1.0, 2.0, 3.0) + def mock_magnetic(): + """Mock magnetic field vector.""" + return (1.0, 2.0, 3.0) + magnetometer._magnetometer.magnetic = mock_magnetic() -def test_get_vector_failure( + # Run the async function + vector = magnetometer.get_magnetic_field() + + # Verify the result + assert isinstance(vector, Magnetic) + assert vector.x == 1.0 + assert vector.y == 2.0 + assert vector.z == 3.0 + + +def test_get_magnetic_field_unknown_error( mock_lis2mdl: MagicMock, mock_i2c: MagicMock, mock_logger: MagicMock, ) -> None: - """Tests handling of exceptions when retrieving the magnetic field vector. + """Tests handling of unknown errors when retrieving the magnetic field vector. Args: mock_lis2mdl: Mocked LIS2MDL class. @@ -118,20 +133,16 @@ def test_get_vector_failure( mock_logger: Mocked Logger instance. """ magnetometer = LIS2MDLManager(mock_logger, mock_i2c) + magnetometer._magnetometer = MagicMock(spec=LIS2MDL) + + # Patch wait_for to raise TimeoutError immediately + with patch("asyncio.wait_for", side_effect=ValueError): + # Set a dummy coroutine - it won't be used due to the patch + magnetometer._magnetometer.asyncio_magnetic = MagicMock() + + # Run the async function and expect SensorReadingUnknownError + with pytest.raises(SensorReadingUnknownError) as excinfo: + magnetometer.get_magnetic_field() - # Configure the mock to raise an exception when accessing the magnetic property - mock_mag_instance = MagicMock(spec=LIS2MDL) - magnetometer._magnetometer = mock_mag_instance - mock_magnetic_property = PropertyMock( - side_effect=RuntimeError("Simulated retrieval error") - ) - type(mock_mag_instance).magnetic = mock_magnetic_property - - vector = magnetometer.get_vector() - - assert vector is None - assert mock_logger.error.call_count == 1 - call_args, _ = mock_logger.error.call_args - assert call_args[0] == "Error retrieving magnetometer sensor values" - assert isinstance(call_args[1], RuntimeError) - assert str(call_args[1]) == "Simulated retrieval error" + # Verify the exception message + assert "Unknown error while reading magnetometer data" in str(excinfo.value) diff --git a/tests/unit/hardware/magnetorquer/manager/test_proves_v3_manager.py b/tests/unit/hardware/magnetorquer/manager/test_proves_v3_manager.py new file mode 100644 index 00000000..26e7760b --- /dev/null +++ b/tests/unit/hardware/magnetorquer/manager/test_proves_v3_manager.py @@ -0,0 +1,372 @@ +"""Unit tests for the ProvesV3Manager class. + +This module contains unit tests for the `ProvesV3Manager` class, which is responsible for +controlling magnetorquers on the Proves V3 hardware. The tests cover initialization, +dipole moment calculations, current limiting, and the magnetorquer protocol interface. +""" + +import math +from unittest.mock import patch + +import pytest + +from pysquared.hardware.magnetorquer.manager.proves_v3_manager import ProvesV3Manager +from pysquared.protos.magnetorquer import MagnetorquerProto + + +class TestProvesV3Manager: + """Test suite for the ProvesV3Manager class.""" + + @pytest.fixture + def manager(self) -> ProvesV3Manager: + """Create a ProvesV3Manager instance for testing.""" + return ProvesV3Manager() + + def test_initialization(self, manager: ProvesV3Manager) -> None: + """Test that the manager initializes correctly.""" + assert isinstance(manager, ProvesV3Manager) + assert isinstance(manager, MagnetorquerProto) + + def test_class_constants(self) -> None: + """Test that all class constants are set correctly.""" + # Coil voltage + assert ProvesV3Manager._coil_voltage == 3.3 + + # X and Y axis constants + assert ProvesV3Manager._coil_num_turns_x_y == 2 * 24 + assert ProvesV3Manager._coil_length_x_y == 0.053 + assert ProvesV3Manager._coil_width_x_y == 0.045 + assert ProvesV3Manager._coil_area_x_y == 0.053 * 0.045 + assert ProvesV3Manager._coil_resistance_x_y == 57.2 + assert ProvesV3Manager._coil_max_current_x_y == 3.3 / 57.2 + + # Z axis constants + assert ProvesV3Manager._coil_num_turns_z == 3 * 51 + assert ProvesV3Manager._coil_diameter_z == 0.05755 + expected_area_z = math.pi * (0.05755 / 2) ** 2 + assert abs(ProvesV3Manager._coil_area_z - expected_area_z) < 1e-10 + assert ProvesV3Manager._coil_resistance_z == 248.8 + assert ProvesV3Manager._coil_max_current_z == 3.3 / 248.8 + + def test_current_from_dipole_moment_zero(self, manager: ProvesV3Manager) -> None: + """Test current calculation with zero dipole moment.""" + dipole_moment = (0.0, 0.0, 0.0) + current = manager._current_from_dipole_moment(dipole_moment) + + assert current == (0.0, 0.0, 0.0) + + def test_current_from_dipole_moment_positive( + self, manager: ProvesV3Manager + ) -> None: + """Test current calculation with positive dipole moments.""" + dipole_moment = (1.0, 2.0, 3.0) + current = manager._current_from_dipole_moment(dipole_moment) + + # Calculate expected values + expected_x = 1.0 / ( + ProvesV3Manager._coil_num_turns_x_y * ProvesV3Manager._coil_area_x_y + ) + expected_y = 2.0 / ( + ProvesV3Manager._coil_num_turns_x_y * ProvesV3Manager._coil_area_x_y + ) + expected_z = 3.0 / ( + ProvesV3Manager._coil_num_turns_z * ProvesV3Manager._coil_area_z + ) + + assert abs(current[0] - expected_x) < 1e-10 + assert abs(current[1] - expected_y) < 1e-10 + assert abs(current[2] - expected_z) < 1e-10 + + def test_current_from_dipole_moment_negative( + self, manager: ProvesV3Manager + ) -> None: + """Test current calculation with negative dipole moments.""" + dipole_moment = (-1.0, -2.0, -3.0) + current = manager._current_from_dipole_moment(dipole_moment) + + # Calculate expected values + expected_x = -1.0 / ( + ProvesV3Manager._coil_num_turns_x_y * ProvesV3Manager._coil_area_x_y + ) + expected_y = -2.0 / ( + ProvesV3Manager._coil_num_turns_x_y * ProvesV3Manager._coil_area_x_y + ) + expected_z = -3.0 / ( + ProvesV3Manager._coil_num_turns_z * ProvesV3Manager._coil_area_z + ) + + assert abs(current[0] - expected_x) < 1e-10 + assert abs(current[1] - expected_y) < 1e-10 + assert abs(current[2] - expected_z) < 1e-10 + + def test_current_from_dipole_moment_mixed(self, manager: ProvesV3Manager) -> None: + """Test current calculation with mixed positive and negative dipole moments.""" + dipole_moment = (1.5, -2.5, 0.8) + current = manager._current_from_dipole_moment(dipole_moment) + + # Calculate expected values + expected_x = 1.5 / ( + ProvesV3Manager._coil_num_turns_x_y * ProvesV3Manager._coil_area_x_y + ) + expected_y = -2.5 / ( + ProvesV3Manager._coil_num_turns_x_y * ProvesV3Manager._coil_area_x_y + ) + expected_z = 0.8 / ( + ProvesV3Manager._coil_num_turns_z * ProvesV3Manager._coil_area_z + ) + + assert abs(current[0] - expected_x) < 1e-10 + assert abs(current[1] - expected_y) < 1e-10 + assert abs(current[2] - expected_z) < 1e-10 + + def test_limit_current_within_limits(self, manager: ProvesV3Manager) -> None: + """Test current limiting when currents are within limits.""" + # Use small currents that are well within the limits + current = (0.01, -0.02, 0.005) + limited = manager._limit_current(current) + + # Should return the same values since they're within limits + assert limited == current + + def test_limit_current_exceeds_x_axis_positive( + self, manager: ProvesV3Manager + ) -> None: + """Test current limiting when x-axis current exceeds positive limit.""" + max_current_x_y = ProvesV3Manager._coil_max_current_x_y + current = (max_current_x_y + 0.1, 0.01, 0.005) + limited = manager._limit_current(current) + + assert limited[0] == max_current_x_y # Limited to max + assert limited[1] == 0.01 # Unchanged + assert limited[2] == 0.005 # Unchanged + + def test_limit_current_exceeds_x_axis_negative( + self, manager: ProvesV3Manager + ) -> None: + """Test current limiting when x-axis current exceeds negative limit.""" + max_current_x_y = ProvesV3Manager._coil_max_current_x_y + current = (-(max_current_x_y + 0.1), 0.01, 0.005) + limited = manager._limit_current(current) + + assert limited[0] == -max_current_x_y # Limited to -max + assert limited[1] == 0.01 # Unchanged + assert limited[2] == 0.005 # Unchanged + + def test_limit_current_exceeds_y_axis_positive( + self, manager: ProvesV3Manager + ) -> None: + """Test current limiting when y-axis current exceeds positive limit.""" + max_current_x_y = ProvesV3Manager._coil_max_current_x_y + current = (0.01, max_current_x_y + 0.1, 0.005) + limited = manager._limit_current(current) + + assert limited[0] == 0.01 # Unchanged + assert limited[1] == max_current_x_y # Limited to max + assert limited[2] == 0.005 # Unchanged + + def test_limit_current_exceeds_y_axis_negative( + self, manager: ProvesV3Manager + ) -> None: + """Test current limiting when y-axis current exceeds negative limit.""" + max_current_x_y = ProvesV3Manager._coil_max_current_x_y + current = (0.01, -(max_current_x_y + 0.1), 0.005) + limited = manager._limit_current(current) + + assert limited[0] == 0.01 # Unchanged + assert limited[1] == -max_current_x_y # Limited to -max + assert limited[2] == 0.005 # Unchanged + + def test_limit_current_exceeds_z_axis_positive( + self, manager: ProvesV3Manager + ) -> None: + """Test current limiting when z-axis current exceeds positive limit.""" + max_current_z = ProvesV3Manager._coil_max_current_z + current = (0.01, 0.02, max_current_z + 0.1) + limited = manager._limit_current(current) + + assert limited[0] == 0.01 # Unchanged + assert limited[1] == 0.02 # Unchanged + assert limited[2] == max_current_z # Limited to max + + def test_limit_current_exceeds_z_axis_negative( + self, manager: ProvesV3Manager + ) -> None: + """Test current limiting when z-axis current exceeds negative limit.""" + max_current_z = ProvesV3Manager._coil_max_current_z + current = (0.01, 0.02, -(max_current_z + 0.1)) + limited = manager._limit_current(current) + + assert limited[0] == 0.01 # Unchanged + assert limited[1] == 0.02 # Unchanged + assert limited[2] == -max_current_z # Limited to -max + + def test_limit_current_all_axes_exceed(self, manager: ProvesV3Manager) -> None: + """Test current limiting when all axes exceed their limits.""" + max_current_x_y = ProvesV3Manager._coil_max_current_x_y + max_current_z = ProvesV3Manager._coil_max_current_z + + current = (max_current_x_y + 0.1, -(max_current_x_y + 0.2), max_current_z + 0.3) + limited = manager._limit_current(current) + + assert limited[0] == max_current_x_y + assert limited[1] == -max_current_x_y + assert limited[2] == max_current_z + + def test_limit_current_zero_currents(self, manager: ProvesV3Manager) -> None: + """Test current limiting with zero currents.""" + current = (0.0, 0.0, 0.0) + limited = manager._limit_current(current) + + assert limited == (0.0, 0.0, 0.0) + + def test_limit_current_exactly_at_limits(self, manager: ProvesV3Manager) -> None: + """Test current limiting when currents are exactly at the limits.""" + max_current_x_y = ProvesV3Manager._coil_max_current_x_y + max_current_z = ProvesV3Manager._coil_max_current_z + + current = (max_current_x_y, -max_current_x_y, max_current_z) + limited = manager._limit_current(current) + + # Should return the same values since they're exactly at the limits + assert limited == current + + def test_set_dipole_moment_calls_helper_methods( + self, manager: ProvesV3Manager + ) -> None: + """Test that set_dipole_moment calls the correct helper methods.""" + dipole_moment = (1.0, 2.0, 3.0) + + with ( + patch.object(manager, "_current_from_dipole_moment") as mock_current_calc, + patch.object(manager, "_limit_current") as mock_limit, + ): + # Set up return values + calculated_current = (0.1, 0.2, 0.05) + limited_current = (0.08, 0.15, 0.04) + mock_current_calc.return_value = calculated_current + mock_limit.return_value = limited_current + + manager.set_dipole_moment(dipole_moment) + + # Verify methods were called with correct arguments + mock_current_calc.assert_called_once_with(dipole_moment) + mock_limit.assert_called_once_with(calculated_current) + + def test_set_dipole_moment_integration(self, manager: ProvesV3Manager) -> None: + """Test set_dipole_moment with actual calculations (integration test).""" + # This test verifies the complete flow without mocking + dipole_moment = (0.001, -0.002, 0.0005) + + # This should not raise any exceptions + manager.set_dipole_moment(dipole_moment) + + def test_set_dipole_moment_large_values(self, manager: ProvesV3Manager) -> None: + """Test set_dipole_moment with large dipole moment values.""" + # Large values that would exceed current limits + dipole_moment = (100.0, -150.0, 50.0) + + # This should not raise any exceptions (current limiting should handle it) + manager.set_dipole_moment(dipole_moment) + + def test_magnetorquer_configuration_comments(self) -> None: + """Test that the magnetorquer configuration is documented correctly in comments.""" + # This test verifies the physical configuration described in the code comments + + # X and Y axis: 2 layers of 24 turns each + assert ProvesV3Manager._coil_num_turns_x_y == 48 + + # Z axis: 3 layers of 51 turns each + assert ProvesV3Manager._coil_num_turns_z == 153 + + def test_mathematical_relationships(self) -> None: + """Test that mathematical relationships between constants are correct.""" + # Test that area calculations are correct + expected_area_x_y = ( + ProvesV3Manager._coil_length_x_y * ProvesV3Manager._coil_width_x_y + ) + assert ProvesV3Manager._coil_area_x_y == expected_area_x_y + + expected_area_z = math.pi * (ProvesV3Manager._coil_diameter_z / 2) ** 2 + assert abs(ProvesV3Manager._coil_area_z - expected_area_z) < 1e-10 + + # Test that max current calculations are correct + expected_max_current_x_y = ( + ProvesV3Manager._coil_voltage / ProvesV3Manager._coil_resistance_x_y + ) + assert ( + abs(ProvesV3Manager._coil_max_current_x_y - expected_max_current_x_y) + < 1e-10 + ) + + expected_max_current_z = ( + ProvesV3Manager._coil_voltage / ProvesV3Manager._coil_resistance_z + ) + assert abs(ProvesV3Manager._coil_max_current_z - expected_max_current_z) < 1e-10 + + def test_edge_case_very_small_dipole_moments( + self, manager: ProvesV3Manager + ) -> None: + """Test with very small dipole moment values.""" + dipole_moment = (1e-10, -1e-10, 1e-10) + current = manager._current_from_dipole_moment(dipole_moment) + limited = manager._limit_current(current) + + # Should handle very small values without issues + assert all(abs(c) < 1e-5 for c in current) + assert limited == current # Should not be limited + + def test_edge_case_very_large_dipole_moments( + self, manager: ProvesV3Manager + ) -> None: + """Test with very large dipole moment values.""" + dipole_moment = (1e6, -1e6, 1e6) + current = manager._current_from_dipole_moment(dipole_moment) + limited = manager._limit_current(current) + + # Current should be very large + assert all(abs(c) > 1e3 for c in current) + + # But limited current should be within bounds + max_x_y = ProvesV3Manager._coil_max_current_x_y + max_z = ProvesV3Manager._coil_max_current_z + + assert abs(limited[0]) <= max_x_y + assert abs(limited[1]) <= max_x_y + assert abs(limited[2]) <= max_z + + @pytest.mark.parametrize( + "dipole_x,dipole_y,dipole_z", + [ + (1.0, 0.0, 0.0), # X-axis only + (0.0, 1.0, 0.0), # Y-axis only + (0.0, 0.0, 1.0), # Z-axis only + (1.0, 1.0, 1.0), # All axes equal + (-1.0, -1.0, -1.0), # All axes equal negative + ], + ) + def test_dipole_moment_parametrized( + self, + manager: ProvesV3Manager, + dipole_x: float, + dipole_y: float, + dipole_z: float, + ) -> None: + """Test dipole moment calculations with various parameter combinations.""" + dipole_moment = (dipole_x, dipole_y, dipole_z) + + # Should not raise any exceptions + current = manager._current_from_dipole_moment(dipole_moment) + limited = manager._limit_current(current) + manager.set_dipole_moment(dipole_moment) + + # Basic sanity checks + assert len(current) == 3 + assert len(limited) == 3 + + # Check that signs are preserved when within limits + if abs(current[0]) <= ProvesV3Manager._coil_max_current_x_y: + assert (current[0] >= 0) == (limited[0] >= 0) + if abs(current[1]) <= ProvesV3Manager._coil_max_current_x_y: + assert (current[1] >= 0) == (limited[1] >= 0) + if abs(current[2]) <= ProvesV3Manager._coil_max_current_z: + assert (current[2] >= 0) == (limited[2] >= 0) diff --git a/tests/unit/hardware/power_monitor/manager/test_ina219_manager.py b/tests/unit/hardware/power_monitor/manager/test_ina219_manager.py index 4d474e2c..72f3f959 100644 --- a/tests/unit/hardware/power_monitor/manager/test_ina219_manager.py +++ b/tests/unit/hardware/power_monitor/manager/test_ina219_manager.py @@ -13,6 +13,9 @@ from mocks.adafruit_ina219.ina219 import INA219 from pysquared.hardware.exception import HardwareInitializationError from pysquared.hardware.power_monitor.manager.ina219 import INA219Manager +from pysquared.sensor_reading.current import Current +from pysquared.sensor_reading.error import SensorReadingUnknownError +from pysquared.sensor_reading.voltage import Voltage address: int = 123 @@ -93,7 +96,8 @@ def test_get_bus_voltage_success(mock_ina219, mock_i2c, mock_logger): power_monitor._ina219.bus_voltage = 3.3 voltage = power_monitor.get_bus_voltage() - assert voltage == pytest.approx(3.3, rel=1e-6) + assert isinstance(voltage, Voltage) + assert voltage.value == pytest.approx(3.3, rel=1e-6) def test_get_bus_voltage_failure(mock_ina219, mock_i2c, mock_logger): @@ -114,8 +118,8 @@ def test_get_bus_voltage_failure(mock_ina219, mock_i2c, mock_logger): ) type(power_monitor._ina219).bus_voltage = mock_ina219_bus_voltage_property - voltage = power_monitor.get_bus_voltage() - assert voltage is None + with pytest.raises(SensorReadingUnknownError): + power_monitor.get_bus_voltage() def test_get_shunt_voltage_success(mock_ina219, mock_i2c, mock_logger): @@ -132,7 +136,8 @@ def test_get_shunt_voltage_success(mock_ina219, mock_i2c, mock_logger): power_monitor._ina219.shunt_voltage = 0.1 voltage = power_monitor.get_shunt_voltage() - assert voltage == pytest.approx(0.1, rel=1e-6) + assert isinstance(voltage, Voltage) + assert voltage.value == pytest.approx(0.1, rel=1e-6) def test_get_shunt_voltage_failure(mock_ina219, mock_i2c, mock_logger): @@ -153,8 +158,8 @@ def test_get_shunt_voltage_failure(mock_ina219, mock_i2c, mock_logger): ) type(power_monitor._ina219).shunt_voltage = mock_ina219_shunt_voltage_property - voltage = power_monitor.get_shunt_voltage() - assert voltage is None + with pytest.raises(SensorReadingUnknownError): + power_monitor.get_shunt_voltage() def test_get_current_success(mock_ina219, mock_i2c, mock_logger): @@ -171,7 +176,8 @@ def test_get_current_success(mock_ina219, mock_i2c, mock_logger): power_monitor._ina219.current = 0.5 current = power_monitor.get_current() - assert current == pytest.approx(0.5, rel=1e-6) + assert isinstance(current, Current) + assert current.value == pytest.approx(0.5, rel=1e-6) def test_get_current_failure(mock_ina219, mock_i2c, mock_logger): @@ -192,5 +198,5 @@ def test_get_current_failure(mock_ina219, mock_i2c, mock_logger): ) type(power_monitor._ina219).current = mock_ina219_current_property - current = power_monitor.get_current() - assert current is None + with pytest.raises(SensorReadingUnknownError): + power_monitor.get_current() diff --git a/tests/unit/hardware/radio/manager/test_base.py b/tests/unit/hardware/radio/manager/test_radio_base.py similarity index 100% rename from tests/unit/hardware/radio/manager/test_base.py rename to tests/unit/hardware/radio/manager/test_radio_base.py diff --git a/tests/unit/sensor_reading/test_acceleration.py b/tests/unit/sensor_reading/test_acceleration.py new file mode 100644 index 00000000..01904421 --- /dev/null +++ b/tests/unit/sensor_reading/test_acceleration.py @@ -0,0 +1,40 @@ +"""Unit tests for the Acceleration sensor reading class.""" + +from unittest.mock import patch + +from hypothesis import given +from hypothesis import strategies as st + +from pysquared.sensor_reading.acceleration import Acceleration + + +@given( + st.floats(allow_nan=False, allow_infinity=False), + st.floats(allow_nan=False, allow_infinity=False), + st.floats(allow_nan=False, allow_infinity=False), +) +def test_acceleration_fuzzed_values(x, y, z): + """Fuzz test Acceleration sensor reading with arbitrary float values.""" + reading = Acceleration(x, y, z) + assert reading.x == x + assert reading.y == y + assert reading.z == z + assert reading.value == (x, y, z) + assert reading.timestamp is not None + assert isinstance(reading.timestamp, (int, float)) + + result_dict = reading.to_dict() + assert isinstance(result_dict, dict) + assert "timestamp" in result_dict + assert "value" in result_dict + assert result_dict["timestamp"] == reading.timestamp + assert result_dict["value"] == (x, y, z) + + +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_acceleration_timestamp(ts): + """Test that different Acceleration readings have timestamps.""" + with patch("time.time", side_effect=[ts]): + reading1 = Acceleration(1.0, 2.0, 3.0) + + assert reading1.timestamp == ts diff --git a/tests/unit/sensor_reading/test_avg.py b/tests/unit/sensor_reading/test_avg.py new file mode 100644 index 00000000..22d3bd62 --- /dev/null +++ b/tests/unit/sensor_reading/test_avg.py @@ -0,0 +1,190 @@ +"""Unit tests for the avg_readings helper function.""" + +from unittest.mock import Mock + +import pytest + +from pysquared.sensor_reading.avg import avg_readings +from pysquared.sensor_reading.current import Current +from pysquared.sensor_reading.voltage import Voltage + + +def test_avg_readings_with_current(): + """Test avg_readings with Current sensor readings.""" + # Create mock function that returns Current readings + mock_func = Mock() + mock_func.side_effect = [ + Current(10.0), + Current(20.0), + Current(30.0), + Current(40.0), + Current(50.0), + ] + + result = avg_readings(mock_func, num_readings=5) + + assert result == 30.0 # Average of 10, 20, 30, 40, 50 + assert mock_func.call_count == 5 + + +def test_avg_readings_with_voltage(): + """Test avg_readings with Voltage sensor readings.""" + # Create mock function that returns Voltage readings + mock_func = Mock() + mock_func.side_effect = [ + Voltage(3.0), + Voltage(3.3), + Voltage(3.6), + Voltage(3.9), + Voltage(4.2), + ] + + result = avg_readings(mock_func, num_readings=5) + + assert result == 3.6 # Average of 3.0, 3.3, 3.6, 3.9, 4.2 + assert mock_func.call_count == 5 + + +def test_avg_readings_default_num_readings(): + """Test avg_readings with default number of readings (50).""" + mock_func = Mock() + mock_func.return_value = Current(5.0) + + result = avg_readings(mock_func) + + assert result == 5.0 + assert mock_func.call_count == 50 + + +def test_avg_readings_single_reading(): + """Test avg_readings with a single reading.""" + mock_func = Mock() + mock_func.return_value = Voltage(12.5) + + result = avg_readings(mock_func, num_readings=1) + + assert result == 12.5 + assert mock_func.call_count == 1 + + +def test_avg_readings_zero_values(): + """Test avg_readings with zero values.""" + mock_func = Mock() + mock_func.return_value = Current(0.0) + + result = avg_readings(mock_func, num_readings=10) + + assert result == 0.0 + assert mock_func.call_count == 10 + + +def test_avg_readings_negative_values(): + """Test avg_readings with negative values.""" + mock_func = Mock() + mock_func.side_effect = [ + Current(-5.0), + Current(-10.0), + Current(-15.0), + ] + + result = avg_readings(mock_func, num_readings=3) + + assert result == -10.0 # Average of -5, -10, -15 + assert mock_func.call_count == 3 + + +def test_avg_readings_mixed_values(): + """Test avg_readings with mixed positive and negative values.""" + mock_func = Mock() + mock_func.side_effect = [ + Voltage(-5.0), + Voltage(0.0), + Voltage(5.0), + Voltage(10.0), + ] + + result = avg_readings(mock_func, num_readings=4) + + assert result == 2.5 # Average of -5, 0, 5, 10 + assert mock_func.call_count == 4 + + +def test_avg_readings_precision(): + """Test avg_readings with high precision values.""" + mock_func = Mock() + mock_func.side_effect = [ + Current(1.111111), + Current(2.222222), + Current(3.333333), + ] + + result = avg_readings(mock_func, num_readings=3) + + expected = (1.111111 + 2.222222 + 3.333333) / 3 + assert abs(result - expected) < 1e-6 + assert mock_func.call_count == 3 + + +def test_avg_readings_function_exception(): + """Test avg_readings when the function raises an exception.""" + mock_func = Mock() + mock_func.__name__ = "mock_sensor_function" + mock_func.side_effect = Exception("Sensor failure") + + with pytest.raises(RuntimeError, match="Error retrieving reading from"): + avg_readings(mock_func, num_readings=5) + + +def test_avg_readings_function_exception_on_second_call(): + """Test avg_readings when the function raises an exception on a later call.""" + mock_func = Mock() + mock_func.__name__ = "mock_sensor_function" + mock_func.side_effect = [ + Current(10.0), + Current(20.0), + Exception("Sensor failure on third reading"), + ] + + with pytest.raises(RuntimeError, match="Error retrieving reading from"): + avg_readings(mock_func, num_readings=5) + + # Should have tried 3 times before failing + assert mock_func.call_count == 3 + + +def test_avg_readings_function_name_in_error(): + """Test that the function name appears in the error message.""" + + def test_sensor_func(): + """Mock sensor function that raises an error.""" + raise ValueError("Sensor error") + + with pytest.raises( + RuntimeError, match="Error retrieving reading from test_sensor_func" + ): + avg_readings(test_sensor_func, num_readings=1) + + +def test_avg_readings_large_number_of_readings(): + """Test avg_readings with a large number of readings.""" + mock_func = Mock() + mock_func.return_value = Current(1.0) + + result = avg_readings(mock_func, num_readings=1000) + + assert result == 1.0 + assert mock_func.call_count == 1000 + + +def test_avg_readings_various_reading_counts(): + """Test avg_readings with various reading counts.""" + test_cases = [1, 2, 5, 10, 25, 50, 100] + + for count in test_cases: + mock_func = Mock() + mock_func.return_value = Voltage(2.5) + + result = avg_readings(mock_func, num_readings=count) + + assert result == 2.5 + assert mock_func.call_count == count diff --git a/tests/unit/sensor_reading/test_current.py b/tests/unit/sensor_reading/test_current.py new file mode 100644 index 00000000..8b593af8 --- /dev/null +++ b/tests/unit/sensor_reading/test_current.py @@ -0,0 +1,33 @@ +"""Unit tests for the Current sensor reading class.""" + +from unittest.mock import patch + +from hypothesis import given +from hypothesis import strategies as st + +from pysquared.sensor_reading.current import Current + + +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_current_fuzzed_values(value): + """Fuzz test Current sensor reading with arbitrary float values.""" + reading = Current(value) + assert reading.value == value + assert reading.timestamp is not None + assert isinstance(reading.timestamp, (int, float)) + + result_dict = reading.to_dict() + assert isinstance(result_dict, dict) + assert "timestamp" in result_dict + assert "value" in result_dict + assert result_dict["timestamp"] == reading.timestamp + assert result_dict["value"] == value + + +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_current_timestamp(ts): + """Test that different Current readings have timestamps.""" + with patch("time.time", side_effect=[ts]): + reading1 = Current(150.5) + + assert reading1.timestamp == ts diff --git a/tests/unit/sensor_reading/test_error.py b/tests/unit/sensor_reading/test_error.py new file mode 100644 index 00000000..e56ce166 --- /dev/null +++ b/tests/unit/sensor_reading/test_error.py @@ -0,0 +1,112 @@ +"""Unit tests for the sensor reading error classes.""" + +import pytest + +from pysquared.sensor_reading.error import ( + SensorReadingError, + SensorReadingTimeoutError, + SensorReadingUnknownError, + SensorReadingValueError, +) + + +def test_sensor_reading_error_base(): + """Test that SensorReadingError can be raised and caught.""" + with pytest.raises(SensorReadingError): + raise SensorReadingError("Test error") + + +def test_sensor_reading_error_inheritance(): + """Test that all specific errors inherit from SensorReadingError.""" + assert issubclass(SensorReadingTimeoutError, SensorReadingError) + assert issubclass(SensorReadingValueError, SensorReadingError) + assert issubclass(SensorReadingUnknownError, SensorReadingError) + + +def test_sensor_reading_timeout_error_default_message(): + """Test SensorReadingTimeoutError with default message.""" + error = SensorReadingTimeoutError() + assert str(error) == "Sensor reading operation timed out." + + +def test_sensor_reading_timeout_error_custom_message(): + """Test SensorReadingTimeoutError with custom message.""" + custom_message = "Custom timeout message" + error = SensorReadingTimeoutError(custom_message) + assert str(error) == custom_message + + +def test_sensor_reading_timeout_error_raising(): + """Test that SensorReadingTimeoutError can be raised and caught.""" + with pytest.raises(SensorReadingTimeoutError) as exc_info: + raise SensorReadingTimeoutError("Timeout occurred") + + assert str(exc_info.value) == "Timeout occurred" + + +def test_sensor_reading_value_error_default_message(): + """Test SensorReadingValueError with default message.""" + error = SensorReadingValueError() + assert str(error) == "Sensor reading returned an invalid value." + + +def test_sensor_reading_value_error_custom_message(): + """Test SensorReadingValueError with custom message.""" + custom_message = "Invalid sensor value detected" + error = SensorReadingValueError(custom_message) + assert str(error) == custom_message + + +def test_sensor_reading_value_error_raising(): + """Test that SensorReadingValueError can be raised and caught.""" + with pytest.raises(SensorReadingValueError) as exc_info: + raise SensorReadingValueError("Value out of range") + + assert str(exc_info.value) == "Value out of range" + + +def test_sensor_reading_unknown_error_default_message(): + """Test SensorReadingUnknownError with default message.""" + error = SensorReadingUnknownError() + assert str(error) == "An unknown error occurred during sensor reading." + + +def test_sensor_reading_unknown_error_custom_message(): + """Test SensorReadingUnknownError with custom message.""" + custom_message = "Mysterious sensor failure" + error = SensorReadingUnknownError(custom_message) + assert str(error) == custom_message + + +def test_sensor_reading_unknown_error_raising(): + """Test that SensorReadingUnknownError can be raised and caught.""" + with pytest.raises(SensorReadingUnknownError) as exc_info: + raise SensorReadingUnknownError("Unknown failure mode") + + assert str(exc_info.value) == "Unknown failure mode" + + +def test_error_hierarchy_catching(): + """Test that specific errors can be caught by the base class.""" + # Test that specific errors can be caught as SensorReadingError + with pytest.raises(SensorReadingError): + raise SensorReadingTimeoutError("Timeout") + + with pytest.raises(SensorReadingError): + raise SensorReadingValueError("Invalid value") + + with pytest.raises(SensorReadingError): + raise SensorReadingUnknownError("Unknown error") + + +def test_multiple_error_types(): + """Test handling multiple error types in exception handling.""" + errors_to_test = [ + SensorReadingTimeoutError("Timeout"), + SensorReadingValueError("Bad value"), + SensorReadingUnknownError("Unknown"), + ] + + for error in errors_to_test: + with pytest.raises(SensorReadingError): + raise error diff --git a/tests/unit/sensor_reading/test_gyro.py b/tests/unit/sensor_reading/test_gyro.py new file mode 100644 index 00000000..f3471401 --- /dev/null +++ b/tests/unit/sensor_reading/test_gyro.py @@ -0,0 +1,40 @@ +"""Unit tests for the AngularVelocity sensor reading class.""" + +from unittest.mock import patch + +from hypothesis import given +from hypothesis import strategies as st + +from pysquared.sensor_reading.angular_velocity import AngularVelocity + + +@given( + st.floats(allow_nan=False, allow_infinity=False), + st.floats(allow_nan=False, allow_infinity=False), + st.floats(allow_nan=False, allow_infinity=False), +) +def test_angular_velocity_fuzzed_values(x, y, z): + """Fuzz test AngularVelocity sensor reading with arbitrary float values.""" + reading = AngularVelocity(x, y, z) + assert reading.x == x + assert reading.y == y + assert reading.z == z + assert reading.value == (x, y, z) + assert reading.timestamp is not None + assert isinstance(reading.timestamp, (int, float)) + + result_dict = reading.to_dict() + assert isinstance(result_dict, dict) + assert "timestamp" in result_dict + assert "value" in result_dict + assert result_dict["timestamp"] == reading.timestamp + assert result_dict["value"] == (x, y, z) + + +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_angular_velocity_timestamp(ts): + """Test that different AngularVelocity readings have timestamps.""" + with patch("time.time", side_effect=[ts]): + reading1 = AngularVelocity(1.0, 2.0, 3.0) + + assert reading1.timestamp == ts diff --git a/tests/unit/sensor_reading/test_light.py b/tests/unit/sensor_reading/test_light.py index 94e690c8..9cc504bd 100644 --- a/tests/unit/sensor_reading/test_light.py +++ b/tests/unit/sensor_reading/test_light.py @@ -1,47 +1,33 @@ """Unit tests for the Light sensor reading class.""" +from unittest.mock import patch + +from hypothesis import given +from hypothesis import strategies as st + from pysquared.sensor_reading.light import Light -def test_light_initialization(): - """Test that Light sensor reading initializes correctly.""" - value = 500.0 +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_light_fuzzed_values(value): + """Fuzz test Light sensor reading with arbitrary float values.""" reading = Light(value) - assert reading.value == value assert reading.timestamp is not None assert isinstance(reading.timestamp, (int, float)) + result_dict = reading.to_dict() + assert isinstance(result_dict, dict) + assert "timestamp" in result_dict + assert "value" in result_dict + assert result_dict["timestamp"] == reading.timestamp + assert result_dict["value"] == value -def test_light_with_zero_value(): - """Test Light sensor reading with zero value.""" - reading = Light(0.0) - - assert reading.value == 0.0 - - -def test_light_with_negative_value(): - """Test Light sensor reading with negative value.""" - reading = Light(-10.5) - - assert reading.value == -10.5 - - -def test_light_with_large_value(): - """Test Light sensor reading with large value.""" - large_value = 999999.9 - reading = Light(large_value) - - assert reading.value == large_value - - -def test_light_timestamp_uniqueness(): - """Test that different Light readings have different timestamps.""" - import time - reading1 = Light(100.0) - time.sleep(0.01) # Small delay to ensure different timestamps - reading2 = Light(200.0) +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_light_timestamp(ts): + """Test that different Light readings have timestamps.""" + with patch("time.time", side_effect=[ts]): + reading1 = Light(500.0) - assert reading1.timestamp != reading2.timestamp - assert reading2.timestamp > reading1.timestamp + assert reading1.timestamp == ts diff --git a/tests/unit/sensor_reading/test_lux.py b/tests/unit/sensor_reading/test_lux.py index c20bf771..cc009fcc 100644 --- a/tests/unit/sensor_reading/test_lux.py +++ b/tests/unit/sensor_reading/test_lux.py @@ -1,63 +1,33 @@ """Unit tests for the Lux sensor reading class.""" +from unittest.mock import patch + +from hypothesis import given +from hypothesis import strategies as st + from pysquared.sensor_reading.lux import Lux -def test_lux_initialization(): - """Test that Lux sensor reading initializes correctly.""" - value = 250.5 +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_lux_fuzzed_values(value): + """Fuzz test Lux sensor reading with arbitrary float values.""" reading = Lux(value) - assert reading.value == value assert reading.timestamp is not None assert isinstance(reading.timestamp, (int, float)) + result_dict = reading.to_dict() + assert isinstance(result_dict, dict) + assert "timestamp" in result_dict + assert "value" in result_dict + assert result_dict["timestamp"] == reading.timestamp + assert result_dict["value"] == value -def test_lux_with_zero_value(): - """Test Lux sensor reading with zero value.""" - reading = Lux(0.0) - - assert reading.value == 0.0 - - -def test_lux_with_negative_value(): - """Test Lux sensor reading with negative value (edge case).""" - reading = Lux(-5.0) - - assert reading.value == -5.0 - - -def test_lux_with_typical_indoor_value(): - """Test Lux sensor reading with typical indoor lighting value.""" - indoor_lux = 300.0 # Typical indoor lighting - reading = Lux(indoor_lux) - - assert reading.value == indoor_lux - - -def test_lux_with_typical_outdoor_value(): - """Test Lux sensor reading with typical outdoor lighting value.""" - outdoor_lux = 10000.0 # Typical daylight - reading = Lux(outdoor_lux) - - assert reading.value == outdoor_lux - - -def test_lux_with_very_high_value(): - """Test Lux sensor reading with very high value (direct sunlight).""" - sunlight_lux = 100000.0 # Direct sunlight - reading = Lux(sunlight_lux) - - assert reading.value == sunlight_lux - - -def test_lux_timestamp_uniqueness(): - """Test that different Lux readings have different timestamps.""" - import time - reading1 = Lux(100.0) - time.sleep(0.01) # Small delay to ensure different timestamps - reading2 = Lux(200.0) +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_lux_timestamp(ts): + """Test that different Lux readings have timestamps.""" + with patch("time.time", side_effect=[ts]): + reading1 = Lux(250.5) - assert reading1.timestamp != reading2.timestamp - assert reading2.timestamp > reading1.timestamp + assert reading1.timestamp == ts diff --git a/tests/unit/sensor_reading/test_magnetic.py b/tests/unit/sensor_reading/test_magnetic.py new file mode 100644 index 00000000..7bedb7c8 --- /dev/null +++ b/tests/unit/sensor_reading/test_magnetic.py @@ -0,0 +1,40 @@ +"""Unit tests for the Magnetic sensor reading class.""" + +from unittest.mock import patch + +from hypothesis import given +from hypothesis import strategies as st + +from pysquared.sensor_reading.magnetic import Magnetic + + +@given( + st.floats(allow_nan=False, allow_infinity=False), + st.floats(allow_nan=False, allow_infinity=False), + st.floats(allow_nan=False, allow_infinity=False), +) +def test_magnetic_fuzzed_values(x, y, z): + """Fuzz test Magnetic sensor reading with arbitrary float values.""" + reading = Magnetic(x, y, z) + assert reading.x == x + assert reading.y == y + assert reading.z == z + assert reading.value == (x, y, z) + assert reading.timestamp is not None + assert isinstance(reading.timestamp, (int, float)) + + result_dict = reading.to_dict() + assert isinstance(result_dict, dict) + assert "timestamp" in result_dict + assert "value" in result_dict + assert result_dict["timestamp"] == reading.timestamp + assert result_dict["value"] == (x, y, z) + + +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_magnetic_timestamp(ts): + """Test that different Magnetic readings have timestamps.""" + with patch("time.time", side_effect=[ts]): + reading1 = Magnetic(1.0, 2.0, 3.0) + + assert reading1.timestamp == ts diff --git a/tests/unit/sensor_reading/test_reading_base.py b/tests/unit/sensor_reading/test_reading_base.py new file mode 100644 index 00000000..36a23993 --- /dev/null +++ b/tests/unit/sensor_reading/test_reading_base.py @@ -0,0 +1,28 @@ +"""Unit tests for the base Reading class.""" + +from unittest.mock import patch + +import pytest +from hypothesis import given +from hypothesis import strategies as st + +from pysquared.sensor_reading.base import Reading + + +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_reading_timestamp(ts): + """Test that Reading timestamps work with different values.""" + with patch("time.time", side_effect=[ts]): + reading1 = Reading() + + assert reading1.timestamp == ts + + +def test_reading_value_not_implemented(): + """Test that Reading.value raises NotImplementedError when not overridden.""" + reading = Reading() + + with pytest.raises( + NotImplementedError, match="Subclasses must implement this method." + ): + _ = reading.value diff --git a/tests/unit/sensor_reading/test_temperature.py b/tests/unit/sensor_reading/test_temperature.py index c9fa869d..001f551e 100644 --- a/tests/unit/sensor_reading/test_temperature.py +++ b/tests/unit/sensor_reading/test_temperature.py @@ -1,58 +1,33 @@ """Unit tests for the Temperature sensor reading class.""" -import pytest +from unittest.mock import patch + +from hypothesis import given +from hypothesis import strategies as st from pysquared.sensor_reading.temperature import Temperature -def test_temperature_initialization(): - """Test that Temperature sensor reading initializes correctly.""" - value = 25.5 +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_temperature_fuzzed_values(value): + """Fuzz test Temperature sensor reading with arbitrary float values.""" reading = Temperature(value) - assert reading.value == value assert reading.timestamp is not None assert isinstance(reading.timestamp, (int, float)) - -def test_temperature_with_zero_value(): - """Test Temperature sensor reading with zero value.""" - reading = Temperature(0.0) - - assert reading.value == 0.0 - - -def test_temperature_with_positive_value(): - """Test Temperature sensor reading with high temperature value.""" - high_temp = 85.0 - reading = Temperature(high_temp) - - assert reading.value == high_temp - - -def test_temperature_with_negative_value(): - """Test Temperature sensor reading with very low temperature.""" - very_cold = -273.15 # Absolute zero - reading = Temperature(very_cold) - - assert reading.value == very_cold - - -def test_temperature_timestamp_uniqueness(): - """Test that different Temperature readings have different timestamps.""" - import time - - reading1 = Temperature(20.0) - time.sleep(0.01) # Small delay to ensure different timestamps - reading2 = Temperature(25.0) - - assert reading1.timestamp != reading2.timestamp - assert reading2.timestamp > reading1.timestamp + result_dict = reading.to_dict() + assert isinstance(result_dict, dict) + assert "timestamp" in result_dict + assert "value" in result_dict + assert result_dict["timestamp"] == reading.timestamp + assert result_dict["value"] == value -def test_temperature_precision(): - """Test Temperature sensor reading with high precision values.""" - precise_temp = 23.456789 - reading = Temperature(precise_temp) +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_temperature_timestamp(ts): + """Test that different Temperature readings have timestamps.""" + with patch("time.time", side_effect=[ts]): + reading1 = Temperature(25.0) - assert reading.value == pytest.approx(precise_temp, rel=1e-9) + assert reading1.timestamp == ts diff --git a/tests/unit/sensor_reading/test_voltage.py b/tests/unit/sensor_reading/test_voltage.py new file mode 100644 index 00000000..59cd4ba8 --- /dev/null +++ b/tests/unit/sensor_reading/test_voltage.py @@ -0,0 +1,33 @@ +"""Unit tests for the Voltage sensor reading class.""" + +from unittest.mock import patch + +from hypothesis import given +from hypothesis import strategies as st + +from pysquared.sensor_reading.voltage import Voltage + + +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_voltage_fuzzed_values(value): + """Fuzz test Voltage sensor reading with arbitrary float values.""" + reading = Voltage(value) + assert reading.value == value + assert reading.timestamp is not None + assert isinstance(reading.timestamp, (int, float)) + + result_dict = reading.to_dict() + assert isinstance(result_dict, dict) + assert "timestamp" in result_dict + assert "value" in result_dict + assert result_dict["timestamp"] == reading.timestamp + assert result_dict["value"] == value + + +@given(st.floats(allow_nan=False, allow_infinity=False)) +def test_voltage_timestamp(ts): + """Test that different Voltage readings have timestamps.""" + with patch("time.time", side_effect=[ts]): + reading1 = Voltage(3.3) + + assert reading1.timestamp == ts diff --git a/tests/unit/test_beacon.py b/tests/unit/test_beacon.py index 3806ec6d..f9775e05 100644 --- a/tests/unit/test_beacon.py +++ b/tests/unit/test_beacon.py @@ -25,7 +25,12 @@ from pysquared.protos.power_monitor import PowerMonitorProto from pysquared.protos.radio import RadioProto from pysquared.protos.temperature_sensor import TemperatureSensorProto +from pysquared.sensor_reading.acceleration import Acceleration +from pysquared.sensor_reading.angular_velocity import AngularVelocity +from pysquared.sensor_reading.avg import avg_readings +from pysquared.sensor_reading.current import Current from pysquared.sensor_reading.temperature import Temperature +from pysquared.sensor_reading.voltage import Voltage @pytest.fixture @@ -87,17 +92,17 @@ def get_name(self) -> str: class MockPowerMonitor(PowerMonitorProto): """Mocks the PowerMonitorProto for testing.""" - def get_current(self) -> float: + def get_current(self) -> Current: """Mocks the get_current method.""" - return 0.5 + return Current(0.5) - def get_bus_voltage(self) -> float: + def get_bus_voltage(self) -> Voltage: """Mocks the get_bus_voltage method.""" - return 3.3 + return Voltage(3.3) - def get_shunt_voltage(self) -> float: + def get_shunt_voltage(self) -> Voltage: """Mocks the get_shunt_voltage method.""" - return 0.1 + return Voltage(0.1) class MockTemperatureSensor(TemperatureSensorProto): @@ -111,13 +116,13 @@ def get_temperature(self) -> Temperature: class MockIMU(IMUProto): """Mocks the IMUProto for testing.""" - def get_gyro_data(self) -> tuple[float, float, float]: - """Mocks the get_gyro_data method.""" - return (0.1, 2.3, 4.5) + def get_angular_velocity(self) -> AngularVelocity: + """Mocks the get_angular_velocity method.""" + return AngularVelocity(0.1, 2.3, 4.5) - def get_acceleration(self) -> tuple[float, float, float]: + def get_acceleration(self) -> Acceleration: """Mocks the get_acceleration method.""" - return (5.4, 3.2, 1.0) + return Acceleration(5.4, 3.2, 1.0) def test_beacon_init(mock_logger, mock_packet_manager): @@ -228,54 +233,44 @@ def test_beacon_send_with_sensors( assert pytest.approx(d["MockPowerMonitor_4_shunt_voltage_avg"], 0.01) == 0.1 # temperature sensor - assert pytest.approx(d["MockTemperatureSensor_5_temperature"], 0.01) == 22.5 - assert d["MockTemperatureSensor_5_temperature_timestamp"] is not None + assert ( + pytest.approx(d["MockTemperatureSensor_5_temperature"]["value"], 0.01) == 22.5 + ) + assert d["MockTemperatureSensor_5_temperature"]["timestamp"] is not None # IMU sensor - assert pytest.approx(d["MockIMU_6_gyroscope"][0], 0.1) == 0.1 - assert pytest.approx(d["MockIMU_6_gyroscope"][1], 0.1) == 2.3 - assert pytest.approx(d["MockIMU_6_gyroscope"][2], 0.1) == 4.5 - assert pytest.approx(d["MockIMU_6_acceleration"][0], 0.1) == 5.4 - assert pytest.approx(d["MockIMU_6_acceleration"][1], 0.1) == 3.2 - assert pytest.approx(d["MockIMU_6_acceleration"][2], 0.1) == 1.0 - + assert pytest.approx(d["MockIMU_6_angular_velocityscope"]["value"][0], 0.1) == 0.1 + assert pytest.approx(d["MockIMU_6_angular_velocityscope"]["value"][1], 0.1) == 2.3 + assert pytest.approx(d["MockIMU_6_angular_velocityscope"]["value"][2], 0.1) == 4.5 + assert d["MockIMU_6_angular_velocityscope"]["timestamp"] is not None + assert pytest.approx(d["MockIMU_6_acceleration"]["value"][0], 0.1) == 5.4 + assert pytest.approx(d["MockIMU_6_acceleration"]["value"][1], 0.1) == 3.2 + assert pytest.approx(d["MockIMU_6_acceleration"]["value"][2], 0.1) == 1.0 + assert d["MockIMU_6_acceleration"]["timestamp"] is not None -def test_beacon_avg_readings(mock_logger, mock_packet_manager): - """Tests the avg_readings method in the context of the Beacon class. - Args: - mock_logger: Mocked Logger instance. - mock_packet_manager: Mocked PacketManager instance. - """ - beacon = Beacon(mock_logger, "test_beacon", mock_packet_manager, 0.0) +def test_avg_readings_function(): + """Tests the avg_readings standalone function.""" # Test with a function that returns consistent values def constant_func(): """Returns a constant value.""" - return 5.0 + return Voltage(5.0) - result = beacon.avg_readings(constant_func, num_readings=5) + result = avg_readings(constant_func, num_readings=5) assert pytest.approx(result, 0.01) == 5.0 - # Test with a function that returns None - def none_func(): - """Returns None to simulate a sensor failure.""" - return None + # Test with a function that raises an exception + def error_func(): + """Raises an exception to simulate a sensor failure.""" + raise Exception("Sensor error") - result = beacon.avg_readings(none_func) - assert result is None - mock_logger.warning.assert_called_once() + with pytest.raises(RuntimeError, match="Error retrieving reading from error_func"): + avg_readings(error_func) -def test_avg_readings_varying_values(mock_logger, mock_packet_manager): - """Tests avg_readings with values that vary. - - Args: - mock_logger: Mocked Logger instance. - mock_packet_manager: Mocked PacketManager instance. - """ - beacon = Beacon(mock_logger, "test_beacon", mock_packet_manager, 0.0) - +def test_avg_readings_varying_values(): + """Tests avg_readings with values that vary.""" # Create a simple counter function that returns incrementing values # Starting from 1 and incrementing by 1 each time values = list(range(1, 6)) # [1, 2, 3, 4, 5] @@ -288,8 +283,370 @@ def incrementing_func(): nonlocal read_count value = values[read_count % len(values)] read_count += 1 - return value + return Voltage(value) # Test with a specific number of readings that's a multiple of our pattern length - result = beacon.avg_readings(incrementing_func, num_readings=5) + result = avg_readings(incrementing_func, num_readings=5) assert result == expected_avg + + +@patch("pysquared.nvm.flag.microcontroller") +@patch("pysquared.nvm.counter.microcontroller") +def test_beacon_send_with_imu_acceleration_error( + mock_flag_microcontroller, + mock_counter_microcontroller, + mock_logger, + mock_packet_manager, +): + """Tests sending a beacon when IMU acceleration sensor fails. + + Args: + mock_flag_microcontroller: Mocked microcontroller for Flag. + mock_counter_microcontroller: Mocked microcontroller for Counter. + mock_logger: Mocked Logger instance. + mock_packet_manager: Mocked PacketManager instance. + """ + mock_flag_microcontroller.nvm = setup_datastore + mock_counter_microcontroller.nvm = setup_datastore + + imu = MockIMU() + # Mock the get_acceleration method to raise an exception + imu.get_acceleration = MagicMock( + side_effect=Exception("Acceleration sensor failure") + ) + + beacon = Beacon( + mock_logger, + "test_beacon", + mock_packet_manager, + 0, + imu, + ) + + _ = beacon.send() + + # Verify the error was logged + mock_logger.error.assert_called_with( + "Error retrieving acceleration", + imu.get_acceleration.side_effect, + sensor="MockIMU", + index=0, + ) + + # Verify beacon was still sent (despite the error) + mock_packet_manager.send.assert_called_once() + + +@patch("pysquared.nvm.flag.microcontroller") +@patch("pysquared.nvm.counter.microcontroller") +def test_beacon_send_with_imu_angular_velocity_error( + mock_flag_microcontroller, + mock_counter_microcontroller, + mock_logger, + mock_packet_manager, +): + """Tests sending a beacon when IMU angular_velocityscope sensor fails. + + Args: + mock_flag_microcontroller: Mocked microcontroller for Flag. + mock_counter_microcontroller: Mocked microcontroller for Counter. + mock_logger: Mocked Logger instance. + mock_packet_manager: Mocked PacketManager instance. + """ + mock_flag_microcontroller.nvm = setup_datastore + mock_counter_microcontroller.nvm = setup_datastore + + imu = MockIMU() + # Mock the get_angular_velocity method to raise an exception + imu.get_angular_velocity = MagicMock( + side_effect=Exception("AngularVelocityscope sensor failure") + ) + + beacon = Beacon( + mock_logger, + "test_beacon", + mock_packet_manager, + 0, + imu, + ) + + _ = beacon.send() + + # Verify the error was logged + mock_logger.error.assert_called_with( + "Error retrieving angular velocity", + imu.get_angular_velocity.side_effect, + sensor="MockIMU", + index=0, + ) + + # Verify beacon was still sent (despite the error) + mock_packet_manager.send.assert_called_once() + + +@patch("pysquared.nvm.flag.microcontroller") +@patch("pysquared.nvm.counter.microcontroller") +def test_beacon_send_with_power_monitor_current_error( + mock_flag_microcontroller, + mock_counter_microcontroller, + mock_logger, + mock_packet_manager, +): + """Tests sending a beacon when power monitor current sensor fails. + + Args: + mock_flag_microcontroller: Mocked microcontroller for Flag. + mock_counter_microcontroller: Mocked microcontroller for Counter. + mock_logger: Mocked Logger instance. + mock_packet_manager: Mocked PacketManager instance. + """ + mock_flag_microcontroller.nvm = setup_datastore + mock_counter_microcontroller.nvm = setup_datastore + + power_monitor = MockPowerMonitor() + # Mock the get_current method to raise an exception + power_monitor.get_current = MagicMock( + side_effect=Exception("Current sensor failure") + ) + power_monitor.get_current.__name__ = "get_current" + + beacon = Beacon( + mock_logger, + "test_beacon", + mock_packet_manager, + 0, + power_monitor, + ) + + _ = beacon.send() + + # Verify the error was logged + mock_logger.error.assert_called_with( + "Error retrieving current", + mock_logger.error.call_args[0][1], # The actual RuntimeError that was raised + sensor="MockPowerMonitor", + index=0, + ) + # Verify the exception is a RuntimeError from avg_readings + assert isinstance(mock_logger.error.call_args[0][1], RuntimeError) + assert "Error retrieving reading from get_current" in str( + mock_logger.error.call_args[0][1] + ) + + # Verify beacon was still sent (despite the error) + mock_packet_manager.send.assert_called_once() + + +@patch("pysquared.nvm.flag.microcontroller") +@patch("pysquared.nvm.counter.microcontroller") +def test_beacon_send_with_power_monitor_bus_voltage_error( + mock_flag_microcontroller, + mock_counter_microcontroller, + mock_logger, + mock_packet_manager, +): + """Tests sending a beacon when power monitor bus voltage sensor fails. + + Args: + mock_flag_microcontroller: Mocked microcontroller for Flag. + mock_counter_microcontroller: Mocked microcontroller for Counter. + mock_logger: Mocked Logger instance. + mock_packet_manager: Mocked PacketManager instance. + """ + mock_flag_microcontroller.nvm = setup_datastore + mock_counter_microcontroller.nvm = setup_datastore + + power_monitor = MockPowerMonitor() + # Mock the get_bus_voltage method to raise an exception + power_monitor.get_bus_voltage = MagicMock( + side_effect=Exception("Bus voltage sensor failure") + ) + power_monitor.get_bus_voltage.__name__ = "get_bus_voltage" + + beacon = Beacon( + mock_logger, + "test_beacon", + mock_packet_manager, + 0, + power_monitor, + ) + + _ = beacon.send() + + # Verify the error was logged + mock_logger.error.assert_called_with( + "Error retrieving bus voltage", + mock_logger.error.call_args[0][1], # The actual RuntimeError that was raised + sensor="MockPowerMonitor", + index=0, + ) + # Verify the exception is a RuntimeError from avg_readings + assert isinstance(mock_logger.error.call_args[0][1], RuntimeError) + assert "Error retrieving reading from get_bus_voltage" in str( + mock_logger.error.call_args[0][1] + ) + + # Verify beacon was still sent (despite the error) + mock_packet_manager.send.assert_called_once() + + +@patch("pysquared.nvm.flag.microcontroller") +@patch("pysquared.nvm.counter.microcontroller") +def test_beacon_send_with_power_monitor_shunt_voltage_error( + mock_flag_microcontroller, + mock_counter_microcontroller, + mock_logger, + mock_packet_manager, +): + """Tests sending a beacon when power monitor shunt voltage sensor fails. + + Args: + mock_flag_microcontroller: Mocked microcontroller for Flag. + mock_counter_microcontroller: Mocked microcontroller for Counter. + mock_logger: Mocked Logger instance. + mock_packet_manager: Mocked PacketManager instance. + """ + mock_flag_microcontroller.nvm = setup_datastore + mock_counter_microcontroller.nvm = setup_datastore + + power_monitor = MockPowerMonitor() + # Mock the get_shunt_voltage method to raise an exception + power_monitor.get_shunt_voltage = MagicMock( + side_effect=Exception("Shunt voltage sensor failure") + ) + power_monitor.get_shunt_voltage.__name__ = "get_shunt_voltage" + + beacon = Beacon( + mock_logger, + "test_beacon", + mock_packet_manager, + 0, + power_monitor, + ) + + _ = beacon.send() + + # Verify the error was logged + mock_logger.error.assert_called_with( + "Error retrieving shunt voltage", + mock_logger.error.call_args[0][1], # The actual RuntimeError that was raised + sensor="MockPowerMonitor", + index=0, + ) + # Verify the exception is a RuntimeError from avg_readings + assert isinstance(mock_logger.error.call_args[0][1], RuntimeError) + assert "Error retrieving reading from get_shunt_voltage" in str( + mock_logger.error.call_args[0][1] + ) + + # Verify beacon was still sent (despite the error) + mock_packet_manager.send.assert_called_once() + + +@patch("pysquared.nvm.flag.microcontroller") +@patch("pysquared.nvm.counter.microcontroller") +def test_beacon_send_with_temperature_sensor_error( + mock_flag_microcontroller, + mock_counter_microcontroller, + mock_logger, + mock_packet_manager, +): + """Tests sending a beacon when temperature sensor fails. + + Args: + mock_flag_microcontroller: Mocked microcontroller for Flag. + mock_counter_microcontroller: Mocked microcontroller for Counter. + mock_logger: Mocked Logger instance. + mock_packet_manager: Mocked PacketManager instance. + """ + mock_flag_microcontroller.nvm = setup_datastore + mock_counter_microcontroller.nvm = setup_datastore + + temp_sensor = MockTemperatureSensor() + # Mock the get_temperature method to raise an exception + temp_sensor.get_temperature = MagicMock( + side_effect=Exception("Temperature sensor failure") + ) + + beacon = Beacon( + mock_logger, + "test_beacon", + mock_packet_manager, + 0, + temp_sensor, + ) + + _ = beacon.send() + + # Verify the error was logged + mock_logger.error.assert_called_with( + "Error retrieving temperature", + temp_sensor.get_temperature.side_effect, + sensor="MockTemperatureSensor", + index=0, + ) + + # Verify beacon was still sent (despite the error) + mock_packet_manager.send.assert_called_once() + + +@patch("pysquared.nvm.flag.microcontroller") +@patch("pysquared.nvm.counter.microcontroller") +def test_beacon_send_with_multiple_sensor_errors( + mock_flag_microcontroller, + mock_counter_microcontroller, + mock_logger, + mock_packet_manager, +): + """Tests sending a beacon when multiple sensors fail. + + Args: + mock_flag_microcontroller: Mocked microcontroller for Flag. + mock_counter_microcontroller: Mocked microcontroller for Counter. + mock_logger: Mocked Logger instance. + mock_packet_manager: Mocked PacketManager instance. + """ + mock_flag_microcontroller.nvm = setup_datastore + mock_counter_microcontroller.nvm = setup_datastore + + imu = MockIMU() + power_monitor = MockPowerMonitor() + temp_sensor = MockTemperatureSensor() + + # Mock multiple methods to raise exceptions + imu.get_acceleration = MagicMock( + side_effect=Exception("Acceleration sensor failure") + ) + power_monitor.get_current = MagicMock( + side_effect=Exception("Current sensor failure") + ) + power_monitor.get_current.__name__ = "get_current" + temp_sensor.get_temperature = MagicMock( + side_effect=Exception("Temperature sensor failure") + ) + + beacon = Beacon( + mock_logger, + "test_beacon", + mock_packet_manager, + 0, + imu, + power_monitor, + temp_sensor, + ) + + _ = beacon.send() + + # Verify all errors were logged + assert mock_logger.error.call_count == 3 + + # Check that the correct error messages were logged + calls = mock_logger.error.call_args_list + error_messages = [call[0][0] for call in calls] + + assert "Error retrieving acceleration" in error_messages + assert "Error retrieving current" in error_messages + assert "Error retrieving temperature" in error_messages + + # Verify beacon was still sent (despite the errors) + mock_packet_manager.send.assert_called_once() diff --git a/tests/unit/test_detumble.py b/tests/unit/test_detumble.py deleted file mode 100644 index 2d5bd09e..00000000 --- a/tests/unit/test_detumble.py +++ /dev/null @@ -1,113 +0,0 @@ -"""Unit tests for the detumble module. - -This module contains unit tests for the `detumble` module, which provides -functions for spacecraft detumbling. The tests cover dot product, cross product, -and magnetorquer dipole calculations. -""" - -import pytest - -import pysquared.detumble as detumble - - -def test_dot_product(): - """Tests the dot_product function with positive values.""" - # dot_product is only ever called to give the square of mag_field - mag_field_vector = (30.0, 45.0, 60.0) - result = detumble.dot_product(mag_field_vector, mag_field_vector) - assert result == 6525.0 # 30.0*30.0 + 45.0*45.0 + 60.0*60.0 = 6525.0 - - -def test_dot_product_negatives(): - """Tests the dot_product function with negative vectors.""" - vector1 = (-1, -2, -3) - vector2 = (-4, -5, -6) - result = detumble.dot_product(vector1, vector2) - assert result == 32 # -1*-4 + -2*-5 + -3*-6 - - -def test_dot_product_large_val(): - """Tests the dot_product function with large value vectors.""" - vector1 = (1e6, 1e6, 1e6) - vector2 = (1e6, 1e6, 1e6) - result = detumble.dot_product(vector1, vector2) - assert result == 3e12 # 1e6*1e6 + 1e6*1e6 + 1e6*1e6 = 3e12 - - -def test_dot_product_zero(): - """Tests the dot_product function with zero values.""" - vector = (0.0, 0.0, 0.0) - result = detumble.dot_product(vector, vector) - assert result == 0.0 - - -def test_x_product(): - """Tests the x_product (cross product) function.""" - mag_field_vector = (30.0, 45.0, 60.0) - ang_vel_vector = (0.0, 0.02, 0.015) - expected_result = [-0.525, 0.45, 0.6] - # x_product takes in tuple arguments and returns a list value - actual_result = detumble.x_product( - mag_field_vector, ang_vel_vector - ) # cross product - assert pytest.approx(actual_result[0], 0.001) == expected_result[0] - assert pytest.approx(actual_result[1], 0.001) == expected_result[1] - assert pytest.approx(actual_result[2], 0.001) == expected_result[2] - # due to floating point arithmetic, accept answer within 5 places - - -def test_x_product_negatives(): - """Tests the x_product function with negative values.""" - mag_field_vector = (-30.0, -45.0, -60.0) - ang_vel_vector = (-0.02, -0.02, -0.015) - expected_result = [-0.525, -0.75, -0.3] - actual_result = detumble.x_product(mag_field_vector, ang_vel_vector) - assert pytest.approx(actual_result[0], 0.001) == expected_result[0] - assert pytest.approx(actual_result[1], 0.001) == expected_result[1] - assert pytest.approx(actual_result[2], 0.001) == expected_result[2] - - -def test_x_product_large_val(): - """Tests the x_product function with large values.""" - mag_field_vector = (1e6, 1e6, 1e6) - ang_vel_vector = (1e6, 1e6, 1e6) # cross product of parallel vector equals 0 - result = detumble.x_product(mag_field_vector, ang_vel_vector) - assert result == [0.0, 0.0, 0.0] - - -def test_x_product_zero(): - """Tests the x_product function with zero values.""" - mag_field_vector = (0.0, 0.0, 0.0) - ang_vel_vector = (0.0, 0.02, 0.015) - result = detumble.x_product(mag_field_vector, ang_vel_vector) - assert result == [0.0, 0.0, 0.0] - - -# Bigger context: magnetorquer_dipole() is called by do_detumble() in (FC board) functions.py & (Batt Board) battery_functions.py -# mag_field: mag. field strength at x, y, & z axis (tuple) (magnetometer reading) -# ang_vel: ang. vel. at x, y, z axis (tuple) (gyroscope reading) -def test_magnetorquer_dipole(): - """Tests the magnetorquer_dipole function with valid inputs.""" - mag_field = (30.0, -45.0, 60.0) - ang_vel = (0.0, 0.02, 0.015) - expected_result = [0.023211, -0.00557, -0.007426] - actual_result = detumble.magnetorquer_dipole(mag_field, ang_vel) - assert pytest.approx(actual_result[0], 0.001) == expected_result[0] - assert pytest.approx(actual_result[1], 0.001) == expected_result[1] - assert pytest.approx(actual_result[2], 0.001) == expected_result[2] - - -def test_magnetorquer_dipole_zero_mag_field(): - """Tests magnetorquer_dipole with a zero magnetic field, expecting ZeroDivisionError.""" - mag_field = (0.0, 0.0, 0.0) - ang_vel = (0.0, 0.02, 0.015) - with pytest.raises(ZeroDivisionError): - detumble.magnetorquer_dipole(mag_field, ang_vel) - - -def test_magnetorquer_dipole_zero_ang_vel(): - """Tests magnetorquer_dipole with zero angular velocity.""" - mag_field = (30.0, -45.0, 60.0) - ang_vel = (0.0, 0.0, 0.0) - result = detumble.magnetorquer_dipole(mag_field, ang_vel) - assert result == [0.0, 0.0, 0.0] diff --git a/tests/unit/test_power_health.py b/tests/unit/test_power_health.py index ae60d307..18487661 100644 --- a/tests/unit/test_power_health.py +++ b/tests/unit/test_power_health.py @@ -14,6 +14,8 @@ from pysquared.logger import Logger from pysquared.power_health import CRITICAL, DEGRADED, NOMINAL, UNKNOWN, PowerHealth from pysquared.protos.power_monitor import PowerMonitorProto +from pysquared.sensor_reading.current import Current +from pysquared.sensor_reading.voltage import Voltage @pytest.fixture @@ -36,7 +38,11 @@ def mock_config(): @pytest.fixture def mock_power_monitor(): """Mocks the PowerMonitorProto class.""" - return MagicMock(spec=PowerMonitorProto) + monitor = MagicMock(spec=PowerMonitorProto) + # Default mock return values as sensor reading objects + monitor.get_bus_voltage.return_value = Voltage(7.2) + monitor.get_current.return_value = Current(100.0) + return monitor @pytest.fixture @@ -72,13 +78,17 @@ def test_get_nominal_state(power_health): power_health: PowerHealth instance for testing. """ # Mock normal readings - power_health._power_monitor.get_bus_voltage.return_value = 7.2 # Normal voltage - power_health._power_monitor.get_current.return_value = 100.0 # Normal current + power_health._power_monitor.get_bus_voltage.return_value = Voltage( + 7.2 + ) # Normal voltage + power_health._power_monitor.get_current.return_value = Current( + 100.0 + ) # Normal current result = power_health.get() assert isinstance(result, NOMINAL) - power_health.logger.info.assert_called_with("Power health is NOMINAL") + power_health.logger.debug.assert_called_with("Power health is NOMINAL") def test_get_critical_state_low_voltage(power_health): @@ -88,17 +98,19 @@ def test_get_critical_state_low_voltage(power_health): power_health: PowerHealth instance for testing. """ # Mock critical voltage reading - power_health._power_monitor.get_bus_voltage.return_value = ( - 5.8 # Below critical (6.0) - ) - power_health._power_monitor.get_current.return_value = 100.0 + power_health._power_monitor.get_bus_voltage.return_value = Voltage( + 5.8 + ) # Below critical (6.0) + power_health._power_monitor.get_current.return_value = Current(100.0) result = power_health.get() assert isinstance(result, CRITICAL) - power_health.logger.warning.assert_called_with( - "CRITICAL: Battery voltage 5.8V is at or below critical threshold 6.0V" - ) + # Use any_order=True to handle call details, and check the values with pytest.approx for floating point precision + call_args = power_health.logger.warning.call_args + assert call_args[0] == ("Power is CRITICAL",) + assert call_args[1]["voltage"] == pytest.approx(5.8, rel=1e-6) + assert call_args[1]["threshold"] == 6.0 def test_get_critical_state_exactly_critical_voltage(power_health): @@ -108,14 +120,18 @@ def test_get_critical_state_exactly_critical_voltage(power_health): power_health: PowerHealth instance for testing. """ # Mock exactly critical voltage reading - power_health._power_monitor.get_bus_voltage.return_value = 6.0 # Exactly critical - power_health._power_monitor.get_current.return_value = 100.0 + power_health._power_monitor.get_bus_voltage.return_value = Voltage( + 6.0 + ) # Exactly critical + power_health._power_monitor.get_current.return_value = Current(100.0) result = power_health.get() assert isinstance(result, CRITICAL) power_health.logger.warning.assert_called_with( - "CRITICAL: Battery voltage 6.0V is at or below critical threshold 6.0V" + "Power is CRITICAL", + voltage=6.0, + threshold=6.0, ) @@ -126,17 +142,20 @@ def test_get_degraded_state_current_deviation(power_health): power_health: PowerHealth instance for testing. """ # Mock readings with current deviation - power_health._power_monitor.get_bus_voltage.return_value = 7.2 # Normal voltage - power_health._power_monitor.get_current.return_value = ( - 250.0 # Way above normal (100.0) - ) + power_health._power_monitor.get_bus_voltage.return_value = Voltage( + 7.2 + ) # Normal voltage + power_health._power_monitor.get_current.return_value = Current( + 250.0 + ) # Way above normal (100.0) result = power_health.get() assert isinstance(result, DEGRADED) - power_health.logger.info.assert_called_with( - "Power health is NOMINAL with minor deviations", - errors=["Current reading 250.0 is outside of normal range 100.0"], + power_health.logger.warning.assert_called_with( + "Power is DEGRADED: Current above threshold", + current=250.0, + threshold=100.0, ) @@ -146,18 +165,21 @@ def test_get_degraded_state_voltage_deviation(power_health): Args: power_health: PowerHealth instance for testing. """ - power_health._power_monitor.get_bus_voltage.return_value = ( - 6.8 # Below degraded threshold (7.0) but above critical (6.0) - ) - power_health._power_monitor.get_current.return_value = 100.0 # Normal current + power_health._power_monitor.get_bus_voltage.return_value = Voltage( + 6.8 + ) # Below degraded threshold (7.0) but above critical (6.0) + power_health._power_monitor.get_current.return_value = Current( + 100.0 + ) # Normal current result = power_health.get() assert isinstance(result, DEGRADED) - power_health.logger.info.assert_called_with( - "Power health is NOMINAL with minor deviations", - errors=["Bus voltage reading 6.8V is at or below degraded threshold 7.0V"], - ) + # Use pytest.approx for floating point precision + call_args = power_health.logger.warning.call_args + assert call_args[0] == ("Power is DEGRADED: Bus voltage below threshold",) + assert call_args[1]["voltage"] == pytest.approx(6.8, rel=1e-6) + assert call_args[1]["threshold"] == 7.0 def test_get_nominal_with_minor_voltage_deviation(power_health): @@ -166,114 +188,17 @@ def test_get_nominal_with_minor_voltage_deviation(power_health): Args: power_health: PowerHealth instance for testing. """ - power_health._power_monitor.get_bus_voltage.return_value = ( - 7.1 # Above degraded threshold (7.0) - ) - power_health._power_monitor.get_current.return_value = 100.0 # Normal current + power_health._power_monitor.get_bus_voltage.return_value = Voltage( + 7.1 + ) # Above degraded threshold (7.0) + power_health._power_monitor.get_current.return_value = Current( + 100.0 + ) # Normal current result = power_health.get() assert isinstance(result, NOMINAL) - power_health.logger.info.assert_called_with("Power health is NOMINAL") - - -def test_avg_reading_normal_operation(power_health): - """Tests _avg_reading() with normal sensor readings. - - Args: - power_health: PowerHealth instance for testing. - """ - mock_func = MagicMock(return_value=7.5) - - result = power_health._avg_reading(mock_func, num_readings=10) - - assert result == 7.5 - assert mock_func.call_count == 10 - - -def test_avg_reading_with_none_values(power_health): - """Tests _avg_reading() when sensor returns None. - - Args: - power_health: PowerHealth instance for testing. - """ - mock_func = MagicMock(return_value=None) - mock_func.__name__ = "test_sensor_function" - - result = power_health._avg_reading(mock_func, num_readings=5) - - assert result is None - assert mock_func.call_count == 1 - power_health.logger.warning.assert_called() - - -def test_avg_reading_with_varying_values(power_health): - """Tests _avg_reading() with varying sensor readings. - - Args: - power_health: PowerHealth instance for testing. - """ - mock_func = MagicMock(side_effect=[7.0, 7.2, 7.4, 7.6, 7.8]) - - result = power_health._avg_reading(mock_func, num_readings=5) - - expected_avg = (7.0 + 7.2 + 7.4 + 7.6 + 7.8) / 5 - assert result == pytest.approx(expected_avg, rel=1e-6) - assert mock_func.call_count == 5 - - -def test_avg_reading_default_num_readings(power_health): - """Tests _avg_reading() uses default of 50 readings. - - Args: - power_health: PowerHealth instance for testing. - """ - mock_func = MagicMock(return_value=7.0) - - result = power_health._avg_reading(mock_func) - - assert result == 7.0 - assert mock_func.call_count == 50 - - -def test_get_with_none_voltage_reading(power_health): - """Tests get() when voltage reading returns None. - - Args: - power_health: PowerHealth instance for testing. - """ - power_health._power_monitor.get_bus_voltage.return_value = None - power_health._power_monitor.get_current.return_value = 100.0 - - # Mock _avg_reading to return None for voltage - power_health._avg_reading = MagicMock(side_effect=[None, 100.0]) - - result = power_health.get() - - assert isinstance(result, UNKNOWN) - power_health.logger.warning.assert_called_with( - "Power monitor failed to provide bus voltage reading" - ) - - -def test_get_with_none_current_reading(power_health): - """Tests get() when current reading returns None. - - Args: - power_health: PowerHealth instance for testing. - """ - power_health._power_monitor.get_bus_voltage.return_value = 7.2 - power_health._power_monitor.get_current.return_value = None - - # Mock _avg_reading to return None for current - power_health._avg_reading = MagicMock(side_effect=[7.2, None]) - - result = power_health.get() - - assert isinstance(result, UNKNOWN) - power_health.logger.warning.assert_called_with( - "Power monitor failed to provide current reading" - ) + power_health.logger.debug.assert_called_with("Power health is NOMINAL") def test_get_with_exception_during_voltage_reading(power_health): @@ -282,17 +207,19 @@ def test_get_with_exception_during_voltage_reading(power_health): Args: power_health: PowerHealth instance for testing. """ - # Mock _avg_reading to raise an exception on first call (voltage) + # Mock the sensor method to raise an exception test_exception = RuntimeError("Sensor communication error") - power_health._avg_reading = MagicMock(side_effect=test_exception) + power_health._power_monitor.get_bus_voltage.side_effect = test_exception result = power_health.get() assert isinstance(result, UNKNOWN) - # Check that error was called with the correct message and exception as positional parameter - power_health.logger.error.assert_called_once_with( - "Exception occurred while reading from power monitor", test_exception - ) + # The error is now a RuntimeError from avg_readings about func.__name__ + # Check that error was called with error message and some exception + power_health.logger.error.assert_called_once() + call_args = power_health.logger.error.call_args + assert call_args[0][0] == "Error retrieving bus voltage" + assert isinstance(call_args[0][1], Exception) # Some exception was passed def test_get_with_exception_during_current_reading(power_health): @@ -301,17 +228,19 @@ def test_get_with_exception_during_current_reading(power_health): Args: power_health: PowerHealth instance for testing. """ - # Mock _avg_reading to return normal voltage, then raise exception for current + # Mock voltage to work normally but current to raise exception + power_health._power_monitor.get_bus_voltage.return_value = Voltage(7.2) test_exception = RuntimeError("Current sensor failed") - power_health._avg_reading = MagicMock(side_effect=[7.2, test_exception]) + power_health._power_monitor.get_current.side_effect = test_exception result = power_health.get() assert isinstance(result, UNKNOWN) - # Check that error was called with the correct message and exception as positional parameter - power_health.logger.error.assert_called_once_with( - "Exception occurred while reading from power monitor", test_exception - ) + # Check that error was called with error message and some exception + power_health.logger.error.assert_called_once() + call_args = power_health.logger.error.call_args + assert call_args[0][0] == "Error retrieving current" + assert isinstance(call_args[0][1], Exception) # Some exception was passed def test_get_with_sensor_method_exception(power_health): @@ -327,26 +256,11 @@ def test_get_with_sensor_method_exception(power_health): result = power_health.get() assert isinstance(result, UNKNOWN) - # Check that error was called with the correct message and exception as positional parameter - power_health.logger.error.assert_called_once_with( - "Exception occurred while reading from power monitor", test_exception - ) - - -def test_get_logs_sensor_debug_info(power_health): - """Tests that get() logs debug information about the sensor. - - Args: - power_health: PowerHealth instance for testing. - """ - power_health._power_monitor.get_bus_voltage.return_value = 7.2 - power_health._power_monitor.get_current.return_value = 100.0 - - power_health.get() - - power_health.logger.debug.assert_called_with( - "Power monitor: ", sensor=power_health._power_monitor - ) + # Check that error was called with error message and some exception + power_health.logger.error.assert_called_once() + call_args = power_health.logger.error.call_args + assert call_args[0][0] == "Error retrieving bus voltage" + assert isinstance(call_args[0][1], Exception) # Some exception was passed def test_degraded_vs_critical_voltage_boundaries(power_health): @@ -356,17 +270,18 @@ def test_degraded_vs_critical_voltage_boundaries(power_health): power_health: PowerHealth instance for testing. """ # Test voltage just above critical but below degraded - power_health._power_monitor.get_bus_voltage.return_value = ( - 6.5 # Above critical (6.0) but below degraded (7.0) - ) - power_health._power_monitor.get_current.return_value = 100.0 + power_health._power_monitor.get_bus_voltage.return_value = Voltage( + 6.5 + ) # Above critical (6.0) but below degraded (7.0) + power_health._power_monitor.get_current.return_value = Current(100.0) result = power_health.get() assert isinstance(result, DEGRADED) - power_health.logger.info.assert_called_with( - "Power health is NOMINAL with minor deviations", - errors=["Bus voltage reading 6.5V is at or below degraded threshold 7.0V"], + power_health.logger.warning.assert_called_with( + "Power is DEGRADED: Bus voltage below threshold", + voltage=6.5, + threshold=7.0, ) @@ -377,17 +292,18 @@ def test_current_deviation_threshold(power_health): power_health: PowerHealth instance for testing. """ # normal_charge_current = 100.0, so deviation = 150 > 100 should trigger error - power_health._power_monitor.get_bus_voltage.return_value = 7.2 - power_health._power_monitor.get_current.return_value = ( - 250.0 # deviation = 150 > 100 - ) + power_health._power_monitor.get_bus_voltage.return_value = Voltage(7.2) + power_health._power_monitor.get_current.return_value = Current( + 250.0 + ) # deviation = 150 > 100 result = power_health.get() assert isinstance(result, DEGRADED) - power_health.logger.info.assert_called_with( - "Power health is NOMINAL with minor deviations", - errors=["Current reading 250.0 is outside of normal range 100.0"], + power_health.logger.warning.assert_called_with( + "Power is DEGRADED: Current above threshold", + current=250.0, + threshold=100.0, ) @@ -397,17 +313,20 @@ def test_degraded_battery_voltage_threshold(power_health): Args: power_health: PowerHealth instance for testing. """ - power_health._power_monitor.get_bus_voltage.return_value = ( - 7.0 # Exactly at degraded threshold - ) - power_health._power_monitor.get_current.return_value = 100.0 # Normal current + power_health._power_monitor.get_bus_voltage.return_value = Voltage( + 7.0 + ) # Exactly at degraded threshold + power_health._power_monitor.get_current.return_value = Current( + 100.0 + ) # Normal current result = power_health.get() assert isinstance(result, DEGRADED) - power_health.logger.info.assert_called_with( - "Power health is NOMINAL with minor deviations", - errors=["Bus voltage reading 7.0V is at or below degraded threshold 7.0V"], + power_health.logger.warning.assert_called_with( + "Power is DEGRADED: Bus voltage below threshold", + voltage=7.0, + threshold=7.0, ) @@ -417,12 +336,14 @@ def test_voltage_just_above_degraded_threshold(power_health): Args: power_health: PowerHealth instance for testing. """ - power_health._power_monitor.get_bus_voltage.return_value = ( - 7.01 # Just above degraded threshold (7.0) - ) - power_health._power_monitor.get_current.return_value = 100.0 # Normal current + power_health._power_monitor.get_bus_voltage.return_value = Voltage( + 7.01 + ) # Just above degraded threshold (7.0) + power_health._power_monitor.get_current.return_value = Current( + 100.0 + ) # Normal current result = power_health.get() assert isinstance(result, NOMINAL) - power_health.logger.info.assert_called_with("Power health is NOMINAL") + power_health.logger.debug.assert_called_with("Power health is NOMINAL") diff --git a/uv.lock b/uv.lock index 63453b26..b08576f3 100644 --- a/uv.lock +++ b/uv.lock @@ -270,6 +270,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/22/74/07679c5b9f98a7cb0fc147b1ef1cc1853bc07a4eb9cb5731e24732c5f773/asyncio-3.4.3-py3-none-any.whl", hash = "sha256:c4d18b22701821de07bd6aea8b53d21449ec0ec5680645e5317062ea21817d2d", size = 101767, upload-time = "2015-03-10T14:05:10.959Z" }, ] +[[package]] +name = "attrs" +version = "25.3.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/5a/b0/1367933a8532ee6ff8d63537de4f1177af4bff9f3e829baf7331f595bb24/attrs-25.3.0.tar.gz", hash = "sha256:75d7cefc7fb576747b2c81b4442d4d4a1ce0900973527c011d1030fd3bf4af1b", size = 812032, upload-time = "2025-03-13T11:10:22.779Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/77/06/bb80f5f86020c4551da315d78b3ab75e8228f89f0162f2c3a819e407941a/attrs-25.3.0-py3-none-any.whl", hash = "sha256:427318ce031701fea540783410126f03899a97ffc6f61596ad581ac2e40e3bc3", size = 63815, upload-time = "2025-03-13T11:10:21.14Z" }, +] + [[package]] name = "babel" version = "2.17.0" @@ -511,6 +520,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/be/31/a76f4bfa885f93b8167cb4c85cf32b54d1f64384d0b897d45bc6d19b7b45/htmlmin2-0.1.13-py3-none-any.whl", hash = "sha256:75609f2a42e64f7ce57dbff28a39890363bde9e7e5885db633317efbdf8c79a2", size = 34486, upload-time = "2023-03-14T21:28:30.388Z" }, ] +[[package]] +name = "hypothesis" +version = "6.136.7" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "attrs" }, + { name = "sortedcontainers" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/2e/19/92369e1d5f2ead5f3f491d5453f28a04be82133886d3a7b46ebdcf7e109d/hypothesis-6.136.7.tar.gz", hash = "sha256:864c5cf7779adc58871ee51595bc724c496047c5bc45229e0baa950b103a73ea", size = 458037, upload-time = "2025-08-01T22:14:23.057Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/88/59/32813aa1a2715d7c531f87b46a04ba23d05d2bd4f7e21ecda8135100f8ce/hypothesis-6.136.7-py3-none-any.whl", hash = "sha256:12a8b76a5c453f8478d6bb4b5450824856af571c5e7821604a15aeaafa9beefd", size = 524917, upload-time = "2025-08-01T22:14:18.845Z" }, +] + [[package]] name = "identify" version = "2.6.6" @@ -1013,6 +1035,7 @@ dev = [ { name = "circuitpython-stubs" }, { name = "coverage" }, { name = "freezegun" }, + { name = "hypothesis" }, { name = "pre-commit" }, { name = "pyright", extra = ["nodejs"] }, { name = "pytest" }, @@ -1051,6 +1074,7 @@ dev = [ { name = "circuitpython-stubs", specifier = "==9.2.8" }, { name = "coverage", specifier = "==7.9.1" }, { name = "freezegun", specifier = ">=1.5.2" }, + { name = "hypothesis", specifier = "==6.136.7" }, { name = "pre-commit", specifier = "==4.2.0" }, { name = "pyright", extras = ["nodejs"], specifier = "==1.1.402" }, { name = "pytest", specifier = "==8.4.1" }, @@ -1172,6 +1196,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/04/be/d09147ad1ec7934636ad912901c5fd7667e1c858e19d355237db0d0cd5e4/smmap-5.0.2-py3-none-any.whl", hash = "sha256:b30115f0def7d7531d22a0fb6502488d879e75b260a9db4d0819cfb25403af5e", size = 24303, upload-time = "2025-01-02T07:14:38.724Z" }, ] +[[package]] +name = "sortedcontainers" +version = "2.4.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e8/c4/ba2f8066cceb6f23394729afe52f3bf7adec04bf9ed2c820b39e19299111/sortedcontainers-2.4.0.tar.gz", hash = "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88", size = 30594, upload-time = "2021-05-16T22:03:42.897Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/32/46/9cb0e58b2deb7f82b84065f37f3bffeb12413f947f9388e4cac22c4621ce/sortedcontainers-2.4.0-py2.py3-none-any.whl", hash = "sha256:a163dcaede0f1c021485e957a39245190e74249897e2ae4b2aa38595db237ee0", size = 29575, upload-time = "2021-05-16T22:03:41.177Z" }, +] + [[package]] name = "soupsieve" version = "2.7"