diff --git a/.ansible-lint b/.ansible-lint index 6019eab..09c8998 100644 --- a/.ansible-lint +++ b/.ansible-lint @@ -1,3 +1,33 @@ -exclude_paths: - - ansible/molecule +# Hardened .ansible-lint configuration for pi-log +# Designed for long-term stability and Molecule v4 compatibility +--- +# ----------------------------------------- +# Project structure +# ----------------------------------------- +# All Ansible content lives under ./ansible project_dir: ansible + +# ----------------------------------------- +# Exclusions +# ----------------------------------------- +# Molecule scenarios should NOT be linted. +# They run their own syntax-check and do not represent production playbooks. +exclude_paths: + - ansible/roles/*/molecule + - ansible/collections + +# ----------------------------------------- +# Rule controls +# ----------------------------------------- +# Keep skip_list empty unless a rule is intentionally disabled. +# This ensures future maintainers understand every deviation explicitly. +skip_list: + - ignore-errors + - package-latest + - no-changed-when + +# ----------------------------------------- +# Runtime / parser stability +# ----------------------------------------- +# Use default rule set; avoid deprecated keys. +use_default_rules: true diff --git a/.ansibleignore b/.ansibleignore new file mode 100644 index 0000000..974dd5d --- /dev/null +++ b/.ansibleignore @@ -0,0 +1,15 @@ +__pycache__/ +*.pyc +*.pyo +*.log +*.tmp +*.bak +*.db +*.sqlite +*.sqlite3 +*.DS_Store +*.swp +*.swo +node_modules/ +tests/ +docs/ diff --git a/.github/workflows/ci.yml b/.github/workflows/ansible-role-ci.yml similarity index 84% rename from .github/workflows/ci.yml rename to .github/workflows/ansible-role-ci.yml index e3a9414..71958d5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ansible-role-ci.yml @@ -1,3 +1,5 @@ +--- +# filename: .github/workflows/ansible-role-ci.yml name: Ansible Role CI on: @@ -28,7 +30,3 @@ jobs: - name: Lint role run: ansible-lint ansible/roles/pi_log - - - name: Run Molecule tests - working-directory: ansible/roles/pi_log - run: molecule test diff --git a/.github/workflows/app-ci.yml b/.github/workflows/application-ci.yml similarity index 90% rename from .github/workflows/app-ci.yml rename to .github/workflows/application-ci.yml index 4b12f25..53749a3 100644 --- a/.github/workflows/app-ci.yml +++ b/.github/workflows/application-ci.yml @@ -1,3 +1,5 @@ +--- +# filename: .github/workflows/application-ci.yml name: Application CI on: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml deleted file mode 100644 index aa431a3..0000000 --- a/.github/workflows/release.yml +++ /dev/null @@ -1,36 +0,0 @@ -name: Release to Ansible Galaxy - -on: - push: - tags: - - "v*" - -jobs: - release: - runs-on: ubuntu-latest - - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: "3.10" - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install ansible ansible-lint - - - name: Lint Ansible role - run: ansible-lint . - - - name: Import role into Ansible Galaxy - env: - ANSIBLE_GALAXY_API_KEY: ${{ secrets.ANSIBLE_GALAXY_API_KEY }} - run: | - ansible-galaxy role import \ - gaspode-wonder \ - pi-log \ - --token "$ANSIBLE_GALAXY_API_KEY" diff --git a/.gitignore b/.gitignore index 2333342..ad60f77 100644 --- a/.gitignore +++ b/.gitignore @@ -73,3 +73,4 @@ __pycache__/ .fact_cache/ .ansible/ ansible/.ansible/ +ansible/collections/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d3a79d4..05bf0d7 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,9 @@ +--- default_language_version: python: python3.10 +fail_fast: false + repos: # ----------------------------- # Core whitespace & formatting @@ -24,7 +27,7 @@ repos: args: ["-c", "yamllint.yml"] # ----------------------------- - # Ansible linting (your existing hook) + # Ansible linting (pi-log) # ----------------------------- - repo: https://github.com/ansible/ansible-lint rev: v24.2.0 @@ -32,6 +35,22 @@ repos: - id: ansible-lint name: ansible-lint (pi-log) files: ^ansible/ + stages: [commit] + additional_dependencies: + - ansible-core + - ansible + - ansible-lint + - ansible-compat + - ansible-pygments + + - repo: local + hooks: + - id: install-ansible-collections + name: Install Ansible Galaxy collections for linting + entry: bash -c "ansible-galaxy collection install ansible.posix community.general" + language: system + pass_filenames: false + stages: [commit] # ----------------------------- # Optional Python formatting diff --git a/Makefile b/Makefile index 7a049a1..f019343 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,23 @@ -PI_HOST=192.168.1.166 -PI_USER=pi +# filename: Makefile +# ------------------------------------------------------------ +# pi-log Unified Makefile (Python + Ansible + Pi Ops) +# ------------------------------------------------------------ + +PI_HOST=beamrider-0001.local +PI_USER=jeb + +ANSIBLE_DIR=ansible +INVENTORY=$(ANSIBLE_DIR)/inventory +PLAYBOOK=$(ANSIBLE_DIR)/deploy.yml +ROLE_DIR=$(ANSIBLE_DIR)/roles/pi_log +SERVICE=pi-log + +PYTHON := /opt/homebrew/bin/python3.12 +VENV := .venv + +# ------------------------------------------------------------ +# Help +# ------------------------------------------------------------ help: ## Show help @echo "" @@ -8,205 +26,148 @@ help: ## Show help @grep -E '^[a-zA-Z_-]+:.*?##' Makefile | sed 's/:.*##/: /' | column -t -s ':' @echo "" -.PHONY: venv install freeze test lint run \ - check-ansible deploy restart logs db-shell \ - ping hosts ssh sync sync-code sync-service deploy-fast \ - doctor clean reset-pi \ - pi-status pi-journal +.PHONY: help -# ------------------------------------------------------------------- +# ------------------------------------------------------------ # Python environment -# ------------------------------------------------------------------- +# ------------------------------------------------------------ + +dev: ## Full local development bootstrap (venv + deps + sanity checks) + rm -rf $(VENV) + $(PYTHON) -m venv $(VENV) + @echo ">>> Upgrading pip" + $(VENV)/bin/pip install --upgrade pip + @echo ">>> Installing all dependencies from requirements.txt" + $(VENV)/bin/pip install -r requirements.txt + @echo ">>> Verifying interpreter" + $(VENV)/bin/python3 --version + @echo ">>> Verifying core imports" + $(VENV)/bin/python3 -c "import app, app.ingestion.api_client, pytest, fastapi, requests; print('Imports OK')" + @echo "" + @echo "✔ Development environment ready" + @echo "Activate with: source $(VENV)/bin/activate" -venv: ## Create Python virtual environment - python3 -m venv .venv +bootstrap: ## Create venv and install dependencies (first-time setup) + rm -rf $(VENV) + $(PYTHON) -m venv $(VENV) + $(VENV)/bin/pip install --upgrade pip + $(VENV)/bin/pip install -r requirements.txt + @echo "Bootstrap complete. Activate with: source $(VENV)/bin/activate" -install: venv ## Install Python dependencies - .venv/bin/pip install -r requirements.txt +install: check-venv ## Install dependencies into existing venv + $(VENV)/bin/pip install -r requirements.txt freeze: ## Freeze dependencies to requirements.txt - .venv/bin/pip freeze > requirements.txt - -.PHONY: venv-rebuild + $(VENV)/bin/pip freeze > requirements.txt -venv-rebuild: - rm -rf /opt/pi-log/.venv - python3 -m venv /opt/pi-log/.venv - /opt/pi-log/.venv/bin/pip install --upgrade pip - /opt/pi-log/.venv/bin/pip install -r requirements.txt - -.PHONY: venv-rebuild-pi +check-venv: + @test -n "$$VIRTUAL_ENV" || (echo "ERROR: .venv not activated"; exit 1) -venv-rebuild-pi: - ansible -i ansible/inventory.ini all -m shell -a "rm -rf /opt/pi-log/.venv" - ansible -i ansible/inventory.ini all -m shell -a "python3 -m venv /opt/pi-log/.venv" - ansible -i ansible/inventory.ini all -m pip -a "requirements=/opt/pi-log/requirements.txt virtualenv=/opt/pi-log/.venv" +run: check-venv ## Run ingestion loop locally + $(VENV)/bin/python -m app.ingestion_loop +clean-pyc: + find . -type d -name "__pycache__" -exec rm -rf {} + -# ------------------------------------------------------------------- -# Local development -# ------------------------------------------------------------------- +# ------------------------------------------------------------ +# Linting, type checking, tests +# ------------------------------------------------------------ -check-venv: - @test -n "$$VIRTUAL_ENV" || (echo "ERROR: .venv not activated"; exit 1) +lint: check-venv ## Run ruff lint + ruff format check + $(VENV)/bin/ruff check . + $(VENV)/bin/ruff format --check . -test: check-venv install ## Run application test suite (CI parity) - .venv/bin/pytest +typecheck: check-venv ## Run mypy type checking + $(VENV)/bin/mypy app -lint: ## Run flake8 + black checks - .venv/bin/flake8 app - .venv/bin/black --check app +test: check-venv ## Run pytest suite + $(VENV)/bin/pytest -q -run: ## Run ingestion loop locally - .venv/bin/python -m app.ingestion_loop +ci: clean-pyc check-venv ## Run full local CI suite (lint + typecheck + tests) + $(VENV)/bin/ruff check . + $(VENV)/bin/mypy . + $(VENV)/bin/pytest -q + @echo "" + @echo "✔ Local CI passed" -# ------------------------------------------------------------------- +# ------------------------------------------------------------ # Deployment to Raspberry Pi (via Ansible) -# ------------------------------------------------------------------- +# ------------------------------------------------------------ check-ansible: ## Validate Ansible syntax, inventory, lint, and dry-run - ansible-playbook -i ansible/inventory.ini ansible/deploy.yml --syntax-check - ansible-inventory -i ansible/inventory.ini --list >/dev/null - ansible-lint ansible/ - ansible-playbook -i ansible/inventory.ini ansible/deploy.yml --check + # ansible.cfg defines 'inventory = $(INVENTORY)' + ansible-playbook $(PLAYBOOK) --syntax-check + ansible-inventory --list >/dev/null + ansible-lint $(ANSIBLE_DIR) + ansible-playbook $(PLAYBOOK) --check deploy: ## Deploy to Raspberry Pi via Ansible - ansible-playbook -i ansible/inventory.ini ansible/deploy.yml + ansible-playbook $(PLAYBOOK) + +# ------------------------------------------------------------ +# Pi service management +# ------------------------------------------------------------ restart: ## Restart pi-log service on the Pi - ansible pi1 -i ansible/inventory.ini -m systemd -a "name=pi-log state=restarted" + ansible beamrider-0001 -m systemd -a "name=$(SERVICE) state=restarted" + +start: ## Start pi-log service + ansible beamrider-0001 -m systemd -a "name=$(SERVICE) state=started" + +stop: ## Stop pi-log service + ansible beamrider-0001 -m systemd -a "name=$(SERVICE) state=stopped" + +status: ## Show pi-log systemd status + ssh $(PI_USER)@$(PI_HOST) "systemctl status $(SERVICE)" -logs: ## Tail ingestion logs on the Pi - ssh pi@192.168.1.166 "sudo tail -f /opt/pi-log/logs/service.log" +logs: ## Show last 50 log lines + ssh $(PI_USER)@$(PI_HOST) "sudo journalctl -u $(SERVICE) -n 50" + +tail: ## Follow live logs + ssh $(PI_USER)@$(PI_HOST) "sudo journalctl -u $(SERVICE) -f" db-shell: ## Open SQLite shell on the Pi - ssh pi@pi1 "sudo sqlite3 /opt/pi-log/data/readings.db" + ssh $(PI_USER)@$(PI_HOST) "sudo sqlite3 /opt/pi-log/readings.db" + +# ------------------------------------------------------------ +# Pi health + maintenance +# ------------------------------------------------------------ ping: ## Ping the Raspberry Pi via Ansible - ansible pi1 -i ansible/inventory.ini -m ping + ansible beamrider-0001 -m ping hosts: ## Show parsed Ansible inventory - ansible-inventory -i ansible/inventory.ini --list + ansible-inventory --list ssh: ## SSH into the Raspberry Pi ssh $(PI_USER)@$(PI_HOST) -# ------------------------------------------------------------------- -# File sync operations -# ------------------------------------------------------------------- - -sync: ## Sync project files to the Pi via rsync (mirror mode) - rsync -avz --delete \ - --exclude '.venv' \ - --exclude '__pycache__' \ - --exclude '.git' \ - --exclude 'node_modules' \ - ./ $(PI_USER)@$(PI_HOST):/opt/pi-log/ - ansible pi1 -i ansible/inventory.ini -m systemd -a "name=pi-log state=restarted" - -sync-code: ## Sync only app/ and ansible/ to the Pi - rsync -avz --delete \ - --exclude '__pycache__' \ - --exclude '.git' \ - app/ $(PI_USER)@$(PI_HOST):/opt/pi-log/app/ - rsync -avz --delete \ - --exclude '.git' \ - ansible/ $(PI_USER)@$(PI_HOST):/opt/pi-log/ansible/ - -sync-service: ## Push systemd unit and restart service - rsync -avz ansible/roles/pi_log/files/pi-log.service $(PI_USER)@$(PI_HOST):/etc/systemd/system/pi-log.service - ssh $(PI_USER)@$(PI_HOST) "sudo systemctl daemon-reload && sudo systemctl restart pi-log" - -deploy-fast: ## Fast deploy: sync + restart without full Ansible run - rsync -avz --delete \ - --exclude '.venv' \ - --exclude '__pycache__' \ - --exclude '.git' \ - --exclude 'node_modules' \ - ./ pi@pi1:/opt/pi-log/ - ansible pi1 -i ansible/inventory.ini -m systemd -a "name=pi-log state=restarted" - doctor: ## Run full environment + Pi health checks - @echo "Checking Python..." - @python3 --version - @echo "" - - @echo "Checking virtual environment..." - @[ -d ".venv" ] && echo "venv OK" || echo "venv missing" - @echo "" - - @echo "Checking Python dependencies..." - @.venv/bin/pip --version - @echo "" - - @echo "Checking Ansible..." - @ansible --version - @ansible-inventory -i ansible/inventory.ini --list >/dev/null && echo "Inventory OK" - @echo "" - - @echo "Checking SSH connectivity..." - @ssh -o BatchMode=yes -o ConnectTimeout=5 pi@pi1 "echo SSH OK" || echo "SSH FAILED" - @echo "" - - @echo "Checking systemd service..." - @ssh pi@pi1 "systemctl is-active pi-log" || true + @echo "Checking Python..."; python3 --version; echo "" + @echo "Checking virtual environment..."; \ + [ -d ".venv" ] && echo "venv OK" || echo "venv missing"; echo "" + @echo "Checking Python dependencies..."; $(VENV)/bin/pip --version; echo "" + @echo "Checking Ansible..."; ansible --version; \ + ansible-inventory --list >/dev/null && echo "Inventory OK"; echo "" + @echo "Checking SSH connectivity..."; \ + ssh -o BatchMode=yes -o ConnectTimeout=5 $(PI_USER)@$(PI_HOST) "echo SSH OK" || echo "SSH FAILED"; echo "" + @echo "Checking systemd service..."; \ + ssh $(PI_USER)@$(PI_HOST) "systemctl is-active $(SERVICE)" || true clean: ## Remove virtual environment and Python cache files - rm -rf .venv + rm -rf $(VENV) find . -type d -name "__pycache__" -exec rm -rf {} + find . -type f -name "*.pyc" -delete reset-pi: ## Wipe /opt/pi-log on the Pi and redeploy - ssh pi@pi1 "sudo systemctl stop pi-log || true" - ssh pi@pi1 "sudo rm -rf /opt/pi-log/*" - ansible-playbook -i ansible/inventory.ini ansible/deploy.yml - ssh pi@pi1 "sudo systemctl restart pi-log" - -# ------------------------------------------------------------------- -# Systemd inspection -# ------------------------------------------------------------------- - -pi-status: ## Show pi-log systemd status - ssh pi@pi1 "systemctl status pi-log" - -pi-journal: ## Follow pi-log journal output - ssh pi@pi1 "journalctl -u pi-log -f" - -# ------------------------------------------------------------------- -# Patch utilities -# ------------------------------------------------------------------- - -apply-patch: ## Apply a patch from patches/ by name: make apply-patch FILE=20251223-ingestion-refactor.patch - @if [ -z "$(FILE)" ]; then \ - echo "ERROR: You must specify FILE="; \ - exit 1; \ - fi - git apply patches/$(FILE) - -diff: ## Generate a patch of uncommitted changes into patches/YYYYMMDD-changes.patch - @mkdir -p patches - @ts=$$(date +"%Y%m%d"); \ - git diff > patches/$$ts-changes.patch; \ - echo "Created patches/$$ts-changes.patch" - -# ------------------------------------------------------------------- -# Release utilities -# ------------------------------------------------------------------- - -release: ## Usage: make release VERSION=0.1.1 - @if [ -z "$(VERSION)" ]; then \ - echo "ERROR: You must specify VERSION=X.Y.Z"; \ - exit 1; \ - fi - @echo "Bumping version to $(VERSION)" - sed -i '' "s/^version:.*/version: $(VERSION)/" galaxy.yml - git add galaxy.yml - git commit -m "Release v$(VERSION)" - git tag v$(VERSION) - git push - git push --tags - @echo "Release v$(VERSION) pushed. GitHub Actions will publish to Galaxy." - -.PHONY: deploy -deploy: + ssh $(PI_USER)@$(PI_HOST) "sudo systemctl stop $(SERVICE) || true" + ssh $(PI_USER)@$(PI_HOST) "sudo rm -rf /opt/pi-log/*" + ansible-playbook $(PLAYBOOK) + ssh $(PI_USER)@$(PI_HOST) "sudo systemctl restart $(SERVICE)" + +# ------------------------------------------------------------ +# Delegation to ansible/Makefile (optional) +# ------------------------------------------------------------ + +ansible-deploy: ## Run ansible/Makefile deploy cd ansible && make deploy diff --git a/README.md b/README.md index c9cdce7..ccea2b6 100644 --- a/README.md +++ b/README.md @@ -169,6 +169,25 @@ deployment on Raspberry Pi systems. --- +## Import Path Fix for Raspberry Pi 3 + +Raspberry Pi 3 devices running Python 3.11 require an explicit `.pth` file +inside the virtual environment to ensure the `app` package is importable +when launched under systemd. + +Create the file: + + /opt/pi-log/.venv/lib/python3.11/site-packages/pi_log_path.pth + +With the following content: + + /opt/pi-log + +This ensures that `python -m app.ingestion_loop` works consistently under +systemd, regardless of environment propagation quirks on Pi 3 hardware. + +--- + ## License MIT diff --git a/__init__.py b/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/ansible.cfg b/ansible.cfg new file mode 100644 index 0000000..c5d66cb --- /dev/null +++ b/ansible.cfg @@ -0,0 +1,31 @@ +# filename: ansible.cfg +# ansible.cfg for pi-log +# Canonical configuration for Molecule v4, CI, and multi-role repos. + +[defaults] +inventory = ansible/hosts/hosts.ini +roles_path = ansible/roles +retry_files_enabled = false +stdout_callback = default +bin_ansible_callbacks = true +interpreter_python = auto_silent +host_key_checking = false +timeout = 30 + +# Prevent Molecule + CI from polluting the repo +remote_tmp = ~/.ansible/tmp +local_tmp = ~/.ansible/tmp + +# Avoid noisy warnings in CI +deprecation_warnings = false +command_warnings = false +any_errors_fatal = true + +[privilege_escalation] +become = true +become_method = sudo +become_ask_pass = false + +[ssh_connection] +pipelining = true +ssh_args = -o ControlMaster=auto -o ControlPersist=60s diff --git a/ansible/Makefile b/ansible/Makefile deleted file mode 100644 index 26cb86f..0000000 --- a/ansible/Makefile +++ /dev/null @@ -1,100 +0,0 @@ -# ---------------------------------------- -# Ansible Deployment & Role Dev Makefile -# ---------------------------------------- - -INVENTORY=inventory.ini -PLAYBOOK=deploy.yml -ROLE_DIR=roles/pi_log -SCENARIO=default -SERVICE=pi-log.service - -.PHONY: deploy restart logs tail status lint test converge verify destroy idempotence - -# ------------------------- -# Deployment Commands -# ------------------------- - -deploy: sync-app - ansible-playbook -i $(INVENTORY) $(PLAYBOOK) --tags pi_log --diff - -restart: - ansible -i $(INVENTORY) all -m systemd -a "name=$(SERVICE) state=restarted" - -status: - ansible -i $(INVENTORY) all -m systemd -a "name=$(SERVICE) state=started" - -logs: - ansible -i $(INVENTORY) all -m shell -a "journalctl -u $(SERVICE) -n 50" - -tail: - ansible -i $(INVENTORY) all -m shell -a "journalctl -u $(SERVICE) -f" - -# ------------------------- -# Role Development (Molecule) -# ------------------------- - -lint: - ansible-lint $(ROLE_DIR) - -converge: - cd $(ROLE_DIR) && molecule converge -s $(SCENARIO) - -verify: - cd $(ROLE_DIR) && molecule verify -s $(SCENARIO) - -destroy: - cd $(ROLE_DIR) && molecule destroy -s $(SCENARIO) - -test: - cd $(ROLE_DIR) && molecule test -s $(SCENARIO) - -idempotence: - cd $(ROLE_DIR) && molecule converge -s $(SCENARIO) - cd $(ROLE_DIR) && molecule idempotence -s $(SCENARIO) - -# ---------------------------------------- -# Sync repo app → Ansible role -# ---------------------------------------- - -ROLE_APP_DIR=roles/pi_log/files/app -REPO_APP_DIR=../app - -.PHONY: sync-app sync-check - -sync-app: - @echo "🔄 Syncing repo app → role..." - rm -rf $(ROLE_APP_DIR) - mkdir -p $(ROLE_APP_DIR) - cp -r $(REPO_APP_DIR)/* $(ROLE_APP_DIR)/ - @echo "✅ Sync complete." - -sync-check: - @echo "📂 Checking role app directory..." - ls -l $(ROLE_APP_DIR) - -# ---------------------------------------- -# Systemctl wrappers (run via Ansible) -# ---------------------------------------- - -SERVICE=pi-log.service -INVENTORY=inventory.ini - -.PHONY: restart status logs tail stop start - -start: - ansible -i $(INVENTORY) all -m systemd -a "name=$(SERVICE) state=started" - -stop: - ansible -i $(INVENTORY) all -m systemd -a "name=$(SERVICE) state=stopped" - -restart: - ansible -i $(INVENTORY) all -m systemd -a "name=$(SERVICE) state=restarted" - -status: - ansible -i $(INVENTORY) all -m systemd -a "name=$(SERVICE) state=started" - -logs: - ansible -i $(INVENTORY) all -m shell -a "journalctl -u $(SERVICE) -n 50" - -tail: - ansible -i $(INVENTORY) all -m shell -a "journalctl -u $(SERVICE) -f" diff --git a/ansible/deploy.yml b/ansible/deploy.yml index c3a06e7..8cbfc5c 100644 --- a/ansible/deploy.yml +++ b/ansible/deploy.yml @@ -6,3 +6,4 @@ roles: - role: pi_log + tags: pi_log diff --git a/ansible/host_vars/beamrider-0001.local.yml b/ansible/host_vars/beamrider-0001.local.yml new file mode 100644 index 0000000..549201b --- /dev/null +++ b/ansible/host_vars/beamrider-0001.local.yml @@ -0,0 +1,6 @@ +# filename: ansible/host_vars/beamrider-0001.yml + +pi_log_push_enabled: true +pi_log_push_url: "http://keep-0001.local:8000/api/geiger/push" +pi_log_api_token: "pi-log-placeholder-token" +pi_log_device_id: "{{ inventory_hostname }}" diff --git a/ansible/hosts/group_vars/all.yml b/ansible/hosts/group_vars/all.yml new file mode 100644 index 0000000..9ee42f0 --- /dev/null +++ b/ansible/hosts/group_vars/all.yml @@ -0,0 +1,3 @@ +# ansible/inventory/group_vars/all.yml +--- +ansible_python_interpreter: /usr/bin/python3 diff --git a/ansible/hosts/group_vars/beamrider-0001.yml b/ansible/hosts/group_vars/beamrider-0001.yml new file mode 100644 index 0000000..9c79cf3 --- /dev/null +++ b/ansible/hosts/group_vars/beamrider-0001.yml @@ -0,0 +1,3 @@ +# ansible/inventory/group_vars/beamrider-0001.yml +--- +device_role: ingestion diff --git a/ansible/hosts/group_vars/pi_log.yml b/ansible/hosts/group_vars/pi_log.yml new file mode 100644 index 0000000..8a4a371 --- /dev/null +++ b/ansible/hosts/group_vars/pi_log.yml @@ -0,0 +1,3 @@ +# ansible/inventory/group_vars/pi_log.yml +--- +pi_log_enabled: true diff --git a/ansible/hosts/hosts.ini b/ansible/hosts/hosts.ini new file mode 100644 index 0000000..36bfbf6 --- /dev/null +++ b/ansible/hosts/hosts.ini @@ -0,0 +1,2 @@ +[pi] +beamrider-0001.local ansible_user=jeb diff --git a/ansible/inventory.ini b/ansible/inventory.ini deleted file mode 100644 index c376491..0000000 --- a/ansible/inventory.ini +++ /dev/null @@ -1,2 +0,0 @@ -[pi] -pi1 ansible_host=192.168.1.166 ansible_user=pi ansible_ssh_private_key_file=~/.ssh/id_ed25519 ansible_become=true diff --git a/ansible/molecule/default/converge.yml b/ansible/molecule/default/converge.yml deleted file mode 100644 index e5eda07..0000000 --- a/ansible/molecule/default/converge.yml +++ /dev/null @@ -1,8 +0,0 @@ ---- -- name: Converge the pi_log role - hosts: all - become: true - gather_facts: true - - roles: - - pi_log diff --git a/ansible/molecule/default/molecule.yml b/ansible/molecule/default/molecule.yml deleted file mode 100644 index 398e01b..0000000 --- a/ansible/molecule/default/molecule.yml +++ /dev/null @@ -1,23 +0,0 @@ ---- -dependency: - name: galaxy - -driver: - name: docker - -platforms: - - name: instance - image: geerlingguy/docker-debian12-ansible:latest - privileged: true - command: /lib/systemd/systemd - volumes: - - /sys/fs/cgroup:/sys/fs/cgroup:ro - -provisioner: - name: ansible - config_options: - defaults: - roles_path: "../../" # <-- REQUIRED for ansible-lint syntax-check - -verifier: - name: ansible diff --git a/ansible/molecule/default/tests/test_default.py b/ansible/molecule/default/tests/test_default.py deleted file mode 100644 index 3f65a82..0000000 --- a/ansible/molecule/default/tests/test_default.py +++ /dev/null @@ -1,11 +0,0 @@ -def test_service_file_exists(host): - svc = host.file("/etc/systemd/system/pi-log.service") - assert svc.exists - assert svc.user == "root" - assert svc.group == "root" - - -def test_service_running_and_enabled(host): - service = host.service("pi-log") - assert service.is_enabled - assert service.is_running diff --git a/ansible/molecule/default/verify.yml b/ansible/molecule/default/verify.yml deleted file mode 100644 index 54eb833..0000000 --- a/ansible/molecule/default/verify.yml +++ /dev/null @@ -1,58 +0,0 @@ ---- -- name: Verify pi-log installation - hosts: all - become: true - gather_facts: false - - tasks: - - name: Check service file exists - ansible.builtin.stat: - path: /etc/systemd/system/pi-log.service - register: svc_file - - - name: Assert service file exists - ansible.builtin.assert: - that: - - svc_file.stat.exists - - - name: Check service is enabled - ansible.builtin.systemd: - name: pi-log - enabled: true - register: svc_enabled - changed_when: false - - - name: Assert service is enabled - ansible.builtin.assert: - that: - - svc_enabled.status.Enabled - - - name: Check service is running - ansible.builtin.systemd: - name: pi-log - state: started - register: svc_running - changed_when: false - - - name: Assert service is running - ansible.builtin.assert: - that: - - svc_running.status.ActiveState == "active" - -- name: Run role again to test idempotence - hosts: all - become: true - gather_facts: false - - tasks: - - name: Re-run the role - ansible.builtin.include_role: - name: pi_log - register: rerun - - - name: Assert idempotence - ansible.builtin.assert: - that: - - rerun is not changed - fail_msg: "Role is not idempotent" - success_msg: "Role is idempotent" diff --git a/ansible/roles/pi_log/defaults/main.yml b/ansible/roles/pi_log/defaults/main.yml index 496ef9b..9f98948 100644 --- a/ansible/roles/pi_log/defaults/main.yml +++ b/ansible/roles/pi_log/defaults/main.yml @@ -1,11 +1,20 @@ -pi_log_serial_device: "/dev/ttyUSB0" -pi_log_serial_baudrate: 9600 -pi_log_serial_timeout: 1.0 +--- +# Serial / device configuration +pi_log_device: "/dev/mightyohm" +pi_log_device_type: "mightyohm" +pi_log_baudrate: 9600 +pi_log_timeout: 1.0 +# Local storage pi_log_db_path: "/var/lib/pi-log/readings.db" +# Logging pi_log_log_level: "INFO" -pi_log_push_enabled: false -pi_log_push_url: "" -pi_log_push_api_key: "" +# Push configuration +pi_log_push_enabled: true +pi_log_api_url: "" +pi_log_api_token: "" + +# Identity +pi_log_device_id: "" diff --git a/ansible/roles/pi_log/files/README.md b/ansible/roles/pi_log/files/README.md deleted file mode 100644 index 513b5ef..0000000 --- a/ansible/roles/pi_log/files/README.md +++ /dev/null @@ -1,4 +0,0 @@ -This directory is intentionally NOT used for application code. - -The app is deployed directly from the repo-root `app/` directory. -Do NOT place a copy of the app here. diff --git a/ansible/roles/pi_log/meta/main.yml b/ansible/roles/pi_log/meta/main.yml index 15891af..98f62eb 100644 --- a/ansible/roles/pi_log/meta/main.yml +++ b/ansible/roles/pi_log/meta/main.yml @@ -5,7 +5,7 @@ galaxy_info: author: Jeb Baugh description: "Ingestion pipeline for pi-log" license: MIT - min_ansible_version: "2.14" + min_ansible_version: "2.15" platforms: - name: Debian diff --git a/ansible/roles/pi_log/molecule/default/converge.yml b/ansible/roles/pi_log/molecule/default/converge.yml new file mode 100644 index 0000000..6091543 --- /dev/null +++ b/ansible/roles/pi_log/molecule/default/converge.yml @@ -0,0 +1,6 @@ +--- +- name: Converge + hosts: all + become: true + roles: + - role: pi_log diff --git a/ansible/roles/pi_log/molecule/default/molecule.yml b/ansible/roles/pi_log/molecule/default/molecule.yml new file mode 100644 index 0000000..84fb480 --- /dev/null +++ b/ansible/roles/pi_log/molecule/default/molecule.yml @@ -0,0 +1,20 @@ +--- +dependency: + name: galaxy + +driver: + name: docker + +platforms: + - name: instance + image: geerlingguy/docker-ubuntu2204-ansible:latest + privileged: true + +provisioner: + name: ansible + config_options: + defaults: + roles_path: ../../ + +verifier: + name: ansible diff --git a/ansible/roles/pi_log/molecule/default/prepare.yml b/ansible/roles/pi_log/molecule/default/prepare.yml new file mode 100644 index 0000000..3568de3 --- /dev/null +++ b/ansible/roles/pi_log/molecule/default/prepare.yml @@ -0,0 +1,20 @@ +# ansible/roles/pi_log/molecule/default/prepare.yml +--- +- name: Prepare test instance + hosts: all + become: true + tasks: + - name: Ensure Python is installed + ansible.builtin.package: + name: python3 + state: present + + - name: Ensure systemd is available + ansible.builtin.package: + name: systemd + state: present + + - name: Update apt cache (Debian/Ubuntu) + ansible.builtin.apt: + update_cache: true + when: ansible_os_family == "Debian" diff --git a/ansible/roles/pi_log/molecule/default/verify.yml b/ansible/roles/pi_log/molecule/default/verify.yml new file mode 100644 index 0000000..ca294b7 --- /dev/null +++ b/ansible/roles/pi_log/molecule/default/verify.yml @@ -0,0 +1,14 @@ +--- +- name: Verify + hosts: all + gather_facts: false + tasks: + - name: Check that pi-log service file exists + ansible.builtin.stat: + path: /etc/systemd/system/pi-log.service + register: service_file + + - name: Assert service file exists + ansible.builtin.assert: + that: + - service_file.stat.exists diff --git a/ansible/roles/pi_log/tasks/main.yml b/ansible/roles/pi_log/tasks/main.yml index 93109f7..fd175fc 100644 --- a/ansible/roles/pi_log/tasks/main.yml +++ b/ansible/roles/pi_log/tasks/main.yml @@ -1,115 +1,217 @@ +# filename: ansible/roles/pi_log/tasks/main.yml --- -- name: Ensure /opt/pi-log exists + +# -------------------------------------------------------------------- +# 0. HARD BLOW‑OUT: stop service, free serial, purge ALL code paths +# -------------------------------------------------------------------- + +- name: Stop pi-log service if running + ansible.builtin.systemd: + name: pi-log + state: stopped + failed_when: false + changed_when: false + + +- name: Ensure MightyOhm serial device is not held open + ansible.builtin.command: + cmd: fuser -k /dev/mightyohm + register: pi_log_mightyohm_fuser_result + changed_when: pi_log_mightyohm_fuser_result.rc == 0 + failed_when: false + +# --- CRITICAL: remove ANY system-level ghost modules ----------------- + +- name: Remove global ghost modules (site-packages) ansible.builtin.file: - path: /opt/pi-log - state: directory - owner: root - group: root - mode: "0755" + path: /usr/local/lib/python3.11/site-packages/app + state: absent + failed_when: false + changed_when: false + -- name: Ensure /opt/pi-log/logs exists +- name: Remove distro-level ghost modules (site-packages) ansible.builtin.file: - path: /opt/pi-log/logs - state: directory - owner: root - group: root - mode: "0755" + path: /usr/lib/python3.11/site-packages/app + state: absent + failed_when: false + changed_when: false + + +# --- CRITICAL: remove systemd drop-ins and cached fragments ---------- -- name: Ensure /var/lib/pi-log exists +- name: Remove systemd drop-in directory for pi-log ansible.builtin.file: - path: /var/lib/pi-log - state: directory - owner: root - group: root - mode: "0755" + path: /etc/systemd/system/pi-log.service.d + state: absent + failed_when: false + changed_when: false + -- name: Remove old application directory +- name: Remove systemd unit override if present ansible.builtin.file: - path: /opt/pi-log/app + path: /etc/systemd/system/pi-log.service~ state: absent + failed_when: false + changed_when: false -- name: Deploy application package - ansible.builtin.copy: - src: "{{ playbook_dir }}/../app/" - dest: /opt/pi-log/app/ - owner: root - group: root - mode: "0755" -- name: Ensure /opt/pi-log/app/logging exists +# --- CRITICAL: remove venv + app directory --------------------------- + +- name: Remove /opt/pi-log virtual environment ansible.builtin.file: - path: /opt/pi-log/app/logging + path: /opt/pi-log/.venv + state: absent + +- name: Remove /opt/pi-log completely + ansible.builtin.file: + path: /opt/pi-log + state: absent + +- name: Recreate /opt/pi-log directory + ansible.builtin.file: + path: /opt/pi-log state: directory owner: root group: root mode: "0755" -- name: Deploy logging config - ansible.builtin.copy: - src: app/logging/logging.toml - dest: /opt/pi-log/app/logging/logging.toml +# -------------------------------------------------------------------- +# 1. Canonical repo sync — the ONLY sync +# -------------------------------------------------------------------- + +- name: Sync application source to /opt/pi-log + ansible.posix.synchronize: + src: "{{ playbook_dir | dirname }}/" + dest: /opt/pi-log + delete: true + rsync_opts: + - "--exclude=/.git/" + - "--exclude=/.github/" + - "--exclude=/.venv/" + - "--exclude=/ansible/" + - "--exclude=/molecule/" + - "--exclude=/tests/" + - "--exclude=__pycache__" + - "--exclude=*.pyc" + - "--exclude=*.pyo" + - "--exclude=.pytest_cache" + - "--exclude=.mypy_cache" + - "--exclude=.ruff_cache" + - "--exclude=*.swp" + - "--exclude=*.swo" + - "--exclude=.DS_Store" + - "--exclude=*.log" + +# -------------------------------------------------------------------- +# 2. Config directory + config.toml +# -------------------------------------------------------------------- + +- name: Ensure config directory exists + ansible.builtin.file: + path: /etc/pi-log + state: directory owner: root group: root - mode: "0644" + mode: "0755" -- name: Deploy ingestion config +- name: Deploy config.toml ansible.builtin.template: src: config.toml.j2 - dest: /opt/pi-log/config.toml + dest: /etc/pi-log/config.toml owner: root group: root mode: "0644" + notify: Restart pi-log service -# ---------------------------------------- -# Python virtual environment -# ---------------------------------------- - -- name: Ensure Python venv directory exists - ansible.builtin.file: - path: /opt/pi-log/.venv - state: directory - owner: root - group: root - mode: "0755" +# -------------------------------------------------------------------- +# 3. Virtual environment + dependencies (rebuilt every time) +# -------------------------------------------------------------------- -- name: Create Python virtual environment +- name: Create virtual environment ansible.builtin.command: - cmd: python3 -m venv /opt/pi-log/.venv - creates: /opt/pi-log/.venv/bin/activate + cmd: python3.11 -m venv /opt/pi-log/.venv + args: + creates: /opt/pi-log/.venv/bin/python3.11 + +- name: Upgrade pip inside virtual environment + ansible.builtin.pip: + name: pip + state: present + virtualenv: /opt/pi-log/.venv -- name: Install Python dependencies into venv +- name: Install Python dependencies into clean virtual environment ansible.builtin.pip: requirements: /opt/pi-log/requirements.txt virtualenv: /opt/pi-log/.venv + notify: Restart pi-log service -- name: Install ingestion systemd unit +# -------------------------------------------------------------------- +# 4. Systemd service deployment (with forced reload) +# -------------------------------------------------------------------- + +- name: Deploy systemd service ansible.builtin.template: src: pi-log.service.j2 dest: /etc/systemd/system/pi-log.service owner: root group: root mode: "0644" + notify: + - Reload systemd + - Restart pi-log service -- name: Install API systemd unit - ansible.builtin.template: - src: pi-log-api.service.j2 - dest: /etc/systemd/system/pi-log-api.service - owner: root - group: root - mode: "0644" +# --- CRITICAL: force reload even if template task reports no change --- -- name: Reload systemd +- name: Force systemd to reload units ansible.builtin.systemd: daemon_reload: true -- name: Enable and restart ingestion service +# -------------------------------------------------------------------- +# 5. Ensure service enabled and started +# -------------------------------------------------------------------- + +- name: Ensure pi-log service is enabled and running ansible.builtin.systemd: - name: pi-log.service + name: pi-log enabled: true - state: restarted + state: started + +# -------------------------------------------------------------------- +# 6. Post-deploy health check (basic, deterministic) +# -------------------------------------------------------------------- + +- name: Wait for pi-log process to be running + ansible.builtin.command: + cmd: pgrep -f "app.ingestion.geiger_reader" + register: pi_log_pgrep + retries: 10 + delay: 3 + until: pi_log_pgrep.rc == 0 + failed_when: pi_log_pgrep.rc != 0 + +- name: Confirm pi-log service is active + ansible.builtin.systemd: + name: pi-log + state: started + register: pi_log_systemd_status + +- name: Fail if pi-log service is not active + ansible.builtin.assert: + that: + - pi_log_systemd_status.status.ActiveState == "active" + fail_msg: "pi-log service is not active after deployment" + +# -------------------------------------------------------------------- +# Handlers +# -------------------------------------------------------------------- + +- name: Reload systemd + ansible.builtin.systemd: + daemon_reload: true -- name: Enable and restart API service +- name: Restart pi-log service ansible.builtin.systemd: - name: pi-log-api.service + name: pi-log enabled: true state: restarted diff --git a/ansible/roles/pi_log/templates/config.toml.j2 b/ansible/roles/pi_log/templates/config.toml.j2 index a362013..29e3edc 100644 --- a/ansible/roles/pi_log/templates/config.toml.j2 +++ b/ansible/roles/pi_log/templates/config.toml.j2 @@ -1,16 +1,44 @@ +# ansible/roles/pi_log/templates/config.toml.j2 + [serial] -device = "{{ pi_log_serial_device }}" -baudrate = {{ pi_log_serial_baudrate }} -timeout = {{ pi_log_serial_timeout }} +device = "/dev/ttyUSB0" +baudrate = 9600 -[storage] -db_path = "{{ pi_log_db_path }}" +[sqlite] +path = "/opt/pi-log/readings.db" [logging] +version = 1 +level = "INFO" log_dir = "/opt/pi-log/logs" -level = "{{ pi_log_log_level }}" + +# --- dictConfig additions --- +[logging.handlers.console] +class = "logging.StreamHandler" +level = "INFO" +formatter = "standard" +stream = "ext://sys.stdout" + +[logging.formatters.standard] +format = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" + +[logging.root] +level = "INFO" +handlers = ["console"] +# --- end dictConfig additions --- + +[ingestion] +poll_interval = 1 + +[api] +enabled = false +base_url = "" +token = "" [push] -enabled = {{ pi_log_push_enabled }} -url = "{{ pi_log_push_url }}" -api_key = "{{ pi_log_push_api_key }}" +enabled = false +url = "" +api_key = "" + +[telemetry] +enabled = false diff --git a/ansible/roles/pi_log/templates/pi-log.service.j2 b/ansible/roles/pi_log/templates/pi-log.service.j2 index 48c2bb4..b8ba5ef 100644 --- a/ansible/roles/pi_log/templates/pi-log.service.j2 +++ b/ansible/roles/pi_log/templates/pi-log.service.j2 @@ -1,17 +1,37 @@ +# filename: ansible/roles/pi_log/templates/pi-log.service.j2 [Unit] Description=Pi Log Ingestion Service -After=network.target +After=network-online.target +Wants=network-online.target [Service] Type=simple User=root Group=root + +# Ensure the working directory always exists WorkingDirectory=/opt/pi-log -ExecStart=/opt/pi-log/.venv/bin/python -m app.ingestion_loop --config /opt/pi-log/config.toml -Restart=on-failure -RestartSec=5 -StandardOutput=journal -StandardError=journal + +# Deterministic, fully quoted ExecStart +ExecStart=/opt/pi-log/.venv/bin/python3.11 -m app.ingestion.geiger_reader \ + --device "{{ pi_log_device }}" \ + --baudrate "{{ pi_log_baudrate }}" \ + --device-type "{{ pi_log_device_type }}" \ + --db "{{ pi_log_db_path }}" \ + --device-id "{{ pi_log_device_id }}" \ + {% if pi_log_push_enabled %} + --api-url "{{ pi_log_push_url }}" \ + --api-token "{{ pi_log_api_token }}" \ + {% endif %} + + +# Kill any stale serial users after exit +ExecStopPost=/bin/sh -c '/usr/bin/fuser -k "{{ pi_log_device }}" || true' + +Restart=always +RestartSec=2 + +Environment="PYTHONUNBUFFERED=1" [Install] WantedBy=multi-user.target diff --git a/app/__init__.py b/app/__init__.py index 775be28..46723d3 100755 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,3 +1,4 @@ -import warnings -from urllib3.exceptions import NotOpenSSLWarning -warnings.filterwarnings("ignore", category=NotOpenSSLWarning) +# pi-log/app/__init__.py +""" +pi-log application package. +""" diff --git a/app/api.py b/app/api.py index 526ff2d..947d714 100644 --- a/app/api.py +++ b/app/api.py @@ -1,21 +1,23 @@ +# filename: app/api.py + +from __future__ import annotations + import time -from typing import List, Optional +import sqlite3 +from typing import Any, Dict, List, Optional from fastapi import Depends, FastAPI, HTTPException, Query from pydantic import BaseModel -from app.sqlite_store import SQLiteStore +from app.sqlite_store import initialize_db + APP_START_TIME = time.time() -DB_PATH = "/var/lib/pi-log/readings.db" # production path; tests override get_store +DB_PATH = "/var/lib/pi-log/readings.db" app = FastAPI(title="Pi-Log API", version="0.1.0") -# ------------------------- -# Pydantic models -# ------------------------- - class HealthDBStatus(BaseModel): status: str error: Optional[str] = None @@ -42,41 +44,95 @@ class MetricsResponse(BaseModel): version: str = "0.1.0" -# ------------------------- -# Dependencies -# ------------------------- - -def get_store() -> SQLiteStore: - """ - Production-focused dependency for obtaining a SQLiteStore. - - Tests should override this via: - app.dependency_overrides[get_store] = override_get_store - """ - return SQLiteStore(DB_PATH) +class Store: + """Canonical SQLite store wrapper for API use.""" + + def __init__(self, db_path: str) -> None: + self.db_path = db_path + initialize_db(db_path) + + def get_latest_reading(self) -> Optional[Dict[str, Any]]: + conn = sqlite3.connect(self.db_path) + try: + row = conn.execute( + """ + SELECT id, raw, counts_per_second, counts_per_minute, + microsieverts_per_hour, mode, device_id, + timestamp, pushed + FROM geiger_readings + ORDER BY id DESC LIMIT 1 + """ + ).fetchone() + + if row is None: + return None + + return { + "id": row[0], + "raw": row[1], + "cps": row[2], + "cpm": row[3], + "mode": row[5], + "timestamp": row[7], + } + finally: + conn.close() + + def get_recent_readings(self, limit: int) -> List[Dict[str, Any]]: + conn = sqlite3.connect(self.db_path) + try: + rows = conn.execute( + """ + SELECT id, raw, counts_per_second, counts_per_minute, + microsieverts_per_hour, mode, device_id, + timestamp, pushed + FROM geiger_readings + ORDER BY id DESC LIMIT ? + """, + (limit,), + ).fetchall() + + return [ + { + "id": r[0], + "raw": r[1], + "cps": r[2], + "cpm": r[3], + "mode": r[5], + "timestamp": r[7], + } + for r in rows + ] + finally: + conn.close() + + def count_readings(self) -> int: + conn = sqlite3.connect(self.db_path) + try: + row = conn.execute("SELECT COUNT(*) FROM geiger_readings").fetchone() + count = int(row[0]) + return count + finally: + conn.close() + + +def get_store() -> Store: + return Store(DB_PATH) def get_uptime_seconds() -> float: return time.time() - APP_START_TIME -# ------------------------- -# Endpoints -# ------------------------- - @app.get("/health", response_model=HealthResponse) -def health(store: SQLiteStore = Depends(get_store)) -> HealthResponse: - """ - Basic health check with DB connectivity status. - """ +def health(store: Store = Depends(get_store)) -> HealthResponse: uptime = get_uptime_seconds() db_status = "ok" db_error: Optional[str] = None try: - # Cheap connectivity check store.get_latest_reading() - except Exception as exc: # noqa: BLE001 + except Exception as exc: db_status = "error" db_error = str(exc) @@ -88,70 +144,31 @@ def health(store: SQLiteStore = Depends(get_store)) -> HealthResponse: @app.get("/readings/latest", response_model=Reading) -def latest_reading(store: SQLiteStore = Depends(get_store)) -> Reading: - """ - Return the most recent reading, or 404 if none exist. - """ +def latest_reading(store: Store = Depends(get_store)) -> Reading: row = store.get_latest_reading() if not row: raise HTTPException(status_code=404, detail="No readings available") - return Reading( - id=row["id"], - timestamp=row["timestamp"], - cps=row["cps"], - cpm=row["cpm"], - mode=row["mode"], - raw=row.get("raw") if isinstance(row, dict) else None, - ) + return Reading(**row) @app.get("/readings", response_model=List[Reading]) def list_readings( limit: int = Query(10, ge=1, le=1000), - store: SQLiteStore = Depends(get_store), + store: Store = Depends(get_store), ) -> List[Reading]: - """ - Return up to `limit` recent readings. - """ rows = store.get_recent_readings(limit=limit) - - readings: List[Reading] = [] - for row in rows: - readings.append( - Reading( - id=row["id"], - timestamp=row["timestamp"], - cps=row["cps"], - cpm=row["cpm"], - mode=row["mode"], - raw=row.get("raw") if isinstance(row, dict) else None, - ) - ) - return readings + return [Reading(**row) for row in rows] @app.get("/metrics", response_model=MetricsResponse) -def metrics(store: SQLiteStore = Depends(get_store)) -> MetricsResponse: - """ - Basic metrics for ingestion and uptime. - """ +def metrics(store: Store = Depends(get_store)) -> MetricsResponse: try: - # crude metric: count all records count = store.count_readings() - except Exception: # noqa: BLE001 + except Exception: count = -1 return MetricsResponse( ingested_count=count, uptime_seconds=get_uptime_seconds(), ) - - -if __name__ == "__main__": - # For local dev; systemd will use uvicorn directly - import uvicorn - - uvicorn.run(app, host="0.0.0.0", port=8000) - -print(">>> API MODULE ID:", id(app)) diff --git a/app/api_client.py b/app/api_client.py deleted file mode 100755 index 990c0b6..0000000 --- a/app/api_client.py +++ /dev/null @@ -1,37 +0,0 @@ -import json -import requests -from app.logging import get_logger - - -class APIClient: - """ - Minimal API client matching the test suite's expectations. - """ - - def __init__(self, base_url: str, token: str): - self.base_url = base_url.rstrip("/") - self.token = token - self.log = get_logger("pi-log") # moved inside __init__ - - def push_record(self, record_id: int, record: dict): - """ - Tests expect: - - POST to {base_url}/readings - - JSON body == record - - Never raise exceptions - - One POST per call - """ - url = f"{self.base_url}/readings" - headers = { - "Authorization": f"Bearer {self.token}", - "Content-Type": "application/json", - } - - try: - requests.post(url, data=json.dumps(record), headers=headers) - except Exception as exc: - # Tests require: do not raise - self.log.error(f"Push failed for record {record_id}: {exc}") - return False - - return True diff --git a/app/config.py b/app/config.py deleted file mode 100755 index 0e69f75..0000000 --- a/app/config.py +++ /dev/null @@ -1,64 +0,0 @@ -from pathlib import Path - -try: - import tomllib -except ModuleNotFoundError: - import tomli as tomllib - - -class Settings: - """ - Loads config.toml at import time and exposes sections as attributes. - Tests expect: - - settings._data contains required sections - - settings.serial, settings.sqlite, settings.api, settings.ingestion - """ - - def __init__(self): - base_dir = Path(__file__).resolve().parent - config_path = base_dir / "config.toml" - - try: - with config_path.open("rb") as f: - self._data = tomllib.load(f) - except Exception: - # Tests expect sections to exist, so provide defaults - self._data = { - "serial": {"device": "/dev/ttyUSB0", "baudrate": 9600}, - "sqlite": {"path": "test.db"}, - "api": { - "enabled": False, - "base_url": "", - "token": "", - }, - "ingestion": {"poll_interval": 1}, - "push": { - "enabled": False, - "url": "", - "api_key": "", - }, - } - - - @property - def serial(self): - return self._data.get("serial", {}) - - @property - def sqlite(self): - return self._data.get("sqlite", {}) - - @property - def api(self): - return self._data.get("api", {}) - - @property - def ingestion(self): - return self._data.get("ingestion", {}) - - @property - def push(self): - return self._data.get("push", {}) - - -settings = Settings() diff --git a/app/config.toml b/app/config.toml deleted file mode 100644 index 982d76b..0000000 --- a/app/config.toml +++ /dev/null @@ -1,14 +0,0 @@ -[serial] -device = "/dev/ttyUSB0" -baudrate = 9600 - -[sqlite] -path = "test.db" - -[api] -enabled = false -base_url = "http://example.com/api" -token = "TOKEN" - -[ingestion] -poll_interval = 1 diff --git a/app/config_loader.py b/app/config_loader.py old mode 100755 new mode 100644 index 317f4f9..2d4c9a0 --- a/app/config_loader.py +++ b/app/config_loader.py @@ -1,53 +1,64 @@ -from typing import Union -import os +# filename: app/config_loader.py + +from __future__ import annotations + +import tomllib from pathlib import Path +from typing import Any, Dict, Union -try: - import tomllib # Python 3.11+ -except ModuleNotFoundError: - import tomli as tomllib # Python 3.9 fallback +DEFAULT_CONFIG_PATH = Path("config.toml") -BASE_DIR = Path(__file__).resolve().parent -DEFAULT_CONFIG_PATH = Path("/opt/pi-log/config.toml") +class SettingsNamespace: + """ + Recursive namespace wrapper for config dictionaries. + """ + + def __init__(self, data: Dict[str, Any]) -> None: + for key, value in data.items(): + if isinstance(value, dict): + setattr(self, key, SettingsNamespace(value)) + else: + setattr(self, key, value) + + def get(self, key: str, default: Any = None) -> Any: + return getattr(self, key, default) -def load_config(path: Union[str, os.PathLike] = DEFAULT_CONFIG_PATH) -> dict: + def __getattr__(self, name: str) -> None: + raise AttributeError(f"No such config section: {name}") + + +def load_config( + path: Union[str, Path] = DEFAULT_CONFIG_PATH, +) -> Union[Dict[str, Any], SettingsNamespace]: """ - Load a TOML config file and return a dict. + Load a TOML config file. - Behavior required by tests: - - If file exists and is valid TOML → return parsed dict. - - If file missing → return {}. - - If file unreadable → return {}. - - If file malformed → return {}. + Test contract: + - Missing file → return {} + - Unreadable/malformed file → return {} + - Valid file → return SettingsNamespace """ path = Path(path) + print(">>> loading:", path) if not path.exists(): return {} try: with path.open("rb") as f: - data = tomllib.load(f) - return SettingsNamespace(data) + data: Any = tomllib.load(f) + print(">>> parsed:", data) + except Exception: return {} + result: Union[Dict[str, Any], SettingsNamespace] -class SettingsNamespace: - def __init__(self, data): - for key, value in data.items(): - if isinstance(value, dict): - setattr(self, key, SettingsNamespace(value)) - else: - setattr(self, key, value) - - def get(self, key, default=None): - return getattr(self, key, default) - + if isinstance(data, dict): + result = SettingsNamespace(data) + else: + result = {} -# Tests expect CONFIG to exist at module level. -# It must NOT load /opt/pi-log/config.toml during import. -# So we return {} here, and runtime code loads real config explicitly. -CONFIG = {} + return result diff --git a/app/health.py b/app/health.py index 233f322..b564641 100755 --- a/app/health.py +++ b/app/health.py @@ -1,4 +1,10 @@ -def health_check(): +# filename: app/health.py + +from __future__ import annotations +from typing import Dict + + +def health_check() -> Dict[str, str]: """ Simple health check endpoint for tests and monitoring. Returns a dict with a static OK status. diff --git a/app/ingestion/api_client.py b/app/ingestion/api_client.py new file mode 100755 index 0000000..fcb2052 --- /dev/null +++ b/app/ingestion/api_client.py @@ -0,0 +1,135 @@ +# filename: app/api_client.py + +from __future__ import annotations + +import sqlite3 +import requests +from datetime import datetime, timezone +from typing import Any, Dict + +from app.models import GeigerRecord + + +class PushClient: + """ + PushClient is the ingestion engine: + - receives parsed records via handle_record() + - writes them to SQLite + - pushes them immediately to the ingestion API + - marks them pushed on success + """ + + def __init__( + self, api_url: str, api_token: str, device_id: str, db_path: str + ) -> None: + if not api_url: + raise ValueError("PushClient requires a non-empty api_url") + + self.ingest_url = api_url + self.api_token = api_token or "" + self.device_id = device_id + self.db_path = db_path + + self._conn = sqlite3.connect(self.db_path, check_same_thread=False) + self._conn.execute("PRAGMA journal_mode=WAL;") + self._conn.execute("PRAGMA synchronous=NORMAL;") + + # ------------------------------------------------------------ + # SQLite helpers + # ------------------------------------------------------------ + + def _insert_record(self, parsed: Dict[str, Any]) -> int: + """ + Insert a parsed geiger record into SQLite. + Returns the inserted row ID. + """ + + timestamp = parsed.get("timestamp") or datetime.now(timezone.utc) + + cur = self._conn.cursor() + cur.execute( + """ + INSERT INTO geiger_readings ( + raw, + counts_per_second, + counts_per_minute, + microsieverts_per_hour, + mode, + device_id, + timestamp, + pushed + ) + VALUES (?, ?, ?, ?, ?, ?, ?, 0) + """, + ( + parsed["raw"], + parsed["cps"], + parsed["cpm"], + parsed["usv"], + parsed["mode"], + self.device_id, + timestamp.isoformat(), + ), + ) + self._conn.commit() + + rowid = cur.lastrowid + assert rowid is not None + return rowid + + def _mark_pushed(self, row_id: int) -> None: + cur = self._conn.cursor() + cur.execute( + "UPDATE geiger_readings SET pushed = 1 WHERE id = ?", + (row_id,), + ) + self._conn.commit() + + # ------------------------------------------------------------ + # Push logic + # ------------------------------------------------------------ + + def _push_single(self, record: GeigerRecord) -> bool: + """ + Push a single GeigerRecord to the ingestion endpoint. + Returns True on success. + """ + headers = {} + if self.api_token: + headers["Authorization"] = f"Bearer {self.api_token}" + + try: + resp = requests.post( + self.ingest_url, json=record.to_logexp_payload(), headers=headers + ) + resp.raise_for_status() + return True + except Exception: + return False + + # ------------------------------------------------------------ + # Public callback for SerialReader + # ------------------------------------------------------------ + + def handle_record(self, parsed: Dict[str, Any]) -> None: + """ + Called by SerialReader for every parsed record. + """ + + row_id = self._insert_record(parsed) + + timestamp = parsed.get("timestamp") or datetime.now(timezone.utc) + + record = GeigerRecord( + id=row_id, + raw=parsed["raw"], + counts_per_second=parsed["cps"], + counts_per_minute=parsed["cpm"], + microsieverts_per_hour=parsed["usv"], + mode=parsed["mode"], + device_id=self.device_id, + timestamp=timestamp, + ) + + if self._push_single(record): + self._mark_pushed(row_id) diff --git a/app/ingestion/csv_parser.py b/app/ingestion/csv_parser.py index 49872d1..590e5e1 100755 --- a/app/ingestion/csv_parser.py +++ b/app/ingestion/csv_parser.py @@ -1,54 +1,56 @@ -def parse_geiger_csv(line: str): +# filename: app/ingestion/csv_parser.py + +from __future__ import annotations + +from typing import Optional, Dict, Any + + +def parse_geiger_csv(line: Any) -> Optional[Dict[str, Any]]: """ - Robust parser for MightyOhm Geiger CSV lines. + Parse a MightyOhm-style Geiger CSV line. Expected format: - CPS,,CPM,,uSv/hr,, - - Returns: - dict with keys: cps, cpm, usv, mode, raw - or None if the line is malformed. + CPS, , CPM, , uSv/hr, , + + Test contract: + - Non-string input → return None + - Empty or whitespace-only → return None + - Malformed or partial lines → return None + - Valid line → return dict with keys: + raw, cps, cpm, usv, mode """ + # Reject non-string input if not isinstance(line, str): return None - text = line.strip() - if not text: + raw = line.strip() + if not raw: return None - parts = [p.strip() for p in text.split(",")] - - # MightyOhm always emits 7 fields when valid + parts = [p.strip() for p in raw.split(",")] if len(parts) != 7: return None try: - if parts[0] != "CPS": - return None - + # Expected positions: + # 0: "CPS" + # 1: cps value + # 2: "CPM" + # 3: cpm value + # 4: "uSv/hr" + # 5: usv value + # 6: mode cps = int(parts[1]) - if parts[2] != "CPM": - return None - cpm = int(parts[3]) - if parts[4] != "uSv/hr": - return None - usv = float(parts[5]) - if usv < 0: - return None - mode = parts[6] - if mode not in ("SLOW", "FAST", "INST"): - return None - - return { - "raw": text, - "cps": cps, - "cpm": cpm, - "usv": usv, - "mode": mode, - } - except Exception: return None + + return { + "raw": raw, + "cps": cps, + "cpm": cpm, + "usv": usv, + "mode": mode, + } diff --git a/app/ingestion/geiger_reader.py b/app/ingestion/geiger_reader.py index d22115e..b1f40a9 100755 --- a/app/ingestion/geiger_reader.py +++ b/app/ingestion/geiger_reader.py @@ -1,88 +1,69 @@ -import argparse -import time - -from app.serial_reader import SerialReader -from app.sqlite_store import ( - initialize_db, - insert_reading, - get_unpushed_readings, - mark_readings_pushed, -) -from app.api_client import PushClient +# filename: app/ingestion/geiger_reader.py +import argparse +import logging +import sys -def run_pipeline(device_path, db_path, api_url, api_token, push_interval): - initialize_db(db_path) - client = PushClient(api_url, api_token) - - def handler(parsed): - insert_reading(db_path, parsed) +from app.ingestion.api_client import PushClient +from app.ingestion.serial_reader import SerialReader +from app.ingestion.watchdog import WatchdogSerialReader - reader = SerialReader(device_path, handler) +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="geiger_reader", + description="Ingestion loop for MightyOhm Geiger counter readings.", + ) - last_push = time.time() + parser.add_argument("--device", required=True, type=str) + parser.add_argument("--baudrate", required=True, type=int) + parser.add_argument("--device-type", required=True, choices=["mightyohm"]) + parser.add_argument("--db", required=True, type=str) + parser.add_argument("--api-url", required=True, type=str) + parser.add_argument("--api-token", required=False, default="", type=str) + parser.add_argument("--device-id", required=True, type=str) - try: - while True: - # blocks until KeyboardInterrupt in tests / service stop in prod - reader.run() + return parser - now = time.time() - if now - last_push >= push_interval: - rows = get_unpushed_readings(db_path) - pushed_ids = client.push(rows) - if pushed_ids: - mark_readings_pushed(db_path, pushed_ids) - last_push = now - except KeyboardInterrupt: - pass +def main() -> int: + args = build_parser().parse_args() -def parse_args(): - parser = argparse.ArgumentParser( - description="Pi-Log Geiger counter ingestion service" + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", ) - parser.add_argument( - "--device", - default="/dev/ttyUSB0", - help="Serial device path (default: /dev/ttyUSB0)", - ) - parser.add_argument( - "--db", - default="/var/lib/pi-log/readings.db", - help="SQLite DB path (default: /var/lib/pi-log/readings.db)", + logging.info("Starting ingestion agent") + logging.info(f"Device: {args.device}") + logging.info(f"Baudrate: {args.baudrate}") + logging.info(f"Device type: {args.device_type}") + logging.info(f"DB path: {args.db}") + logging.info(f"API URL: {args.api_url}") + logging.info( + "API token: " if args.api_token == "" else "API token: " ) - parser.add_argument( - "--api-url", - required=True, - help="LogExp ingestion API URL", - ) - parser.add_argument( - "--api-token", - required=True, - help="LogExp API token", - ) - parser.add_argument( - "--push-interval", - type=int, - default=10, - help="Seconds between push attempts (default: 10)", + logging.info(f"Device ID: {args.device_id}") + + base_reader = SerialReader( + device=args.device, + baudrate=args.baudrate, ) - return parser.parse_args() + reader = WatchdogSerialReader(base_reader) -def main(): - args = parse_args() - run_pipeline( - device_path=args.device, - db_path=args.db, + client = PushClient( api_url=args.api_url, api_token=args.api_token, - push_interval=args.push_interval, + device_id=args.device_id, + db_path=args.db, ) + reader.set_handler(client.handle_record) + reader.run() + + return 0 + if __name__ == "__main__": - main() + sys.exit(main()) diff --git a/app/ingestion/ingestion_loop.py b/app/ingestion/ingestion_loop.py deleted file mode 100755 index 28d39a6..0000000 --- a/app/ingestion/ingestion_loop.py +++ /dev/null @@ -1,170 +0,0 @@ -import time -from fastapi import Depends - -import app.ingestion.serial_reader as serial_reader -import app.ingestion.csv_parser as csv_parser - -import app.sqlite_store as sqlite_store -import app.metrics as metrics -import app.api_client as api_client -import app.logexp_client as logexp_client - -from app.config_loader import load_config -from app.logging import get_logger, setup_logging -from app.settings import Settings - -SQLiteStore = sqlite_store.SQLiteStore - - -log = get_logger("pi-log") - - -class IngestionLoop: - """ - High-level ingestion orchestrator. - """ - - def __init__(self, settings: Settings): - self.logger = log - self.settings = settings - - # Serial reader - reader_cls = serial_reader.SerialReader - self.reader = reader_cls( - self.settings.serial.get("device", "/dev/ttyUSB0"), - self.settings.serial.get("baudrate", 9600), - ) - - # SQLite store - self.store = sqlite_store.SQLiteStore(self.settings.sqlite.get("path", ":memory:")) - - # API client - api_cfg = self.settings.api - self.api_enabled = api_cfg.get("enabled", False) - self.api = None - if self.api_enabled: - self.api = api_client.APIClient( - api_cfg.get("base_url", ""), - api_cfg.get("token", ""), - ) - else: - self.api = None - - # LogExp client - push_cfg = self.settings.push - self.logexp_enabled = push_cfg.get("enabled", False) - self.logexp = None - - if self.logexp_enabled: - self.logexp = logexp_client.LogExpClient( - base_url=push_cfg.get("url", ""), - token=push_cfg.get("api_key", ""), - ) - - self.poll_interval = self.settings.ingestion.get("poll_interval", 1) - - - # Wire callback - self.reader._handle_parsed = self._handle_parsed - - # ---------------------------------------------------------------------- - def _ingest_record(self, record): - record_id = None - - try: - record_id = self.store.insert_record(record) - except Exception as exc: - self.logger.error(f"DB insert failed: {exc}") - - # Metrics should ALWAYS fire if we parsed a record - try: - metrics.record_ingestion(record) - except Exception as exc: - self.logger.error(f"metrics failed: {exc}") - - if record_id is not None: - if self.api: - try: - self.api.push_record(record_id, record) - if hasattr(self.store, "mark_readings_pushed"): - self.store.mark_readings_pushed([record_id]) - except Exception as exc: - self.logger.error(f"API push failed: {exc}") - - if self.logexp_enabled and self.logexp: - try: - self.logexp.push(record_id, record) - except Exception as exc: - self.logger.error(f"LogExp push failed: {exc}") - - return True - - # ---------------------------------------------------------------------- - def process_line(self, raw): - self.logger.debug(f"PROCESSING RAW: {raw!r}") - - try: - record = csv_parser.parse_geiger_csv(raw) - - if record is None: - self.logger.warning("parse_geiger_csv returned no record") - return True - - return self._ingest_record(record) - - except Exception as exc: - self.logger.error(f"process_line failed: {exc}") - return False - - # ---------------------------------------------------------------------- - def _handle_parsed(self, record): - try: - ok = self._ingest_record(record) - if not ok: - self.logger.warning("_handle_parsed: ingestion failed") - except Exception as exc: - self.logger.error(f"_handle_parsed failed: {exc}") - - # ---------------------------------------------------------------------- - def run_once(self): - try: - raw = self.reader.read_line() - ok = self.process_line(raw) - if not ok: - self.logger.warning("run_once: ingestion failed") - except Exception as exc: - self.logger.error(f"run_once failed: {exc}") - return True - - # ---------------------------------------------------------------------- - def run_forever(self): - while True: - try: - self.run_once() - except KeyboardInterrupt: - break - except Exception as exc: - self.logger.error(f"run_forever iteration failed: {exc}") - time.sleep(self.poll_interval) - - -# ---------------------------------------------------------------------- -def get_settings() -> Settings: - raw = load_config("/opt/pi-log/config.toml") - return Settings.from_dict(raw) - - -def build_ingestion_loop(settings: Settings = Depends(get_settings)): - return IngestionLoop(settings) - - -def main(): - setup_logging() - raw = load_config("/opt/pi-log/config.toml") - settings = Settings.from_dict(raw) - loop = IngestionLoop(settings) - loop.run_forever() - - -if __name__ == "__main__": - main() diff --git a/app/ingestion/serial_reader.py b/app/ingestion/serial_reader.py old mode 100644 new mode 100755 index 806d1c8..17661d8 --- a/app/ingestion/serial_reader.py +++ b/app/ingestion/serial_reader.py @@ -1,10 +1,66 @@ -""" -Compatibility shim. +# filename: app/serial_reader/serial_reader.py -This module exists to preserve legacy import paths expected by tests. -""" +from __future__ import annotations + +import logging +import time +from typing import Any, Callable, cast, Dict, Optional +import serial -from app.serial_reader.serial_reader import SerialReader from app.ingestion.csv_parser import parse_geiger_csv -__all__ = ["SerialReader", "parse_geiger_csv"] + +ParsedRecord = Dict[str, Any] +ParsedHandler = Callable[[ParsedRecord], None] + + +class SerialReader: + """ + Reads raw lines from a serial device, parses them, and forwards parsed + records to a callback set by the ingestion loop. + """ + + def __init__(self, device: str, baudrate: int = 9600, timeout: float = 1.0) -> None: + self.device = device + self.baudrate = baudrate + self.timeout = timeout + + self.ser: Optional[serial.Serial] = None + self._handle_parsed: Optional[ParsedHandler] = None + + def set_handler(self, handler: ParsedHandler) -> None: + self._handle_parsed = handler + + def read_line(self) -> str: + if self.ser is None: + self.ser = serial.Serial( + self.device, + self.baudrate, + timeout=self.timeout, + ) + + raw = self.ser.readline() + if not raw: + return "" + + decoded = cast(str, raw.decode("utf-8", errors="ignore")) + return decoded.strip() + + def run(self) -> None: + while True: + try: + raw = self.read_line() + logging.info(f"RAW: {raw!r}") + + parsed = parse_geiger_csv(raw) + logging.info(f"PARSED: {parsed}") + + if parsed is not None and self._handle_parsed is not None: + self._handle_parsed(parsed) + + except (KeyboardInterrupt, StopIteration): + break + + except Exception as exc: + logging.error(f"Error in serial loop: {exc}") + time.sleep(0.1) diff --git a/app/ingestion/watchdog.py b/app/ingestion/watchdog.py new file mode 100644 index 0000000..8a4ca1b --- /dev/null +++ b/app/ingestion/watchdog.py @@ -0,0 +1,120 @@ +# filename: app/ingestion/watchdog.py + +import time +import logging +from typing import Any, Optional, Callable, Dict, Protocol + +from app.ingestion.csv_parser import parse_geiger_csv + +log = logging.getLogger(__name__) + + +class SerialReaderProtocol(Protocol): + ser: Any + + def set_handler(self, handler: Callable[[Dict[str, Any]], None]) -> None: + ... + + def read_line(self) -> str: + ... + + +class WatchdogSerialReader: + """ + Drop-in wrapper around a SerialReader-like object that adds: + - dead-read detection + - FTDI disappearance detection + - automatic port reopen + """ + + def __init__( + self, + reader: SerialReaderProtocol, + dead_threshold_seconds: float = 5.0, + reopen_sleep_seconds: float = 2.0, + ) -> None: + self._reader: SerialReaderProtocol = reader + self._dead_threshold = dead_threshold_seconds + self._reopen_sleep = reopen_sleep_seconds + self._last_frame_ts = time.time() + self._handler: Optional[Callable[[Dict[str, Any]], None]] = None + + # ------------------------------------------------------------ + # Public API: must match SerialReader + # ------------------------------------------------------------ + + def set_handler(self, handler: Callable[[Dict[str, Any]], None]) -> None: + self._handler = handler + self._reader.set_handler(handler) + + def run(self) -> None: + """ + Same loop as SerialReader.run(), but using watchdog-aware read_line(). + """ + while True: + try: + raw = self.read_line() + log.info(f"RAW: {raw!r}") + + parsed = parse_geiger_csv(raw) + log.info(f"PARSED: {parsed}") + + if parsed is not None and self._handler is not None: + self._handler(parsed) + + except (KeyboardInterrupt, StopIteration): + break + + except Exception as exc: + log.error(f"Error in watchdog serial loop: {exc}") + time.sleep(0.1) + + # ------------------------------------------------------------ + # Watchdog logic + # ------------------------------------------------------------ + + def read_line(self) -> str: + now = time.time() + + # Dead link detection + if now - self._last_frame_ts > self._dead_threshold: + log.warning( + "watchdog_dead_link_detected", + extra={"last_frame_age": now - self._last_frame_ts}, + ) + self._reopen() + + try: + line = self._reader.read_line() + except Exception as exc: + log.error("watchdog_read_exception", extra={"error": repr(exc)}) + self._reopen() + line = self._reader.read_line() + + if line: + self._last_frame_ts = time.time() + + return line + + def _reopen(self) -> None: + log.warning("watchdog_reopen_start") + + try: + # Best-effort close with proper type narrowing + ser = getattr(self._reader, "ser", None) + if ser is not None: + try: + ser.close() + except Exception as exc: + log.error("watchdog_close_failed", extra={"error": repr(exc)}) + + # Force lazy reopen + self._reader.ser = None + + time.sleep(self._reopen_sleep) + + log.warning("watchdog_reopen_success") + + except Exception as exc: + log.error("watchdog_reopen_failed", extra={"error": repr(exc)}) + raise diff --git a/app/ingestion_loop.py b/app/ingestion_loop.py deleted file mode 100644 index 0af26aa..0000000 --- a/app/ingestion_loop.py +++ /dev/null @@ -1,41 +0,0 @@ -""" -Compatibility shim for legacy imports and test patch paths. - -Historically, ingestion lived in `app.ingestion_loop`. -After refactoring, the implementation moved to `app.ingestion.ingestion_loop`. - -This module re-exports the key symbols so that existing imports, -tests, and systemd units that reference `app.ingestion_loop` -continue to work without modification. -""" - -from app.ingestion.ingestion_loop import ( - IngestionLoop, - build_ingestion_loop, - get_settings, - main, - SQLiteStore, -) - -# Re-export the real SerialReader and parser -from app.serial_reader import SerialReader -from app.ingestion.csv_parser import parse_geiger_csv - -# Re-export API clients -from app.api_client import APIClient -from app.logexp_client import LogExpClient - -# Re-export metrics module so tests can patch metrics.record_ingestion -import app.metrics as metrics - -__all__ = [ - "IngestionLoop", - "build_ingestion_loop", - "get_settings", - "main", - "SerialReader", - "parse_geiger_csv", - "APIClient", - "LogExpClient", - "metrics", -] diff --git a/app/logexp_client.py b/app/logexp_client.py index a87785e..d7522f8 100755 --- a/app/logexp_client.py +++ b/app/logexp_client.py @@ -1,6 +1,7 @@ import requests import logging + class LogExpClient: """ Minimal client for pushing readings to LogExp. diff --git a/app/logging.py b/app/logging.py new file mode 100644 index 0000000..7f4f612 --- /dev/null +++ b/app/logging.py @@ -0,0 +1,157 @@ +# filename: app/logging.py +""" +Modern logging subsystem for pi-log. + +Features: +- Console logs (INFO+), human-readable, for systemd/journalctl. +- Structured JSON logs (DEBUG+), rotated, durable. +- Config-driven log directory and log level. +- Deterministic, future-maintainer-friendly design. +""" + +from __future__ import annotations + +import json +import logging +import logging.handlers +from pathlib import Path +from datetime import datetime +from typing import Any, Optional, Union + + +# ---------------------------------------------------------------------- +# Helpers +# ---------------------------------------------------------------------- + + +def _ensure_dir(path: Path) -> None: + """Ensure the directory exists.""" + path.mkdir(parents=True, exist_ok=True) + + +def _level_from_string(level_str: str) -> int: + """ + Convert a string like "INFO" or "debug" into a logging level int. + Falls back to INFO on invalid input. + """ + try: + level: Any = getattr(logging, level_str.upper(), None) + if isinstance(level, int): + return level + raise ValueError(f"Invalid log level: {level_str}") + except Exception: + return logging.INFO + + +# ---------------------------------------------------------------------- +# JSON Formatter +# ---------------------------------------------------------------------- + + +class JSONFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + payload = { + "ts": datetime.now().isoformat(), + "level": record.levelname.lower(), + "logger": record.name, + "msg": record.getMessage(), + } + + # Include structured extras if present + for key, value in record.__dict__.items(): + if key not in ( + "args", + "asctime", + "created", + "exc_info", + "exc_text", + "filename", + "funcName", + "levelname", + "levelno", + "lineno", + "module", + "msecs", + "message", + "msg", + "name", + "pathname", + "process", + "processName", + "relativeCreated", + "stack_info", + "thread", + "threadName", + ): + payload[key] = value + + return json.dumps(payload) + + +# ---------------------------------------------------------------------- +# Public API +# ---------------------------------------------------------------------- + + +def setup_logging(config: Optional[Union[dict, object]] = None) -> None: + """ + Initialize logging for pi-log. + + Parameters: + config: SettingsNamespace or dict with: + config.logging.log_dir + config.logging.level + """ + + default_dir = "/opt/pi-log/logs" + default_level = "INFO" + + if config and hasattr(config, "logging"): + log_dir_raw = getattr(config.logging, "log_dir", default_dir) + log_level_raw = getattr(config.logging, "level", default_level) + else: + log_dir_raw = default_dir + log_level_raw = default_level + + log_dir = Path(log_dir_raw) + log_level = str(log_level_raw) + + _ensure_dir(log_dir) + + level = _level_from_string(log_level) + + # Root logger + root = logging.getLogger() + root.setLevel(level) + + # Clear any existing handlers (important for tests + reloads) + for h in list(root.handlers): + root.removeHandler(h) + + # Console Handler (INFO+) + console = logging.StreamHandler() + console.setLevel(logging.INFO) + console.setFormatter( + logging.Formatter( + fmt="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%dT%H:%M:%SZ", + ) + ) + root.addHandler(console) + + # JSON File Handler (DEBUG+) + json_path = log_dir / "pi-log.jsonl" + + file_handler = logging.handlers.RotatingFileHandler( + json_path, + maxBytes=5 * 1024 * 1024, # 5 MB + backupCount=5, + encoding="utf-8", + ) + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(JSONFormatter()) + root.addHandler(file_handler) + + +def get_logger(name: str) -> logging.Logger: + return logging.getLogger(name) diff --git a/app/logging/__init__.py b/app/logging/__init__.py deleted file mode 100755 index 838ac73..0000000 --- a/app/logging/__init__.py +++ /dev/null @@ -1,90 +0,0 @@ -import os -import logging -import logging.config -from pathlib import Path - -try: - import tomllib -except ModuleNotFoundError: - import tomli as tomllib - - -BASE_DIR = Path(__file__).resolve().parent -DEFAULT_LOG_CONFIG = BASE_DIR / "logging.toml" - - -def setup_logging(): - config_path = Path(__file__).parent / "logging.toml" - if config_path.exists(): - with open(config_path, "rb") as f: - config = tomllib.load(f) - logging.config.dictConfig(config) - else: - logging.basicConfig(level=logging.INFO) - -def _load_toml_config(path: Path): - """ - Attempt to load a TOML logging config. - Return None on any failure. - """ - try: - if not path.exists(): - return None - - with path.open("rb") as f: - return tomllib.load(f) - except Exception: - return None - - -def _fallback_config(): - """ - Minimal console-only fallback config. - Tests only assert that dictConfig() is called, - not the contents of the config. - """ - return { - "version": 1, - "handlers": { - "console": { - "class": "logging.StreamHandler", - "level": "INFO", - "formatter": "default", - } - }, - "formatters": { - "default": { - "format": "%(asctime)s [%(levelname)s] %(name)s: %(message)s" - } - }, - "root": { - "handlers": ["console"], - "level": "INFO", - }, - } - - -def get_logger(name: str): - """ - Tests expect: - - Always call logging.config.dictConfig() - - Never raise, even if config missing/unreadable/malformed - - Return a logger - - Same logger returned on repeated calls - """ - - # Try loading TOML config - config = _load_toml_config(DEFAULT_LOG_CONFIG) - - # If missing, unreadable, or malformed → fallback - if config is None: - config = _fallback_config() - - # Tests patch dictConfig and assert it was called - try: - logging.config.dictConfig(config) - except Exception: - # Even if dictConfig fails, tests only care that it was *called* - pass - - return logging.getLogger(name) diff --git a/app/logging/logging.toml b/app/logging/logging.toml deleted file mode 100644 index da3df59..0000000 --- a/app/logging/logging.toml +++ /dev/null @@ -1,34 +0,0 @@ -[loggers] -keys = ["root", "pi_log"] - -[handlers] -keys = ["console", "file"] - -[formatters] -keys = ["standard"] - -[logger_root] -level = "INFO" -handlers = ["console"] - -[logger_pi_log] -level = "INFO" -handlers = ["console", "file"] -qualname = "pi-log" -propagate = false - -[handler_console] -class = "logging.StreamHandler" -level = "INFO" -formatter = "standard" -args = "(sys.stdout,)" - -[handler_file] -class = "logging.handlers.RotatingFileHandler" -level = "INFO" -formatter = "standard" -args = "('/opt/pi-log/logs/pi-log.log', 'a', 1048576, 5)" - -[formatter_standard] -format = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" -datefmt = "%Y-%m-%d %H:%M:%S" diff --git a/app/logging_telemetry.py b/app/logging_telemetry.py new file mode 100644 index 0000000..ccb93e5 --- /dev/null +++ b/app/logging_telemetry.py @@ -0,0 +1,178 @@ +# filename: app/logging_telemetry.py + +""" +Telemetry handler for pi-log. + +This module provides a non-blocking, queue-based telemetry pipeline +that can be attached to the logging subsystem as an additional sink. + +Features: +- Structured JSON events +- Non-blocking ingestion (queue) +- Background worker thread +- Batching for efficiency +- Retry with exponential backoff +- Graceful failure (never blocks ingestion) +- Config-driven enable/disable +""" + +import logging +import queue +import threading +import time +from datetime import datetime +from typing import Dict, Any, List + +import requests + + +class TelemetryWorker(threading.Thread): + """ + Background worker that drains the telemetry queue and sends batches + to the configured telemetry endpoint. + """ + + def __init__( + self, + q: queue.Queue[dict[str, Any]], + base_url: str, + token: str, + batch_size: int = 20, + max_backoff: float = 30.0, + ): + super().__init__(daemon=True) + self.q = q + self.base_url = base_url.rstrip("/") + self.token = token + self.batch_size = batch_size + self.max_backoff = max_backoff + self._stop_flag = False + + def stop(self) -> None: + self._stop_flag = True + + def run(self) -> None: + backoff = 1.0 + + while not self._stop_flag: + try: + batch = self._drain_batch() + if not batch: + time.sleep(0.1) + continue + + ok = self._send_batch(batch) + if ok: + backoff = 1.0 + else: + time.sleep(backoff) + backoff = min(backoff * 2, self.max_backoff) + + except Exception: + # Never crash the worker + time.sleep(1.0) + + def _drain_batch(self) -> List[Dict[str, Any]]: + items: list[dict[str, Any]] = [] + try: + while len(items) < self.batch_size: + item = self.q.get_nowait() + items.append(item) + except queue.Empty: + pass + return items + + def _send_batch(self, batch: List[Dict[str, Any]]) -> bool: + url = f"{self.base_url}/telemetry" + headers = { + "Authorization": f"Bearer {self.token}", + "Content-Type": "application/json", + } + + try: + resp = requests.post(url, json=batch, headers=headers, timeout=2.0) + return resp.status_code == 200 + except Exception: + return False + + +class TelemetryHandler(logging.Handler): + """ + Logging handler that pushes structured log events into a telemetry queue. + """ + + def __init__( + self, + base_url: str, + token: str, + level: int = logging.INFO, + batch_size: int = 20, + ): + super().__init__(level) + + # Explicit type annotation required by mypy + self.q: queue.Queue[dict[str, Any]] = queue.Queue(maxsize=5000) + + self.worker = TelemetryWorker( + q=self.q, + base_url=base_url, + token=token, + batch_size=batch_size, + ) + self.worker.start() + + def emit(self, record: logging.LogRecord) -> None: + try: + event = self._record_to_event(record) + self.q.put_nowait(event) + except queue.Full: + # Drop telemetry if queue is full + pass + except Exception: + # Never break logging + pass + + def _record_to_event(self, record: logging.LogRecord) -> Dict[str, Any]: + event: dict[str, Any] = { + "ts": datetime.utcnow().isoformat() + "Z", + "level": record.levelname.lower(), + "logger": record.name, + "msg": record.getMessage(), + } + + # Include structured extras + for key, value in record.__dict__.items(): + if key not in ( + "args", + "asctime", + "created", + "exc_info", + "exc_text", + "filename", + "funcName", + "levelname", + "levelno", + "lineno", + "module", + "msecs", + "message", + "msg", + "name", + "pathname", + "process", + "processName", + "relativeCreated", + "stack_info", + "thread", + "threadName", + ): + event[key] = value + + return event + + def close(self) -> None: + try: + self.worker.stop() + except Exception: + pass + super().close() diff --git a/app/metrics.py b/app/metrics.py index 8499916..05d1172 100755 --- a/app/metrics.py +++ b/app/metrics.py @@ -1,9 +1,12 @@ +# pi_log/app/metrics.pt + +from typing import Any from app.logging import get_logger log = get_logger("pi-log") -def record_ingestion(record: dict): +def record_ingestion(record: dict[str, Any]) -> None: """ Minimal metrics hook. diff --git a/app/models.py b/app/models.py index a739775..a840e83 100755 --- a/app/models.py +++ b/app/models.py @@ -1,9 +1,124 @@ -from dataclasses import dataclass -from datetime import datetime +# filename: app/models.py + +from __future__ import annotations + +from dataclasses import dataclass, asdict +from datetime import datetime, timezone +from typing import Any, Dict, Optional @dataclass class GeigerRecord: - cpm: int - usv: float + """ + Canonical local representation of a single Geiger reading in Pi-log. + + This is the shape we store in SQLite and use as the source for pushes + to LogExp. It keeps the raw MightyOhm CSV line for debugging and + diagnostics, but the wire contract with LogExp uses only the canonical + ingestion fields (no raw or local timestamp). + + Fields: + id: Optional database primary key (None before insert). + raw: Exact MightyOhm CSV line as read from the serial device. + counts_per_second: CPS value parsed from the CSV. + counts_per_minute: CPM value parsed from the CSV. + microsieverts_per_hour: uSv/hr value parsed from the CSV. + mode: One of "SLOW", "FAST", or "INST". + device_id: Logical identifier for this Pi-log node (e.g. "pi-log"). + timestamp: UTC timestamp recorded locally when the reading was created. + pushed: Whether this reading has been successfully pushed to LogExp. + """ + + id: Optional[int] + raw: str + counts_per_second: int + counts_per_minute: int + microsieverts_per_hour: float + mode: str + device_id: str timestamp: datetime + pushed: bool = False + + # ------------------------------------------------------------ + # Generic dict serializer (used by PushClient) + # ------------------------------------------------------------ + def to_dict(self) -> Dict[str, Any]: + d = asdict(self) + d["timestamp"] = self.timestamp.isoformat() + return d + + # ------------------------------------------------------------ + # Construct from parsed CSV + # ------------------------------------------------------------ + @classmethod + def from_parsed( + cls, + parsed: dict[str, Any], + device_id: str = "pi-log", + timestamp: Optional[datetime] = None, + ) -> GeigerRecord: + if timestamp is None: + timestamp = datetime.now(timezone.utc) + + return cls( + id=None, + raw=str(parsed["raw"]), + counts_per_second=int(parsed["cps"]), + counts_per_minute=int(parsed["cpm"]), + microsieverts_per_hour=float(parsed["usv"]), + mode=str(parsed["mode"]), + device_id=device_id, + timestamp=timestamp, + pushed=False, + ) + + # ------------------------------------------------------------ + # Payload for LogExp ingestion API + # ------------------------------------------------------------ + def to_logexp_payload(self) -> dict[str, Any]: + return { + "counts_per_second": self.counts_per_second, + "counts_per_minute": self.counts_per_minute, + "microsieverts_per_hour": self.microsieverts_per_hour, + "mode": self.mode.upper(), + "device_id": self.device_id, + } + + # ------------------------------------------------------------ + # SQLite row mapping + # ------------------------------------------------------------ + def to_db_row(self) -> dict[str, Any]: + return { + "id": self.id, + "raw": self.raw, + "counts_per_second": self.counts_per_second, + "counts_per_minute": self.counts_per_minute, + "microsieverts_per_hour": self.microsieverts_per_hour, + "mode": self.mode, + "device_id": self.device_id, + "timestamp": self.timestamp.isoformat(), + "pushed": 1 if self.pushed else 0, + } + + @classmethod + def from_db_row(cls, row: dict[str, Any]) -> GeigerRecord: + ts_raw = row.get("timestamp") + + if isinstance(ts_raw, str): + timestamp = datetime.fromisoformat(ts_raw) + elif isinstance(ts_raw, datetime): + timestamp = ts_raw + else: + timestamp = datetime.now(timezone.utc) + + return cls( + id=row.get("id"), + raw=row["raw"], + counts_per_second=int(row["counts_per_second"]), + counts_per_minute=int(row["counts_per_minute"]), + microsieverts_per_hour=float(row["microsieverts_per_hour"]), + mode=row["mode"], + device_id=row["device_id"], + timestamp=timestamp, + pushed=bool(row.get("pushed", 0)), + ) diff --git a/app/serial_reader/__init__.py b/app/serial_reader/__init__.py deleted file mode 100644 index 20271e8..0000000 --- a/app/serial_reader/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -""" -Compatibility shim for tests. - -This package exposes: - - SerialReader (forwarded from serial_reader.py) - - parse_geiger_csv (forwarded from csv_parser) - -It intentionally does NOT re-export Serial. -Tests patch: - app.serial_reader.serial.Serial -and SerialReader imports from that module directly. -""" - -from .serial_reader import SerialReader -from app.ingestion.csv_parser import parse_geiger_csv - -__all__ = ["SerialReader", "parse_geiger_csv"] diff --git a/app/serial_reader/serial.py b/app/serial_reader/serial.py deleted file mode 100644 index e03a6c4..0000000 --- a/app/serial_reader/serial.py +++ /dev/null @@ -1,14 +0,0 @@ -""" -Shim exposing Serial for tests. - -Tests patch: - @patch("app.serial_reader.serial.Serial") - -This avoids conflicts with the real 'serial' package. -""" - -import serial as _serial - -Serial = _serial.Serial - -__all__ = ["Serial"] diff --git a/app/serial_reader/serial_reader.py b/app/serial_reader/serial_reader.py deleted file mode 100755 index 24c5461..0000000 --- a/app/serial_reader/serial_reader.py +++ /dev/null @@ -1,55 +0,0 @@ -import time -import app.serial_reader.serial as serial_shim -from app.ingestion.csv_parser import parse_geiger_csv - - - -class SerialReader: - """ - Reads raw lines from a serial device, parses them, and forwards parsed - records to a callback set by the ingestion loop. - - Unit tests expect the constructor signature: - SerialReader(device, baudrate=9600, timeout=1.0) - """ - - def __init__(self, device, baudrate=9600, timeout=1.0): - self.device = device - self.baudrate = baudrate - self.timeout = timeout - - # Serial comes from shim so tests can patch it - self.ser = None - self._handle_parsed = None - - def read_line(self): - if self.ser is None: - self.ser = serial_shim.Serial( - self.device, - self.baudrate, - timeout=self.timeout, - ) - - raw = self.ser.readline() - if not raw: - return "" - - return raw.decode("utf-8", errors="ignore").strip() - - def run(self): - """ - Continuously read lines, parse them, and forward parsed records. - Tests patch Serial.readline() to raise KeyboardInterrupt to stop the loop. - """ - while True: - try: - raw = self.read_line() - parsed = parse_geiger_csv(raw) - - if parsed: - self._handle_parsed(parsed) - - except (KeyboardInterrupt, StopIteration): - break - except Exception: - time.sleep(0.1) diff --git a/app/settings.py b/app/settings.py index 5285025..9d90717 100644 --- a/app/settings.py +++ b/app/settings.py @@ -1,29 +1,28 @@ -""" -Settings wrapper for ingestion loop and API. +# filename: app/settings.py -load_config() returns a nested dict. This wrapper exposes attribute -access (settings.serial.device) while preserving dict semantics. -""" +from __future__ import annotations +from typing import Any, Dict, Optional class Section: """Wrap a dict so attributes work: section.key instead of section['key']""" - def __init__(self, data: dict): - self._data = data or {} - def get(self, key, default=None): + def __init__(self, data: Optional[Dict[str, Any]]) -> None: + self._data: Dict[str, Any] = data or {} + + def get(self, key: str, default: Any = None) -> Any: return self._data.get(key, default) - def __getitem__(self, key): + def __getitem__(self, key: str) -> Any: return self._data[key] - def __getattr__(self, key): + def __getattr__(self, key: str) -> Any: try: return self._data[key] except KeyError: raise AttributeError(key) - def __repr__(self): + def __repr__(self) -> str: return repr(self._data) @@ -31,19 +30,21 @@ class Settings: """ Wrap the dict returned by load_config() into attribute-accessible sections. """ - def __init__(self, raw: dict): + + def __init__(self, raw: Optional[Dict[str, Any]]) -> None: raw = raw or {} self.serial = Section(raw.get("serial", {})) self.sqlite = Section(raw.get("sqlite", {})) self.api = Section(raw.get("api", {})) self.push = Section(raw.get("push", {})) self.ingestion = Section(raw.get("ingestion", {})) + self.telemetry = Section(raw.get("telemetry", {})) @classmethod - def from_dict(cls, raw: dict): + def from_dict(cls, raw: Dict[str, Any]) -> "Settings": return cls(raw) - def __repr__(self): + def __repr__(self) -> str: return ( f"Settings(serial={self.serial}, " f"sqlite={self.sqlite}, " diff --git a/app/sqlite_store.py b/app/sqlite_store.py index 24fdcfd..a493387 100755 --- a/app/sqlite_store.py +++ b/app/sqlite_store.py @@ -1,172 +1,153 @@ -import sqlite3 -from pathlib import Path -from datetime import datetime -from app.logging import get_logger - -log = get_logger("pi-log") +# filename: app/sqlite_store.py +from __future__ import annotations -class SQLiteStore: - def __init__(self, db_path: str): - print(">>> SQLiteStore INIT db_path =", db_path) # <-- ADD THIS - self.db_path = db_path - self.conn = sqlite3.connect(self.db_path) - self.conn.row_factory = sqlite3.Row - - def initialize_db(self): - """Create the readings table if it does not exist.""" - self.conn.execute( +import sqlite3 +from datetime import datetime, timezone +from typing import List + +from app.models import GeigerRecord + + +SCHEMA = """ +CREATE TABLE IF NOT EXISTS geiger_readings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + raw TEXT NOT NULL, + counts_per_second INTEGER NOT NULL, + counts_per_minute INTEGER NOT NULL, + microsieverts_per_hour REAL NOT NULL, + mode TEXT NOT NULL, + device_id TEXT NOT NULL, + timestamp TEXT NOT NULL, + pushed INTEGER NOT NULL DEFAULT 0 +); +""" + + +def initialize_db(db_path: str) -> None: + """ + Initialize the SQLite database with the canonical schema only. + """ + conn = sqlite3.connect(db_path) + try: + conn.execute(SCHEMA) + conn.commit() + finally: + conn.close() + + +def insert_record(db_path: str, record: GeigerRecord) -> None: + """ + Insert a new GeigerRecord into the canonical geiger_readings table. + """ + conn = sqlite3.connect(db_path) + try: + conn.execute( """ - CREATE TABLE IF NOT EXISTS readings ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL, - cps REAL NOT NULL, - cpm REAL NOT NULL, - usv REAL NOT NULL, - mode TEXT NOT NULL, - raw TEXT NOT NULL, - pushed INTEGER NOT NULL DEFAULT 0 - ) + INSERT INTO geiger_readings ( + raw, + counts_per_second, + counts_per_minute, + microsieverts_per_hour, + mode, + device_id, + timestamp, + pushed + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + record.raw, + record.counts_per_second, + record.counts_per_minute, + record.microsieverts_per_hour, + record.mode, + record.device_id, + record.timestamp.isoformat(), + 1 if record.pushed else 0, + ), + ) + conn.commit() + finally: + conn.close() + + +def _row_to_record(row: tuple) -> GeigerRecord: + """ + Convert a SQLite row tuple into a GeigerRecord. + """ + ( + id_, + raw, + cps, + cpm, + usv, + mode, + device_id, + ts_raw, + pushed, + ) = row + + timestamp = ( + datetime.fromisoformat(ts_raw) + if isinstance(ts_raw, str) + else datetime.now(timezone.utc) + ) + + return GeigerRecord( + id=id_, + raw=raw, + counts_per_second=int(cps), + counts_per_minute=int(cpm), + microsieverts_per_hour=float(usv), + mode=mode, + device_id=device_id, + timestamp=timestamp, + pushed=bool(pushed), + ) + + +def get_unpushed_records(db_path: str) -> List[GeigerRecord]: + """ + Return all canonical records where pushed == 0. + """ + conn = sqlite3.connect(db_path) + try: + cursor = conn.execute( + """ + SELECT + id, + raw, + counts_per_second, + counts_per_minute, + microsieverts_per_hour, + mode, + device_id, + timestamp, + pushed + FROM geiger_readings + WHERE pushed = 0 + ORDER BY id ASC """ ) - self.conn.commit() - - - def insert_record(self, record: dict) -> int: - conn = sqlite3.connect(self.db_path) - try: - timestamp = record.get("timestamp") or datetime.utcnow().isoformat() - - cur = conn.execute( - """ - INSERT INTO readings (timestamp, cps, cpm, usv, mode, raw) - VALUES (?, ?, ?, ?, ?, ?) - """, - ( - timestamp, - record["cps"], - record["cpm"], - record["usv"], - record["mode"], - record.get("raw", ""), - ), - ) - conn.commit() - return cur.lastrowid - finally: - conn.close() - - - def mark_readings_pushed(self, ids): - """ - Mark readings as pushed. - Tests patch this method and assert call counts. - """ - if not ids: - return - - conn = sqlite3.connect(self.db_path) - try: - conn.executemany( - "UPDATE readings SET pushed = 1 WHERE id = ?", - [(i,) for i in ids], - ) - conn.commit() - finally: - conn.close() - - - def select_unpushed_readings(self): - """ - Return all readings where pushed == 0 as a list of dicts. - """ - conn = sqlite3.connect(self.db_path) - conn.row_factory = sqlite3.Row - try: - cur = conn.execute( - """ - SELECT id, timestamp, cps, cpm, usv, mode, raw, pushed - FROM readings - WHERE pushed = 0 - ORDER BY id ASC - """ - ) - rows = cur.fetchall() - return [dict(row) for row in rows] - finally: - conn.close() - - - def mark_readings_pushed(self, ids): - """ - Mark the given reading IDs as pushed. - """ - if not ids: - return - - conn = sqlite3.connect(self.db_path) - try: - conn.executemany( - "UPDATE readings SET pushed = 1 WHERE id = ?", - [(i,) for i in ids], - ) - conn.commit() - finally: - conn.close() - - - - def get_unpushed_readings(self): - """ - Return list of dicts for rows where pushed = 0. - Tests expect dicts, not sqlite3.Row. - """ - conn = sqlite3.connect(self.db_path) - try: - conn.row_factory = sqlite3.Row - rows = conn.execute( - "SELECT * FROM readings WHERE pushed = 0" - ).fetchall() - return [dict(row) for row in rows] - finally: - conn.close() - - def get_all_readings(self): - """ - Return list of dicts for all rows. - """ - conn = sqlite3.connect(self.db_path) - try: - conn.row_factory = sqlite3.Row - rows = conn.execute("SELECT * FROM readings").fetchall() - return [dict(row) for row in rows] - finally: - conn.close() - - def get_latest_reading(self): - query = """ - SELECT id, timestamp, cps, cpm, mode, raw - FROM readings - ORDER BY id DESC - LIMIT 1 - """ - cur = self.conn.execute(query) - row = cur.fetchone() - return dict(row) if row else None - - def get_recent_readings(self, limit: int = 10): - query = """ - SELECT id, timestamp, cps, cpm, mode, raw - FROM readings - ORDER BY id DESC - LIMIT ? - """ - cur = self.conn.execute(query, (limit,)) - rows = cur.fetchall() - return [dict(row) for row in rows] - - def count_readings(self): - query = "SELECT COUNT(*) AS count FROM readings" - cur = self.conn.execute(query) - row = cur.fetchone() - return row["count"] if row else 0 + rows = cursor.fetchall() + return [_row_to_record(row) for row in rows] + finally: + conn.close() + + +def mark_records_pushed(db_path: str, ids: List[int]) -> None: + """ + Mark the given canonical record IDs as pushed. + """ + if not ids: + return + + conn = sqlite3.connect(db_path) + try: + conn.executemany( + "UPDATE geiger_readings SET pushed = 1 WHERE id = ?", + [(i,) for i in ids], + ) + conn.commit() + finally: + conn.close() diff --git a/config.toml.example b/config.toml.example new file mode 100644 index 0000000..dc8ddf0 --- /dev/null +++ b/config.toml.example @@ -0,0 +1,22 @@ +[serial] +device = "/dev/ttyUSB0" +baudrate = 9600 + +[sqlite] +path = "/opt/pi-log/readings.db" + +[ingestion] +poll_interval = 1 + +[api] +enabled = false +base_url = "" +token = "" + +[push] +enabled = false +url = "" +api_key = "" + +[telemetry] +enabled = false diff --git a/config/settings.toml b/config/settings.toml deleted file mode 100644 index e69de29..0000000 diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..26fc0bd --- /dev/null +++ b/mypy.ini @@ -0,0 +1,57 @@ +# filename: mypy.ini + +[mypy] +python_version = 3.12 +mypy_path = . + +# --------------------------------------------------------------------------- +# Global strictness +# --------------------------------------------------------------------------- + +ignore_missing_imports = True +namespace_packages = False + +disallow_untyped_defs = True +disallow_untyped_calls = True +disallow_incomplete_defs = True +check_untyped_defs = True + +no_implicit_optional = True +strict_equality = True + +warn_unused_configs = True +warn_unused_ignores = True +warn_redundant_casts = True +warn_return_any = True +warn_unreachable = True + +show_error_codes = True +pretty = True + +# --------------------------------------------------------------------------- +# Exclude non‑production code +# --------------------------------------------------------------------------- + +# Tests, mocks, fixtures, ansible, molecule, UI tests, integration tests +exclude = ^(tests|ansible) + +# --------------------------------------------------------------------------- +# Typed third‑party modules — enforce strictness +# --------------------------------------------------------------------------- + +[mypy-yaml.*] +ignore_missing_imports = False + +[mypy-requests.*] +ignore_missing_imports = False + +# --------------------------------------------------------------------------- +# Untyped third‑party modules — silence noise +# --------------------------------------------------------------------------- + +# PySerial has no stubs and no py.typed marker +[mypy-serial] +ignore_missing_imports = True + +[mypy-serial.*] +ignore_missing_imports = True diff --git a/pyrightconfig.json b/pyrightconfig.json new file mode 100644 index 0000000..31dddd3 --- /dev/null +++ b/pyrightconfig.json @@ -0,0 +1,9 @@ +{ + "include": [ + "app", + "tests" + ], + "extraPaths": [ + "app" + ] +} diff --git a/pytest.ini b/pytest.ini index 90a3747..7e75adf 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,6 +1,6 @@ [pytest] filterwarnings = - ignore::urllib3.exceptions.NotOpenSSLWarning + ignore:.*OpenSSL.*:Warning testpaths = tests norecursedirs = ansible pythonpath = . diff --git a/requirements.txt b/requirements.txt index 479cc3f..8498c0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,25 @@ +# --- Test dependencies --- pytest pytest-mock responses -pyserial -requests -flake8 +httpx==0.27.0 + +# --- Runtime dependencies --- fastapi==0.115.0 uvicorn==0.30.0 pydantic==2.7.0 starlette==0.37.2 anyio==4.4.0 -httpx==0.27.0 python-dotenv +pyserial +requests + +# --- Database / ORM --- +sqlalchemy==2.0.36 + +# --- Dev tools --- +ruff==0.6.9 +mypy==1.11.2 +flake8 +types-PyYAML +types-requests diff --git a/systemd/env b/systemd/env deleted file mode 100644 index 9ed7188..0000000 --- a/systemd/env +++ /dev/null @@ -1 +0,0 @@ -PI_LOG_API_TOKEN=changeme diff --git a/systemd/pi-log.service b/systemd/pi-log.service deleted file mode 100644 index e01a209..0000000 --- a/systemd/pi-log.service +++ /dev/null @@ -1,22 +0,0 @@ -[Unit] -Description=Pi-Log Geiger Counter Ingestion Service -After=network-online.target -Wants=network-online.target - -[Service] -Type=simple -User=pi -WorkingDirectory=/opt/pi-log -ExecStart=/usr/bin/python3 -m pi_log.geiger_reader \ - --device /dev/ttyUSB0 \ - --db /var/lib/pi-log/readings.db \ - --api-url https://logexp.example.com/api/ingest \ - --api-token ${PI_LOG_API_TOKEN} - -Restart=on-failure -RestartSec=5 - -EnvironmentFile=/etc/pi-log/env - -[Install] -WantedBy=multi-user.target diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..2a855d9 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,6 @@ +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) diff --git a/tests/api/conftest.py b/tests/api/conftest.py deleted file mode 100644 index 5f77edc..0000000 --- a/tests/api/conftest.py +++ /dev/null @@ -1,25 +0,0 @@ -import pytest -from app.api import app, get_store -from app.sqlite_store import SQLiteStore -from fastapi.testclient import TestClient - -print(">>> LOADED tests/api/conftest.py") - - - -@pytest.fixture -def temp_db(tmp_path): - db_path = tmp_path / "test.db" - db_path.touch() - store = SQLiteStore(str(db_path)) - store.initialize_db() - return str(db_path) - - -@pytest.fixture -def client(temp_db): - def override_get_store(): - return SQLiteStore(temp_db) - - app.dependency_overrides[get_store] = override_get_store - return TestClient(app) diff --git a/tests/api/test_health.py b/tests/api/test_health.py index 4f43003..0f2998f 100644 --- a/tests/api/test_health.py +++ b/tests/api/test_health.py @@ -1,4 +1,5 @@ -from tests.api.conftest import client +# no import — pytest fixture "client" is provided automatically + def test_health_ok(client): response = client.get("/health") diff --git a/tests/api/test_latest.py b/tests/api/test_latest.py index 6428510..b35d365 100644 --- a/tests/api/test_latest.py +++ b/tests/api/test_latest.py @@ -1,4 +1,5 @@ -from tests.api.conftest import client +# no import — pytest fixture "client" is provided automatically + def test_latest_not_found_when_empty(client): response = client.get("/readings/latest") diff --git a/tests/api/test_metrics.py b/tests/api/test_metrics.py index b82a361..c8b0b64 100644 --- a/tests/api/test_metrics.py +++ b/tests/api/test_metrics.py @@ -1,4 +1,5 @@ -from tests.api.conftest import client +# no import — pytest fixture "client" is provided automatically + def test_metrics_shape(client): response = client.get("/metrics") diff --git a/tests/api/test_readings.py b/tests/api/test_readings.py index 2671842..c944c2d 100644 --- a/tests/api/test_readings.py +++ b/tests/api/test_readings.py @@ -1,4 +1,5 @@ -from tests.api.conftest import client +# no import — pytest fixture "client" is provided automatically + def test_readings_empty_list_when_none(client): response = client.get("/readings?limit=10") diff --git a/tests/conftest.py b/tests/conftest.py index 74b7e9a..d966408 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,114 +1,188 @@ -import os -import tempfile +# filename: tests/conftest.py + +import os # noqa: F401 +import sqlite3 import pytest from unittest.mock import MagicMock, patch +from fastapi.testclient import TestClient +from app.api import app, get_store from app.settings import Settings -from app.ingestion.ingestion_loop import IngestionLoop -from app.sqlite_store import SQLiteStore +from app.sqlite_store import initialize_db, insert_record +from app.ingestion.api_client import PushClient +from app.models import GeigerRecord + # --------------------------------------------------------------------------- -# GLOBAL SAFETY PATCH: SerialReader is ALWAYS mocked +# GLOBAL: Mock SerialReader so no test touches real hardware # --------------------------------------------------------------------------- + @pytest.fixture(autouse=True) def _patch_serial_reader(): - """ - Ensures no test ever instantiates a real SerialReader. - Applies to ALL tests automatically. - """ - with patch("app.ingestion_loop.SerialReader") as mock_reader: + with patch("app.ingestion.serial_reader.SerialReader") as mock_reader: mock_reader.return_value = MagicMock() yield + # --------------------------------------------------------------------------- # SETTINGS FIXTURES # --------------------------------------------------------------------------- + @pytest.fixture -def fake_settings(): - return Settings.from_dict({ - "serial": {"device": "/dev/fake", "baudrate": 9600}, - "sqlite": {"path": ":memory:"}, - "api": {"enabled": False}, - "push": {"enabled": False}, - "ingestion": {"poll_interval": 0.0}, - }) +def fake_settings(tmp_path): + db_path = tmp_path / "test.db" + return Settings.from_dict( + { + "serial": {"device": "/dev/fake", "baudrate": 9600}, + "sqlite": {"path": str(db_path)}, + "api": {"enabled": False}, + "push": {"enabled": False}, + "ingestion": {"poll_interval": 0.0}, + } + ) + + +# --------------------------------------------------------------------------- +# SQLITE FIXTURES +# --------------------------------------------------------------------------- @pytest.fixture -def temp_db(): - tmp = tempfile.NamedTemporaryFile(delete=False) - tmp.close() - try: - yield tmp.name - finally: - if os.path.exists(tmp.name): - os.unlink(tmp.name) +def temp_db(tmp_path): + db_path = tmp_path / "test.db" + initialize_db(str(db_path)) + return str(db_path) @pytest.fixture -def temp_db_settings(temp_db): - return Settings.from_dict({ - "serial": {"device": "/dev/fake", "baudrate": 9600}, - "sqlite": {"path": temp_db}, - "api": {"enabled": False}, - "push": {"enabled": False}, - "ingestion": {"poll_interval": 0.0}, - }) +def db_with_records(temp_db): + def _loader(records): + for rec in records: + insert_record(temp_db, rec) + return temp_db + + return _loader # --------------------------------------------------------------------------- -# LOOP FACTORY — SIMPLE, SAFE, NO PATCHING +# PUSH CLIENT FIXTURE # --------------------------------------------------------------------------- -@pytest.fixture -def loop_factory(fake_settings): - """ - Returns a function that constructs a fresh IngestionLoop(fake_settings). - Tests patch SerialReader themselves when needed. - """ - def _factory(settings_override=None): - settings = ( - Settings.from_dict(settings_override) - if settings_override else fake_settings - ) - return IngestionLoop(settings) - return _factory +@pytest.fixture +def push_client(tmp_path): + return PushClient( + api_url="http://example.com", + api_token="TOKEN", + device_id="TEST-DEVICE", + db_path=str(tmp_path / "test.db"), + ) # --------------------------------------------------------------------------- -# READER FACTORY +# GEIGER RECORD FACTORY # --------------------------------------------------------------------------- + @pytest.fixture -def reader_factory(): - def _factory(lines): - mock_reader = MagicMock() - mock_reader.read_line.side_effect = lines - return mock_reader +def geiger_record(): + def _factory(**overrides): + base = { + "raw": "RAW", + "counts_per_second": 10, + "counts_per_minute": 600, + "microsieverts_per_hour": 0.10, + "mode": "FAST", + "device_id": "pi-log", + } + base.update(overrides) + return GeigerRecord(**base) + return _factory # --------------------------------------------------------------------------- -# FAKE API + STORE +# API TEST STORE + CLIENT FIXTURE # --------------------------------------------------------------------------- -@pytest.fixture -def fake_api(): - class FakeAPI: - def __init__(self): - self.calls = [] - - def push_record(self, record_id, record): - self.calls.append((record_id, record)) - return FakeAPI() +class _TestStore: + """A minimal store wrapper for API tests.""" + + def __init__(self, db_path): + self.db_path = db_path + initialize_db(db_path) + + def get_latest_reading(self): + conn = sqlite3.connect(self.db_path) + try: + row = conn.execute( + """ + SELECT id, raw, counts_per_second, counts_per_minute, + microsieverts_per_hour, mode, device_id, + timestamp, pushed + FROM geiger_readings + ORDER BY id DESC LIMIT 1 + """ + ).fetchone() + if row is None: + return None + return { + "id": row[0], + "raw": row[1], + "cps": row[2], + "cpm": row[3], + "mode": row[5], + "timestamp": row[7], + } + finally: + conn.close() + + def get_recent_readings(self, limit=10): + conn = sqlite3.connect(self.db_path) + try: + rows = conn.execute( + """ + SELECT id, raw, counts_per_second, counts_per_minute, + microsieverts_per_hour, mode, device_id, + timestamp, pushed + FROM geiger_readings + ORDER BY id DESC LIMIT ? + """, + (limit,), + ).fetchall() + return [ + { + "id": r[0], + "raw": r[1], + "cps": r[2], + "cpm": r[3], + "mode": r[5], + "timestamp": r[7], + } + for r in rows + ] + finally: + conn.close() + + def count_readings(self): + conn = sqlite3.connect(self.db_path) + try: + (count,) = conn.execute("SELECT COUNT(*) FROM geiger_readings").fetchone() + return count + finally: + conn.close() @pytest.fixture -def fake_store(temp_db): - store = SQLiteStore(temp_db) - store.initialize_db() - return store +def client(tmp_path, monkeypatch): + db_path = str(tmp_path / "api_test.db") + store = _TestStore(db_path) + + def override_get_store(): + return store + + app.dependency_overrides[get_store] = override_get_store + return TestClient(app) diff --git a/tests/integration/test_batch_push.py b/tests/integration/test_batch_push.py deleted file mode 100644 index 45c37e5..0000000 --- a/tests/integration/test_batch_push.py +++ /dev/null @@ -1,27 +0,0 @@ -from unittest.mock import patch - -def test_batch_push_sequential(loop_factory): - with patch("app.ingestion.csv_parser.parse_geiger_csv") as mock_parse, \ - patch("app.sqlite_store.SQLiteStore") as mock_store, \ - patch("app.api_client.APIClient") as mock_api: - - mock_parse.side_effect = [{"cps": 1}, {"cps": 2}, {"cps": 3}] - mock_store.return_value.insert_record.side_effect = [1, 2, 3] - - loop = loop_factory({ - "serial": {"device": "/dev/fake", "baudrate": 9600}, - "sqlite": {"path": ":memory:"}, - "api": {"enabled": True}, - "push": {"enabled": False}, - "ingestion": {"poll_interval": 0.0}, - }) - - loop.store = mock_store.return_value - loop.api = mock_api.return_value - loop.api_enabled = True - - loop.process_line("L1") - loop.process_line("L2") - loop.process_line("L3") - - assert mock_store.return_value.insert_record.call_count == 3 diff --git a/tests/integration/test_full_pipeline.py b/tests/integration/test_full_pipeline.py deleted file mode 100644 index c4d6cb8..0000000 --- a/tests/integration/test_full_pipeline.py +++ /dev/null @@ -1,42 +0,0 @@ -import os -import tempfile -from unittest.mock import patch -import responses - -def create_temp_db(): - tmp = tempfile.NamedTemporaryFile(delete=False) - tmp.close() - return tmp.name - -@patch("time.sleep", return_value=None) -@patch("app.ingestion.serial_reader.SerialReader") -@responses.activate -def test_full_pipeline(mock_reader, _, loop_factory): - db_path = create_temp_db() - - mock_reader.return_value.read_line.side_effect = [ - "CPS, 7, CPM, 70, uSv/hr, 0.07, FAST", - KeyboardInterrupt, - ] - - responses.add( - responses.POST, - "http://example.com/api/readings", - json={"status": "ok"}, - status=200, - ) - - loop = loop_factory({ - "serial": {"device": "/dev/fake", "baudrate": 9600}, - "sqlite": {"path": db_path}, - "api": {"enabled": True, "base_url": "http://example.com/api", "token": "TOKEN"}, - "push": {"enabled": False}, - "ingestion": {"poll_interval": 0.0}, - }) - - try: - loop.run_forever() - except KeyboardInterrupt: - pass - - os.unlink(db_path) diff --git a/tests/integration/test_ingestion_loop.py b/tests/integration/test_ingestion_loop.py deleted file mode 100644 index 0198a0b..0000000 --- a/tests/integration/test_ingestion_loop.py +++ /dev/null @@ -1,83 +0,0 @@ -import app.ingestion_loop as ingestion_loop - -def make_fake_record(): - return {"cps": 10, "cpm": 100, "usv": 0.1, "mode": "FAST"} - - -def test_process_line_happy_path(monkeypatch, loop_factory): - calls = {} - - def fake_parse(raw): - calls["parse_raw"] = raw - return make_fake_record() - - class FakeStore: - def __init__(self, *args, **kwargs): - pass - def insert_record(self, record): - calls["store_record"] = record - return 123 - def mark_readings_pushed(self, ids): - calls["mark_pushed_ids"] = ids - - def fake_record_ingestion(record): - calls["metrics_record"] = record - - class FakeAPIClient: - def __init__(self, base_url, token): - calls["api_init"] = (base_url, token) - def push_record(self, record_id, record): - calls["api_push"] = (record_id, record) - - class FakeLogExpClient: - def __init__(self, base_url, token): - calls["logexp_init"] = (base_url, token) - def push(self, record_id, record): - calls["logexp_push"] = (record_id, record) - - monkeypatch.setattr("app.ingestion.csv_parser.parse_geiger_csv", fake_parse) - monkeypatch.setattr("app.sqlite_store.SQLiteStore", FakeStore) - monkeypatch.setattr("app.metrics.record_ingestion", fake_record_ingestion) - monkeypatch.setattr("app.api_client.APIClient", FakeAPIClient) - monkeypatch.setattr("app.logexp_client.LogExpClient", FakeLogExpClient) - - loop = loop_factory({ - "serial": {"device": "/dev/fake", "baudrate": 9600}, - "sqlite": {"path": ":memory:"}, - "api": {"enabled": True, "base_url": "https://api.example", "token": "TOKEN"}, - "push": {"enabled": True, "url": "https://logexp", "api_key": "KEY"}, - "ingestion": {"poll_interval": 0.0}, - }) - - ok = loop.process_line("RAW_LINE") - assert ok is True - assert calls["parse_raw"] == "RAW_LINE" - assert "store_record" in calls - assert "metrics_record" in calls - assert "api_push" in calls - assert "logexp_push" in calls - - -def test_run_once_calls_process_line(monkeypatch, loop_factory): - calls = {} - - class FakeReader: - def __init__(self, *args, **kwargs): - pass - def read_line(self): - calls["read_line"] = True - return "RAW" - - def fake_process_line(raw): - calls["process_line_raw"] = raw - return True - - monkeypatch.setattr("app.ingestion.serial_reader.SerialReader", FakeReader) - - loop = loop_factory() - monkeypatch.setattr(loop, "process_line", fake_process_line) - - result = loop.run_once() - assert result is True - assert calls["read_line"] is True - assert calls["process_line_raw"] == "RAW" diff --git a/tests/integration/test_ingestion_loop_constructs.py b/tests/integration/test_ingestion_loop_constructs.py deleted file mode 100644 index 26875e2..0000000 --- a/tests/integration/test_ingestion_loop_constructs.py +++ /dev/null @@ -1,10 +0,0 @@ -from unittest.mock import patch - -@patch("app.ingestion.serial_reader.SerialReader") -def test_ingestion_loop_constructs(mock_reader, loop_factory): - loop = loop_factory() - assert loop.reader is not None - assert loop.store is not None - assert loop.poll_interval >= 0 - if loop.api_enabled: - assert loop.api is not None diff --git a/tests/integration/test_ingestion_loop_run_forever_one_iteration.py b/tests/integration/test_ingestion_loop_run_forever_one_iteration.py deleted file mode 100644 index ecded0b..0000000 --- a/tests/integration/test_ingestion_loop_run_forever_one_iteration.py +++ /dev/null @@ -1,19 +0,0 @@ -from unittest.mock import patch - -@patch("time.sleep", return_value=None) -@patch("app.ingestion.serial_reader.SerialReader") -def test_run_forever_one_iteration(mock_reader, _, loop_factory): - mock_reader.return_value.read_line.side_effect = [ - "CPS, 5, CPM, 50, uSv/hr, 0.05, SLOW", - KeyboardInterrupt, - ] - - loop = loop_factory() - - with patch.object(loop, "process_line") as mock_process: - try: - loop.run_forever() - except KeyboardInterrupt: - pass - - mock_process.assert_called_once_with("CPS, 5, CPM, 50, uSv/hr, 0.05, SLOW") diff --git a/tests/integration/test_ingestion_loop_with_mock_serial_and_mock_api.py b/tests/integration/test_ingestion_loop_with_mock_serial_and_mock_api.py deleted file mode 100644 index 1601230..0000000 --- a/tests/integration/test_ingestion_loop_with_mock_serial_and_mock_api.py +++ /dev/null @@ -1,38 +0,0 @@ -from unittest.mock import patch -import responses - -@patch("time.sleep", return_value=None) -@patch("app.ingestion.serial_reader.SerialReader") -@patch("app.sqlite_store.SQLiteStore") -@responses.activate -def test_ingestion_loop_full_pipeline(mock_store, mock_reader, _, loop_factory): - mock_reader.return_value.read_line.side_effect = [ - "CPS, 9, CPM, 90, uSv/hr, 0.09, FAST", - KeyboardInterrupt, - ] - - mock_store.return_value.insert_record.return_value = 1 - - responses.add( - responses.POST, - "http://example.com/api/readings", - json={"status": "ok"}, - status=200, - ) - - loop = loop_factory({ - "serial": {"device": "/dev/fake", "baudrate": 9600}, - "sqlite": {"path": ":memory:"}, - "api": {"enabled": True, "base_url": "http://example.com/api", "token": "TOKEN"}, - "push": {"enabled": False}, - "ingestion": {"poll_interval": 0.0}, - }) - - loop.store = mock_store.return_value - - try: - loop.run_forever() - except KeyboardInterrupt: - pass - - mock_store.return_value.insert_record.assert_called_once() diff --git a/tests/integration/test_ingestion_with_mock_serial.py b/tests/integration/test_ingestion_with_mock_serial.py index e9c2875..dcf3dfb 100644 --- a/tests/integration/test_ingestion_with_mock_serial.py +++ b/tests/integration/test_ingestion_with_mock_serial.py @@ -1,9 +1,16 @@ +# filename: tests/integration/test_ingestion_with_mock_serial.py + from unittest.mock import patch, MagicMock -from app.serial_reader import SerialReader -from app.ingestion.csv_parser import parse_geiger_csv +from datetime import datetime, timezone + +from app.ingestion.serial_reader import SerialReader +from app.ingestion.csv_parser import parse_geiger_csv # noqa: F401 +from app.sqlite_store import insert_record, get_unpushed_records +from app.models import GeigerRecord -@patch("app.serial_reader.serial.Serial") -def test_serial_to_parser_to_storage(mock_serial, fake_store): + +@patch("app.ingestion.serial_reader.serial.Serial") +def test_serial_to_parser_to_storage(mock_serial, temp_db): mock_port = MagicMock() mock_port.readline.side_effect = [ b"CPS, 9, CPM, 90, uSv/hr, 0.09, FAST\n", @@ -11,14 +18,35 @@ def test_serial_to_parser_to_storage(mock_serial, fake_store): ] mock_serial.return_value = mock_port - store = fake_store + calls = [] + + def fake_handler(parsed_dict): + calls.append(parsed_dict) - def fake_handler(): - raw = mock_port.readline().decode("utf-8").strip() - parsed = parse_geiger_csv(raw) - if parsed: - store.insert_record(parsed) + record = GeigerRecord( + id=None, + raw=parsed_dict.get("raw", ""), + counts_per_second=parsed_dict["cps"], + counts_per_minute=parsed_dict["cpm"], + microsieverts_per_hour=parsed_dict["usv"], + mode=parsed_dict["mode"], + device_id="test-device", + timestamp=datetime.now(timezone.utc), + pushed=False, + ) - with patch.object(SerialReader, "run", side_effect=fake_handler): - reader = SerialReader("/dev/ttyUSB0") + insert_record(temp_db, record) + + reader = SerialReader("/dev/ttyUSB0") + + with patch.object(reader, "_handle_parsed", side_effect=fake_handler): reader.run() + + # Validate handler was called + assert len(calls) == 1 + assert calls[0]["cps"] == 9 + + # Validate record was stored + stored = get_unpushed_records(temp_db) + assert len(stored) == 1 + assert stored[0].counts_per_second == 9 diff --git a/tests/integration/test_metrics_integration.py b/tests/integration/test_metrics_integration.py deleted file mode 100644 index 4f16a9e..0000000 --- a/tests/integration/test_metrics_integration.py +++ /dev/null @@ -1,17 +0,0 @@ -from unittest.mock import patch - -import app.ingestion_loop as ingestion_loop - - -@patch("app.metrics.record_ingestion") -@patch("app.ingestion.csv_parser.parse_geiger_csv") -@patch("app.sqlite_store.SQLiteStore") -def test_metrics_recorded(mock_store, mock_parse, mock_record, loop_factory): - mock_parse.return_value = {"cps": 10} - mock_store.return_value.insert_record.return_value = 1 - - loop = loop_factory() - - loop.process_line("RAW_LINE") - - mock_record.assert_called_once_with({"cps": 10}) diff --git a/tests/integration/test_retry_logic.py b/tests/integration/test_retry_logic.py deleted file mode 100644 index cd0735a..0000000 --- a/tests/integration/test_retry_logic.py +++ /dev/null @@ -1,33 +0,0 @@ -from unittest.mock import patch - -import app.ingestion_loop as ingestion_loop - - -@patch("app.ingestion.csv_parser.parse_geiger_csv") -@patch("app.sqlite_store.SQLiteStore") -@patch("app.api_client.APIClient") -def test_retry_logic_api_failure(mock_api, mock_store, mock_parse, loop_factory): - # Parser returns valid reading - mock_parse.return_value = {"cps": 10} - - # DB insert returns ID - mock_store.return_value.insert_record.return_value = 1 - - # API always fails - mock_api_instance = mock_api.return_value - mock_api_instance.push_record.side_effect = Exception("network down") - - loop = loop_factory({ - "serial": {"device": "/dev/fake", "baudrate": 9600}, - "sqlite": {"path": ":memory:"}, - "api": {"enabled": True, "base_url": "http://example.com", "token": "TOKEN"}, - "push": {"enabled": False}, - "ingestion": {"poll_interval": 0.0}, - }) - - loop.store = mock_store.return_value - loop.api = mock_api_instance - loop.api_enabled = True - - ok = loop.process_line("RAW_LINE") - assert ok is True # ingestion shouldn't blow up even if API fails diff --git a/tests/integration/test_serial_to_parser_to_storage.py b/tests/integration/test_serial_to_parser_to_storage.py new file mode 100644 index 0000000..e5174f7 --- /dev/null +++ b/tests/integration/test_serial_to_parser_to_storage.py @@ -0,0 +1,30 @@ +# pi-log/tests/integration/test_serial_to_parser_to_storage.py + +from unittest.mock import patch, MagicMock +from app.ingestion.serial_reader import SerialReader +from app.ingestion.csv_parser import parse_geiger_csv # noqa: F401 +from app.sqlite_store import insert_record + + +@patch("app.ingestion.serial_reader.serial.Serial") +def test_serial_to_parser_to_storage(mock_serial, temp_db): + mock_port = MagicMock() + mock_port.readline.side_effect = [ + b"CPS, 9, CPM, 90, uSv/hr, 0.09, FAST\n", + KeyboardInterrupt, + ] + mock_serial.return_value = mock_port + + calls = [] + + def fake_handler(parsed): + calls.append(parsed) + insert_record(temp_db, parsed) + + reader = SerialReader("/dev/ttyUSB0") + + with patch.object(reader, "_handle_parsed", side_effect=fake_handler): + reader.run() + + assert len(calls) == 1 + assert calls[0]["cps"] == 9 diff --git a/tests/unit/test_config_loader.py b/tests/unit/test_config_loader.py index 5bc810e..3ec3a48 100644 --- a/tests/unit/test_config_loader.py +++ b/tests/unit/test_config_loader.py @@ -1,31 +1,43 @@ -from app.config import settings +# tests/unit/test_config_loader.py +import textwrap +from pathlib import Path # noqa: F401 -def test_config_has_required_sections(): - assert "serial" in settings._data - assert "sqlite" in settings._data - assert "api" in settings._data - assert "ingestion" in settings._data +from app.config_loader import load_config, SettingsNamespace +from app.settings import Settings -def test_config_serial_section(): - serial = settings.serial - assert "device" in serial - assert "baudrate" in serial +def test_load_config_missing_file_returns_empty_dict(tmp_path): + missing = tmp_path / "does_not_exist.toml" + result = load_config(missing) + assert result == {} -def test_config_sqlite_section(): - sqlite_cfg = settings.sqlite - assert "path" in sqlite_cfg +def test_load_config_valid_toml(tmp_path): + config_path = tmp_path / "config.toml" + config_path.write_text( + textwrap.dedent(""" + [serial] + device = "/dev/ttyUSB0" + baudrate = 9600 + """) + ) + result = load_config(config_path) + assert isinstance(result, SettingsNamespace) + assert result.serial.device == "/dev/ttyUSB0" + assert result.serial.baudrate == 9600 -def test_config_api_section(): - api = settings.api - assert "enabled" in api - assert "base_url" in api - assert "token" in api +def test_settings_from_dict(): + raw = { + "serial": {"device": "/dev/ttyUSB0", "baudrate": 9600}, + "sqlite": {"path": "test.db"}, + "ingestion": {"poll_interval": 1}, + } -def test_config_ingestion_section(): - ingestion = settings.ingestion - assert "poll_interval" in ingestion + settings = Settings.from_dict(raw) + + assert settings.serial.device == "/dev/ttyUSB0" + assert settings.sqlite.path == "test.db" + assert settings.ingestion.poll_interval == 1 diff --git a/tests/unit/test_health_checks.py b/tests/unit/test_health_checks.py index cd1d402..e398e3a 100644 --- a/tests/unit/test_health_checks.py +++ b/tests/unit/test_health_checks.py @@ -1,4 +1,4 @@ -from unittest.mock import patch +from unittest.mock import patch # noqa: F401 from app.health import health_check diff --git a/tests/unit/test_ingestion_loop_batch_push.py b/tests/unit/test_ingestion_loop_batch_push.py deleted file mode 100644 index faf4f94..0000000 --- a/tests/unit/test_ingestion_loop_batch_push.py +++ /dev/null @@ -1,35 +0,0 @@ -from unittest.mock import patch - - -@patch("app.ingestion.csv_parser.parse_geiger_csv") -@patch("app.sqlite_store.SQLiteStore") -@patch("app.api_client.APIClient") -def test_ingestion_loop_pushes_multiple_records(mock_api, mock_store, mock_parse, loop_factory): - # Mock parser output for multiple lines - mock_parse.side_effect = [ - {"cps": 1}, - {"cps": 2}, - {"cps": 3}, - ] - - # Mock DB insert IDs - mock_store.return_value.insert_record.side_effect = [1, 2, 3] - - loop = loop_factory({ - "serial": {"device": "/dev/fake", "baudrate": 9600}, - "sqlite": {"path": ":memory:"}, - "api": {"enabled": True}, - "push": {"enabled": False}, - "ingestion": {"poll_interval": 0.0}, - }) - - loop.store = mock_store.return_value - loop.api = mock_api.return_value - loop.api_enabled = True - - loop.process_line("L1") - loop.process_line("L2") - loop.process_line("L3") - - assert mock_store.return_value.insert_record.call_count == 3 - assert mock_api.return_value.push_record.call_count == 3 diff --git a/tests/unit/test_ingestion_loop_process_line.py b/tests/unit/test_ingestion_loop_process_line.py deleted file mode 100644 index f78cd2c..0000000 --- a/tests/unit/test_ingestion_loop_process_line.py +++ /dev/null @@ -1,30 +0,0 @@ -from unittest.mock import patch - - -@patch("app.ingestion.csv_parser.parse_geiger_csv") -@patch("app.sqlite_store.SQLiteStore") -@patch("app.api_client.APIClient") -def test_process_line_stores_and_pushes(mock_api, mock_store, mock_parse, loop_factory): - # Mock parser output - mock_parse.return_value = {"cps": 10, "cpm": 100, "usv": 0.1, "mode": "FAST"} - - # Mock store behavior - mock_store.return_value.insert_record.return_value = 1 - - loop = loop_factory({ - "serial": {"device": "/dev/fake", "baudrate": 9600}, - "sqlite": {"path": ":memory:"}, - "api": {"enabled": True, "base_url": "https://api.example", "token": "TOKEN"}, - "push": {"enabled": False}, - "ingestion": {"poll_interval": 0.0}, - }) - - loop.store = mock_store.return_value - loop.api = mock_api.return_value - loop.api_enabled = True - - ok = loop.process_line("RAW_LINE") - - assert ok is True - mock_store.return_value.insert_record.assert_called_once() - mock_api.return_value.push_record.assert_called_once() diff --git a/tests/unit/test_ingestion_loop_retry.py b/tests/unit/test_ingestion_loop_retry.py deleted file mode 100644 index 8240727..0000000 --- a/tests/unit/test_ingestion_loop_retry.py +++ /dev/null @@ -1,31 +0,0 @@ -from unittest.mock import patch - - -@patch("app.ingestion.csv_parser.parse_geiger_csv") -@patch("app.sqlite_store.SQLiteStore") -@patch("app.api_client.APIClient") -def test_ingestion_loop_handles_push_failure(mock_api, mock_store, mock_parse, loop_factory): - # Mock parser output - mock_parse.return_value = {"cps": 10} - - # Mock DB insert - mock_store.return_value.insert_record.return_value = 1 - - # Mock API failure - mock_api_instance = mock_api.return_value - mock_api_instance.push_record.side_effect = Exception("network error") - - loop = loop_factory({ - "serial": {"device": "/dev/fake", "baudrate": 9600}, - "sqlite": {"path": ":memory:"}, - "api": {"enabled": True, "base_url": "https://api.example", "token": "TOKEN"}, - "push": {"enabled": False}, - "ingestion": {"poll_interval": 0.0}, - }) - - loop.store = mock_store.return_value - loop.api = mock_api_instance - loop.api_enabled = True - - ok = loop.process_line("RAW_LINE") - assert ok is True # ingestion should log and continue, not crash diff --git a/tests/unit/test_logging_loader.py b/tests/unit/test_logging_loader.py index 6712e0d..1e675f6 100644 --- a/tests/unit/test_logging_loader.py +++ b/tests/unit/test_logging_loader.py @@ -1,35 +1,24 @@ -from unittest.mock import patch, MagicMock -from app.logging import get_logger +# filename: tests/unit/test_logging_loader.py +from unittest.mock import patch +from app.logging import get_logger, setup_logging -def test_logger_initializes_without_error(): + +@patch("logging.handlers.RotatingFileHandler._open", return_value=None) +@patch("app.logging.Path.mkdir") +def test_logger_initializes_without_error(mock_mkdir, mock_open): + setup_logging() log = get_logger("test_logger") log.info("test message") assert log is not None + assert mock_mkdir.called -def test_logger_multiple_calls_return_logger(): +@patch("logging.handlers.RotatingFileHandler._open", return_value=None) +@patch("app.logging.Path.mkdir") +def test_logger_multiple_calls_return_same_logger(mock_mkdir, mock_open): + setup_logging() log1 = get_logger("test_logger") log2 = get_logger("test_logger") assert log1 is log2 - - -@patch("app.logging.logging.config.dictConfig") -@patch("app.logging.os.path.exists") -def test_logging_fallback_when_log_directory_missing(mock_exists, mock_dict_config): - """ - If the logging config references a file handler whose directory does not exist, - the loader should fall back to console logging and not raise an exception. - """ - - # Simulate missing directory for file handler - mock_exists.return_value = False - - # Call logger initialization - log = get_logger("fallback_test_logger") - - # Should still return a logger object - assert log is not None - - # dictConfig should still be called (with fallback config) - assert mock_dict_config.called + assert mock_mkdir.called diff --git a/tests/unit/test_logging_loader_malformed_toml.py b/tests/unit/test_logging_loader_malformed_toml.py index 86a6a85..ec9f94a 100644 --- a/tests/unit/test_logging_loader_malformed_toml.py +++ b/tests/unit/test_logging_loader_malformed_toml.py @@ -1,16 +1,17 @@ +# filename: tests/unit/test_logging_loader_malformed_toml.py + from unittest.mock import patch, mock_open -from app.logging import get_logger +from app.logging import get_logger, setup_logging -@patch("app.logging.os.path.exists", return_value=True) +@patch("logging.handlers.RotatingFileHandler._open", return_value=None) +@patch("app.logging.Path.mkdir") @patch("app.logging.open", new_callable=mock_open, read_data="not: valid: toml") -@patch("app.logging.tomllib.load", side_effect=Exception("malformed TOML")) -@patch("app.logging.logging.config.dictConfig") -def test_logging_fallback_when_config_malformed(mock_dict_config, _mock_toml, _mock_open, _mock_exists): - """ - If logging.toml exists but contains invalid TOML, the loader should fall back - to console-only logging and not raise an exception. - """ +def test_logging_fallback_when_config_malformed( + _mock_open_file, mock_mkdir, mock_open_handler +): + setup_logging() log = get_logger("malformed_config_test") + log.info("test message") assert log is not None - assert mock_dict_config.called + assert mock_mkdir.called diff --git a/tests/unit/test_logging_loader_missing_directory.py b/tests/unit/test_logging_loader_missing_directory.py index 9849249..1e675f6 100644 --- a/tests/unit/test_logging_loader_missing_directory.py +++ b/tests/unit/test_logging_loader_missing_directory.py @@ -1,14 +1,24 @@ +# filename: tests/unit/test_logging_loader.py + from unittest.mock import patch -from app.logging import get_logger +from app.logging import get_logger, setup_logging -@patch("app.logging.os.path.exists", return_value=False) -@patch("app.logging.logging.config.dictConfig") -def test_logging_fallback_when_log_directory_missing(mock_dict_config, _): - """ - If the logging config references a file handler whose directory does not exist, - the loader should fall back to console logging and not raise an exception. - """ - log = get_logger("fallback_test_logger") +@patch("logging.handlers.RotatingFileHandler._open", return_value=None) +@patch("app.logging.Path.mkdir") +def test_logger_initializes_without_error(mock_mkdir, mock_open): + setup_logging() + log = get_logger("test_logger") + log.info("test message") assert log is not None - assert mock_dict_config.called + assert mock_mkdir.called + + +@patch("logging.handlers.RotatingFileHandler._open", return_value=None) +@patch("app.logging.Path.mkdir") +def test_logger_multiple_calls_return_same_logger(mock_mkdir, mock_open): + setup_logging() + log1 = get_logger("test_logger") + log2 = get_logger("test_logger") + assert log1 is log2 + assert mock_mkdir.called diff --git a/tests/unit/test_logging_loader_missing_file.py b/tests/unit/test_logging_loader_missing_file.py index 44333e3..6540c76 100644 --- a/tests/unit/test_logging_loader_missing_file.py +++ b/tests/unit/test_logging_loader_missing_file.py @@ -1,14 +1,19 @@ +# filename: tests/unit/test_logging_loader_missing_file.py + from unittest.mock import patch -from app.logging import get_logger +from app.logging import get_logger, setup_logging -@patch("app.logging.os.path.exists", return_value=False) -@patch("app.logging.logging.config.dictConfig") -def test_logging_fallback_when_config_missing(mock_dict_config, _): +@patch("logging.handlers.RotatingFileHandler._open", return_value=None) +@patch("app.logging.Path.mkdir") +def test_logging_initialization_when_config_missing(mock_mkdir, mock_open_handler): """ - If logging.toml does not exist, the loader should fall back to - console-only logging and not raise an exception. + The new logging system does not load or check for any external config file. + Logging must initialize cleanly and produce a usable logger. """ + setup_logging() log = get_logger("missing_config_test") + log.info("test message") + assert log is not None - assert mock_dict_config.called + assert mock_mkdir.called diff --git a/tests/unit/test_logging_loader_unreadable_file.py b/tests/unit/test_logging_loader_unreadable_file.py index 1bac1c9..71cbd46 100644 --- a/tests/unit/test_logging_loader_unreadable_file.py +++ b/tests/unit/test_logging_loader_unreadable_file.py @@ -1,15 +1,19 @@ +# filename: tests/unit/test_logging_loader_unreadable_file.py + from unittest.mock import patch -from app.logging import get_logger +from app.logging import get_logger, setup_logging -@patch("app.logging.os.path.exists", return_value=True) -@patch("app.logging.open", side_effect=OSError("cannot read file")) -@patch("app.logging.logging.config.dictConfig") -def test_logging_fallback_when_config_unreadable(mock_dict_config, _mock_open, _mock_exists): +@patch("logging.handlers.RotatingFileHandler._open", return_value=None) +@patch("app.logging.Path.mkdir") +def test_logging_initialization_when_config_unreadable(mock_mkdir, mock_open_handler): """ - If logging.toml exists but cannot be read, the loader should fall back - to console-only logging and not raise an exception. + The new logging system does not read config files at all. + Even if a file were unreadable, logging must still initialize cleanly. """ + setup_logging() log = get_logger("unreadable_config_test") + log.info("test message") + assert log is not None - assert mock_dict_config.called + assert mock_mkdir.called diff --git a/tests/unit/test_push_client.py b/tests/unit/test_push_client.py deleted file mode 100644 index 5df7640..0000000 --- a/tests/unit/test_push_client.py +++ /dev/null @@ -1,67 +0,0 @@ -import json -import responses -from app.api_client import APIClient - - -@responses.activate -def test_push_success_calls_api_for_each_row(): - client = APIClient("http://example.com/api", "TOKEN") - - responses.add( - responses.POST, - "http://example.com/api/readings", - json={"status": "ok"}, - status=200, - ) - - rows = [ - {"id": 1, "cps": 10, "cpm": 100, "usv": 0.10, "mode": "SLOW"}, - {"id": 2, "cps": 20, "cpm": 200, "usv": 0.20, "mode": "FAST"}, - ] - - for row in rows: - client.push_record(row["id"], row) - - assert len(responses.calls) == 2 - assert responses.calls[0].request.url == "http://example.com/api/readings" - assert responses.calls[1].request.url == "http://example.com/api/readings" - - -@responses.activate -def test_push_failure_does_not_raise(): - client = APIClient("http://example.com/api", "TOKEN") - - responses.add( - responses.POST, - "http://example.com/api/readings", - status=500, - ) - - row = {"id": 1, "cps": 10, "cpm": 100, "usv": 0.10, "mode": "SLOW"} - - # Should not raise an exception - client.push_record(row["id"], row) - - assert len(responses.calls) == 1 - - -@responses.activate -def test_payload_structure(): - client = APIClient("http://example.com/api", "TOKEN") - - responses.add( - responses.POST, - "http://example.com/api/readings", - json={"status": "ok"}, - status=200, - ) - - row = {"id": 1, "cps": 5, "cpm": 50, "usv": 0.05, "mode": "INST"} - - client.push_record(row["id"], row) - - sent = json.loads(responses.calls[0].request.body) - - assert sent["id"] == 1 - assert sent["cps"] == 5 - assert sent["mode"] == "INST" diff --git a/tests/unit/test_serial_parser_storage.py b/tests/unit/test_serial_parser_storage.py deleted file mode 100644 index f0c20b5..0000000 --- a/tests/unit/test_serial_parser_storage.py +++ /dev/null @@ -1,38 +0,0 @@ -from unittest.mock import patch, MagicMock -import sqlite3 - -from app.serial_reader import SerialReader -from app.ingestion.csv_parser import parse_geiger_csv -from app.sqlite_store import SQLiteStore - - -@patch("app.serial_reader.serial.Serial") -def test_serial_to_parser_to_storage(mock_serial, temp_db): - # Mock the serial port to return one valid line, then stop - mock_port = MagicMock() - mock_port.readline.side_effect = [ - b"CPS, 9, CPM, 90, uSv/hr, 0.09, FAST\n", - KeyboardInterrupt, # stop the reader loop - ] - mock_serial.return_value = mock_port - - store = SQLiteStore(temp_db) - store.initialize_db() - - def fake_handler(): - raw = mock_port.readline().decode("utf-8").strip() - parsed = parse_geiger_csv(raw) - if parsed: - store.insert_record(parsed) - - with patch.object(SerialReader, "run", side_effect=fake_handler): - reader = SerialReader("/dev/ttyUSB0") - reader.run() - - conn = sqlite3.connect(temp_db) - cur = conn.cursor() - cur.execute("SELECT COUNT(*) FROM readings") - count = cur.fetchone()[0] - conn.close() - - assert count == 1 diff --git a/tests/unit/test_serial_reader.py b/tests/unit/test_serial_reader.py index 684b1a5..044e63e 100644 --- a/tests/unit/test_serial_reader.py +++ b/tests/unit/test_serial_reader.py @@ -1,11 +1,13 @@ -import pytest +# filename: tests/unit/test_serial_reader.py + +import pytest # noqa: F401 from unittest.mock import MagicMock, patch -from app.serial_reader import SerialReader +from app.ingestion.serial_reader import SerialReader -@patch("app.serial_reader.parse_geiger_csv") -@patch("app.serial_reader.serial.Serial") +@patch("app.ingestion.serial_reader.parse_geiger_csv") +@patch("app.ingestion.serial_reader.serial.Serial") def test_serial_reader_reads_lines(mock_serial, mock_parse): # Mock serial port returning two valid lines then stopping mock_port = MagicMock() @@ -34,8 +36,8 @@ def test_serial_reader_reads_lines(mock_serial, mock_parse): assert mock_handler.call_args_list[1].args[0]["cps"] == 20 -@patch("app.serial_reader.parse_geiger_csv") -@patch("app.serial_reader.serial.Serial") +@patch("app.ingestion.serial_reader.parse_geiger_csv") +@patch("app.ingestion.serial_reader.serial.Serial") def test_serial_reader_skips_malformed_lines(mock_serial, mock_parse): mock_port = MagicMock() mock_port.readline.side_effect = [ diff --git a/tests/unit/test_storage.py b/tests/unit/test_storage.py deleted file mode 100644 index 0057094..0000000 --- a/tests/unit/test_storage.py +++ /dev/null @@ -1,55 +0,0 @@ -import sqlite3 - -from app.sqlite_store import SQLiteStore - - -def test_initialize_db_creates_schema(temp_db): - store = SQLiteStore(temp_db) - store.initialize_db() - - conn = sqlite3.connect(temp_db) - cur = conn.cursor() - cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='readings'") - result = cur.fetchone() - conn.close() - - assert result is not None - - -def test_insert_and_select_unpushed(fake_store): - store = fake_store - - reading = { - "cps": 10, - "cpm": 100, - "usv": 0.10, - "mode": "SLOW", - } - - record_id = store.insert_record(reading) - assert isinstance(record_id, int) - - unpushed = store.select_unpushed_readings() - assert len(unpushed) == 1 - assert unpushed[0]["id"] == record_id - - -def test_mark_readings_pushed(fake_store): - store = fake_store - - reading = { - "cps": 5, - "cpm": 50, - "usv": 0.05, - "mode": "FAST", - } - - record_id = store.insert_record(reading) - - unpushed_before = store.select_unpushed_readings() - assert any(r["id"] == record_id for r in unpushed_before) - - store.mark_readings_pushed([record_id]) - - unpushed_after = store.select_unpushed_readings() - assert all(r["id"] != record_id for r in unpushed_after) diff --git a/tests/unit/test_storage_push.py b/tests/unit/test_storage_push.py deleted file mode 100644 index 19ef171..0000000 --- a/tests/unit/test_storage_push.py +++ /dev/null @@ -1,32 +0,0 @@ -import responses - -from app.sqlite_store import SQLiteStore -from app.api_client import APIClient - - -@responses.activate -def test_storage_to_push_to_storage(fake_store): - store = fake_store - - # Insert a reading - reading_id = store.insert_record({"cps": 9, "cpm": 90, "usv": 0.09, "mode": "FAST"}) - - # Mock push endpoint - responses.add( - responses.POST, - "http://example.com/api/readings", - json={"status": "ok"}, - status=200, - ) - - client = APIClient("http://example.com/api", "TOKEN") - - # Push unpushed readings - unpushed = store.select_unpushed_readings() - for r in unpushed: - client.push_record(r["id"], r) - store.mark_readings_pushed([r["id"]]) - - # Ensure reading is no longer unpushed - remaining = store.select_unpushed_readings() - assert all(r["id"] != reading_id for r in remaining) diff --git a/tests/unit/test_watchdog_serial_reader.py b/tests/unit/test_watchdog_serial_reader.py new file mode 100644 index 0000000..8dba4fc --- /dev/null +++ b/tests/unit/test_watchdog_serial_reader.py @@ -0,0 +1,95 @@ +# filename: tests/unit/test_watchdog_serial_reader.py + +import time +import pytest + +from app.ingestion.watchdog import WatchdogSerialReader + + +class MockSerial: + def __init__(self): + self.closed = False + + def close(self): + self.closed = True + + +class MockReader: + def __init__(self, lines=None, raise_on_call=False): + self.lines = lines or [] + self.raise_on_call = raise_on_call + self.calls = 0 + self.handler = None + self.ser = MockSerial() + + def set_handler(self, handler): + self.handler = handler + + def read_line(self): + self.calls += 1 + if self.raise_on_call: + raise RuntimeError("boom") + if self.lines: + return self.lines.pop(0) + return "" + + + +def test_watchdog_proxies_set_handler(): + mock = MockReader() + wd = WatchdogSerialReader(mock) + + def handler(_): + pass + + wd.set_handler(handler) + + assert mock.handler is handler + + +def test_watchdog_updates_last_frame_timestamp_on_data(): + mock = MockReader(lines=["abc"]) + wd = WatchdogSerialReader(mock) + + before = wd._last_frame_ts + time.sleep(0.01) + + line = wd.read_line() + + assert line == "abc" + assert wd._last_frame_ts > before + + +def test_watchdog_triggers_reopen_on_exception(monkeypatch): + mock = MockReader(raise_on_call=True) + wd = WatchdogSerialReader(mock) + + reopened = {"called": False} + + def fake_reopen(): + reopened["called"] = True + + monkeypatch.setattr(wd, "_reopen", fake_reopen) + + # read_line should catch the exception and call _reopen() + with pytest.raises(RuntimeError): + # second call after reopen will still raise, so we expect the error + wd.read_line() + + assert reopened["called"] is True + + +def test_watchdog_triggers_reopen_on_dead_link(monkeypatch): + mock = MockReader(lines=[""]) + wd = WatchdogSerialReader(mock, dead_threshold_seconds=0.0) + + reopened = {"called": False} + + def fake_reopen(): + reopened["called"] = True + + monkeypatch.setattr(wd, "_reopen", fake_reopen) + + wd.read_line() + + assert reopened["called"] is True diff --git a/yamllint.yml b/yamllint.yml index aa8237b..30261a5 100644 --- a/yamllint.yml +++ b/yamllint.yml @@ -1,12 +1,29 @@ +# yamllint configuration for pi-log +# Hardened for CI, Ansible, and long-term maintainability. +--- extends: default rules: + indentation: + spaces: 2 + indent-sequences: true + check-multi-line-strings: false + line-length: max: 120 - level: warning + allow-non-breakable-words: true truthy: - allowed-values: ["true", "false", "yes", "no"] + allowed-values: ["true", "false"] + + trailing-spaces: enable + + empty-lines: + max: 2 - brackets: - max-spaces-inside: 0 + # Disable rules that conflict with Ansible idioms + braces: disable + brackets: disable + colons: disable + commas: disable + comments-indentation: disable