Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__pycache__/
*.pyc
.pytest_cache/
136 changes: 136 additions & 0 deletions app/api/token_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""
API Router for SecretGateway Token Management
"""

from fastapi import APIRouter, HTTPException, status
from typing import Dict
import logging

from app.services.crypto import (
TokenService,
TokenIssuanceRequest,
TokenIssuanceResponse,
TokenValidationRequest,
TokenValidationResponse
)

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/tokens", tags=["tokens"])

# Global token service instance
_token_service: TokenService = TokenService()
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a global singleton token service instance may cause issues in production deployments with multiple workers. Consider using FastAPI's dependency injection system to manage the TokenService lifecycle properly. This would allow for better testing, isolation between requests, and proper cleanup on application shutdown.

Copilot uses AI. Check for mistakes.


def get_token_service() -> TokenService:
"""Get the global token service instance"""
return _token_service


@router.post(
"/issue",
response_model=TokenIssuanceResponse,
status_code=status.HTTP_201_CREATED,
summary="Issue ephemeral token",
description="Issue a short-lived token with specified scope and TTL"
)
async def issue_token(request: TokenIssuanceRequest) -> TokenIssuanceResponse:
"""
Issue a new ephemeral token

- **scope**: Token scope with resource and actions
- **ttl_seconds**: Time-to-live (1-3600 seconds)
- **metadata**: Optional metadata dictionary

Returns the token ID and expiration details.
"""
try:
response = _token_service.issue_token_from_request(request)
logger.info(f"Token issued: {response.token_id[:8]}... for {request.scope.resource}")
return response
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error(f"Token issuance failed: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to issue token"
)


@router.post(
"/validate",
response_model=TokenValidationResponse,
summary="Validate token",
description="Validate a token and check if it's still valid"
)
async def validate_token(request: TokenValidationRequest) -> TokenValidationResponse:
"""
Validate a token

- **token_id**: Token identifier to validate

Returns validation result with token details if valid.
"""
response = _token_service.validate_token(request.token_id)
return response


@router.delete(
"/{token_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="Revoke token",
description="Revoke (delete) a token before it expires"
)
async def revoke_token(token_id: str) -> None:
"""
Revoke a token

- **token_id**: Token identifier to revoke

Returns 204 No Content on success, 404 if token not found.
"""
revoked = _token_service.revoke_token(token_id)
if not revoked:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Token not found"
)


@router.get(
"/stats",
response_model=Dict[str, int],
summary="Get token statistics",
description="Get statistics about active tokens"
)
async def get_token_stats() -> Dict[str, int]:
"""
Get token statistics

Returns count of active tokens.
"""
return {
"active_tokens": _token_service.get_active_token_count()
}


@router.post(
"/cleanup",
response_model=Dict[str, int],
summary="Cleanup expired tokens",
description="Manually trigger cleanup of expired tokens"
)
async def cleanup_expired_tokens() -> Dict[str, int]:
"""
Manually trigger cleanup of expired tokens

Returns count of tokens removed.
"""
count = _token_service.cleanup_expired_tokens()
return {
"removed_tokens": count
}
3 changes: 2 additions & 1 deletion app/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pydantic import BaseSettings, AnyUrl
from pydantic import AnyUrl
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
Expand Down
2 changes: 2 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
from .config import get_settings
from .logging import setup_logging
from .api.router import router
from .api.token_router import router as token_router

settings = get_settings()
setup_logging(settings.LOG_JSON)

app = FastAPI(title="AgentAddon EventBridge", version="0.1.0")
app.include_router(router)
app.include_router(token_router)

@app.get("/health")
async def health():
Expand Down
33 changes: 33 additions & 0 deletions app/services/crypto/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
SecretGateway Cryptographic Services Module

Provides core cryptographic utilities for secure secret management:
- Secret generation
- Symmetric encryption/decryption
- Secure hashing with salt
- Ephemeral token issuance and validation
"""

from .crypto_service import CryptoService
from .token_service import TokenService
from .token_store import InMemoryTokenStore
from .token_models import (
Token,
TokenScope,
TokenIssuanceRequest,
TokenIssuanceResponse,
TokenValidationRequest,
TokenValidationResponse
)

__all__ = [
"CryptoService",
"TokenService",
"InMemoryTokenStore",
"Token",
"TokenScope",
"TokenIssuanceRequest",
"TokenIssuanceResponse",
"TokenValidationRequest",
"TokenValidationResponse",
]
Loading
Loading