-
Notifications
You must be signed in to change notification settings - Fork 0
Build cryptographic secret gateway service #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Build cryptographic secret gateway service #2
Conversation
Implement comprehensive crypto service with: - Cryptographically secure random secret generation (bytes, hex, urlsafe) - Fernet symmetric encryption/decryption with TTL support - SHA-256 hashing with salt and constant-time verification - PBKDF2 key derivation from passwords (480k iterations) - Key rotation support for encrypted data - Comprehensive test suite (41 tests, 100% pass rate) Security features: - Uses secrets module for CSPRNG - Fernet provides authenticated encryption (AES-128-CBC + HMAC) - Constant-time hash comparison to prevent timing attacks - OWASP-recommended PBKDF2 iterations Dependencies: - Add cryptography==42.0.5 - Add .gitignore for Python cache files
Implement comprehensive token management system with:
- Short-lived token issuance with configurable TTL (1s-3600s)
- In-memory token store with automatic expiry cleanup
- Thread-safe token operations with RLock
- Token validation with TTL remaining calculation
- Token revocation support
Token Models:
- TokenScope: Resource and action-based permissions
- Token: Ephemeral token with expiry tracking
- Pydantic schemas for API requests/responses
API Endpoints (/tokens):
- POST /issue: Issue new ephemeral token
- POST /validate: Validate token and check expiry
- DELETE /{token_id}: Revoke token
- GET /stats: Active token statistics
- POST /cleanup: Manual expired token cleanup
Features:
- Automatic background cleanup of expired tokens (60s interval)
- TTL validation (1-3600 seconds)
- Unique token IDs using cryptographically secure random generation
- Token metadata support for custom attributes
- Comprehensive test suite (50 tests, 100% pass rate)
Testing:
- 33 service/store tests (issuance, validation, expiry, concurrency)
- 17 API endpoint tests (full lifecycle workflows)
- Total: 92 tests passed (including Window 17 crypto tests)
Dependencies:
- Add pydantic-settings==2.2.1
- Add httpx==0.27.0 for test client
- Update config.py to use pydantic-settings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
You have reached your Codex rate limits. Please try again later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a comprehensive cryptographic secret gateway service that adds token management and encryption capabilities to the existing application.
Key Changes
- Cryptographic Services: Implements
CryptoServicefor secure encryption/decryption, hashing, and secret generation using industry-standard libraries (Fernet, PBKDF2) - Token Management System: Adds
TokenServicewith ephemeral token issuance, validation, and revocation capabilities including automatic expiry and cleanup - RESTful API: Provides token management endpoints (
/tokens/issue,/tokens/validate,/tokens/{token_id},/tokens/stats) through FastAPI
Reviewed Changes
Copilot reviewed 11 out of 13 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
app/services/crypto/crypto_service.py |
Core cryptographic operations including Fernet encryption, SHA-256 hashing, PBKDF2 key derivation, and secure random generation |
app/services/crypto/token_service.py |
Token lifecycle management with configurable TTL, validation, and revocation logic |
app/services/crypto/token_store.py |
Thread-safe in-memory token storage with automatic cleanup timer and expiry handling |
app/services/crypto/token_models.py |
Pydantic models for token requests, responses, scopes, and validation schemas |
app/services/crypto/__init__.py |
Module exports for the crypto services package |
app/api/token_router.py |
FastAPI router defining token management endpoints with proper HTTP status codes |
app/main.py |
Integrates token router into the main FastAPI application |
app/config.py |
Updates to use pydantic-settings for proper settings management |
requirements.txt |
Adds dependencies: cryptography==42.0.5, httpx==0.27.0, pydantic-settings==2.2.1 |
tests/test_crypto_service.py |
Comprehensive tests covering encryption, hashing, key derivation, and edge cases (497 lines) |
tests/test_token_service.py |
Tests for token issuance, validation, revocation, store operations, and concurrency (573 lines) |
tests/test_token_api.py |
API endpoint tests covering complete token workflows and error cases (418 lines) |
.gitignore |
Adds Python-specific ignore patterns for build artifacts and caches |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @field_validator('ttl_seconds') | ||
| @classmethod | ||
| def validate_ttl(cls, v: int) -> int: | ||
| if v < 1: | ||
| raise ValueError("TTL must be at least 1 second") | ||
| if v > 3600: | ||
| raise ValueError("TTL cannot exceed 3600 seconds (1 hour)") | ||
| return v |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validate_ttl validator is redundant since the ttl_seconds field already has ge=1 and le=3600 constraints defined in the Field definition (lines 24-29). Pydantic will automatically validate these constraints. Consider removing this validator to avoid duplication.
| @field_validator('ttl_seconds') | |
| @classmethod | |
| def validate_ttl(cls, v: int) -> int: | |
| if v < 1: | |
| raise ValueError("TTL must be at least 1 second") | |
| if v > 3600: | |
| raise ValueError("TTL cannot exceed 3600 seconds (1 hour)") | |
| return v |
| router = APIRouter(prefix="/tokens", tags=["tokens"]) | ||
|
|
||
| # Global token service instance | ||
| _token_service: TokenService = TokenService() |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a global singleton token service instance may cause issues in production deployments with multiple workers. Consider using FastAPI's dependency injection system to manage the TokenService lifecycle properly. This would allow for better testing, isolation between requests, and proper cleanup on application shutdown.
| # Clean up expired tokens first | ||
| self.cleanup_expired() |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The count() method calls cleanup_expired() on every invocation, which could be expensive if called frequently (e.g., in monitoring/metrics endpoints). Consider either: (1) removing the automatic cleanup from count() and relying on the scheduled cleanup, or (2) adding a cache/cooldown mechanism to avoid running cleanup too frequently.
| # Clean up expired tokens first | |
| self.cleanup_expired() |
| def shutdown(self) -> None: | ||
| """Shutdown the store and cancel cleanup timer""" | ||
| self._shutdown = True | ||
| if self._cleanup_timer: | ||
| self._cleanup_timer.cancel() | ||
| logger.info("Token store shutdown") | ||
|
|
||
| def __len__(self) -> int: | ||
| """Get count of active tokens""" | ||
| return self.count() | ||
|
|
||
| def __contains__(self, token_id: str) -> bool: | ||
| """Check if token exists and is not expired""" | ||
| return self.get(token_id) is not None |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The InMemoryTokenStore has a shutdown() method to clean up the background cleanup timer, but there's no application lifecycle event handler to call it. Consider adding a FastAPI shutdown event handler in app/main.py to properly shutdown the token store and cancel the cleanup timer when the application stops.
| """ | ||
| with self._lock: | ||
| self._store[token.token_id] = token | ||
| logger.debug(f"Stored token {token.token_id}, expires at {token.expires_at}") |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The debug log statement includes the full token_id, which is sensitive information. For security best practices, consider logging only a partial token ID (e.g., token.token_id[:8]...) to prevent token leakage in log files, similar to what's done in token_service.py line 111.
| logger.debug(f"Stored token {token.token_id}, expires at {token.expires_at}") | |
| logger.debug(f"Stored token {token.token_id[:8]}..., expires at {token.expires_at}") |
| Number of tokens removed | ||
| """ | ||
| with self._lock: | ||
| now = datetime.utcnow() |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datetime.utcnow() is deprecated as of Python 3.12. Consider using datetime.now(timezone.utc) instead for timezone-aware datetime objects.
| import hashlib | ||
| import secrets | ||
| from typing import Optional, Tuple | ||
| from base64 import b64encode, b64decode |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'b64decode' is not used.
| from base64 import b64encode, b64decode | |
| from base64 import b64encode |
| from app.services.crypto import CryptoService | ||
| from app.services.crypto.crypto_service import ( | ||
| CryptoError, | ||
| EncryptionError, |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'EncryptionError' is not used.
| EncryptionError, |
| Token models and schemas for ephemeral token management | ||
| """ | ||
|
|
||
| from datetime import datetime, timedelta |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'timedelta' is not used.
| from datetime import datetime, timedelta | |
| from datetime import datetime |
| """ | ||
|
|
||
| import threading | ||
| from datetime import datetime, timedelta |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'timedelta' is not used.
| from datetime import datetime, timedelta | |
| from datetime import datetime |
No description provided.