diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4932edc..6af4dd1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -73,7 +73,7 @@ jobs: run: alembic upgrade head && alembic check - name: Upload coverage to Codecov - uses: codecov/codecov-action@v6 + uses: codecov/codecov-action@57e3a136b779b570ffcdbf80b3bdc90e7fab3de2 # v6 with: files: server/coverage.xml flags: backend diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 245611d..ce624f5 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -187,7 +187,7 @@ jobs: EOF - name: Create release - uses: softprops/action-gh-release@v2 + uses: softprops/action-gh-release@153bb8e04406b158c6c84fc1615b65b24149a1fe # v2 with: tag_name: ${{ github.ref_name }} name: "WrzDJ ${{ github.ref_name }}" diff --git a/server/alembic/versions/033_add_user_token_version.py b/server/alembic/versions/033_add_user_token_version.py new file mode 100644 index 0000000..8ece383 --- /dev/null +++ b/server/alembic/versions/033_add_user_token_version.py @@ -0,0 +1,27 @@ +"""Add token_version column to users table. + +SECURITY (CRIT-2): enables JWT revocation. Every JWT carries a `tv` claim +that must match the user's token_version. Bumping the version (via logout +or admin action) invalidates all outstanding tokens for that user. + +Revision ID: 033 +Revises: 032 +""" + +import sqlalchemy as sa + +from alembic import op + +revision = "033" +down_revision = "032" + + +def upgrade() -> None: + op.add_column( + "users", + sa.Column("token_version", sa.Integer(), nullable=False, server_default="0"), + ) + + +def downgrade() -> None: + op.drop_column("users", "token_version") diff --git a/server/app/api/auth.py b/server/app/api/auth.py index d4dbbbd..c9b2069 100644 --- a/server/app/api/auth.py +++ b/server/app/api/auth.py @@ -66,10 +66,27 @@ def login( if settings.is_lockout_enabled: lockout_manager.record_success(client_ip, username) - access_token = create_access_token(data={"sub": user.username}) + access_token = create_access_token(data={"sub": user.username, "tv": user.token_version}) return Token(access_token=access_token) +@router.post("/logout", response_model=StatusMessageResponse) +@limiter.limit("30/minute") +def logout( + request: Request, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +) -> StatusMessageResponse: + """Invalidate all outstanding JWTs for the current user. + + SECURITY (CRIT-2): bumps token_version so every previously-issued JWT + for this user fails the version check in get_current_user. + """ + current_user.token_version += 1 + db.commit() + return StatusMessageResponse(status="ok", message="Logged out") + + @router.get("/me", response_model=UserOut) @limiter.limit("60/minute") def get_me(request: Request, current_user: User = Depends(get_current_user)) -> User: diff --git a/server/app/api/deps.py b/server/app/api/deps.py index ab04ffe..6160c98 100644 --- a/server/app/api/deps.py +++ b/server/app/api/deps.py @@ -36,6 +36,9 @@ def get_current_user(db: Session = Depends(get_db), token: str = Depends(oauth2_ raise credentials_exception if not user.is_active: raise HTTPException(status_code=400, detail="Inactive user") + # CRIT-2: reject tokens whose version doesn't match the user's current version + if token_data.token_version != user.token_version: + raise credentials_exception return user diff --git a/server/app/api/kiosk.py b/server/app/api/kiosk.py index d69ee47..1ea9f50 100644 --- a/server/app/api/kiosk.py +++ b/server/app/api/kiosk.py @@ -45,6 +45,19 @@ def _resolve_event_name(db: Session, event_code: str | None) -> str | None: return event.name if event else None +def _assert_caller_owns_event(event: Event, user: User) -> None: + """Enforce that the caller owns the target event (or is an admin). + + SECURITY (CRIT-3, CRIT-4): before this check, any DJ could pair or + reassign a kiosk to an event owned by another DJ by supplying the + victim's event code. See docs/security/audit-2026-04-08.md. + """ + if user.role == "admin": + return + if event.created_by_user_id != user.id: + raise HTTPException(status_code=403, detail="You do not own this event") + + # ── Public endpoints ─────────────────────────────────────────────────── @@ -134,6 +147,9 @@ def complete_kiosk_pairing( if not event: raise HTTPException(status_code=404, detail="Event not found") + # CRIT-3: caller must own the target event (or be admin) + _assert_caller_owns_event(event, current_user) + try: complete_pairing(db, kiosk, event.code, current_user.id) except ValueError as e: @@ -204,6 +220,9 @@ def assign_kiosk( if not event: raise HTTPException(status_code=404, detail="Event not found") + # CRIT-4: caller must own the target event (or be admin) + _assert_caller_owns_event(event, current_user) + assign_kiosk_event(db, kiosk, event.code) return KioskOut( id=kiosk.id, diff --git a/server/app/api/sse.py b/server/app/api/sse.py index 7d4ec22..aa6e3ca 100644 --- a/server/app/api/sse.py +++ b/server/app/api/sse.py @@ -5,9 +5,13 @@ import logging from typing import Any -from fastapi import APIRouter, Request +from fastapi import APIRouter, Depends, HTTPException, Request +from sqlalchemy.orm import Session from sse_starlette.sse import EventSourceResponse +from app.api.deps import get_db +from app.core.rate_limit import limiter +from app.services.event import EventLookupResult, get_event_by_code_with_status from app.services.event_bus import get_event_bus logger = logging.getLogger(__name__) @@ -46,9 +50,19 @@ async def _event_generator( @router.get("/events/{code}/stream") -async def event_stream(code: str, request: Request) -> EventSourceResponse: +@limiter.limit("10/minute") +async def event_stream( + code: str, + request: Request, + db: Session = Depends(get_db), +) -> EventSourceResponse: """Public SSE endpoint for real-time event updates. + SECURITY (CRIT-5): rate-limited and existence-checked. Before this fix, + the endpoint had no rate limit and no existence check, allowing + unauthenticated DoS (unlimited long-lived connections exhausting FDs) + and passive eavesdropping via 6-char event-code brute force. + Event types: - request_created: New request submitted - request_status_changed: Request status update @@ -56,8 +70,16 @@ async def event_stream(code: str, request: Request) -> EventSourceResponse: - requests_bulk_update: Batch accept/reject - bridge_status_changed: Bridge connect/disconnect """ + event, result = get_event_by_code_with_status(db, code) + if result == EventLookupResult.NOT_FOUND: + raise HTTPException(status_code=404, detail="Event not found") + if result == EventLookupResult.ARCHIVED: + raise HTTPException(status_code=410, detail="Event has been archived") + if result == EventLookupResult.EXPIRED: + raise HTTPException(status_code=410, detail="Event has expired") + return EventSourceResponse( - _event_generator(request, code), + _event_generator(request, event.code), media_type="text/event-stream", headers={"X-Accel-Buffering": "no"}, ) diff --git a/server/app/models/user.py b/server/app/models/user.py index d0178ba..61e3933 100644 --- a/server/app/models/user.py +++ b/server/app/models/user.py @@ -2,7 +2,7 @@ from datetime import datetime from enum import Enum -from sqlalchemy import Boolean, DateTime, String, Text +from sqlalchemy import Boolean, DateTime, Integer, String, Text from sqlalchemy.orm import Mapped, mapped_column, relationship from app.core.encryption import EncryptedText @@ -27,6 +27,12 @@ class User(Base): email: Mapped[str | None] = mapped_column(String(255), unique=True, nullable=True, index=True) created_at: Mapped[datetime] = mapped_column(DateTime, default=utcnow) + # SECURITY (CRIT-2): JWT revocation — bumped on logout or admin force-revoke. + # Every JWT carries a `tv` claim that must match this value on decode. + token_version: Mapped[int] = mapped_column( + Integer, nullable=False, default=0, server_default="0" + ) + # Tidal OAuth tokens (encrypted at rest via Fernet) tidal_access_token: Mapped[str | None] = mapped_column(EncryptedText, nullable=True) tidal_refresh_token: Mapped[str | None] = mapped_column(EncryptedText, nullable=True) diff --git a/server/app/schemas/auth.py b/server/app/schemas/auth.py index b1901b4..0f4db60 100644 --- a/server/app/schemas/auth.py +++ b/server/app/schemas/auth.py @@ -8,3 +8,4 @@ class Token(BaseModel): class TokenData(BaseModel): username: str | None = None + token_version: int = 0 diff --git a/server/app/services/auth.py b/server/app/services/auth.py index a6a0af7..f67f719 100644 --- a/server/app/services/auth.py +++ b/server/app/services/auth.py @@ -10,6 +10,11 @@ settings = get_settings() +# SECURITY (CRIT-1): JWT algorithm is a security invariant and must NEVER be +# sourced from config. Hardcoding prevents an `alg=none` confusion attack via +# env-var manipulation. See docs/security/audit-2026-04-08.md CRIT-1. +_JWT_ALGORITHM = "HS256" + def verify_password(plain_password: str, hashed_password: str) -> bool: return bcrypt.checkpw(plain_password.encode("utf-8"), hashed_password.encode("utf-8")) @@ -26,17 +31,20 @@ def create_access_token(data: dict, expires_delta: timedelta | None = None) -> s else: expire = datetime.now(UTC) + timedelta(minutes=settings.jwt_expire_minutes) to_encode.update({"exp": expire}) - encoded_jwt = jwt.encode(to_encode, settings.jwt_secret, algorithm=settings.jwt_algorithm) + encoded_jwt = jwt.encode(to_encode, settings.jwt_secret, algorithm=_JWT_ALGORITHM) return encoded_jwt def decode_token(token: str) -> TokenData | None: try: - payload = jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_algorithm]) + payload = jwt.decode(token, settings.jwt_secret, algorithms=[_JWT_ALGORITHM]) username: str = payload.get("sub") if username is None: return None - return TokenData(username=username) + # CRIT-2: reject tokens without the tv claim (legacy pre-fix tokens) + if "tv" not in payload: + return None + return TokenData(username=username, token_version=payload["tv"]) except jwt.PyJWTError: return None diff --git a/server/tests/test_auth_jwt_algorithm.py b/server/tests/test_auth_jwt_algorithm.py new file mode 100644 index 0000000..ab1d8c7 --- /dev/null +++ b/server/tests/test_auth_jwt_algorithm.py @@ -0,0 +1,51 @@ +"""TDD guard for CRIT-1 — JWT algorithm must be hardcoded, not config-sourced. + +An operator (or attacker with env write access) setting JWT_ALGORITHM=none +must NOT silently disable signature verification. The accepted-algorithm list +is a security invariant and must never come from config. +""" + +import jwt + +from app.core.config import get_settings +from app.services.auth import create_access_token, decode_token + +settings = get_settings() + + +class TestJwtAlgorithmHardcoded: + """CRIT-1 guard: decode only accepts HS256, regardless of settings.""" + + def test_decode_rejects_none_algorithm(self): + """A token signed with alg=none must be rejected.""" + unsigned = jwt.encode({"sub": "attacker"}, "", algorithm="none") + assert decode_token(unsigned) is None + + def test_decode_rejects_hs512_token(self): + """Only HS256 is accepted. An HS512 token (even forged with the + real secret) must be rejected — the algorithm whitelist is the + security boundary, not the secret.""" + token = jwt.encode({"sub": "attacker"}, settings.jwt_secret, algorithm="HS512") + assert decode_token(token) is None + + def test_encode_uses_hs256_regardless_of_setting(self, monkeypatch): + """Even if settings.jwt_algorithm is mutated at runtime to an + insecure value, encode must still emit HS256.""" + monkeypatch.setattr(settings, "jwt_algorithm", "none", raising=False) + token = create_access_token({"sub": "alice", "tv": 0}) + header = jwt.get_unverified_header(token) + assert header["alg"] == "HS256" + + def test_decode_accepts_valid_hs256(self): + """Sanity: a legitimate HS256 token still decodes.""" + token = create_access_token({"sub": "alice", "tv": 0}) + td = decode_token(token) + assert td is not None + assert td.username == "alice" + + def test_decode_rejects_none_alg_even_if_setting_mutated(self, monkeypatch): + """Even if an attacker could flip settings.jwt_algorithm to 'none' + at runtime, the decode path must still reject unsigned tokens.""" + monkeypatch.setattr(settings, "jwt_algorithm", "none", raising=False) + unsigned = jwt.encode({"sub": "attacker"}, "", algorithm="none") + assert decode_token(unsigned) is None diff --git a/server/tests/test_auth_jwt_revocation.py b/server/tests/test_auth_jwt_revocation.py new file mode 100644 index 0000000..b560527 --- /dev/null +++ b/server/tests/test_auth_jwt_revocation.py @@ -0,0 +1,136 @@ +"""TDD guard for CRIT-2 — JWT revocation via token_version. + +Before the fix, JWTs had no revocation mechanism. A stolen token was +valid for the full 24-hour TTL, regardless of password change, logout, +or role demotion. No jti, no token_version, no deny-list. + +The fix embeds a `tv` (token_version) claim in every JWT. On decode, +the claim is compared against the user's `token_version` column. A +POST /api/auth/logout bumps the counter, invalidating all outstanding +tokens for that user. + +See docs/security/audit-2026-04-08.md CRIT-2. +""" + +import jwt +from fastapi.testclient import TestClient +from sqlalchemy.orm import Session + +from app.core.config import get_settings +from app.models.user import User +from app.services.auth import _JWT_ALGORITHM + +settings = get_settings() + + +class TestJwtTokenVersion: + """CRIT-2 guard: JWTs must be revocable via token_version.""" + + def test_fresh_login_includes_token_version_claim( + self, + client: TestClient, + test_user: User, + ): + """Every JWT must carry a 'tv' claim matching the user's token_version.""" + resp = client.post( + "/api/auth/login", + data={"username": "testuser", "password": "testpassword123"}, + ) + assert resp.status_code == 200 + token = resp.json()["access_token"] + claims = jwt.decode(token, settings.jwt_secret, algorithms=[_JWT_ALGORITHM]) + assert "tv" in claims, "JWT must include 'tv' (token_version) claim" + assert claims["tv"] == 0 + + def test_logout_invalidates_existing_token( + self, + client: TestClient, + test_user: User, + ): + """After calling /api/auth/logout, that same token must be rejected.""" + login = client.post( + "/api/auth/login", + data={"username": "testuser", "password": "testpassword123"}, + ) + token = login.json()["access_token"] + headers = {"Authorization": f"Bearer {token}"} + + # Token works before logout + assert client.get("/api/auth/me", headers=headers).status_code == 200 + + # Logout bumps token_version + resp = client.post("/api/auth/logout", headers=headers) + assert resp.status_code == 200 + + # Same token no longer works + assert client.get("/api/auth/me", headers=headers).status_code == 401 + + def test_old_token_rejected_after_db_version_bump( + self, + client: TestClient, + test_user: User, + db: Session, + ): + """Server-side bump of token_version (e.g., admin force-logout) + must invalidate all outstanding tokens for that user.""" + login = client.post( + "/api/auth/login", + data={"username": "testuser", "password": "testpassword123"}, + ) + token = login.json()["access_token"] + headers = {"Authorization": f"Bearer {token}"} + + # Bump directly in DB (simulates admin action) + user = db.query(User).filter(User.username == "testuser").first() + user.token_version += 1 + db.commit() + + resp = client.get("/api/auth/me", headers=headers) + assert resp.status_code == 401 + + def test_token_without_tv_claim_rejected( + self, + client: TestClient, + test_user: User, + ): + """Legacy tokens (without tv claim) from a pre-fix build + must be rejected after deployment.""" + legacy = jwt.encode( + {"sub": "testuser"}, + settings.jwt_secret, + algorithm=_JWT_ALGORITHM, + ) + resp = client.get( + "/api/auth/me", + headers={"Authorization": f"Bearer {legacy}"}, + ) + assert resp.status_code == 401 + + def test_new_login_after_logout_works( + self, + client: TestClient, + test_user: User, + ): + """After logout, a fresh login must produce a valid token.""" + login1 = client.post( + "/api/auth/login", + data={"username": "testuser", "password": "testpassword123"}, + ) + headers1 = {"Authorization": f"Bearer {login1.json()['access_token']}"} + + # Logout first session + client.post("/api/auth/logout", headers=headers1) + + # Login again + login2 = client.post( + "/api/auth/login", + data={"username": "testuser", "password": "testpassword123"}, + ) + assert login2.status_code == 200 + headers2 = {"Authorization": f"Bearer {login2.json()['access_token']}"} + + # New token works + assert client.get("/api/auth/me", headers=headers2).status_code == 200 + + # Old token still doesn't + assert client.get("/api/auth/me", headers=headers1).status_code == 401 diff --git a/server/tests/test_kiosk_api.py b/server/tests/test_kiosk_api.py index 69a7310..bb0223e 100644 --- a/server/tests/test_kiosk_api.py +++ b/server/tests/test_kiosk_api.py @@ -420,3 +420,118 @@ def test_403_non_owner( def test_404_nonexistent(self, client: TestClient, auth_headers: dict): resp = client.delete("/api/kiosk/99999", headers=auth_headers) assert resp.status_code == 404 + + +class TestKioskIdor: + """TDD guard for CRIT-3 and CRIT-4 — kiosk pairing/reassignment IDOR. + + Before the fix, any authenticated DJ could pair or reassign a kiosk + to an event owned by another DJ, simply by knowing (or brute-forcing) + the target event code. The fix enforces that the caller owns the + target event (or is an admin). + + See docs/security/audit-2026-04-08.md CRIT-3 and CRIT-4. + """ + + @staticmethod + def _make_victim_event(db: Session, owner: User, code: str = "VICTIM") -> Event: + evt = Event( + code=code, + name="Victim Event", + created_by_user_id=owner.id, + expires_at=utcnow() + timedelta(hours=6), + ) + db.add(evt) + db.commit() + db.refresh(evt) + return evt + + def test_complete_pairing_rejects_non_owned_event( + self, + client: TestClient, + db: Session, + auth_headers: dict, + admin_user: User, + ): + """CRIT-3: a DJ must not be able to pair a kiosk to someone else's event.""" + victim_event = self._make_victim_event(db, admin_user) + kiosk = create_kiosk(db) + + resp = client.post( + f"/api/kiosk/pair/{kiosk.pair_code}/complete", + json={"event_code": victim_event.code}, + headers=auth_headers, # test_user, NOT the victim (admin_user) + ) + assert resp.status_code == 403 + # Kiosk must still be in 'pairing' state — not silently paired + db.refresh(kiosk) + assert kiosk.status == "pairing" + assert kiosk.event_code is None + + def test_assign_kiosk_rejects_non_owned_event( + self, + client: TestClient, + db: Session, + auth_headers: dict, + test_user: User, + test_event: Event, + admin_user: User, + ): + """CRIT-4: a DJ must not be able to reassign their own kiosk to someone else's event.""" + kiosk = create_kiosk(db) + complete_pairing(db, kiosk, test_event.code, test_user.id) + victim_event = self._make_victim_event(db, admin_user) + + resp = client.patch( + f"/api/kiosk/{kiosk.id}/assign", + json={"event_code": victim_event.code}, + headers=auth_headers, + ) + assert resp.status_code == 403 + # Kiosk still pointing at the original event + db.refresh(kiosk) + assert kiosk.event_code == test_event.code + + def test_admin_can_pair_kiosk_to_any_event( + self, + client: TestClient, + db: Session, + admin_headers: dict, + test_user: User, + test_event: Event, + ): + """Admin bypass must still work — admins can manage any event's kiosks.""" + kiosk = create_kiosk(db) + resp = client.post( + f"/api/kiosk/pair/{kiosk.pair_code}/complete", + json={"event_code": test_event.code}, # owned by test_user, not admin + headers=admin_headers, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "active" + assert data["event_code"] == test_event.code + + def test_admin_can_reassign_any_kiosk_to_any_event( + self, + client: TestClient, + db: Session, + admin_headers: dict, + admin_user: User, + test_user: User, + test_event: Event, + ): + """Admin bypass for reassignment.""" + # Admin owns a kiosk (paired by admin to their own event) + admin_event = self._make_victim_event(db, admin_user, code="ADMIN1") + kiosk = create_kiosk(db) + complete_pairing(db, kiosk, admin_event.code, admin_user.id) + + # Admin reassigns their kiosk to test_user's event + resp = client.patch( + f"/api/kiosk/{kiosk.id}/assign", + json={"event_code": test_event.code}, + headers=admin_headers, + ) + assert resp.status_code == 200 + assert resp.json()["event_code"] == test_event.code diff --git a/server/tests/test_sse_security.py b/server/tests/test_sse_security.py new file mode 100644 index 0000000..b4eb6ee --- /dev/null +++ b/server/tests/test_sse_security.py @@ -0,0 +1,130 @@ +"""TDD guard for CRIT-5 — SSE stream must validate event existence +and rate-limit connection opens. + +Before the fix, GET /api/public/events/{code}/stream had no auth, +no existence check, and no rate limit. An unauthenticated attacker +could open unlimited long-lived SSE connections (DoS) or brute-force +6-char event codes to passively eavesdrop on real-time events. + +See docs/security/audit-2026-04-08.md CRIT-5. +""" + +from datetime import timedelta + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy.orm import Session + +from app.core.time import utcnow +from app.models.event import Event +from app.models.user import User + + +class TestSseExistenceCheck: + """CRIT-5 guard: existence check must run before the async generator.""" + + def test_sse_returns_404_for_unknown_event(self, client: TestClient): + """An unknown event code must return 404 immediately, + not silently subscribe to a ghost event bus channel.""" + resp = client.get( + "/api/public/events/NONEXIST/stream", + headers={"Accept": "text/event-stream"}, + ) + assert resp.status_code == 404 + + def test_sse_returns_404_for_archived_event( + self, + client: TestClient, + db: Session, + test_user: User, + ): + """Archived events must not be streamable.""" + evt = Event( + code="ARCHIV", + name="Archived", + created_by_user_id=test_user.id, + expires_at=utcnow() + timedelta(hours=6), + archived_at=utcnow(), + ) + db.add(evt) + db.commit() + + resp = client.get( + f"/api/public/events/{evt.code}/stream", + headers={"Accept": "text/event-stream"}, + ) + assert resp.status_code in (404, 410) + + def test_sse_returns_404_for_expired_event( + self, + client: TestClient, + db: Session, + test_user: User, + ): + """Expired events must not be streamable.""" + evt = Event( + code="EXPIRD", + name="Expired", + created_by_user_id=test_user.id, + expires_at=utcnow() - timedelta(hours=1), + ) + db.add(evt) + db.commit() + + resp = client.get( + f"/api/public/events/{evt.code}/stream", + headers={"Accept": "text/event-stream"}, + ) + assert resp.status_code in (404, 410) + + +class TestSseRateLimit: + """CRIT-5 guard: rate-limit must fire on connection open.""" + + @pytest.fixture(autouse=True) + def _enable_rate_limit(self, monkeypatch): + """Force-enable the rate limiter for this test class. + + Rate limiting is disabled by default in dev/tests via + settings.is_rate_limit_enabled, so we monkeypatch the limiter + instance's enabled attribute directly and reset its storage. + """ + from app.core.rate_limit import limiter + + original_enabled = limiter.enabled + limiter.enabled = True + try: + # Clear any residual rate-limit state from prior tests + limiter.reset() + except Exception: + pass + yield + limiter.enabled = original_enabled + try: + limiter.reset() + except Exception: + pass + + def test_sse_rate_limited_per_ip( + self, + client: TestClient, + ): + """Rapidly opening >10 SSE connections per minute must return 429 + on at least one of the excess attempts. + + Uses a nonexistent event code: the rate-limit decorator fires + BEFORE the route body, so the first 10 requests return 404 and + subsequent requests return 429. This avoids opening real SSE + streams (which would hang the TestClient).""" + responses = [] + for _ in range(15): + r = client.get( + "/api/public/events/NOTEXIST/stream", + headers={"Accept": "text/event-stream"}, + ) + responses.append(r.status_code) + assert 429 in responses, ( + f"Expected at least one 429 among 15 rapid requests, got {responses}" + ) + # Sanity: early requests should be 404 (existence check runs after limit) + assert 404 in responses diff --git a/server/tests/test_workflow_pins.py b/server/tests/test_workflow_pins.py new file mode 100644 index 0000000..3654f2b --- /dev/null +++ b/server/tests/test_workflow_pins.py @@ -0,0 +1,60 @@ +"""TDD guard for CRIT-6 — third-party GitHub Actions must be pinned to +40-character commit SHAs, not floating tags. + +A compromised tag (see the tj-actions/changed-files attack, March 2025) +could exfiltrate GITHUB_TOKEN, WINGET_PAT, and inject malicious code +into bridge-app installers shipped to DJs. The release.yml workflow has +`contents: write` and publishes binaries directly. + +This guardrail runs on every CI run and treats any third-party `uses:` +without a 40-char SHA as a test failure. First-party actions (owned by +GitHub, e.g. actions/checkout, github/codeql-action) are exempt. + +See docs/security/audit-2026-04-08.md CRIT-6. +""" + +import re +from pathlib import Path + +import pytest +import yaml + +WORKFLOW_DIR = Path(__file__).resolve().parents[2] / ".github" / "workflows" +SHA_PATTERN = re.compile(r"^[0-9a-f]{40}$") + +# First-party actions (owned by GitHub) — exempt from SHA pinning +FIRST_PARTY_PREFIXES = ("actions/", "github/") + + +def _iter_uses(workflow: dict): + """Yield every `uses:` string from a parsed workflow.""" + for job in (workflow.get("jobs") or {}).values(): + for step in job.get("steps") or []: + uses = step.get("uses") + if uses: + yield uses + + +@pytest.mark.parametrize( + "workflow_path", + sorted(WORKFLOW_DIR.glob("*.yml")), + ids=lambda p: p.name, +) +def test_third_party_actions_pinned_to_sha(workflow_path: Path): + """Every third-party action must use a full 40-char commit SHA, e.g. + softprops/action-gh-release@de2c0eb89ae2a093876385947365aca7b0e5f844 # v2.1.0 + """ + data = yaml.safe_load(workflow_path.read_text()) + offenders = [] + for uses in _iter_uses(data): + if uses.startswith("./") or uses.startswith("docker://"): + continue + action, _, ref = uses.partition("@") + if any(action.startswith(p) for p in FIRST_PARTY_PREFIXES): + continue + if not SHA_PATTERN.match(ref): + offenders.append(f"{action}@{ref}") + assert not offenders, ( + f"{workflow_path.name}: third-party actions must be pinned to 40-char SHA, " + f"found: {offenders}" + )