diff --git a/.gitignore b/.gitignore index d267ce2..758aa26 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ video_cache yolo_cache ftp_incoming config/config.yaml +.coverage diff --git a/AGENTS.md b/AGENTS.md index 5e52d32..a37802e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -13,6 +13,7 @@ - **Preserve stack traces**: Custom errors must set `self.__cause__ = cause` to preserve original exception. - **Tests must use Given/When/Then comments**: Every test must include these comments and follow behavioral testing principles (see `TESTING.md`). - **Postgres for state**: Use `clip_states` table with `clip_id` (primary key) + `data` (jsonb) for evolvable schema. +- **DB migrations**: If you change schema/models, follow `docs/migrations.md` (PostgreSQL-first autogenerate, add SQLite compatibility, run migration smoke test). - **Pydantic everywhere**: Validate config, DB payloads, VLM outputs, and MQTT payloads with Pydantic models. - **Clarify before complexity**: Ask user for clarification when simpler design may exist. Don't proceed with complex workarounds. - **Product priorities**: Recording + uploading (P0) must work even if Postgres is down. Analysis/notifications are best-effort (P1). @@ -349,12 +350,34 @@ async def test_filter_stage_failure(): ```bash uv sync # Install dependencies make typecheck # Run mypy --strict (mandatory before commit) -make test # Run pytest -make check # Run both typecheck + test -make db-up # Start Postgres for development +make test # Run pytest (both SQLite and PostgreSQL) +make test-sqlite # Run pytest with SQLite only (fast, no PG required) +make check # Run lint + typecheck + test +make db # Start Postgres for development make db-migrate # Run Alembic migrations ``` +### Database Testing + +Tests run against **both SQLite and PostgreSQL** by default using parametrized fixtures. This ensures compatibility with both backends. + +**If PostgreSQL is unavailable or causing issues**, you can skip PostgreSQL tests: + +```bash +# Skip PostgreSQL tests, run only SQLite +SKIP_POSTGRES_TESTS=1 make test + +# Or use the dedicated target +make test-sqlite +``` + +| Variable | Default | Description | +|----------|---------|-------------| +| `SKIP_POSTGRES_TESTS` | `0` (off) | Set to `1` to skip PostgreSQL tests | +| `TEST_DB_DSN` | (from `.env`) | PostgreSQL connection string for tests | + +**Note:** CI always runs both backends. Only use `SKIP_POSTGRES_TESTS=1` for local development when PostgreSQL is unavailable. + ### Pattern Usage Examples **Error-as-value:** `src/homesec/pipeline/core.py::_filter_stage()`, `_upload_stage()`, `_vlm_stage()` diff --git a/Makefile b/Makefile index 81ea992..8e258fd 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ SHELL := /bin/bash .SHELLFLAGS := -eu -o pipefail -c -.PHONY: help up down docker-build docker-push run db test coverage typecheck lint check db-migrate db-migration publish +.PHONY: help up down docker-build docker-push run db test test-sqlite coverage typecheck lint check db-migrate db-migration publish help: @echo "Targets:" @@ -15,7 +15,8 @@ help: @echo " Local dev:" @echo " make run Run HomeSec locally (requires Postgres)" @echo " make db Start just Postgres" - @echo " make test Run tests with coverage" + @echo " make test Run tests with coverage (SQLite + PostgreSQL)" + @echo " make test-sqlite Run tests with SQLite only (fast, no PG required)" @echo " make coverage Run tests and generate HTML coverage report" @echo " make typecheck Run mypy" @echo " make lint Run ruff linter" @@ -64,9 +65,14 @@ run: db: docker compose up -d postgres +# Run all tests (both SQLite and PostgreSQL backends) test: uv run pytest tests/homesec/ -v --cov=homesec --cov-report=term-missing +# Run tests with SQLite only (fast, no PostgreSQL required) +test-sqlite: + SKIP_POSTGRES_TESTS=1 uv run pytest tests/homesec/ -v --cov=homesec --cov-report=term-missing + coverage: uv run pytest tests/homesec/ -v --cov=homesec --cov-report=html --cov-report=xml @echo "Coverage report: htmlcov/index.html" @@ -86,14 +92,14 @@ check: lint typecheck test # Database db-migrate: - uv run --with alembic --with sqlalchemy --with asyncpg --with python-dotenv alembic -c alembic.ini upgrade head + uv run --with alembic --with sqlalchemy --with asyncpg --with aiosqlite --with python-dotenv alembic -c alembic.ini upgrade head db-migration: @if [ -z "$(m)" ]; then \ echo "Error: message required. Run: make db-migration m=\"your description\""; \ exit 1; \ fi - uv run --with alembic --with sqlalchemy --with asyncpg --with python-dotenv alembic -c alembic.ini revision --autogenerate -m "$(m)" + uv run --with alembic --with sqlalchemy --with asyncpg --with aiosqlite --with python-dotenv alembic -c alembic.ini revision --autogenerate -m "$(m)" # Release publish: check diff --git a/alembic/env.py b/alembic/env.py index 840837b..219b521 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -1,3 +1,8 @@ +"""Alembic migration environment configuration. + +Supports both PostgreSQL and SQLite through the database abstraction layer. +""" + from __future__ import annotations import asyncio @@ -11,7 +16,7 @@ load_dotenv() from alembic import context -from sqlalchemy import pool +from sqlalchemy import MetaData, pool from sqlalchemy.ext.asyncio import async_engine_from_config PROJECT_ROOT = Path(__file__).resolve().parents[1] @@ -19,9 +24,9 @@ if str(SRC_DIR) not in sys.path: sys.path.insert(0, str(SRC_DIR)) -from sqlalchemy import MetaData # noqa: E402 -from homesec.telemetry.db.log_table import metadata as telemetry_metadata # noqa: E402 +from homesec.db import DialectHelper # noqa: E402 from homesec.state.postgres import Base as StateBase # noqa: E402 +from homesec.telemetry.db.log_table import metadata as telemetry_metadata # noqa: E402 # Combine all metadata into one for alembic target_metadata = MetaData() @@ -33,23 +38,39 @@ config = context.config if config.config_file_name is not None: - fileConfig(config.config_file_name) + fileConfig(config.config_file_name, disable_existing_loggers=False) def _get_url() -> str: + """Get database URL from environment or config.""" url = os.getenv("DB_DSN") or os.getenv("DATABASE_URL") or config.get_main_option("sqlalchemy.url") if not url: raise RuntimeError("Missing DB_DSN (or DATABASE_URL) for alembic migration.") return url +def _normalize_url(url: str) -> str: + """Normalize URL to use appropriate async driver.""" + return DialectHelper.normalize_dsn(url) + + def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL and not an Engine, + though an Engine is acceptable here as well. By skipping the Engine + creation we don't even need a DBAPI to be available. + """ + url = _normalize_url(_get_url()) + context.configure( - url=_get_url(), + url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, compare_type=True, + # Enable batch mode for SQLite ALTER TABLE support + render_as_batch=True, ) with context.begin_transaction(): @@ -57,17 +78,34 @@ def run_migrations_offline() -> None: def do_run_migrations(connection) -> None: - context.configure(connection=connection, target_metadata=target_metadata, compare_type=True) + """Execute migrations with the given connection.""" + context.configure( + connection=connection, + target_metadata=target_metadata, + compare_type=True, + # Enable batch mode for SQLite ALTER TABLE support + render_as_batch=True, + ) with context.begin_transaction(): context.run_migrations() async def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine and associate + a connection with the context. + """ + url = _normalize_url(_get_url()) configuration = config.get_section(config.config_ini_section, {}) - configuration["sqlalchemy.url"] = _get_url() + configuration["sqlalchemy.url"] = url - connectable = async_engine_from_config(configuration, prefix="sqlalchemy.", poolclass=pool.NullPool) + connectable = async_engine_from_config( + configuration, + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) async with connectable.connect() as connection: await connection.run_sync(do_run_migrations) diff --git a/alembic/versions/e6f25df0df90_initial.py b/alembic/versions/e6f25df0df90_initial.py index 426a342..e7cbffb 100644 --- a/alembic/versions/e6f25df0df90_initial.py +++ b/alembic/versions/e6f25df0df90_initial.py @@ -7,9 +7,10 @@ """ from typing import Sequence, Union -from alembic import op +from alembic import context, op import sqlalchemy as sa -from sqlalchemy.dialects import postgresql + +from homesec.db import JSONType # revision identifiers, used by Alembic. revision: str = 'e6f25df0df90' @@ -18,30 +19,62 @@ depends_on: Union[str, Sequence[str], None] = None +def _dialect_name() -> str: + return context.get_context().dialect.name + + +def _is_postgres() -> bool: + return _dialect_name() == "postgresql" + + def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### op.create_table('clip_states', sa.Column('clip_id', sa.Text(), nullable=False), - sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False), - sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False), - sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False), + sa.Column('data', JSONType(), nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), sa.PrimaryKeyConstraint('clip_id') ) - op.create_index('idx_clip_states_camera', 'clip_states', [sa.literal_column("jsonb_extract_path_text(data, 'camera_name')")], unique=False) - op.create_index('idx_clip_states_status', 'clip_states', [sa.literal_column("jsonb_extract_path_text(data, 'status')")], unique=False) + if _is_postgres(): + op.create_index( + 'idx_clip_states_camera', + 'clip_states', + [sa.literal_column("jsonb_extract_path_text(data, 'camera_name')")], + unique=False, + ) + op.create_index( + 'idx_clip_states_status', + 'clip_states', + [sa.literal_column("jsonb_extract_path_text(data, 'status')")], + unique=False, + ) + else: + op.create_index( + 'idx_clip_states_camera', + 'clip_states', + [sa.literal_column("json_extract(data, '$.camera_name')")], + unique=False, + ) + op.create_index( + 'idx_clip_states_status', + 'clip_states', + [sa.literal_column("json_extract(data, '$.status')")], + unique=False, + ) op.create_table('logs', sa.Column('id', sa.BigInteger(), autoincrement=True, nullable=False), - sa.Column('ts', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False), - sa.Column('payload', postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.Column('ts', sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column('payload', JSONType(), nullable=False), sa.PrimaryKeyConstraint('id') ) op.create_index('logs_ts_idx', 'logs', [sa.literal_column('ts DESC')], unique=False) op.create_table('clip_events', - sa.Column('id', sa.BigInteger(), autoincrement=True, nullable=False), + sa.Column('id', sa.Integer(), sa.Identity(), nullable=False), sa.Column('clip_id', sa.Text(), nullable=False), sa.Column('timestamp', sa.DateTime(timezone=True), nullable=False), sa.Column('event_type', sa.Text(), nullable=False), - sa.Column('event_data', postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.Column('event_data', JSONType(), nullable=False), sa.ForeignKeyConstraint(['clip_id'], ['clip_states.clip_id'], ondelete='CASCADE'), sa.PrimaryKeyConstraint('id') ) diff --git a/docs/decisions/001-sqlite-support.md b/docs/decisions/001-sqlite-support.md new file mode 100644 index 0000000..6694f7b --- /dev/null +++ b/docs/decisions/001-sqlite-support.md @@ -0,0 +1,382 @@ +# ADR-001: Add SQLite Support for HomeSec + +**Status:** Implemented +**Date:** 2026-01-19 +**Authors:** Lev Neiman, Claude + +## Context + +HomeSec currently uses PostgreSQL as its only database backend. While PostgreSQL is excellent for production, it creates friction for: + +1. **Testing** - Requires a running PostgreSQL instance, slowing down development +2. **Local development** - Developers need Docker or local PostgreSQL setup +3. **Lightweight deployments** - Small installations don't need a full database server +4. **CI/CD** - PostgreSQL service adds complexity and boot time to CI pipelines + +### Current JSONB Usage + +The codebase uses PostgreSQL-specific `JSONB` type in three places: + +| Location | Column | Purpose | +|----------|--------|---------| +| `src/homesec/state/postgres.py` | `ClipState.data` | Stores entire clip workflow state | +| `src/homesec/state/postgres.py` | `ClipEvent.event_data` | Stores event lifecycle data | +| `src/homesec/telemetry/db/log_table.py` | `logs.payload` | Stores log payloads | + +### PostgreSQL-Specific Operations + +Current code uses these PostgreSQL-specific features: +- `JSONB` column type +- `jsonb_extract_path_text()` for JSON queries +- `pg_insert().on_conflict_do_update()` for upserts +- `func.make_interval()` for date arithmetic +- PostgreSQL-specific error detection for retries + +## Decision + +Add SQLite as an alternative database backend while: +1. **Maximizing code reuse** through a unified `SQLAlchemyStateStore` class +2. **Isolating dialect differences** in a `DialectHelper` class +3. **Using parametrized tests** to ensure both backends are tested identically +4. **Making PostgreSQL optional** for local development via environment variable + +## Architecture + +### Database Abstraction Layer + +``` +src/homesec/db/ +├── __init__.py # Package exports +├── types.py # JSONType (auto-adapts to dialect) +├── dialect.py # DialectHelper (encapsulates ALL dialect differences) +└── engine.py # Engine factory with dialect-appropriate config +``` + +### Key Abstraction: JSONType + +A custom SQLAlchemy type that automatically uses the appropriate JSON storage: + +```python +class JSONType(TypeDecorator): + """Database-agnostic JSON type.""" + impl = JSON + cache_ok = True + + def load_dialect_impl(self, dialect): + if dialect.name == "postgresql": + return dialect.type_descriptor(JSONB()) + return dialect.type_descriptor(JSON()) +``` + +### Key Abstraction: DialectHelper + +Encapsulates ALL dialect-specific operations in one place: + +| Method | PostgreSQL | SQLite | +|--------|-----------|--------| +| `json_path_text(col, *path)` | `jsonb_extract_path_text(col, 'key')` | `json_extract(col, '$.key')` | +| `upsert_statement()` | `postgresql.insert()` | `sqlite.insert()` | +| `is_retryable_error(exc)` | Check PostgreSQL SQLSTATEs | Check `SQLITE_BUSY/LOCKED` | +| `normalize_dsn(dsn)` | Add `+asyncpg` driver | Add `+aiosqlite` driver | +| `get_engine_kwargs()` | Pool size 5, overflow 0 | StaticPool or NullPool | + +### DRYness Principle + +**95% of code is dialect-agnostic.** Only `DialectHelper` (~80 lines) contains dialect-specific logic. + +| Approach | Dialect-Specific Code | Duplication | +|----------|----------------------|-------------| +| ❌ Separate PostgresStore + SQLiteStore | ~400 lines each | High (80%) | +| ❌ Base class + subclasses | ~50 base + ~30/subclass | Medium | +| ✅ **Single class + DialectHelper** | ~80 lines total | **None** | + +--- + +## Implementation Plan + +### Phase 1: Create Database Abstraction Layer + +#### Task 1.1: Create `src/homesec/db/__init__.py` +- [ ] Create package with exports + +#### Task 1.2: Create `src/homesec/db/types.py` +- [ ] Implement `JSONType` custom type +- [ ] Add docstrings explaining dialect behavior + +#### Task 1.3: Create `src/homesec/db/dialect.py` +- [ ] Implement `DialectHelper` class with: + - [ ] `json_path_text()` - JSON extraction + - [ ] `get_upsert_statement()` - Dialect-appropriate insert + - [ ] `is_retryable_error()` - Error classification + - [ ] `normalize_dsn()` - DSN normalization + - [ ] `get_engine_kwargs()` - Engine configuration + - [ ] `older_than_condition()` - Date arithmetic + +#### Task 1.4: Create `src/homesec/db/engine.py` +- [ ] Implement `create_async_engine_for_dsn()` factory +- [ ] Add `detect_dialect()` helper + +### Phase 2: Update Models + +#### Task 2.1: Update `src/homesec/state/postgres.py` models +- [ ] Replace `JSONB` import with `JSONType` +- [ ] Update `ClipState.data` column +- [ ] Update `ClipEvent.event_data` column +- [ ] Remove/update `__table_args__` functional indexes + +#### Task 2.2: Update `src/homesec/telemetry/db/log_table.py` +- [ ] Replace `JSONB` with `JSONType` + +### Phase 3: Refactor StateStore and EventStore + +#### Task 3.1: Refactor `PostgresStateStore` → `SQLAlchemyStateStore` +- [ ] Add `DialectHelper` as instance variable +- [ ] Replace `pg_insert` with dialect-agnostic upsert +- [ ] Replace `jsonb_extract_path_text` with `dialect.json_path_text()` +- [ ] Replace `make_interval` with `dialect.older_than_condition()` +- [ ] Update `is_retryable_pg_error()` to use `dialect.is_retryable_error()` +- [ ] Keep `PostgresStateStore` as alias for backwards compatibility + +#### Task 3.2: Refactor `PostgresEventStore` → `SQLAlchemyEventStore` +- [ ] Same changes as StateStore +- [ ] Keep `PostgresEventStore` as alias + +#### Task 3.3: Update `src/homesec/state/__init__.py` +- [ ] Export new class names +- [ ] Export aliases for backwards compatibility + +### Phase 4: Update Alembic + +#### Task 4.1: Update `alembic/env.py` +- [ ] Add `render_as_batch=True` for SQLite ALTER TABLE support +- [ ] Update imports to use new module paths + +#### Task 4.2: Create dialect-aware index creation +- [ ] Handle functional indexes differently per dialect +- [ ] Document migration strategy + +### Phase 5: Add Dependencies + +#### Task 5.1: Update `pyproject.toml` +- [ ] Add `aiosqlite>=0.19.0` to dependencies + +### Phase 6: Update Tests (Parametrized) + +#### Task 6.1: Update `tests/conftest.py` +- [ ] Add `pytest_addoption` for `--db-backend` flag +- [ ] Create parametrized `db_backend` fixture +- [ ] Create `db_dsn` fixture that yields DSN based on backend +- [ ] Add `SKIP_POSTGRES_TESTS` environment variable check +- [ ] SQLite tests always run (no external dependency) +- [ ] PostgreSQL tests skip if `SKIP_POSTGRES_TESTS=1` or no `TEST_DB_DSN` + +#### Task 6.2: Update `tests/homesec/test_state_store.py` +- [ ] Use parametrized `state_store` fixture +- [ ] Ensure all tests work with both backends +- [ ] Add backend-specific test markers if needed + +#### Task 6.3: Add dialect helper tests +- [ ] Test `normalize_dsn()` for both dialects +- [ ] Test `json_path_text()` output +- [ ] Test `is_retryable_error()` classification + +### Phase 7: Update CI and Makefile + +#### Task 7.1: Update `Makefile` +- [ ] Update `test` target to run both backends by default +- [ ] Add `test-sqlite` target for SQLite-only (fast) +- [ ] Add `test-postgres` target for PostgreSQL-only +- [ ] Update `check` target + +#### Task 7.2: Update `.github/workflows/ci.yml` +- [ ] Keep PostgreSQL service (already configured) +- [ ] Ensure tests run against both backends +- [ ] Set appropriate environment variables + +### Phase 8: Documentation and Cleanup + +#### Task 8.1: Update `.env.example` +- [ ] Add SQLite DSN examples +- [ ] Document `SKIP_POSTGRES_TESTS` variable + +#### Task 8.2: Update this ADR +- [ ] Mark tasks complete +- [ ] Document any deviations from plan +- [ ] Add lessons learned + +--- + +## Testing Strategy + +### Parametrized Tests + +All database tests run against **both SQLite and PostgreSQL** using pytest parametrization: + +```python +@pytest.fixture(params=["sqlite", "postgresql"]) +def db_backend(request): + """Parametrize tests to run against both database backends.""" + backend = request.param + + if backend == "postgresql": + # Skip PostgreSQL tests if disabled or unavailable + if os.environ.get("SKIP_POSTGRES_TESTS") == "1": + pytest.skip("PostgreSQL tests disabled via SKIP_POSTGRES_TESTS=1") + if not os.environ.get("TEST_DB_DSN"): + pytest.skip("TEST_DB_DSN not set, skipping PostgreSQL tests") + + return backend + +@pytest.fixture +def db_dsn(db_backend): + """Return appropriate DSN for the backend.""" + if db_backend == "sqlite": + return "sqlite+aiosqlite:///:memory:" + else: + return os.environ["TEST_DB_DSN"] +``` + +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `SKIP_POSTGRES_TESTS` | `0` (off) | Set to `1` to skip PostgreSQL tests | +| `TEST_DB_DSN` | (from `.env`) | PostgreSQL DSN for tests | + +**Important:** `SKIP_POSTGRES_TESTS` defaults to OFF. Both backends are always tested unless explicitly disabled. This is documented in `AGENTS.md` for AI agents who may need to skip PostgreSQL if it's unavailable. + +### Test Execution Modes + +| Command | SQLite | PostgreSQL | Use Case | +|---------|--------|------------|----------| +| `make test` | ✅ | ✅ | **Default** - full test suite (both backends) | +| `make check` | ✅ | ✅ | **Default** - lint + typecheck + test (both backends) | +| `make test-sqlite` | ✅ | ❌ | Fast local dev when PG unavailable | +| `SKIP_POSTGRES_TESTS=1 make test` | ✅ | ❌ | Equivalent to `make test-sqlite` | + +### CI Behavior + +GitHub Actions CI **always** runs both backends: +1. Start PostgreSQL service (already configured in `.github/workflows/ci.yml`) +2. Run `make check` which tests against **both** SQLite and PostgreSQL +3. SQLite tests run in-memory (fast, no setup) +4. PostgreSQL tests run against the CI service + +**CI must never set `SKIP_POSTGRES_TESTS=1`.** Both backends must pass in CI. + +### Makefile Targets + +```makefile +# Run all tests (both backends) - DEFAULT +test: + uv run pytest tests/homesec/ -v --cov=homesec --cov-report=term-missing + +# Fast: SQLite only (no PostgreSQL required) +test-sqlite: + SKIP_POSTGRES_TESTS=1 uv run pytest tests/homesec/ -v + +# PostgreSQL only (requires TEST_DB_DSN) +test-postgres: + uv run pytest tests/homesec/ -v -k "postgresql" +``` + +--- + +## File Changes Summary + +| File | Action | Description | +|------|--------|-------------| +| `src/homesec/db/__init__.py` | Create | Package exports | +| `src/homesec/db/types.py` | Create | `JSONType` custom type | +| `src/homesec/db/dialect.py` | Create | `DialectHelper` class | +| `src/homesec/db/engine.py` | Create | Engine factory | +| `src/homesec/state/postgres.py` | Modify | Refactor to `SQLAlchemyStateStore` | +| `src/homesec/state/__init__.py` | Modify | Update exports | +| `src/homesec/telemetry/db/log_table.py` | Modify | Use `JSONType` | +| `alembic/env.py` | Modify | Add `render_as_batch=True` | +| `pyproject.toml` | Modify | Add `aiosqlite` dependency | +| `tests/conftest.py` | Modify | Add parametrized fixtures | +| `tests/homesec/test_state_store.py` | Modify | Use parametrized fixtures | +| `.env.example` | Modify | Add SQLite examples | +| `Makefile` | Modify | Add test targets | +| `.github/workflows/ci.yml` | Modify | Ensure both backends tested | + +--- + +## Consequences + +### Positive +- Tests run faster with in-memory SQLite +- Developers can work without PostgreSQL installed +- Lightweight deployments possible +- Single codebase, no duplication +- Comprehensive test coverage of both backends + +### Negative +- Slightly more complex codebase (DialectHelper abstraction) +- Need to maintain compatibility with two backends +- Some PostgreSQL-specific optimizations (GIN indexes) not available in SQLite + +### Neutral +- Migration strategy needs consideration for production (PostgreSQL recommended) +- SQLite suitable for single-user/embedded deployments only + +--- + +## Execution Log + +### 2026-01-19 - Implementation Complete + +**Phase 1: Database Abstraction Layer** +- [x] Created `src/homesec/db/__init__.py` with package exports +- [x] Created `src/homesec/db/types.py` with `JSONType` custom type +- [x] Created `src/homesec/db/dialect.py` with `DialectHelper` class +- [x] Created `src/homesec/db/engine.py` with engine factory + +**Phase 2: Model Updates** +- [x] Updated `ClipState.data` to use `JSONType` +- [x] Updated `ClipEvent.event_data` to use `JSONType` +- [x] Updated `ClipEvent.id` to use `Integer` with `Identity()` for SQLite compatibility +- [x] Updated `logs.payload` in telemetry to use `JSONType` + +**Phase 3: StateStore/EventStore Refactoring** +- [x] Refactored `PostgresStateStore` to `SQLAlchemyStateStore` +- [x] Refactored `PostgresEventStore` to `SQLAlchemyEventStore` +- [x] Added backwards compatibility aliases +- [x] Updated `src/homesec/state/__init__.py` exports + +**Phase 4: Alembic Updates** +- [x] Added `render_as_batch=True` for SQLite support +- [x] Updated to use `DialectHelper.normalize_dsn()` + +**Phase 5: Dependencies** +- [x] Added `aiosqlite>=0.19.0` to `pyproject.toml` + +**Phase 6: Tests** +- [x] Created parametrized `db_backend` and `db_dsn` fixtures +- [x] Created parametrized `state_store` fixture +- [x] Updated `test_state_store.py` to use parametrized fixtures +- [x] Added SQLite-specific tests (DSN normalization, dialect detection) + +**Phase 7: CI/Makefile** +- [x] Added `make test-sqlite` target +- [x] Updated `make test` and `make check` to run both backends +- [x] Added `aiosqlite` to `db-migrate` command + +**Deviations from Original Plan:** +- Used `Integer` with `Identity()` instead of `BigInteger` for `ClipEvent.id` because SQLite only supports autoincrement on `INTEGER PRIMARY KEY` +- Did not need to modify `.github/workflows/ci.yml` as existing PostgreSQL service works with parametrized tests + +**Test Results:** +- All 250 tests pass with SQLite (16 SQLite-specific, 234 others) +- PostgreSQL tests skip cleanly when `SKIP_POSTGRES_TESTS=1` +- Lint and typecheck pass + +--- + +## References + +- [SQLAlchemy TypeDecorator](https://docs.sqlalchemy.org/en/20/core/custom_types.html#sqlalchemy.types.TypeDecorator) +- [SQLAlchemy Dialect-Specific Types](https://docs.sqlalchemy.org/en/20/core/type_basics.html#dialect-specific-types) +- [pytest Parametrize](https://docs.pytest.org/en/stable/how-to/parametrize.html) +- [aiosqlite Documentation](https://aiosqlite.omnilib.dev/) diff --git a/docs/migrations.md b/docs/migrations.md new file mode 100644 index 0000000..dadff87 --- /dev/null +++ b/docs/migrations.md @@ -0,0 +1,159 @@ +HomeSec Dual-Backend Migrations (PostgreSQL + SQLite) +===================================================== + +This document explains how to generate, edit, and validate Alembic migrations +that must work for both PostgreSQL and SQLite. + + +Goals +----- +- Keep a single Alembic history that is valid on both backends. +- Autogenerate using PostgreSQL as the canonical source of truth. +- Manually adjust the migration for SQLite compatibility where needed. +- Validate with the migration smoke test and normal test suite. + + +Prerequisites +------------- +- PostgreSQL running locally (for autogenerate and validation). +- `DB_DSN` set for the backend you are targeting. +- `alembic/env.py` already normalizes DSNs and includes `src/` in `sys.path`. + +Examples: + +``` +export DB_DSN=postgresql://homesec:homesec@localhost:5432/homesec +export DB_DSN=sqlite+aiosqlite:///tmp/homesec.db +``` + + +Generate a New Migration (PostgreSQL First) +------------------------------------------- +1) Point Alembic at PostgreSQL (canonical schema). + +``` +export DB_DSN=postgresql://homesec:homesec@localhost:5432/homesec +``` + +2) Generate the migration. + +``` +make db-migration m="add foo columns" +``` + +3) Open the new file in `alembic/versions/` and review it carefully. + + +Edit the Migration for SQLite Compatibility +------------------------------------------- +### 1) Use database-agnostic types +- Prefer `JSONType` for JSON columns. +- Avoid `postgresql.JSONB` in migrations. + +Example: + +``` +from homesec.db import JSONType + +sa.Column("data", JSONType(), nullable=False) +``` + +### 2) Use portable defaults +- Prefer `sa.func.now()` over `sa.text("now()")`. + +Example: + +``` +sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()) +``` + +### 3) Use dialect-aware DDL for indexes and PG-only features +Some indexes or expressions are PostgreSQL-only. Use a dialect branch: + +``` +from alembic import context, op + +def _dialect_name() -> str: + return context.get_context().dialect.name + +def _is_postgres() -> bool: + return _dialect_name() == "postgresql" +``` + +Example: JSON path indexes + +``` +if _is_postgres(): + op.create_index( + "idx_clip_states_status", + "clip_states", + [sa.literal_column("jsonb_extract_path_text(data, 'status')")], + unique=False, + ) +else: + op.create_index( + "idx_clip_states_status", + "clip_states", + [sa.literal_column("json_extract(data, '$.status')")], + unique=False, + ) +``` + +SQLite relies on the JSON1 extension (enabled in standard Python builds). +If a custom SQLite build lacks JSON1, these indexes will fail. + +### 4) Autoincrement and identity columns +SQLite only auto-increments for `INTEGER PRIMARY KEY`. If a model uses +`Integer` + `Identity()` for cross-backend compatibility, make sure the +migration matches that. + +### 5) Use batch mode for SQLite ALTER TABLE +`alembic/env.py` enables `render_as_batch=True`, but for table alters +you should still use `op.batch_alter_table()` in migrations. + + +Run Migrations on Both Backends +------------------------------- +Run the migration on PostgreSQL: + +``` +export DB_DSN=postgresql://homesec:homesec@localhost:5432/homesec +make db-migrate +``` + +Run the migration on SQLite (file-based is recommended, not `:memory:`): + +``` +export DB_DSN=sqlite+aiosqlite:///tmp/homesec.db +make db-migrate +``` + + +Validate Against Models and Basic Queries +----------------------------------------- +The migration smoke test runs on both backends and verifies: +- Alembic upgrade works. +- Schema matches SQLAlchemy metadata. +- A basic state/event roundtrip succeeds. + +Run it via the normal test suite: + +``` +make test +``` + +If PostgreSQL is unavailable locally: + +``` +SKIP_POSTGRES_TESTS=1 make test-sqlite +``` + + +Checklist for Every Migration +----------------------------- +- [ ] Generate with PostgreSQL (`make db-migration`). +- [ ] Replace PG-only types with `JSONType` and portable defaults. +- [ ] Add dialect branches for PG-only indexes/functions. +- [ ] Use `Integer` + `Identity()` when models require it. +- [ ] Run `make db-migrate` with PostgreSQL and SQLite DSNs. +- [ ] Run `make test` (or at least `make test-sqlite`). diff --git a/pyproject.toml b/pyproject.toml index a9f7b0b..436feb6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ dependencies = [ "sqlalchemy>=2.0.0", "alembic>=1.13.0", "asyncpg>=0.29.0", + "aiosqlite>=0.19.0", "ultralytics>=8.3.226", "pyyaml>=6.0.3", "aiohttp>=3.13.2", diff --git a/src/homesec/db/__init__.py b/src/homesec/db/__init__.py new file mode 100644 index 0000000..60836f9 --- /dev/null +++ b/src/homesec/db/__init__.py @@ -0,0 +1,33 @@ +"""Database abstraction layer for SQLite and PostgreSQL support. + +This package provides database-agnostic types and utilities that allow +the same SQLAlchemy models to work with both PostgreSQL and SQLite. + +Key components: +- JSONType: Custom type that uses JSONB for PostgreSQL, JSON for SQLite +- DialectHelper: Encapsulates all dialect-specific SQL operations +- create_async_engine_for_dsn: Factory for creating properly configured engines + +Example usage: + from homesec.db import JSONType, DialectHelper, create_async_engine_for_dsn + + # In models - JSONType auto-adapts to dialect + class MyModel(Base): + data: Mapped[dict] = mapped_column(JSONType, nullable=False) + + # In stores - use DialectHelper for dialect-specific operations + engine = create_async_engine_for_dsn(dsn) + dialect = DialectHelper.from_engine(engine) + json_expr = dialect.json_extract_text(MyModel.data, "status") +""" + +from homesec.db.dialect import DialectHelper +from homesec.db.engine import create_async_engine_for_dsn, detect_dialect +from homesec.db.types import JSONType + +__all__ = [ + "JSONType", + "DialectHelper", + "create_async_engine_for_dsn", + "detect_dialect", +] diff --git a/src/homesec/db/dialect.py b/src/homesec/db/dialect.py new file mode 100644 index 0000000..77230c5 --- /dev/null +++ b/src/homesec/db/dialect.py @@ -0,0 +1,413 @@ +"""Dialect-specific database operations. + +This module encapsulates ALL database dialect differences in a single place, +providing a clean abstraction for the rest of the codebase. Instead of +scattering if/else dialect checks throughout the code, all dialect-specific +logic is centralized here. + +The DialectHelper class provides methods for: +- JSON path extraction (JSONB operators vs json_extract) +- Date/time arithmetic (make_interval vs datetime modifiers) +- Upsert statements (dialect-specific INSERT ... ON CONFLICT) +- Retryable error detection (PostgreSQL vs SQLite error types) +- DSN normalization (adding appropriate async drivers) +- Engine configuration (pool settings per dialect) +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +from sqlalchemy import ColumnElement, func +from sqlalchemy.dialects import postgresql, sqlite +from sqlalchemy.exc import DBAPIError, OperationalError +from sqlalchemy.ext.asyncio import AsyncEngine +from sqlalchemy.pool import StaticPool + +if TYPE_CHECKING: + from sqlalchemy import Table + from sqlalchemy.dialects.postgresql import Insert as PgInsert + from sqlalchemy.dialects.sqlite import Insert as SqliteInsert + +logger = logging.getLogger(__name__) + +# PostgreSQL SQLSTATE codes that indicate transient/retryable errors +_RETRYABLE_PG_SQLSTATES = frozenset( + { + "08000", # connection_exception + "08003", # connection_does_not_exist + "08006", # connection_failure + "08007", # transaction_resolution_unknown + "08001", # sqlclient_unable_to_establish_sqlconnection + "08004", # sqlserver_rejected_establishment_of_sqlconnection + "40P01", # deadlock_detected + "40001", # serialization_failure + "53300", # too_many_connections + "57P01", # admin_shutdown + "57P02", # crash_shutdown + "57P03", # cannot_connect_now + } +) + +# SQLite error messages that indicate transient/retryable errors +_RETRYABLE_SQLITE_MESSAGES = frozenset( + { + "database is locked", + "database is busy", + } +) + + +class DialectHelper: + """Encapsulates all database dialect-specific operations. + + This class provides a unified interface for operations that differ + between PostgreSQL and SQLite, allowing the rest of the codebase + to remain dialect-agnostic. + + Create an instance via the factory methods: + dialect = DialectHelper.from_engine(engine) + dialect = DialectHelper.from_dsn(dsn) + dialect = DialectHelper(dialect_name) # Direct construction + + Example usage: + # JSON path extraction + status_expr = dialect.json_extract_text(ClipState.data, "status") + query = select(ClipState).where(status_expr == "uploaded") + + # Upsert + stmt = dialect.insert(table).values(...) + stmt = dialect.on_conflict_do_update(stmt, ["id"], {"data": new_data}) + """ + + def __init__(self, dialect_name: str) -> None: + """Initialize with dialect name. + + Args: + dialect_name: Either "postgresql" or "sqlite" + + Raises: + ValueError: If dialect_name is not supported + """ + if dialect_name not in ("postgresql", "sqlite"): + raise ValueError(f"Unsupported dialect: {dialect_name}") + + self._dialect_name = dialect_name + self._is_postgres = dialect_name == "postgresql" + self._is_sqlite = dialect_name == "sqlite" + + @classmethod + def from_engine(cls, engine: AsyncEngine) -> DialectHelper: + """Create DialectHelper from an async engine. + + Args: + engine: SQLAlchemy async engine + + Returns: + DialectHelper configured for the engine's dialect + """ + return cls(engine.dialect.name) + + @classmethod + def from_dsn(cls, dsn: str) -> DialectHelper: + """Create DialectHelper by detecting dialect from DSN. + + Args: + dsn: Database connection string + + Returns: + DialectHelper configured for the DSN's dialect + + Raises: + ValueError: If DSN dialect cannot be detected + """ + dialect_name = detect_dialect_from_dsn(dsn) + return cls(dialect_name) + + @property + def dialect_name(self) -> str: + """Return the dialect name ("postgresql" or "sqlite").""" + return self._dialect_name + + @property + def is_postgres(self) -> bool: + """Return True if using PostgreSQL.""" + return self._is_postgres + + @property + def is_sqlite(self) -> bool: + """Return True if using SQLite.""" + return self._is_sqlite + + # ------------------------------------------------------------------------- + # JSON Operations + # ------------------------------------------------------------------------- + + def json_extract_text( + self, + column: ColumnElement[Any] | Any, # Accept InstrumentedAttribute too + *path: str, + ) -> ColumnElement[str]: + """Extract a text value from a JSON column. + + Creates a SQL expression that extracts a value from a JSON column + as text, using the appropriate syntax for the dialect. + + Args: + column: The JSON/JSONB column to extract from + *path: Path components to the desired value (e.g., "status" or "nested", "key") + + Returns: + SQL expression that evaluates to the extracted text value + + Example: + # PostgreSQL: jsonb_extract_path_text(data, 'status') + # SQLite: json_extract(data, '$.status') + status = dialect.json_extract_text(ClipState.data, "status") + """ + if not path: + raise ValueError("At least one path component is required") + + if self._is_postgres: + return func.jsonb_extract_path_text(column, *path) + else: + # SQLite uses JSON path notation: $.key.nested + json_path = "$." + ".".join(path) + return func.json_extract(column, json_path) + + # ------------------------------------------------------------------------- + # Date/Time Operations + # ------------------------------------------------------------------------- + + def timestamp_older_than( + self, + column: ColumnElement[Any] | Any, # Accept InstrumentedAttribute too + days: int, + ) -> ColumnElement[bool]: + """Create expression for 'column < now() - N days'. + + Args: + column: Timestamp column to compare + days: Number of days in the past + + Returns: + SQL expression that is True if column is older than N days + + Example: + # PostgreSQL: created_at < now() - make_interval(days => 7) + # SQLite: created_at < datetime('now', '-7 days') + old_clips = dialect.timestamp_older_than(ClipState.created_at, 7) + """ + if days < 0: + raise ValueError("days must be non-negative") + + if self._is_postgres: + return column < func.now() - func.make_interval(days=days) + else: + # SQLite datetime modifier + return column < func.datetime("now", f"-{days} days") + + def current_timestamp(self) -> ColumnElement[Any]: + """Return expression for current timestamp. + + Returns: + SQL expression for the current timestamp + """ + if self._is_postgres: + return func.now() + else: + return func.datetime("now") + + # ------------------------------------------------------------------------- + # Upsert Operations + # ------------------------------------------------------------------------- + + def insert(self, table: Table) -> PgInsert | SqliteInsert: + """Create a dialect-specific INSERT statement. + + Args: + table: The table to insert into + + Returns: + Dialect-specific insert statement that supports on_conflict_do_update + """ + if self._is_postgres: + return postgresql.insert(table) + else: + return sqlite.insert(table) + + def on_conflict_do_update( + self, + stmt: PgInsert | SqliteInsert, + index_elements: list[str], + set_: dict[str, Any], + ) -> PgInsert | SqliteInsert: + """Add ON CONFLICT DO UPDATE clause to an insert statement. + + Args: + stmt: Insert statement from dialect.insert() + index_elements: Columns that form the unique constraint + set_: Dictionary of column -> value for the UPDATE + + Returns: + Insert statement with conflict handling added + + Example: + stmt = dialect.insert(table).values(id=1, data={"key": "value"}) + stmt = dialect.on_conflict_do_update( + stmt, + index_elements=["id"], + set_={"data": stmt.excluded.data, "updated_at": func.now()} + ) + """ + return stmt.on_conflict_do_update( + index_elements=index_elements, + set_=set_, + ) + + # ------------------------------------------------------------------------- + # Error Classification + # ------------------------------------------------------------------------- + + def is_retryable_error(self, exc: Exception) -> bool: + """Determine if an exception represents a transient, retryable error. + + Checks for connection errors, deadlocks, and other conditions that + might succeed on retry. + + Args: + exc: The exception to classify + + Returns: + True if the error is likely transient and worth retrying + """ + # SQLAlchemy OperationalError is generally retryable + if isinstance(exc, OperationalError): + return True + + # Connection invalidated + if isinstance(exc, DBAPIError) and exc.connection_invalidated: + return True + + if self._is_postgres: + return self._is_retryable_postgres_error(exc) + else: + return self._is_retryable_sqlite_error(exc) + + def _is_retryable_postgres_error(self, exc: Exception) -> bool: + """Check PostgreSQL-specific error codes.""" + sqlstate = _extract_sqlstate(exc) + return sqlstate in _RETRYABLE_PG_SQLSTATES + + def _is_retryable_sqlite_error(self, exc: Exception) -> bool: + """Check SQLite-specific error messages.""" + # Check the exception chain for SQLite errors + current: BaseException | None = exc + while current is not None: + msg = str(current).lower() + for retryable_msg in _RETRYABLE_SQLITE_MESSAGES: + if retryable_msg in msg: + return True + current = current.__cause__ + return False + + # ------------------------------------------------------------------------- + # Engine Configuration + # ------------------------------------------------------------------------- + + def get_engine_kwargs(self, dsn: str | None = None) -> dict[str, Any]: + """Get dialect-appropriate engine configuration. + + Args: + dsn: Optional database connection string. Used to detect + in-memory SQLite for special pooling needs. + + Returns: + Dictionary of kwargs for create_async_engine() + """ + if self._is_postgres: + return { + "pool_size": 5, + "max_overflow": 0, + "pool_pre_ping": True, + } + # SQLite: simpler pooling, but in-memory needs a single shared connection. + engine_kwargs: dict[str, Any] = { + "pool_pre_ping": True, + } + if dsn is not None and self._is_sqlite and self._is_sqlite_memory_dsn(dsn): + engine_kwargs.update( + { + "poolclass": StaticPool, + "connect_args": {"check_same_thread": False}, + } + ) + return engine_kwargs + + @staticmethod + def _is_sqlite_memory_dsn(dsn: str) -> bool: + """Return True if the DSN points at an in-memory SQLite database.""" + return ":memory:" in dsn + + # ------------------------------------------------------------------------- + # DSN Normalization + # ------------------------------------------------------------------------- + + @staticmethod + def normalize_dsn(dsn: str) -> str: + """Normalize DSN to include the appropriate async driver. + + Ensures the DSN uses the correct async driver: + - PostgreSQL: asyncpg + - SQLite: aiosqlite + + Args: + dsn: Original database connection string + + Returns: + DSN with async driver specified + """ + # PostgreSQL normalization + if dsn.startswith("postgresql://") and "+asyncpg" not in dsn: + return dsn.replace("postgresql://", "postgresql+asyncpg://", 1) + if dsn.startswith("postgres://") and "+asyncpg" not in dsn: + return dsn.replace("postgres://", "postgresql+asyncpg://", 1) + + # SQLite normalization + if dsn.startswith("sqlite://") and "+aiosqlite" not in dsn: + return dsn.replace("sqlite://", "sqlite+aiosqlite://", 1) + + return dsn + + +def detect_dialect_from_dsn(dsn: str) -> str: + """Detect database dialect from a DSN string. + + Args: + dsn: Database connection string + + Returns: + Dialect name ("postgresql" or "sqlite") + + Raises: + ValueError: If dialect cannot be detected from DSN + """ + dsn_lower = dsn.lower() + if dsn_lower.startswith(("postgresql://", "postgres://", "postgresql+asyncpg://")): + return "postgresql" + if dsn_lower.startswith(("sqlite://", "sqlite+aiosqlite://")): + return "sqlite" + raise ValueError(f"Cannot detect dialect from DSN: {dsn}") + + +def _extract_sqlstate(exc: BaseException) -> str | None: + """Extract PostgreSQL SQLSTATE code from an exception.""" + for candidate in (exc, getattr(exc, "orig", None), getattr(exc, "__cause__", None)): + if candidate is None: + continue + # Try different attribute names used by different drivers + sqlstate = getattr(candidate, "sqlstate", None) or getattr(candidate, "pgcode", None) + if sqlstate: + return str(sqlstate) + return None diff --git a/src/homesec/db/engine.py b/src/homesec/db/engine.py new file mode 100644 index 0000000..42221ba --- /dev/null +++ b/src/homesec/db/engine.py @@ -0,0 +1,79 @@ +"""Database engine factory and utilities. + +This module provides factory functions for creating properly configured +SQLAlchemy async engines for both PostgreSQL and SQLite. +""" + +from __future__ import annotations + +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine + +from homesec.db.dialect import DialectHelper, detect_dialect_from_dsn + + +def create_async_engine_for_dsn(dsn: str, **extra_kwargs: object) -> AsyncEngine: + """Create an async SQLAlchemy engine with dialect-appropriate configuration. + + This factory function: + 1. Normalizes the DSN to use the correct async driver + 2. Applies dialect-specific engine configuration (pool size, etc.) + 3. Allows overriding configuration via extra_kwargs + + Args: + dsn: Database connection string (PostgreSQL or SQLite) + **extra_kwargs: Additional kwargs passed to create_async_engine, + these override the dialect defaults + + Returns: + Configured AsyncEngine instance + + Raises: + ValueError: If DSN dialect is not supported + + Example: + # PostgreSQL + engine = create_async_engine_for_dsn( + "postgresql://user:pass@localhost/db" + ) + + # SQLite in-memory + engine = create_async_engine_for_dsn("sqlite:///:memory:") + + # Override defaults + engine = create_async_engine_for_dsn( + "postgresql://...", + pool_size=10, + echo=True, + ) + """ + # Detect dialect and get appropriate configuration + dialect_name = detect_dialect_from_dsn(dsn) + dialect = DialectHelper(dialect_name) + + # Normalize DSN to include async driver + normalized_dsn = dialect.normalize_dsn(dsn) + + # Get dialect-specific engine kwargs + engine_kwargs = dialect.get_engine_kwargs(normalized_dsn) + + # Allow caller to override defaults + engine_kwargs.update(extra_kwargs) + + return create_async_engine(normalized_dsn, **engine_kwargs) + + +def detect_dialect(dsn: str) -> str: + """Detect database dialect from a DSN string. + + Convenience re-export of detect_dialect_from_dsn for cleaner imports. + + Args: + dsn: Database connection string + + Returns: + Dialect name ("postgresql" or "sqlite") + + Raises: + ValueError: If dialect cannot be detected + """ + return detect_dialect_from_dsn(dsn) diff --git a/src/homesec/db/types.py b/src/homesec/db/types.py new file mode 100644 index 0000000..ff582d2 --- /dev/null +++ b/src/homesec/db/types.py @@ -0,0 +1,54 @@ +"""Database-agnostic type definitions. + +This module provides custom SQLAlchemy types that automatically adapt +to the underlying database dialect, enabling the same model definitions +to work with both PostgreSQL and SQLite. +""" + +from __future__ import annotations + +from typing import Any + +from sqlalchemy import JSON +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.engine import Dialect +from sqlalchemy.types import TypeDecorator, TypeEngine + + +class JSONType(TypeDecorator[dict[str, Any]]): + """Database-agnostic JSON column type. + + Automatically uses the optimal JSON storage for each database: + - PostgreSQL: JSONB (binary format, indexable, efficient queries) + - SQLite: JSON (stored as TEXT, parsed by SQLAlchemy) + + This allows models to be defined once and work correctly with both + databases without any code changes. + + Example: + class ClipState(Base): + data: Mapped[dict[str, Any]] = mapped_column(JSONType, nullable=False) + + The type is determined at connection time based on the dialect, + so the same model class can be used with different databases. + """ + + impl = JSON + cache_ok = True + + def load_dialect_impl(self, dialect: Dialect) -> TypeEngine[dict[str, Any]]: + """Return the appropriate type implementation for the dialect. + + Called by SQLAlchemy when compiling SQL statements. Returns + JSONB for PostgreSQL (with its superior indexing and operators) + or standard JSON for other databases. + + Args: + dialect: The SQLAlchemy dialect being used. + + Returns: + TypeEngine appropriate for the dialect. + """ + if dialect.name == "postgresql": + return dialect.type_descriptor(JSONB()) + return dialect.type_descriptor(JSON()) diff --git a/src/homesec/state/__init__.py b/src/homesec/state/__init__.py index f222ac2..f3e0430 100644 --- a/src/homesec/state/__init__.py +++ b/src/homesec/state/__init__.py @@ -1,10 +1,31 @@ -"""State store implementations.""" +"""State store implementations. + +Provides StateStore and EventStore implementations that work with both +PostgreSQL and SQLite through the database abstraction layer. + +For backwards compatibility, PostgresStateStore and PostgresEventStore +are still exported and work as aliases to the unified implementations. +""" from homesec.state.postgres import ( NoopEventStore, NoopStateStore, + # Backwards compatibility aliases PostgresEventStore, PostgresStateStore, + # New unified implementations + SQLAlchemyEventStore, + SQLAlchemyStateStore, ) -__all__ = ["NoopEventStore", "NoopStateStore", "PostgresEventStore", "PostgresStateStore"] +__all__ = [ + # New unified implementations (preferred) + "SQLAlchemyStateStore", + "SQLAlchemyEventStore", + # Backwards compatibility (aliases to unified implementations) + "PostgresStateStore", + "PostgresEventStore", + # No-op implementations for graceful degradation + "NoopStateStore", + "NoopEventStore", +] diff --git a/src/homesec/state/postgres.py b/src/homesec/state/postgres.py index 7717e85..9054a54 100644 --- a/src/homesec/state/postgres.py +++ b/src/homesec/state/postgres.py @@ -1,30 +1,35 @@ -"""Postgres implementation of StateStore and EventStore.""" +"""SQLAlchemy implementation of StateStore and EventStore. + +Supports both PostgreSQL and SQLite through the database abstraction layer. +For backwards compatibility, PostgresStateStore and PostgresEventStore are +provided as aliases to the new unified implementations. +""" from __future__ import annotations import json import logging from datetime import datetime -from typing import Any, cast +from typing import TYPE_CHECKING, Any, cast from sqlalchemy import ( - BigInteger, DateTime, ForeignKey, + Identity, Index, + Integer, Table, Text, and_, func, + insert, or_, select, ) -from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy.dialects.postgresql import insert as pg_insert -from sqlalchemy.exc import DBAPIError, OperationalError -from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine +from sqlalchemy.ext.asyncio import AsyncEngine from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column +from homesec.db import DialectHelper, JSONType, create_async_engine_for_dsn from homesec.interfaces import EventStore, StateStore from homesec.models.clip import ClipStateData from homesec.models.events import ( @@ -50,8 +55,12 @@ ClipEvent as ClipEventModel, ) +if TYPE_CHECKING: + pass + logger = logging.getLogger(__name__) +# Event type name -> Pydantic model class mapping _EVENT_TYPE_MAP: dict[str, type[ClipEventModel]] = { "clip_recorded": ClipRecordedEvent, "clip_deleted": ClipDeletedEvent, @@ -72,17 +81,28 @@ } +# ============================================================================= +# SQLAlchemy Models +# ============================================================================= + + class Base(DeclarativeBase): + """Base class for all SQLAlchemy models.""" + pass class ClipState(Base): - """Current state snapshot (lightweight, fast queries).""" + """Current state snapshot (lightweight, fast queries). + + Uses JSONType which automatically adapts to JSONB for PostgreSQL + and JSON for SQLite. + """ __tablename__ = "clip_states" clip_id: Mapped[str] = mapped_column(Text, primary_key=True) - data: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False) + data: Mapped[dict[str, Any]] = mapped_column(JSONType, nullable=False) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), @@ -95,18 +115,24 @@ class ClipState(Base): nullable=False, ) - __table_args__ = ( - Index("idx_clip_states_status", func.jsonb_extract_path_text(data, "status")), - Index("idx_clip_states_camera", func.jsonb_extract_path_text(data, "camera_name")), - ) + # Note: Functional indexes on JSON fields are created via Alembic migrations + # using dialect-specific syntax (see alembic/versions/ for details). + # Simple column indexes can still be defined here if needed. class ClipEvent(Base): - """Event history (append-only audit log).""" + """Event history (append-only audit log). + + Uses JSONType which automatically adapts to JSONB for PostgreSQL + and JSON for SQLite. + """ __tablename__ = "clip_events" - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + # Use Integer with Identity() for cross-database autoincrement support. + # SQLite requires INTEGER PRIMARY KEY for autoincrement; BigInteger doesn't work. + # For most use cases, 32-bit Integer is sufficient for event IDs. + id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) clip_id: Mapped[str] = mapped_column( Text, ForeignKey("clip_states.clip_id", ondelete="CASCADE"), @@ -117,7 +143,7 @@ class ClipEvent(Base): nullable=False, ) event_type: Mapped[str] = mapped_column(Text, nullable=False) - event_data: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False) + event_data: Mapped[dict[str, Any]] = mapped_column(JSONType, nullable=False) __table_args__ = ( Index("idx_clip_events_clip_id", "clip_id"), @@ -127,79 +153,131 @@ class ClipEvent(Base): ) -def _normalize_async_dsn(dsn: str) -> str: - if "+asyncpg" in dsn: - return dsn - if dsn.startswith("postgresql://"): - return dsn.replace("postgresql://", "postgresql+asyncpg://", 1) - if dsn.startswith("postgres://"): - return dsn.replace("postgres://", "postgresql+asyncpg://", 1) - return dsn +# ============================================================================= +# JSON Parsing Utilities +# ============================================================================= + + +def _parse_json_payload(raw: object) -> dict[str, Any]: + """Parse JSON payload from SQLAlchemy into a dict. + + Handles different representations that may come from different + database drivers and configurations. + Args: + raw: Raw value from database (dict, str, or bytes) -class PostgresStateStore(StateStore): - """Postgres implementation of StateStore interface. + Returns: + Parsed dictionary - Implements graceful degradation: operations return None/False - instead of raising when DB is unavailable. + Raises: + TypeError: If raw is not a supported type + """ + match raw: + case dict(): + return cast(dict[str, Any], raw) + case str(): + return cast(dict[str, Any], json.loads(raw)) + case bytes() | bytearray(): + return cast(dict[str, Any], json.loads(raw.decode("utf-8"))) + case _: + raise TypeError(f"Unsupported JSON payload type: {type(raw).__name__}") + + +# ============================================================================= +# State Store Implementation +# ============================================================================= + + +class SQLAlchemyStateStore(StateStore): + """SQLAlchemy implementation of StateStore interface. + + Supports both PostgreSQL and SQLite through the DialectHelper abstraction. + Implements graceful degradation: operations return None/False instead of + raising when DB is unavailable. + + Example: + # PostgreSQL + store = SQLAlchemyStateStore("postgresql://user:pass@localhost/db") + + # SQLite (file-based) + store = SQLAlchemyStateStore("sqlite:///data/homesec.db") + + # SQLite (in-memory, for testing) + store = SQLAlchemyStateStore("sqlite:///:memory:") """ def __init__(self, dsn: str) -> None: """Initialize state store. Args: - dsn: Postgres connection string (e.g., "postgresql+asyncpg://user:pass@host/db") + dsn: Database connection string. Supported formats: + - PostgreSQL: postgresql://user:pass@host/db + - SQLite: sqlite:///path/to/db.sqlite or sqlite:///:memory: """ - self._dsn = _normalize_async_dsn(dsn) + self._dsn = dsn self._engine: AsyncEngine | None = None + self._dialect: DialectHelper | None = None + + @property + def dialect(self) -> DialectHelper | None: + """Return the dialect helper, or None if not initialized.""" + return self._dialect async def initialize(self) -> bool: - """Initialize connection pool. + """Initialize connection pool and dialect helper. - Note: Tables are created via alembic migrations, not here. + Note: Tables are created via Alembic migrations, not here. Returns: True if initialization succeeded, False otherwise """ try: - self._engine = create_async_engine( - self._dsn, - pool_pre_ping=True, - pool_size=5, - max_overflow=0, - ) + self._dialect = DialectHelper.from_dsn(self._dsn) + self._engine = create_async_engine_for_dsn(self._dsn) + # Verify connection works async with self._engine.connect() as conn: await conn.execute(select(1)) - logger.info("PostgresStateStore initialized successfully") + + logger.info( + "SQLAlchemyStateStore initialized successfully (dialect=%s)", + self._dialect.dialect_name, + ) return True except Exception as e: - logger.error("Failed to initialize PostgresStateStore: %s", e, exc_info=True) + logger.error("Failed to initialize SQLAlchemyStateStore: %s", e, exc_info=True) if self._engine is not None: await self._engine.dispose() self._engine = None + self._dialect = None return False async def upsert(self, clip_id: str, data: ClipStateData) -> None: """Insert or update clip state. + Uses dialect-specific upsert (INSERT ... ON CONFLICT DO UPDATE). Raises on execution errors so callers can retry/log appropriately. """ - if self._engine is None: + if self._engine is None or self._dialect is None: logger.warning("StateStore not initialized, skipping upsert for %s", clip_id) return json_data = data.model_dump(mode="json") table = cast(Table, ClipState.__table__) - stmt = pg_insert(table).values( + + # Use dialect-specific insert for upsert support + stmt = self._dialect.insert(table).values( clip_id=clip_id, data=json_data, updated_at=func.now(), ) - stmt = stmt.on_conflict_do_update( - index_elements=[table.c.clip_id], + stmt = self._dialect.on_conflict_do_update( + stmt, + index_elements=["clip_id"], set_={"data": stmt.excluded.data, "updated_at": func.now()}, ) + async with self._engine.begin() as conn: await conn.execute(stmt) @@ -218,11 +296,12 @@ async def get(self, clip_id: str) -> ClipStateData | None: select(ClipState.data).where(ClipState.clip_id == clip_id) ) raw = result.scalar_one_or_none() + if raw is None: return None # Parse JSON and validate with Pydantic - data_dict = self._parse_state_data(raw) + data_dict = _parse_json_payload(raw) return ClipStateData.model_validate(data_dict) except Exception as e: logger.error( @@ -248,7 +327,7 @@ async def list_candidate_clips_for_cleanup( Cursor is `(created_at, clip_id)` from the last row of the previous page. """ - if self._engine is None: + if self._engine is None or self._dialect is None: logger.warning("StateStore not initialized, returning empty cleanup candidate list") return [] @@ -257,15 +336,18 @@ async def list_candidate_clips_for_cleanup( if older_than_days is not None and older_than_days < 0: raise ValueError("older_than_days must be >= 0") - status_expr = func.jsonb_extract_path_text(ClipState.data, "status") - camera_expr = func.jsonb_extract_path_text(ClipState.data, "camera_name") + # Use dialect helper for JSON field extraction + status_expr = self._dialect.json_extract_text(ClipState.data, "status") + camera_expr = self._dialect.json_extract_text(ClipState.data, "camera_name") conditions = [or_(status_expr.is_(None), status_expr != "deleted")] + if camera_name is not None: conditions.append(camera_expr == camera_name) + if older_than_days is not None: conditions.append( - ClipState.created_at < func.now() - func.make_interval(days=int(older_than_days)) + self._dialect.timestamp_older_than(ClipState.created_at, older_than_days) ) if cursor is not None: @@ -292,20 +374,20 @@ async def list_candidate_clips_for_cleanup( rows = result.all() items: list[tuple[str, ClipStateData, datetime]] = [] - for clip_id, raw, created_at in rows: + for row_clip_id, raw, created_at in rows: try: - data_dict = self._parse_state_data(raw) + data_dict = _parse_json_payload(raw) state = ClipStateData.model_validate(data_dict) except Exception as exc: logger.warning( "Failed parsing clip state for cleanup: %s error=%s", - clip_id, + row_clip_id, exc, exc_info=True, ) continue - items.append((clip_id, state, created_at)) + items.append((row_clip_id, state, created_at)) return items @@ -331,81 +413,59 @@ async def shutdown(self, timeout: float | None = None) -> None: if self._engine is not None: await self._engine.dispose() self._engine = None - logger.info("PostgresStateStore closed") + self._dialect = None + logger.info("SQLAlchemyStateStore closed") - @staticmethod - def _parse_state_data(raw: object) -> dict[str, Any]: - """Parse JSONB payload from SQLAlchemy into a dict.""" - return _parse_jsonb_payload(raw) + def is_retryable_error(self, exc: Exception) -> bool: + """Check if an exception is a retryable database error. - def create_event_store(self) -> PostgresEventStore | NoopEventStore: - """Create a Postgres-backed event store or a no-op fallback.""" - if self._engine is None: + Delegates to DialectHelper for dialect-specific error classification. + """ + if self._dialect is None: + return False + return self._dialect.is_retryable_error(exc) + + def create_event_store(self) -> SQLAlchemyEventStore | NoopEventStore: + """Create an event store using the same engine, or a no-op fallback.""" + if self._engine is None or self._dialect is None: return NoopEventStore() - return PostgresEventStore(self._engine) + return SQLAlchemyEventStore(self._engine, self._dialect) + @staticmethod + def _parse_state_data(raw: object) -> dict[str, Any]: + """Parse JSON payload from SQLAlchemy into a dict. -def _parse_jsonb_payload(raw: object) -> dict[str, Any]: - """Parse JSONB payload from SQLAlchemy into a dict.""" - match raw: - case dict(): - return cast(dict[str, Any], raw) - case str(): - return cast(dict[str, Any], json.loads(raw)) - case bytes() | bytearray(): - return cast(dict[str, Any], json.loads(raw.decode("utf-8"))) - case _: - raise TypeError(f"Unsupported JSONB payload type: {type(raw).__name__}") - - -_RETRYABLE_SQLSTATES = { - "08000", # connection_exception - "08003", # connection_does_not_exist - "08006", # connection_failure - "08007", # transaction_resolution_unknown - "08001", # sqlclient_unable_to_establish_sqlconnection - "08004", # sqlserver_rejected_establishment_of_sqlconnection - "40P01", # deadlock_detected - "40001", # serialization_failure - "53300", # too_many_connections - "57P01", # admin_shutdown - "57P02", # crash_shutdown - "57P03", # cannot_connect_now -} + Provided for backwards compatibility with tests. + """ + return _parse_json_payload(raw) -def _extract_sqlstate(exc: BaseException) -> str | None: - for candidate in (exc, getattr(exc, "orig", None)): - if candidate is None: - continue - sqlstate = getattr(candidate, "sqlstate", None) or getattr(candidate, "pgcode", None) - if sqlstate: - return str(sqlstate) - return None +# ============================================================================= +# Event Store Implementation +# ============================================================================= -def is_retryable_pg_error(exc: Exception) -> bool: - """Return True if the exception is likely a transient Postgres error.""" - if isinstance(exc, OperationalError): - return True - if isinstance(exc, DBAPIError) and exc.connection_invalidated: - return True - sqlstate = _extract_sqlstate(exc) - return sqlstate in _RETRYABLE_SQLSTATES +class SQLAlchemyEventStore(EventStore): + """SQLAlchemy implementation of EventStore interface. + Supports both PostgreSQL and SQLite through the DialectHelper abstraction. + """ -class PostgresEventStore(EventStore): - """Postgres implementation of EventStore interface.""" + def __init__(self, engine: AsyncEngine, dialect: DialectHelper) -> None: + """Initialize with shared engine and dialect from StateStore. - def __init__(self, engine: AsyncEngine) -> None: - """Initialize with shared engine from StateStore.""" + Args: + engine: SQLAlchemy async engine + dialect: DialectHelper for dialect-specific operations + """ self._engine = engine + self._dialect = dialect async def append(self, event: ClipLifecycleEvent) -> None: """Append a single event.""" try: async with self._engine.begin() as conn: - table = cast(Any, ClipEvent.__table__) + table = cast(Table, ClipEvent.__table__) payload = { "clip_id": event.clip_id, "timestamp": event.timestamp, @@ -415,7 +475,8 @@ async def append(self, event: ClipLifecycleEvent) -> None: exclude={"id", "event_type"}, ), } - await conn.execute(pg_insert(table), [payload]) + # Use standard insert - no conflict handling needed for append-only log + await conn.execute(insert(table).values(**payload)) except Exception as e: logger.error("Failed to append event: %s", e, exc_info=e) raise @@ -440,7 +501,7 @@ async def get_events( events: list[ClipLifecycleEvent] = [] for event_id, event_type, event_data in rows: - event_dict = _parse_jsonb_payload(event_data) + event_dict = _parse_json_payload(event_data) event_dict.setdefault("event_type", event_type) event_dict["id"] = event_id event_cls = _EVENT_TYPE_MAP.get(event_type) @@ -456,10 +517,16 @@ async def get_events( return [] +# ============================================================================= +# No-op Implementations (for graceful degradation) +# ============================================================================= + + class NoopEventStore(EventStore): - """Event store that drops events (used when Postgres is unavailable).""" + """Event store that drops events (used when database is unavailable).""" async def append(self, event: ClipLifecycleEvent) -> None: + """No-op: event is dropped.""" return async def get_events( @@ -467,6 +534,7 @@ async def get_events( clip_id: str, after_id: int | None = None, ) -> list[ClipLifecycleEvent]: + """No-op: returns empty list.""" return [] @@ -474,9 +542,11 @@ class NoopStateStore(StateStore): """State store that drops writes and returns no data.""" async def upsert(self, clip_id: str, data: ClipStateData) -> None: + """No-op: data is dropped.""" return async def get(self, clip_id: str) -> ClipStateData | None: + """No-op: returns None.""" return None async def list_candidate_clips_for_cleanup( @@ -487,6 +557,7 @@ async def list_candidate_clips_for_cleanup( batch_size: int, cursor: tuple[datetime, str] | None = None, ) -> list[tuple[str, ClipStateData, datetime]]: + """No-op: returns empty list.""" _ = older_than_days _ = camera_name _ = batch_size @@ -494,7 +565,38 @@ async def list_candidate_clips_for_cleanup( return [] async def shutdown(self, timeout: float | None = None) -> None: + """No-op.""" return async def ping(self) -> bool: + """No-op: always returns False.""" return False + + +# ============================================================================= +# Backwards Compatibility +# ============================================================================= + +# Aliases for backwards compatibility with existing code +PostgresStateStore = SQLAlchemyStateStore +PostgresEventStore = SQLAlchemyEventStore + + +def is_retryable_pg_error(exc: Exception) -> bool: + """Return True if the exception is likely a transient database error. + + This function is provided for backwards compatibility. New code should + use SQLAlchemyStateStore.is_retryable_error() or DialectHelper.is_retryable_error(). + """ + # Create a PostgreSQL dialect helper for backwards-compatible behavior + dialect = DialectHelper("postgresql") + return dialect.is_retryable_error(exc) + + +def _normalize_async_dsn(dsn: str) -> str: + """Normalize DSN to include async driver. + + This function is provided for backwards compatibility. New code should + use DialectHelper.normalize_dsn(). + """ + return DialectHelper.normalize_dsn(dsn) diff --git a/src/homesec/telemetry/db/log_table.py b/src/homesec/telemetry/db/log_table.py index cdff007..8a788dc 100644 --- a/src/homesec/telemetry/db/log_table.py +++ b/src/homesec/telemetry/db/log_table.py @@ -1,7 +1,14 @@ +"""Telemetry logs table definition. + +Uses JSONType for database-agnostic JSON storage (JSONB for PostgreSQL, +JSON for SQLite). +""" + from __future__ import annotations from sqlalchemy import BigInteger, Column, DateTime, Index, MetaData, Table, func -from sqlalchemy.dialects.postgresql import JSONB + +from homesec.db import JSONType metadata = MetaData() @@ -10,7 +17,7 @@ metadata, Column("id", BigInteger, primary_key=True, autoincrement=True), Column("ts", DateTime(timezone=True), server_default=func.now(), nullable=False), - Column("payload", JSONB, nullable=False), + Column("payload", JSONType, nullable=False), ) Index("logs_ts_idx", logs.c.ts.desc()) diff --git a/tests/homesec/conftest.py b/tests/homesec/conftest.py index 66b9a1d..70985ee 100644 --- a/tests/homesec/conftest.py +++ b/tests/homesec/conftest.py @@ -1,8 +1,17 @@ -"""Shared pytest fixtures for HomeSec tests.""" +"""Shared pytest fixtures for HomeSec tests. +Provides parametrized database fixtures that run tests against both +SQLite and PostgreSQL backends. +""" + +from __future__ import annotations + +import os import sys +from collections.abc import AsyncGenerator from datetime import datetime from pathlib import Path +from typing import TYPE_CHECKING # Add src to sys.path for imports src_path = Path(__file__).parent.parent.parent / "src" @@ -20,6 +29,171 @@ MockVLM, ) +if TYPE_CHECKING: + from homesec.state.postgres import SQLAlchemyStateStore + +# ============================================================================= +# Database Backend Configuration +# ============================================================================= + +# Default PostgreSQL DSN for local Docker (matches docker-compose.postgres.yml) +DEFAULT_PG_DSN = "postgresql://homesec:homesec@localhost:5432/homesec" + + +def _should_skip_postgres() -> bool: + """Check if PostgreSQL tests should be skipped.""" + return os.environ.get("SKIP_POSTGRES_TESTS", "0") == "1" + + +def _get_postgres_dsn() -> str | None: + """Get PostgreSQL DSN from environment, or None if unavailable.""" + if _should_skip_postgres(): + return None + return os.environ.get("TEST_DB_DSN", DEFAULT_PG_DSN) + + +# ============================================================================= +# Parametrized Database Fixtures +# ============================================================================= + + +@pytest.fixture(params=["sqlite", "postgresql"]) +def db_backend(request: pytest.FixtureRequest) -> str: + """Parametrize tests to run against both database backends. + + Tests are run against both SQLite (in-memory) and PostgreSQL. + PostgreSQL tests can be skipped by setting SKIP_POSTGRES_TESTS=1. + + Returns: + Database backend name ("sqlite" or "postgresql") + """ + backend = request.param + + if backend == "postgresql": + if _should_skip_postgres(): + pytest.skip("PostgreSQL tests disabled via SKIP_POSTGRES_TESTS=1") + + pg_dsn = _get_postgres_dsn() + if pg_dsn is None: + pytest.skip("TEST_DB_DSN not set, skipping PostgreSQL tests") + + return backend + + +@pytest.fixture +def db_dsn(db_backend: str) -> str: + """Return appropriate DSN for the database backend. + + Args: + db_backend: Either "sqlite" or "postgresql" + + Returns: + Database connection string + """ + if db_backend == "sqlite": + # In-memory SQLite for fast testing + return "sqlite+aiosqlite:///:memory:" + else: + pg_dsn = _get_postgres_dsn() + assert pg_dsn is not None, "PostgreSQL DSN should be available" + return pg_dsn + + +@pytest.fixture +def db_dsn_for_tests(db_backend: str, db_dsn: str, tmp_path: Path) -> str: + """Return the DSN used by fixtures that need a consistent database.""" + if db_backend == "sqlite": + return f"sqlite+aiosqlite:///{tmp_path / 'tests.db'}" + return db_dsn + + +@pytest.fixture +async def state_store(db_dsn: str) -> AsyncGenerator[SQLAlchemyStateStore, None]: + """Create and initialize a state store for testing. + + Works with both SQLite and PostgreSQL via the parametrized db_dsn fixture. + Creates fresh tables for each test and cleans up afterward. + + Yields: + Initialized SQLAlchemyStateStore instance + """ + from sqlalchemy import delete + + from homesec.state.postgres import Base, ClipEvent, ClipState, SQLAlchemyStateStore + + store = SQLAlchemyStateStore(db_dsn) + initialized = await store.initialize() + assert initialized, f"Failed to initialize state store with {db_dsn}" + + # Create fresh tables for the test + if store._engine is not None: + async with store._engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await conn.run_sync(Base.metadata.create_all) + + yield store + + # Cleanup: delete test data + if store._engine is not None: + async with store._engine.begin() as conn: + # Delete in correct order due to foreign key constraint + await conn.execute(delete(ClipEvent).where(ClipEvent.clip_id.like("test%"))) + await conn.execute(delete(ClipState).where(ClipState.clip_id.like("test%"))) + + await store.shutdown() + + +# ============================================================================= +# Legacy Fixtures (for backwards compatibility) +# ============================================================================= + + +@pytest.fixture +def postgres_dsn() -> str: + """Return test Postgres DSN (requires local DB running). + + This fixture is provided for backwards compatibility. + New tests should use the parametrized db_dsn fixture instead. + """ + dsn = _get_postgres_dsn() + if dsn is None: + pytest.skip("PostgreSQL not available") + return dsn + + +@pytest.fixture +async def clean_test_db(db_dsn_for_tests: str) -> AsyncGenerator[None, None]: + """Clean up test data after each test. + + This fixture is provided for backwards compatibility. + New tests should use the parametrized state_store fixture instead. + """ + from sqlalchemy import delete + + from homesec.state.postgres import Base, ClipEvent, ClipState, SQLAlchemyStateStore + + store = SQLAlchemyStateStore(db_dsn_for_tests) + await store.initialize() + if store._engine is not None: + async with store._engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await conn.run_sync(Base.metadata.create_all) + + yield # Run the test first + + # Cleanup after test + if store._engine: + async with store._engine.begin() as conn: + # Delete in correct order due to foreign key + await conn.execute(delete(ClipEvent).where(ClipEvent.clip_id.like("test-%"))) + await conn.execute(delete(ClipState).where(ClipState.clip_id.like("test-%"))) + await store.shutdown() + + +# ============================================================================= +# Mock Fixtures +# ============================================================================= + @pytest.fixture def mock_filter() -> MockFilter: @@ -64,36 +238,3 @@ def sample_clip() -> Clip: duration_s=10.0, source_type="rtsp", ) - - -@pytest.fixture -def postgres_dsn() -> str: - """Return test Postgres DSN (requires local DB running).""" - import os - - return os.getenv("TEST_DB_DSN", "postgresql://homesec:homesec@localhost:5432/homesec") - - -@pytest.fixture -async def clean_test_db(postgres_dsn: str) -> None: - """Clean up test data after each test.""" - from sqlalchemy import delete - - from homesec.state.postgres import Base, ClipEvent, ClipState, PostgresStateStore - - store = PostgresStateStore(postgres_dsn) - await store.initialize() - if store._engine is not None: - async with store._engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) - await conn.run_sync(Base.metadata.create_all) - - yield # Run the test first - - # Cleanup after test - if store._engine: - async with store._engine.begin() as conn: - # Delete in correct order due to foreign key - await conn.execute(delete(ClipEvent).where(ClipEvent.clip_id.like("test-%"))) - await conn.execute(delete(ClipState).where(ClipState.clip_id.like("test-%"))) - await store.shutdown() diff --git a/tests/homesec/test_cleanup_clips.py b/tests/homesec/test_cleanup_clips.py index 3a54218..49271d4 100644 --- a/tests/homesec/test_cleanup_clips.py +++ b/tests/homesec/test_cleanup_clips.py @@ -87,9 +87,19 @@ def _write_cleanup_config(path: Path, *, dsn: str, storage_root: Path) -> None: path.write_text(yaml.safe_dump(config, sort_keys=False)) +@pytest.fixture +def db_dsn_for_tests(db_backend: str, db_dsn: str, tmp_path: Path) -> str: + if db_backend == "sqlite": + return f"sqlite+aiosqlite:///{tmp_path / 'cleanup.db'}" + return db_dsn + + @pytest.mark.asyncio async def test_cleanup_deletes_empty_clips( - postgres_dsn: str, tmp_path: Path, clean_test_db: None, monkeypatch: pytest.MonkeyPatch + db_dsn_for_tests: str, + tmp_path: Path, + clean_test_db: None, + monkeypatch: pytest.MonkeyPatch, ) -> None: """Cleanup removes clips that still have no detections.""" # Given a clip state with empty filter result and local + storage copies @@ -101,7 +111,7 @@ async def test_cleanup_deletes_empty_clips( dest_path = f"clips/front/{clip_id}.mp4" upload = await storage.put_file(local_path, dest_path) - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() empty_filter = FilterResult( detected_classes=[], @@ -120,7 +130,7 @@ async def test_cleanup_deletes_empty_clips( await state_store.upsert(clip_id, state) config_path = tmp_path / "config.yaml" - _write_cleanup_config(config_path, dsn=postgres_dsn, storage_root=storage_root) + _write_cleanup_config(config_path, dsn=db_dsn_for_tests, storage_root=storage_root) filter_plugin = _TestFilter(detect_on=set()) monkeypatch.setattr( @@ -155,7 +165,10 @@ async def test_cleanup_deletes_empty_clips( @pytest.mark.asyncio async def test_cleanup_marks_false_negatives( - postgres_dsn: str, tmp_path: Path, clean_test_db: None, monkeypatch: pytest.MonkeyPatch + db_dsn_for_tests: str, + tmp_path: Path, + clean_test_db: None, + monkeypatch: pytest.MonkeyPatch, ) -> None: """Cleanup should recheck clips and skip delete when detections appear.""" # Given a clip state that is empty on first pass @@ -167,7 +180,7 @@ async def test_cleanup_marks_false_negatives( dest_path = f"clips/front/{clip_id}.mp4" upload = await storage.put_file(local_path, dest_path) - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() empty_filter = FilterResult( detected_classes=[], @@ -186,7 +199,7 @@ async def test_cleanup_marks_false_negatives( await state_store.upsert(clip_id, state) config_path = tmp_path / "config.yaml" - _write_cleanup_config(config_path, dsn=postgres_dsn, storage_root=storage_root) + _write_cleanup_config(config_path, dsn=db_dsn_for_tests, storage_root=storage_root) filter_plugin = _TestFilter(detect_on={"detect"}) monkeypatch.setattr( diff --git a/tests/homesec/test_clip_repository.py b/tests/homesec/test_clip_repository.py index 4a8f2ff..551af54 100644 --- a/tests/homesec/test_clip_repository.py +++ b/tests/homesec/test_clip_repository.py @@ -19,9 +19,9 @@ @pytest.mark.asyncio -async def test_initialize_clip(postgres_dsn: str, tmp_path: Path, clean_test_db: None) -> None: +async def test_initialize_clip(db_dsn_for_tests: str, tmp_path: Path, clean_test_db: None) -> None: # Given: A repository with state and event stores - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) @@ -56,10 +56,10 @@ async def test_initialize_clip(postgres_dsn: str, tmp_path: Path, clean_test_db: @pytest.mark.asyncio async def test_record_upload_completed( - postgres_dsn: str, tmp_path: Path, clean_test_db: None + db_dsn_for_tests: str, tmp_path: Path, clean_test_db: None ) -> None: # Given: A clip that's been initialized - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) @@ -102,10 +102,10 @@ async def test_record_upload_completed( @pytest.mark.asyncio async def test_record_filter_completed( - postgres_dsn: str, tmp_path: Path, clean_test_db: None + db_dsn_for_tests: str, tmp_path: Path, clean_test_db: None ) -> None: # Given: A clip that's been initialized - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) @@ -149,12 +149,12 @@ async def test_record_filter_completed( @pytest.mark.asyncio async def test_record_clip_rechecked_updates_state_and_event( - postgres_dsn: str, + db_dsn_for_tests: str, tmp_path: Path, clean_test_db: None, ) -> None: # Given: A clip initialized in the repository - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) @@ -210,9 +210,11 @@ async def test_record_clip_rechecked_updates_state_and_event( @pytest.mark.asyncio -async def test_record_vlm_completed(postgres_dsn: str, tmp_path: Path, clean_test_db: None) -> None: +async def test_record_vlm_completed( + db_dsn_for_tests: str, tmp_path: Path, clean_test_db: None +) -> None: # Given: A clip that's been initialized - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) @@ -267,10 +269,10 @@ async def test_record_vlm_completed(postgres_dsn: str, tmp_path: Path, clean_tes @pytest.mark.asyncio async def test_record_notification_sent( - postgres_dsn: str, tmp_path: Path, clean_test_db: None + db_dsn_for_tests: str, tmp_path: Path, clean_test_db: None ) -> None: # Given: A clip that's been analyzed - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) diff --git a/tests/homesec/test_event_store.py b/tests/homesec/test_event_store.py index b68cf46..4f6a4ea 100644 --- a/tests/homesec/test_event_store.py +++ b/tests/homesec/test_event_store.py @@ -17,9 +17,9 @@ @pytest.mark.asyncio -async def test_append_and_get_events(postgres_dsn: str, clean_test_db: None) -> None: +async def test_append_and_get_events(db_dsn_for_tests: str, clean_test_db: None) -> None: # Given: A state store and event store with initialized tables - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) @@ -82,11 +82,11 @@ async def test_append_and_get_events(postgres_dsn: str, clean_test_db: None) -> @pytest.mark.asyncio async def test_append_and_get_clip_deleted_event( - postgres_dsn: str, + db_dsn_for_tests: str, clean_test_db: None, ) -> None: # Given: A state store and event store with initialized tables - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) @@ -129,9 +129,9 @@ async def test_append_and_get_clip_deleted_event( @pytest.mark.asyncio -async def test_get_events_after_id(postgres_dsn: str, clean_test_db: None) -> None: +async def test_get_events_after_id(db_dsn_for_tests: str, clean_test_db: None) -> None: # Given: A clip with multiple events - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) @@ -180,9 +180,9 @@ async def test_get_events_after_id(postgres_dsn: str, clean_test_db: None) -> No @pytest.mark.asyncio -async def test_events_for_nonexistent_clip(postgres_dsn: str) -> None: +async def test_events_for_nonexistent_clip(db_dsn_for_tests: str) -> None: # Given: An initialized event store - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) diff --git a/tests/homesec/test_local_folder_deduplication.py b/tests/homesec/test_local_folder_deduplication.py index d215b3a..6e7ae68 100644 --- a/tests/homesec/test_local_folder_deduplication.py +++ b/tests/homesec/test_local_folder_deduplication.py @@ -16,10 +16,12 @@ @pytest.mark.asyncio -async def test_new_file_is_emitted(tmp_path: Path, postgres_dsn: str, clean_test_db: None) -> None: +async def test_new_file_is_emitted( + tmp_path: Path, db_dsn_for_tests: str, clean_test_db: None +) -> None: """Test that a new file is detected and emitted.""" # Given: LocalFolderSource with state_store - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() config = LocalFolderSourceConfig( @@ -52,11 +54,11 @@ async def test_new_file_is_emitted(tmp_path: Path, postgres_dsn: str, clean_test @pytest.mark.asyncio async def test_file_already_in_cache_not_emitted( - tmp_path: Path, postgres_dsn: str, clean_test_db: None + tmp_path: Path, db_dsn_for_tests: str, clean_test_db: None ) -> None: """Test that a file already in the in-memory cache is not reprocessed.""" # Given: LocalFolderSource with a file already in cache - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() config = LocalFolderSourceConfig( @@ -95,11 +97,11 @@ async def test_file_already_in_cache_not_emitted( @pytest.mark.asyncio async def test_file_already_processed_not_emitted( - tmp_path: Path, postgres_dsn: str, clean_test_db: None + tmp_path: Path, db_dsn_for_tests: str, clean_test_db: None ) -> None: """Test that a file with existing clip_state is not reprocessed.""" # Given: LocalFolderSource with state_store - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() # Create a clip_state for a file BEFORE starting source @@ -145,11 +147,11 @@ async def test_file_already_processed_not_emitted( @pytest.mark.asyncio async def test_deleted_file_tombstone_prevents_reprocessing( - tmp_path: Path, postgres_dsn: str, clean_test_db: None + tmp_path: Path, db_dsn_for_tests: str, clean_test_db: None ) -> None: """Test that a file with status='deleted' (tombstone) is not reprocessed.""" # Given: LocalFolderSource with state_store - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() # Create a tombstone (status='deleted') for a file @@ -190,11 +192,11 @@ async def test_deleted_file_tombstone_prevents_reprocessing( @pytest.mark.asyncio async def test_old_mtime_new_file_is_emitted( - tmp_path: Path, postgres_dsn: str, clean_test_db: None + tmp_path: Path, db_dsn_for_tests: str, clean_test_db: None ) -> None: """Test that a new file with an old mtime IS emitted (no watermark bug).""" # Given: LocalFolderSource with state_store - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() config = LocalFolderSourceConfig( @@ -234,11 +236,11 @@ async def test_old_mtime_new_file_is_emitted( @pytest.mark.asyncio async def test_cache_eviction_with_db_check_prevents_reprocessing( - tmp_path: Path, postgres_dsn: str, clean_test_db: None + tmp_path: Path, db_dsn_for_tests: str, clean_test_db: None ) -> None: """Test that evicted files from cache are not reprocessed (DB check prevents it).""" # Given: LocalFolderSource with state_store and small cache size - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() config = LocalFolderSourceConfig( @@ -350,11 +352,11 @@ async def test_no_state_store_falls_back_to_cache_only(tmp_path: Path) -> None: @pytest.mark.asyncio async def test_multiple_files_some_seen_some_new( - tmp_path: Path, postgres_dsn: str, clean_test_db: None + tmp_path: Path, db_dsn_for_tests: str, clean_test_db: None ) -> None: """Test mixed scenario: some files new, some already processed.""" # Given: LocalFolderSource with state_store - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() # Create clip_states for some files diff --git a/tests/homesec/test_migrations.py b/tests/homesec/test_migrations.py new file mode 100644 index 0000000..3dfb935 --- /dev/null +++ b/tests/homesec/test_migrations.py @@ -0,0 +1,163 @@ +"""Smoke tests for Alembic migrations on SQLite and PostgreSQL.""" + +from __future__ import annotations + +import asyncio +from datetime import datetime +from pathlib import Path + +import pytest +import sqlalchemy as sa +from alembic import command +from alembic.autogenerate import compare_metadata +from alembic.config import Config +from alembic.runtime.migration import MigrationContext +from sqlalchemy import MetaData + +from homesec.db import create_async_engine_for_dsn +from homesec.models.clip import ClipStateData +from homesec.models.events import ClipRecordedEvent +from homesec.state.postgres import Base as StateBase +from homesec.state.postgres import SQLAlchemyStateStore +from homesec.telemetry.db.log_table import metadata as telemetry_metadata + +_TARGET_METADATA = MetaData() +for table in telemetry_metadata.tables.values(): + table.to_metadata(_TARGET_METADATA) +for table in StateBase.metadata.tables.values(): + table.to_metadata(_TARGET_METADATA) + +_SKIP_INDEX_NAMES = {"idx_clip_states_camera", "idx_clip_states_status"} + + +def _is_ignored_diff(diff: object) -> bool: + candidate = diff + if isinstance(diff, list) and len(diff) == 1 and isinstance(diff[0], tuple): + candidate = diff[0] + if not isinstance(candidate, tuple) or not candidate: + return False + if candidate[0] != "modify_default": + return False + _, _, table_name, column_name, *_ = candidate + return table_name == "clip_events" and column_name == "id" + + +def _include_object( + obj: object, name: str | None, type_: str, reflected: bool, compare_to: object +) -> bool: + _ = obj + _ = reflected + _ = compare_to + if type_ == "table" and name == "alembic_version": + return False + if type_ == "index" and name is not None and name.startswith("sqlite_autoindex"): + return False + if type_ == "index" and name in _SKIP_INDEX_NAMES: + return False + return True + + +@pytest.fixture +def migration_db_dsn(db_backend: str, db_dsn: str, tmp_path: Path) -> str: + """Return a DSN suitable for Alembic migrations.""" + if db_backend == "sqlite": + return f"sqlite+aiosqlite:///{tmp_path / 'alembic_test.db'}" + return db_dsn + + +def _load_alembic_config() -> Config: + root = Path(__file__).resolve().parents[2] + config = Config(str(root / "alembic.ini")) + config.set_main_option("script_location", str(root / "alembic")) + return config + + +async def _reset_database(dsn: str) -> None: + engine = create_async_engine_for_dsn(dsn) + try: + async with engine.begin() as conn: + + def _drop(sync_conn) -> None: + _TARGET_METADATA.drop_all(bind=sync_conn, checkfirst=True) + sync_conn.execute(sa.text("DROP TABLE IF EXISTS alembic_version")) + + await conn.run_sync(_drop) + finally: + await engine.dispose() + + +async def _assert_schema_matches_models(dsn: str) -> None: + engine = create_async_engine_for_dsn(dsn) + diffs: list[object] = [] + try: + async with engine.connect() as conn: + + def _compare(sync_conn) -> list[object]: + context = MigrationContext.configure( + connection=sync_conn, + opts={ + "compare_type": True, + "compare_server_default": False, + "include_object": _include_object, + }, + ) + return compare_metadata(context, _TARGET_METADATA) + + diffs = await conn.run_sync(_compare) + finally: + await engine.dispose() + + diffs = [diff for diff in diffs if not _is_ignored_diff(diff)] + assert diffs == [] + + +async def _assert_basic_roundtrip(dsn: str) -> None: + store = SQLAlchemyStateStore(dsn) + try: + initialized = await store.initialize() + assert initialized is True + + clip_id = "test-migration-001" + state = ClipStateData( + camera_name="front_door", + status="queued_local", + local_path="/tmp/test.mp4", + ) + await store.upsert(clip_id, state) + + loaded = await store.get(clip_id) + assert loaded is not None + assert loaded.camera_name == "front_door" + + event_store = store.create_event_store() + event = ClipRecordedEvent( + clip_id=clip_id, + timestamp=datetime.now(), + camera_name="front_door", + duration_s=10.0, + source_type="test", + ) + await event_store.append(event) + + events = await event_store.get_events(clip_id) + assert len(events) == 1 + assert events[0].event_type == "clip_recorded" + finally: + await store.shutdown() + + +def test_alembic_migrations_match_models( + migration_db_dsn: str, monkeypatch: pytest.MonkeyPatch +) -> None: + """Run Alembic migrations and verify model compatibility.""" + # Given: A clean database and alembic configuration + monkeypatch.setenv("DB_DSN", migration_db_dsn) + config = _load_alembic_config() + asyncio.run(_reset_database(migration_db_dsn)) + + # When: Running migrations to head + command.upgrade(config, "head") + + # Then: Schema matches models and basic queries succeed + asyncio.run(_assert_schema_matches_models(migration_db_dsn)) + asyncio.run(_assert_basic_roundtrip(migration_db_dsn)) diff --git a/tests/homesec/test_pipeline_events.py b/tests/homesec/test_pipeline_events.py index 8796b74..5dea1a1 100644 --- a/tests/homesec/test_pipeline_events.py +++ b/tests/homesec/test_pipeline_events.py @@ -102,10 +102,10 @@ def make_clip(tmp_path: Path, clip_id: str) -> Clip: @pytest.mark.asyncio async def test_pipeline_emits_success_events( - postgres_dsn: str, tmp_path: Path, clean_test_db: None + db_dsn_for_tests: str, tmp_path: Path, clean_test_db: None ) -> None: # Given: A real Postgres event store and a pipeline with successful mocks - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) @@ -172,10 +172,10 @@ async def test_pipeline_emits_success_events( @pytest.mark.asyncio async def test_pipeline_emits_notification_events_per_notifier( - postgres_dsn: str, tmp_path: Path, clean_test_db: None + db_dsn_for_tests: str, tmp_path: Path, clean_test_db: None ) -> None: # Given: A real Postgres event store and two notifier entries - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) @@ -223,10 +223,10 @@ async def test_pipeline_emits_notification_events_per_notifier( @pytest.mark.asyncio async def test_pipeline_emits_vlm_skipped_event( - postgres_dsn: str, tmp_path: Path, clean_test_db: None + db_dsn_for_tests: str, tmp_path: Path, clean_test_db: None ) -> None: # Given: A clip with non-trigger classes and notify_on_motion disabled - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) @@ -268,10 +268,10 @@ async def test_pipeline_emits_vlm_skipped_event( @pytest.mark.asyncio async def test_pipeline_emits_upload_failed_event( - postgres_dsn: str, tmp_path: Path, clean_test_db: None + db_dsn_for_tests: str, tmp_path: Path, clean_test_db: None ) -> None: # Given: A pipeline where upload fails but filter/VLM succeed - state_store = PostgresStateStore(postgres_dsn) + state_store = PostgresStateStore(db_dsn_for_tests) await state_store.initialize() event_store = state_store.create_event_store() assert isinstance(event_store, PostgresEventStore) diff --git a/tests/homesec/test_state_store.py b/tests/homesec/test_state_store.py index 306b36f..f5bde8b 100644 --- a/tests/homesec/test_state_store.py +++ b/tests/homesec/test_state_store.py @@ -1,134 +1,170 @@ -"""Tests for PostgresStateStore.""" +"""Tests for SQLAlchemyStateStore. -import os +These tests run against both SQLite and PostgreSQL backends via parametrized +fixtures. Use SKIP_POSTGRES_TESTS=1 to run only SQLite tests locally. +""" + +from __future__ import annotations import pytest -from sqlalchemy import delete +from homesec.db import DialectHelper from homesec.models.clip import ClipStateData from homesec.models.filter import FilterResult -from homesec.state import PostgresStateStore -from homesec.state.postgres import Base, ClipState, _normalize_async_dsn - -# Default DSN for local Docker Postgres (matches docker-compose.postgres.yml) -DEFAULT_DSN = "postgresql://homesec:homesec@localhost:5432/homesec" - - -def get_test_dsn() -> str: - """Get test database DSN from environment or use default.""" - return os.environ.get("TEST_DB_DSN", DEFAULT_DSN) - - -@pytest.fixture -async def state_store() -> PostgresStateStore: - """Create and initialize a PostgresStateStore for testing.""" - dsn = get_test_dsn() - assert dsn is not None - store = PostgresStateStore(dsn) - initialized = await store.initialize() - assert initialized, "Failed to initialize state store" - if store._engine is not None: - async with store._engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) - await conn.run_sync(Base.metadata.create_all) - yield store - # Cleanup: drop test data - if store._engine is not None: - async with store._engine.begin() as conn: - await conn.execute(delete(ClipState).where(ClipState.clip_id.like("test_%"))) - await store.shutdown() +from homesec.state import SQLAlchemyStateStore +from homesec.state.postgres import _normalize_async_dsn, _parse_json_payload +# ============================================================================= +# Unit Tests (no database required) +# ============================================================================= -def sample_state(clip_id: str = "test_clip_001") -> ClipStateData: - """Create a sample ClipStateData for testing.""" - return ClipStateData( - camera_name="front_door", - status="queued_local", - local_path="/tmp/test.mp4", - ) + +def test_parse_json_payload_accepts_dict() -> None: + """Test JSON parsing handles dicts.""" + # Given: A dict payload + payload = {"status": "uploaded", "camera_name": "front_door"} + + # When: Parsing the payload + result = _parse_json_payload(payload) + + # Then: Dict is returned as-is + assert result == payload -def test_parse_state_data_accepts_dict_and_str() -> None: - """Test JSONB parsing handles dicts and JSON strings.""" - # Given a clip state dict and JSON string - state = sample_state() - raw_dict = state.model_dump() - raw_str = state.model_dump_json() +def test_parse_json_payload_accepts_str() -> None: + """Test JSON parsing handles JSON strings.""" + # Given: A JSON string + json_str = '{"status": "uploaded", "camera_name": "front_door"}' - # When parsing raw JSONB payloads - parsed_dict = PostgresStateStore._parse_state_data(raw_dict) - parsed_str = PostgresStateStore._parse_state_data(raw_str) + # When: Parsing the JSON string + result = _parse_json_payload(json_str) - # Then both parse to dicts - assert parsed_dict == raw_dict - assert parsed_str == raw_dict - ClipStateData.model_validate(parsed_str) + # Then: String is parsed to dict + assert result == {"status": "uploaded", "camera_name": "front_door"} -def test_parse_state_data_accepts_bytes() -> None: - """Test JSONB parsing handles bytes payloads.""" - # Given a JSON payload as bytes - state = sample_state() - raw_bytes = state.model_dump_json().encode("utf-8") +def test_parse_json_payload_accepts_bytes() -> None: + """Test JSON parsing handles bytes payloads.""" + # Given: A JSON payload as bytes + json_bytes = b'{"status": "uploaded", "camera_name": "front_door"}' - # When parsing bytes - parsed = PostgresStateStore._parse_state_data(raw_bytes) + # When: Parsing bytes + result = _parse_json_payload(json_bytes) - # Then bytes parse to dict - assert parsed["camera_name"] == "front_door" - ClipStateData.model_validate(parsed) + # Then: Bytes are parsed to dict + assert result == {"status": "uploaded", "camera_name": "front_door"} -def test_normalize_async_dsn() -> None: - """Test DSN normalization adds asyncpg driver.""" - # Given different Postgres DSN formats +def test_normalize_async_dsn_postgresql() -> None: + """Test DSN normalization adds asyncpg driver for PostgreSQL.""" + # Given: Different Postgres DSN formats dsn_plain = "postgresql://user:pass@localhost/db" dsn_short = "postgres://user:pass@localhost/db" dsn_async = "postgresql+asyncpg://user:pass@localhost/db" - # When normalizing DSNs + # When: Normalizing DSNs norm_plain = _normalize_async_dsn(dsn_plain) norm_short = _normalize_async_dsn(dsn_short) norm_async = _normalize_async_dsn(dsn_async) - # Then asyncpg is used - assert norm_plain.startswith("postgresql+asyncpg://") - assert norm_short.startswith("postgresql+asyncpg://") + # Then: asyncpg is used + assert norm_plain == "postgresql+asyncpg://user:pass@localhost/db" + assert norm_short == "postgresql+asyncpg://user:pass@localhost/db" + assert norm_async == dsn_async + + +def test_normalize_async_dsn_sqlite() -> None: + """Test DSN normalization adds aiosqlite driver for SQLite.""" + # Given: Different SQLite DSN formats + dsn_plain = "sqlite:///test.db" + dsn_memory = "sqlite:///:memory:" + dsn_async = "sqlite+aiosqlite:///:memory:" + + # When: Normalizing DSNs + norm_plain = _normalize_async_dsn(dsn_plain) + norm_memory = _normalize_async_dsn(dsn_memory) + norm_async = _normalize_async_dsn(dsn_async) + + # Then: aiosqlite is used + assert norm_plain == "sqlite+aiosqlite:///test.db" + assert norm_memory == "sqlite+aiosqlite:///:memory:" assert norm_async == dsn_async +def test_dialect_helper_detects_postgresql() -> None: + """Test DialectHelper correctly detects PostgreSQL dialect.""" + # Given: A PostgreSQL DSN + dsn = "postgresql://user:pass@localhost/db" + + # When: Creating DialectHelper + helper = DialectHelper.from_dsn(dsn) + + # Then: Dialect is PostgreSQL + assert helper.dialect_name == "postgresql" + assert helper.is_postgres is True + assert helper.is_sqlite is False + + +def test_dialect_helper_detects_sqlite() -> None: + """Test DialectHelper correctly detects SQLite dialect.""" + # Given: A SQLite DSN + dsn = "sqlite:///:memory:" + + # When: Creating DialectHelper + helper = DialectHelper.from_dsn(dsn) + + # Then: Dialect is SQLite + assert helper.dialect_name == "sqlite" + assert helper.is_postgres is False + assert helper.is_sqlite is True + + +# ============================================================================= +# Integration Tests (parametrized for both backends) +# ============================================================================= + + +def sample_state(clip_id: str = "test_clip_001") -> ClipStateData: + """Create a sample ClipStateData for testing.""" + return ClipStateData( + camera_name="front_door", + status="queued_local", + local_path="/tmp/test.mp4", + ) + + @pytest.mark.asyncio -async def test_upsert_and_get_roundtrip(state_store: PostgresStateStore) -> None: +async def test_upsert_and_get_roundtrip(state_store: SQLAlchemyStateStore) -> None: """Test that upsert and get work correctly.""" - # Given a clip state + # Given: A clip state clip_id = "test_roundtrip_001" state = sample_state(clip_id) - # When upserting and fetching + # When: Upserting and fetching await state_store.upsert(clip_id, state) retrieved = await state_store.get(clip_id) - # Then the roundtrip returns the state + # Then: The roundtrip returns the state assert retrieved is not None assert retrieved.camera_name == "front_door" assert retrieved.status == "queued_local" @pytest.mark.asyncio -async def test_upsert_updates_existing(state_store: PostgresStateStore) -> None: +async def test_upsert_updates_existing(state_store: SQLAlchemyStateStore) -> None: """Test that upsert updates existing records.""" - # Given an existing clip state + # Given: An existing clip state clip_id = "test_update_001" state = sample_state(clip_id) - # When inserting and updating + # When: Inserting and updating await state_store.upsert(clip_id, state) state.status = "uploaded" state.storage_uri = "dropbox:/front_door/test.mp4" await state_store.upsert(clip_id, state) - # Then the updated fields are persisted + # Then: The updated fields are persisted retrieved = await state_store.get(clip_id) assert retrieved is not None assert retrieved.status == "uploaded" @@ -136,37 +172,43 @@ async def test_upsert_updates_existing(state_store: PostgresStateStore) -> None: @pytest.mark.asyncio -async def test_get_returns_none_for_missing(state_store: PostgresStateStore) -> None: +async def test_get_returns_none_for_missing(state_store: SQLAlchemyStateStore) -> None: """Test that get returns None for non-existent clip_id.""" - # Given a missing clip id - # When retrieving a missing clip id + # Given: A missing clip id + # When: Retrieving a missing clip id result = await state_store.get("test_nonexistent_999") - # Then None is returned + + # Then: None is returned assert result is None @pytest.mark.asyncio -async def test_ping_returns_true(state_store: PostgresStateStore) -> None: +async def test_ping_returns_true(state_store: SQLAlchemyStateStore) -> None: """Test that ping returns True when connected.""" - # Given an initialized store - # When ping is called + # Given: An initialized store + # When: Ping is called result = await state_store.ping() - # Then ping is True + + # Then: Ping is True assert result is True @pytest.mark.asyncio -async def test_graceful_degradation_uninitialized() -> None: +async def test_graceful_degradation_uninitialized(db_backend: str) -> None: """Test graceful degradation when store is not initialized.""" - # Given an uninitialized store - store = PostgresStateStore("postgresql://invalid:5432/nonexistent") - - # When operations are called + # Given: An uninitialized store with invalid DSN + if db_backend == "postgresql": + store = SQLAlchemyStateStore("postgresql://invalid:5432/nonexistent") + else: + # For SQLite, use a path that doesn't exist and can't be created + store = SQLAlchemyStateStore("sqlite:////nonexistent/path/db.sqlite") + + # When: Operations are called without initialization await store.upsert("test_fail", sample_state()) result = await store.get("test_fail") ping = await store.ping() - # Then operations degrade gracefully + # Then: Operations degrade gracefully assert result is None assert ping is False @@ -174,20 +216,25 @@ async def test_graceful_degradation_uninitialized() -> None: @pytest.mark.asyncio -async def test_initialize_returns_false_on_bad_dsn() -> None: +async def test_initialize_returns_false_on_bad_dsn(db_backend: str) -> None: """Test that initialize returns False for invalid DSN.""" - # Given an invalid DSN - store = PostgresStateStore("postgresql://invalid:5432/nonexistent") - # When initializing + # Given: An invalid DSN + if db_backend == "postgresql": + store = SQLAlchemyStateStore("postgresql://invalid:5432/nonexistent") + else: + store = SQLAlchemyStateStore("sqlite:////nonexistent/path/db.sqlite") + + # When: Initializing result = await store.initialize() - # Then initialization fails + + # Then: Initialization fails assert result is False await store.shutdown() @pytest.mark.asyncio async def test_list_candidate_clips_for_cleanup_skips_deleted_and_filters_camera( - state_store: PostgresStateStore, + state_store: SQLAlchemyStateStore, ) -> None: """Cleanup listing should skip deleted clips and respect camera filter.""" # Given: Three clip states (one deleted) @@ -257,3 +304,47 @@ async def test_list_candidate_clips_for_cleanup_skips_deleted_and_filters_camera # Then: Only that camera's non-deleted rows are returned assert ids_front == {"test-cleanup-a"} + + +@pytest.mark.asyncio +async def test_event_store_append_and_get(state_store: SQLAlchemyStateStore) -> None: + """Test that events can be appended and retrieved.""" + from datetime import datetime, timezone + + from homesec.models.events import ClipRecordedEvent + + # Given: A clip state and an event + clip_id = "test_events_001" + state = sample_state(clip_id) + await state_store.upsert(clip_id, state) + + event_store = state_store.create_event_store() + event = ClipRecordedEvent( + clip_id=clip_id, + timestamp=datetime.now(timezone.utc), + camera_name="front_door", + local_path="/tmp/test.mp4", + duration_s=10.5, + source_type="rtsp", + ) + + # When: Appending and retrieving the event + await event_store.append(event) + events = await event_store.get_events(clip_id) + + # Then: The event is retrieved + assert len(events) == 1 + assert events[0].clip_id == clip_id + assert events[0].event_type == "clip_recorded" + + +@pytest.mark.asyncio +async def test_dialect_is_set_after_initialization(state_store: SQLAlchemyStateStore) -> None: + """Test that dialect helper is properly set after initialization.""" + # Given: An initialized state store + # When: Checking the dialect + dialect = state_store.dialect + + # Then: Dialect is set and valid + assert dialect is not None + assert dialect.dialect_name in ("postgresql", "sqlite") diff --git a/uv.lock b/uv.lock index 5061bec..0bfdb6b 100644 --- a/uv.lock +++ b/uv.lock @@ -152,6 +152,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fb/76/641ae371508676492379f16e2fa48f4e2c11741bd63c48be4b12a6b09cba/aiosignal-1.4.0-py3-none-any.whl", hash = "sha256:053243f8b92b990551949e63930a839ff0cf0b0ebbe0597b0f3fb19e1a0fe82e", size = 7490, upload-time = "2025-07-03T22:54:42.156Z" }, ] +[[package]] +name = "aiosqlite" +version = "0.22.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/4e/8a/64761f4005f17809769d23e518d915db74e6310474e733e3593cfc854ef1/aiosqlite-0.22.1.tar.gz", hash = "sha256:043e0bd78d32888c0a9ca90fc788b38796843360c855a7262a532813133a0650", size = 14821, upload-time = "2025-12-23T19:25:43.997Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/00/b7/e3bf5133d697a08128598c8d0abc5e16377b51465a33756de24fa7dee953/aiosqlite-0.22.1-py3-none-any.whl", hash = "sha256:21c002eb13823fad740196c5a2e9d8e62f6243bd9e7e4a1f87fb5e44ecb4fceb", size = 17405, upload-time = "2025-12-23T19:25:42.139Z" }, +] + [[package]] name = "alembic" version = "1.17.2" @@ -1125,10 +1134,11 @@ wheels = [ [[package]] name = "homesec" -version = "1.1.0" +version = "1.1.1" source = { editable = "." } dependencies = [ { name = "aiohttp" }, + { name = "aiosqlite" }, { name = "alembic" }, { name = "anyio" }, { name = "asyncpg" }, @@ -1161,6 +1171,7 @@ dev = [ [package.metadata] requires-dist = [ { name = "aiohttp", specifier = ">=3.13.2" }, + { name = "aiosqlite", specifier = ">=0.19.0" }, { name = "alembic", specifier = ">=1.13.0" }, { name = "anyio", specifier = ">=4.0.0" }, { name = "asyncpg", specifier = ">=0.29.0" },