Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
abe8f9c
Initial plan
Copilot Oct 6, 2025
cd2a044
Implement HMAC command authentication with replay attack prevention
Copilot Oct 6, 2025
9d074fd
Fix typecheck errors: convert counter to int and use string for HMAC …
Copilot Oct 6, 2025
51b13f8
Merge branch 'main' into copilot/fix-7bfb9c49-466e-42db-a6be-dece956d…
Mikefly123 Oct 6, 2025
b4b7be8
Remove password fallback, add NVM counter storage, implement 16-bit c…
Copilot Oct 7, 2025
3095769
Add comprehensive HMAC integration tests between ground station and f…
Copilot Oct 7, 2025
3aae5f2
Appease Linter
Mikefly123 Oct 7, 2025
32e3f17
Add counter prompt, external HMAC library, and large message integrat…
Copilot Oct 7, 2025
d0a665e
Using Circuitpython Hmac library
ineskhou Oct 7, 2025
6dedc2f
Fix HMAC library imports to work with both CircuitPython and CPython …
Copilot Oct 7, 2025
26db833
Update hmac_auth.py
ineskhou Oct 8, 2025
91d72ed
Last COmmand Counter Default 1
ineskhou Oct 8, 2025
0497842
Update hmac_auth.py
ineskhou Oct 8, 2025
6f5638b
Added compare_digest bc circuitpython and also bc timerattack
ineskhou Oct 8, 2025
7b22d8c
Fix debugger
nateinaction Oct 9, 2025
9547082
show mock and lint
nateinaction Oct 9, 2025
183f6b1
Fix large_message test
nateinaction Oct 9, 2025
a69219d
Merge branch 'copilot/fix-7bfb9c49-466e-42db-a6be-dece956d6d8c' of gi…
ineskhou Oct 9, 2025
99694a9
coorected more tests (thx nate)
ineskhou Oct 9, 2025
76e657d
finished correcting tests for hmac library
ineskhou Oct 10, 2025
afd00a0
added functions for send counter (cdh) and compare sigest
ineskhou Oct 10, 2025
c2ccbac
fixed typecheck
ineskhou Oct 10, 2025
ba658ed
fixed typechek and tests bc of typecheck AHHH
ineskhou Oct 10, 2025
c15c753
remove prints
ineskhou Oct 10, 2025
828acc3
added ground station command counter
ineskhou Oct 10, 2025
7904625
fixed typecheck
ineskhou Oct 10, 2025
f0f398c
put launch back to b4
ineskhou Oct 10, 2025
6f823df
added joke configuation python
ineskhou Oct 10, 2025
7440fa1
updated counter
ineskhou Oct 10, 2025
a9fe3d1
correct format
ineskhou Oct 10, 2025
49d44c9
aknoagement
ineskhou Oct 10, 2025
46bf643
process counter better
ineskhou Oct 10, 2025
67fb3d2
finally udpated the tests
ineskhou Oct 11, 2025
9090797
fixed a test bc converting to string, added more debug messages
ineskhou Oct 14, 2025
ee24f3c
update loge
ineskhou Oct 14, 2025
56d8872
debugging listening
ineskhou Oct 14, 2025
9af4e74
appease linter
ineskhou Oct 14, 2025
3625263
truing to figure ouw where in authen it silcently failts
ineskhou Oct 14, 2025
5f16d89
truing to figur vighfg
ineskhou Oct 14, 2025
28b4b35
removing bytes just string for comparing
ineskhou Oct 14, 2025
d94f87b
seeing how the generation hapens
ineskhou Oct 14, 2025
d0991c5
linkt
ineskhou Oct 15, 2025
48a6d46
help debug
ineskhou Oct 18, 2025
1ad1770
decoding not
ineskhou Oct 29, 2025
5d70b55
new print without string
ineskhou Oct 29, 2025
7eed3e8
updated packet manager
ineskhou Oct 29, 2025
fc841bd
remove the eating of the responses
ineskhou Oct 29, 2025
d32bb27
fixing value rrror
ineskhou Oct 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions circuitpython-workspaces/flight-software/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies = [
"adafruit-circuitpython-ticks==1.1.1",
"adafruit-circuitpython-veml7700==2.1.4",
"adafruit-circuitpython-hashlib==1.4.19",
"circuitpython-hmac @ git+https://github.com/jimbobbennett/CircuitPython_HMAC.git",
"proves-circuitpython-sx126 @ git+https://github.com/proveskit/micropySX126X@1.0.0",
"proves-circuitpython-sx1280 @ git+https://github.com/proveskit/CircuitPython_SX1280@1.0.4",
]
Expand Down
157 changes: 146 additions & 11 deletions circuitpython-workspaces/flight-software/src/pysquared/cdh.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@
import traceback

import microcontroller
from circuitpython_hmac import HMAC

from .config.config import Config
from .config.jokes_config import JokesConfig
from .hardware.radio.packetizer.packet_manager import PacketManager
from .hmac_auth import HMACAuthenticator
from .logger import Logger
from .nvm.counter import Counter16

try:
from typing import Callable, Optional
except Exception:
pass


class CommandDataHandler:
Expand All @@ -33,6 +42,7 @@ class CommandDataHandler:
command_reset: str = "reset"
command_change_radio_modulation: str = "change_radio_modulation"
command_send_joke: str = "send_joke"
command_get_counter: str = "get_counter"

oscar_password: str = "Hello World!" # Default password for OSCAR commands

Expand All @@ -41,20 +51,32 @@ def __init__(
logger: Logger,
config: Config,
packet_manager: PacketManager,
jokes_config: JokesConfig,
last_command_counter: Optional[Counter16] = None,
send_delay: float = 0.2,
hmac_class: Callable = HMAC,
) -> None:
"""Initializes the CommandDataHandler.

Args:
logger: The logger to use.
config: The configuration to use.
packet_manager: The packet manager to use for sending and receiving data.
last_command_counter: NVM counter tracking the last valid command counter (16-bit).
send_delay: The delay between sending an acknowledgement and the response.
"""
self._log: Logger = logger
self._config: Config = config
self._jokes_config: JokesConfig = jokes_config
self._packet_manager: PacketManager = packet_manager
self._send_delay: float = send_delay
self._hmac_authenticator: HMACAuthenticator = HMACAuthenticator(
config.hmac_secret, hmac_class=hmac_class
)
if last_command_counter is not None:
self._last_command_counter: Counter16 = last_command_counter
else:
self._last_command_counter = Counter16(0)

def listen_for_commands(self, timeout: int) -> None:
"""Listens for commands from the radio and handles them.
Expand All @@ -66,14 +88,17 @@ def listen_for_commands(self, timeout: int) -> None:

json_bytes = self._packet_manager.listen(timeout)
if json_bytes is None:
self._log.debug("Nothing Found in the packet")
return
self._log.debug("Found stuff in the packet")

try:
json_str = json_bytes.decode("utf-8")
self._log.debug("json string in the message is", stingis=json_str)

msg: dict[str, str] = json.loads(json_str)

# Check for OSCAR password first
# Check for OSCAR password first (legacy authentication)
if msg.get("password") == self.oscar_password:
self._log.debug("OSCAR command received", msg=msg)
cmd = msg.get("command")
Expand All @@ -89,37 +114,126 @@ def listen_for_commands(self, timeout: int) -> None:
if isinstance(raw_args, list):
args: list[str] = raw_args

# Delay to give the ground station time to switch to listening mode
time.sleep(self._send_delay)
self._packet_manager.send_acknowledgement()

self.oscar_command(cmd, args)
return

# If message has password field, check it
if msg.get("password") != self._config.super_secret_code:
# If message has command field, get the command
cmd = msg.get("command")

if cmd is not None and cmd == self.command_get_counter:
self.send_counter()
return

# HMAC-based authentication (required for non-OSCAR commands)
hmac_value = str(msg.get("hmac"))
counter_raw = msg.get("counter")

self._log.debug(
"Current command, counter and hash",
command=cmd,
counter=counter_raw,
hasht=hmac_value,
)

# Require HMAC authentication
if hmac_value is None or counter_raw is None:
self._log.debug(
"Invalid password in message",
"Missing HMAC or counter in message",
msg=msg,
)
return

# Use HMAC authentication
# Convert counter to int
self._log.debug("The counter is", counter=counter_raw)

try:
counter: int = int(counter_raw)
except (ValueError, TypeError):
self._log.debug(
"Invalid counter in message",
counter=counter_raw,
)
return

# Validate counter is within 16-bit range
if counter < 0 or counter > 0xFFFF:
self._log.debug(
"Counter out of range",
counter=counter,
)
return

self._log.debug("counter validated")

# Extract message without HMAC for verification
msg_without_hmac = {k: v for k, v in msg.items() if k != "hmac"}
message_str = json.dumps(msg_without_hmac, separators=(",", ":"))

self._log.debug("messagestring is", msrg=message_str)
self._log.debug(
"hmac valid details", hmac=hmac_value, typeis=type(hmac_value)
)

# Verify HMAC
if not self._hmac_authenticator.verify_hmac(
message_str, counter, hmac_value
):
self._log.debug(
"Invalid HMAC in message",
msg=msg,
)
return
print("OUT")
self._log.debug("passed the authenticate compeint")
# Prevent replay attacks with wraparound handling
last_valid = self._last_command_counter.get()
self._log.debug("last valid is", lv=last_valid)

# Check if counter is valid considering 16-bit wraparound
# Accept if counter is greater, or if wraparound occurred
# (counter is much smaller, indicating it wrapped around)
counter_diff = (counter - last_valid) & 0xFFFF

# Valid if counter is within forward window (1 to 32768)
# This allows for wraparound while preventing replay attacks
if counter_diff == 0 or counter_diff > 0x8000:
self._log.debug(
"Replay attack detected - invalid counter",
counter=counter,
last_valid=last_valid,
diff=counter_diff,
)
return

# Update last valid counter in NVM
self._last_command_counter.set(counter)

self._log.debug(
"Comparing names",
name1=msg.get("name"),
nameconfig=self._config.cubesat_name,
)

# Verify satellite name
if msg.get("name") != self._config.cubesat_name:
self._log.debug(
"Satellite name mismatch in message",
msg=msg,
)
return

# If message has command field, execute the command
cmd = msg.get("command")
self._log.debug("Names are the same")

if cmd is None:
self._log.warning("No command found in message", msg=msg)
self._packet_manager.send(
f"No command found in message: {msg}".encode("utf-8")
)
return

self._log.debug("COmmand is not none")

args: list[str] = []
raw_args = msg.get("args")
if isinstance(raw_args, list):
Expand All @@ -131,6 +245,8 @@ def listen_for_commands(self, timeout: int) -> None:
time.sleep(self._send_delay)
self._packet_manager.send_acknowledgement()

self._log.debug("Sent Acknowledgement", cmd=cmd, args=args)

if cmd == self.command_reset:
self.reset()
elif cmd == self.command_change_radio_modulation:
Expand All @@ -154,10 +270,23 @@ def listen_for_commands(self, timeout: int) -> None:

def send_joke(self) -> None:
"""Sends a random joke from the config."""
joke = random.choice(self._config.jokes)
joke = random.choice(self._jokes_config.jokes)
self._log.info("Sending joke", joke=joke)
self._packet_manager.send(joke.encode("utf-8"))

def send_counter(self):
"""Send the counter down so the ground station knows how to authenticate"""

time.sleep(self._send_delay)
self._packet_manager.send_acknowledgement()

# Additional delay to ensure ACK packet transmission completes before sending counter
time.sleep(self._send_delay)

counter = str(self._last_command_counter.get())
self._log.info("Sending Counter", counter=counter)
self._packet_manager.send(counter.encode("utf-8"))

def change_radio_modulation(self, args: list[str]) -> None:
"""Changes the radio modulation.

Expand Down Expand Up @@ -191,6 +320,8 @@ def change_radio_modulation(self, args: list[str]) -> None:

def reset(self) -> None:
"""Resets the hardware."""
# Delay to give the ground station time to switch to listening mode

self._log.info("Resetting satellite")
self._packet_manager.send(data="Resetting satellite".encode("utf-8"))
microcontroller.on_next_reset(microcontroller.RunMode.NORMAL)
Expand All @@ -203,6 +334,10 @@ def oscar_command(self, command: str, args: list[str]) -> None:
command: The OSCAR command to execute.
args: A list of arguments for the command.
"""
time.sleep(self._send_delay)

self._packet_manager.send_acknowledgement()

if command == "ping":
self._log.info("OSCAR ping command received. Sending pong response.")
self._packet_manager.send(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ class Config:
critical_battery_voltage (float): Critical battery voltage.
reboot_time (int): Time before reboot in seconds.
turbo_clock (bool): Turbo clock enabled flag.
super_secret_code (str): Secret code for special operations.
super_secret_code (str): Secret code for special operations (deprecated).
repeat_code (str): Code for repeated operations.
hmac_secret (str): Shared secret for HMAC command authentication.
longest_allowable_sleep_time (int): Maximum allowable sleep time.
CONFIG_SCHEMA (dict): Validation schema for configuration keys.

Expand Down Expand Up @@ -114,6 +115,7 @@ def __init__(self, config_path: str) -> None:
self.turbo_clock: bool = json_data["turbo_clock"]
self.super_secret_code: str = json_data["super_secret_code"]
self.repeat_code: str = json_data["repeat_code"]
self.hmac_secret: str = json_data.get("hmac_secret", "default_hmac_secret")
self.longest_allowable_sleep_time: int = json_data[
"longest_allowable_sleep_time"
]
Expand All @@ -122,6 +124,7 @@ def __init__(self, config_path: str) -> None:
"cubesat_name": {"type": str, "min_length": 1, "max_length": 10},
"super_secret_code": {"type": bytes, "min": 1, "max": 24},
"repeat_code": {"type": bytes, "min": 1, "max": 4},
"hmac_secret": {"type": bytes, "min": 16, "max": 64},
"normal_charge_current": {"type": float, "min": 0.0, "max": 2000.0},
"normal_battery_voltage": {"type": float, "min": 6.0, "max": 8.4},
"degraded_battery_voltage": {"type": float, "min": 5.4, "max": 8.0},
Expand Down
Loading
Loading