Skip to content

Security

rUv edited this page Jul 31, 2025 · 1 revision

Security

Comprehensive security guide for FACT, covering authentication, data protection, secure deployment, and best practices.

🛡️ Security Overview

FACT implements a multi-layered security approach designed to protect data, ensure system integrity, and maintain user privacy across all deployment scenarios.

Security Principles

Principle Implementation Impact
Defense in Depth Multiple security layers Comprehensive protection
Least Privilege Minimal access rights Reduced attack surface
Zero Trust Verify everything Enhanced security posture
Data Encryption At rest and in transit Data confidentiality

🔐 Authentication & Authorization

API Key Management

import os
import secrets
import hashlib
from datetime import datetime, timedelta

class APIKeyManager:
    """Secure API key management system"""
    
    def __init__(self, config: SecurityConfig):
        self.config = config
        self.key_store = SecureKeyStore()
        self.rate_limiter = RateLimiter()
    
    def generate_api_key(self, user_id: str, permissions: List[str]) -> str:
        """Generate secure API key with expiration"""
        
        # Generate cryptographically secure key
        key_data = {
            'user_id': user_id,
            'permissions': permissions,
            'created_at': datetime.utcnow().isoformat(),
            'expires_at': (datetime.utcnow() + timedelta(days=90)).isoformat(),
            'nonce': secrets.token_hex(16)
        }
        
        # Create key hash
        key_string = f"{user_id}:{secrets.token_urlsafe(32)}"
        key_hash = hashlib.sha256(key_string.encode()).hexdigest()
        
        # Store key metadata
        self.key_store.store_key(key_hash, key_data)
        
        return f"fact_{key_hash[:8]}.{secrets.token_urlsafe(32)}"
    
    async def validate_key(self, api_key: str) -> Optional[dict]:
        """Validate API key and check permissions"""
        
        try:
            # Extract key hash
            if not api_key.startswith('fact_'):
                return None
            
            key_parts = api_key[5:].split('.')
            if len(key_parts) != 2:
                return None
            
            key_prefix, key_suffix = key_parts
            
            # Check rate limiting
            if not await self.rate_limiter.check_limit(api_key):
                raise SecurityError("Rate limit exceeded")
            
            # Retrieve key metadata
            key_data = await self.key_store.get_key(key_prefix)
            if not key_data:
                return None
            
            # Check expiration
            expires_at = datetime.fromisoformat(key_data['expires_at'])
            if datetime.utcnow() > expires_at:
                await self.key_store.revoke_key(key_prefix)
                return None
            
            return key_data
            
        except Exception as e:
            logger.warning(f"API key validation failed: {e}")
            return None
    
    async def revoke_key(self, api_key: str, reason: str = "manual_revocation"):
        """Revoke API key"""
        key_prefix = api_key[5:].split('.')[0]
        await self.key_store.revoke_key(key_prefix, reason)
        logger.info(f"API key revoked: {key_prefix} - {reason}")

Role-Based Access Control (RBAC)

from enum import Enum
from typing import Set, Dict

class Permission(Enum):
    READ_QUERIES = "read:queries"
    WRITE_QUERIES = "write:queries"
    MANAGE_CACHE = "manage:cache"
    ADMIN_SYSTEM = "admin:system"
    VIEW_METRICS = "view:metrics"

class Role:
    def __init__(self, name: str, permissions: Set[Permission]):
        self.name = name
        self.permissions = permissions

class RBACManager:
    """Role-based access control manager"""
    
    def __init__(self):
        self.roles = {
            'viewer': Role('viewer', {Permission.READ_QUERIES, Permission.VIEW_METRICS}),
            'user': Role('user', {Permission.READ_QUERIES, Permission.WRITE_QUERIES, Permission.VIEW_METRICS}),
            'operator': Role('operator', {Permission.READ_QUERIES, Permission.WRITE_QUERIES, Permission.MANAGE_CACHE, Permission.VIEW_METRICS}),
            'admin': Role('admin', set(Permission))
        }
        self.user_roles: Dict[str, Set[str]] = {}
    
    def assign_role(self, user_id: str, role_name: str):
        """Assign role to user"""
        if role_name not in self.roles:
            raise ValueError(f"Unknown role: {role_name}")
        
        if user_id not in self.user_roles:
            self.user_roles[user_id] = set()
        
        self.user_roles[user_id].add(role_name)
        logger.info(f"Assigned role {role_name} to user {user_id}")
    
    def check_permission(self, user_id: str, permission: Permission) -> bool:
        """Check if user has specific permission"""
        user_roles = self.user_roles.get(user_id, set())
        
        for role_name in user_roles:
            role = self.roles.get(role_name)
            if role and permission in role.permissions:
                return True
        
        return False
    
    def get_user_permissions(self, user_id: str) -> Set[Permission]:
        """Get all permissions for user"""
        permissions = set()
        user_roles = self.user_roles.get(user_id, set())
        
        for role_name in user_roles:
            role = self.roles.get(role_name)
            if role:
                permissions.update(role.permissions)
        
        return permissions

🔒 Data Protection

Encryption at Rest

import cryptography
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os

class DataEncryption:
    """Handle data encryption at rest"""
    
    def __init__(self, master_key: bytes = None):
        if master_key is None:
            master_key = self._derive_key_from_env()
        
        self.cipher = Fernet(master_key)
    
    def _derive_key_from_env(self) -> bytes:
        """Derive encryption key from environment"""
        password = os.environ.get('FACT_MASTER_KEY', '').encode()
        if not password:
            raise SecurityError("FACT_MASTER_KEY environment variable not set")
        
        # Use a fixed salt for consistency (in production, use a secure random salt)
        salt = b'fact_salt_2024'  # Should be random and stored securely
        
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
        )
        
        key = base64.urlsafe_b64encode(kdf.derive(password))
        return key
    
    def encrypt_data(self, data: str) -> str:
        """Encrypt string data"""
        encrypted_bytes = self.cipher.encrypt(data.encode())
        return base64.urlsafe_b64encode(encrypted_bytes).decode()
    
    def decrypt_data(self, encrypted_data: str) -> str:
        """Decrypt string data"""
        try:
            encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode())
            decrypted_bytes = self.cipher.decrypt(encrypted_bytes)
            return decrypted_bytes.decode()
        except Exception as e:
            raise SecurityError(f"Decryption failed: {e}")
    
    def encrypt_file(self, file_path: str, output_path: str = None):
        """Encrypt file contents"""
        if output_path is None:
            output_path = f"{file_path}.encrypted"
        
        with open(file_path, 'rb') as infile:
            file_data = infile.read()
        
        encrypted_data = self.cipher.encrypt(file_data)
        
        with open(output_path, 'wb') as outfile:
            outfile.write(encrypted_data)
    
    def decrypt_file(self, encrypted_path: str, output_path: str = None):
        """Decrypt file contents"""
        if output_path is None:
            output_path = encrypted_path.replace('.encrypted', '')
        
        with open(encrypted_path, 'rb') as infile:
            encrypted_data = infile.read()
        
        decrypted_data = self.cipher.decrypt(encrypted_data)
        
        with open(output_path, 'wb') as outfile:
            outfile.write(decrypted_data)

Secure Cache Storage

class SecureCache:
    """Cache with encryption and integrity checking"""
    
    def __init__(self, config: CacheConfig):
        self.encryption = DataEncryption()
        self.base_cache = Cache(config)
        self.integrity_checker = IntegrityChecker()
    
    async def set(self, key: str, value: Any, ttl: int = None, sensitive: bool = False):
        """Store value in cache with optional encryption"""
        
        # Serialize value
        serialized_value = json.dumps(value, default=str)
        
        # Encrypt if sensitive
        if sensitive:
            encrypted_value = self.encryption.encrypt_data(serialized_value)
            cache_data = {
                'encrypted': True,
                'data': encrypted_value,
                'checksum': self.integrity_checker.calculate_checksum(serialized_value)
            }
        else:
            cache_data = {
                'encrypted': False,
                'data': serialized_value,
                'checksum': self.integrity_checker.calculate_checksum(serialized_value)
            }
        
        # Store in cache
        await self.base_cache.set(key, cache_data, ttl)
        
        logger.debug(f"Cached {'encrypted' if sensitive else 'plain'} data for key: {key[:10]}...")
    
    async def get(self, key: str) -> Optional[Any]:
        """Retrieve and decrypt value from cache"""
        
        cache_data = await self.base_cache.get(key)
        if cache_data is None:
            return None
        
        try:
            # Extract data
            is_encrypted = cache_data.get('encrypted', False)
            data = cache_data['data']
            stored_checksum = cache_data.get('checksum')
            
            # Decrypt if necessary
            if is_encrypted:
                decrypted_data = self.encryption.decrypt_data(data)
                serialized_value = decrypted_data
            else:
                serialized_value = data
            
            # Verify integrity
            if stored_checksum:
                calculated_checksum = self.integrity_checker.calculate_checksum(serialized_value)
                if calculated_checksum != stored_checksum:
                    logger.warning(f"Integrity check failed for cache key: {key}")
                    await self.base_cache.delete(key)  # Remove corrupted data
                    return None
            
            # Deserialize and return
            return json.loads(serialized_value)
            
        except Exception as e:
            logger.error(f"Failed to retrieve cached data for key {key}: {e}")
            return None

class IntegrityChecker:
    """Verify data integrity using checksums"""
    
    @staticmethod
    def calculate_checksum(data: str) -> str:
        """Calculate SHA-256 checksum of data"""
        return hashlib.sha256(data.encode()).hexdigest()
    
    @staticmethod
    def verify_checksum(data: str, expected_checksum: str) -> bool:
        """Verify data against expected checksum"""
        calculated_checksum = IntegrityChecker.calculate_checksum(data)
        return calculated_checksum == expected_checksum

🚨 Input Validation & Sanitization

Query Sanitization

import re
from typing import List, Dict
import html

class QuerySanitizer:
    """Sanitize user input to prevent injection attacks"""
    
    # Dangerous patterns that should be blocked or sanitized
    DANGEROUS_PATTERNS = [
        r'<script[^>]*>.*?</script>',  # Script tags
        r'javascript:',                # JavaScript URLs
        r'on\w+\s*=',                 # Event handlers (onclick, onload, etc.)
        r'data:text/html',            # Data URLs with HTML
        r'vbscript:',                 # VBScript URLs
        r'expression\s*\(',           # CSS expressions
        r'import\s+[\'"]',            # Import statements
        r'eval\s*\(',                 # Eval function calls
        r'exec\s*\(',                 # Exec function calls
        r'subprocess',                # System commands
        r'os\.system',                # OS system calls
        r'__import__',                # Dynamic imports
    ]
    
    # Maximum allowed lengths
    MAX_QUERY_LENGTH = 10000
    MAX_CONTEXT_SIZE = 50000
    
    def __init__(self):
        self.compiled_patterns = [re.compile(pattern, re.IGNORECASE | re.DOTALL) 
                                 for pattern in self.DANGEROUS_PATTERNS]
    
    def sanitize_query(self, query: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
        """Sanitize user query and context"""
        
        # Basic input validation
        if not isinstance(query, str):
            raise ValidationError("Query must be a string")
        
        if len(query) > self.MAX_QUERY_LENGTH:
            raise ValidationError(f"Query too long (max {self.MAX_QUERY_LENGTH} characters)")
        
        if len(query.strip()) == 0:
            raise ValidationError("Query cannot be empty")
        
        # Sanitize query text
        sanitized_query = self._sanitize_text(query)
        
        # Sanitize context if provided
        sanitized_context = {}
        if context:
            sanitized_context = self._sanitize_context(context)
        
        # Check for dangerous patterns
        self._check_dangerous_patterns(sanitized_query)
        
        return {
            'query': sanitized_query,
            'context': sanitized_context,
            'sanitized': True
        }
    
    def _sanitize_text(self, text: str) -> str:
        """Sanitize text content"""
        
        # HTML escape
        sanitized = html.escape(text)
        
        # Remove null bytes
        sanitized = sanitized.replace('\x00', '')
        
        # Normalize whitespace
        sanitized = re.sub(r'\s+', ' ', sanitized)
        
        # Trim
        sanitized = sanitized.strip()
        
        return sanitized
    
    def _sanitize_context(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Sanitize context dictionary"""
        
        sanitized_context = {}
        context_size = 0
        
        for key, value in context.items():
            # Validate key
            if not isinstance(key, str) or len(key) > 100:
                continue
            
            # Sanitize key
            sanitized_key = re.sub(r'[^\w\-_]', '', key)
            if not sanitized_key:
                continue
            
            # Sanitize value based on type
            if isinstance(value, str):
                sanitized_value = self._sanitize_text(value)
                context_size += len(sanitized_value)
            elif isinstance(value, (int, float, bool)):
                sanitized_value = value
            elif isinstance(value, (list, dict)):
                # Convert to string and sanitize
                sanitized_value = self._sanitize_text(str(value))
                context_size += len(sanitized_value)
            else:
                # Skip unsupported types
                continue
            
            # Check context size limit
            if context_size > self.MAX_CONTEXT_SIZE:
                logger.warning("Context size limit exceeded, truncating")
                break
            
            sanitized_context[sanitized_key] = sanitized_value
        
        return sanitized_context
    
    def _check_dangerous_patterns(self, text: str):
        """Check for dangerous patterns"""
        
        for pattern in self.compiled_patterns:
            if pattern.search(text):
                raise SecurityError(f"Dangerous pattern detected in input")
        
        # Additional checks
        if text.count('(') > 20 or text.count('[') > 20:
            raise SecurityError("Excessive parentheses or brackets detected")
        
        if len(re.sub(r'[a-zA-Z0-9\s]', '', text)) > len(text) * 0.3:
            raise SecurityError("Excessive special characters detected")

class ValidationError(Exception):
    """Input validation error"""
    pass

class SecurityError(Exception):
    """Security-related error"""
    pass

🌐 Network Security

TLS Configuration

import ssl
import certifi
from aiohttp import ClientSession, TCPConnector

class SecureHTTPClient:
    """HTTP client with enhanced security"""
    
    def __init__(self):
        # Create secure SSL context
        self.ssl_context = ssl.create_default_context(cafile=certifi.where())
        
        # Enhanced security settings
        self.ssl_context.check_hostname = True
        self.ssl_context.verify_mode = ssl.CERT_REQUIRED
        self.ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
        
        # Disable weak ciphers
        self.ssl_context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
        
        # Create connector with security settings
        self.connector = TCPConnector(
            ssl=self.ssl_context,
            limit=100,              # Connection pool limit
            limit_per_host=30,      # Per-host connection limit
            ttl_dns_cache=300,      # DNS cache TTL
            use_dns_cache=True,
        )
    
    async def create_session(self) -> ClientSession:
        """Create secure HTTP session"""
        
        # Security headers
        headers = {
            'User-Agent': 'FACT/1.0 (Secure Client)',
            'X-Content-Type-Options': 'nosniff',
            'X-Frame-Options': 'DENY',
            'X-XSS-Protection': '1; mode=block',
        }
        
        return ClientSession(
            connector=self.connector,
            headers=headers,
            timeout=ClientTimeout(total=30, connect=10),
            raise_for_status=True
        )

# Rate limiting configuration
class RateLimiter:
    """Token bucket rate limiter"""
    
    def __init__(self, requests_per_minute: int = 60):
        self.requests_per_minute = requests_per_minute
        self.tokens = {}
        self.last_update = {}
    
    async def check_limit(self, identifier: str) -> bool:
        """Check if request is within rate limit"""
        
        now = time.time()
        
        # Initialize bucket if not exists
        if identifier not in self.tokens:
            self.tokens[identifier] = self.requests_per_minute
            self.last_update[identifier] = now
            return True
        
        # Add tokens based on elapsed time
        elapsed = now - self.last_update[identifier]
        tokens_to_add = elapsed * (self.requests_per_minute / 60.0)
        self.tokens[identifier] = min(
            self.requests_per_minute,
            self.tokens[identifier] + tokens_to_add
        )
        self.last_update[identifier] = now
        
        # Check if tokens available
        if self.tokens[identifier] >= 1:
            self.tokens[identifier] -= 1
            return True
        
        return False

🔍 Security Monitoring

Audit Logging

import logging
import json
from datetime import datetime
from typing import Optional, Dict, Any

class SecurityAuditor:
    """Security event auditing and logging"""
    
    def __init__(self, config: AuditConfig):
        self.config = config
        
        # Configure security logger
        self.security_logger = logging.getLogger('fact.security')
        self.security_logger.setLevel(logging.INFO)
        
        # Add secure file handler
        handler = logging.FileHandler(
            config.audit_log_path,
            mode='a',
            encoding='utf-8'
        )
        
        # Secure log format
        formatter = logging.Formatter(
            '%(asctime)s | %(levelname)s | %(name)s | %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S UTC'
        )
        handler.setFormatter(formatter)
        self.security_logger.addHandler(handler)
    
    def log_authentication(self, 
                          user_id: str, 
                          success: bool, 
                          method: str = 'api_key',
                          ip_address: str = None,
                          user_agent: str = None):
        """Log authentication attempt"""
        
        event = {
            'event_type': 'authentication',
            'user_id': user_id,
            'success': success,
            'method': method,
            'ip_address': ip_address,
            'user_agent': user_agent,
            'timestamp': datetime.utcnow().isoformat()
        }
        
        level = logging.INFO if success else logging.WARNING
        self.security_logger.log(level, json.dumps(event))
        
        # Alert on repeated failures
        if not success:
            self._check_brute_force(user_id, ip_address)
    
    def log_authorization(self,
                         user_id: str,
                         resource: str,
                         action: str,
                         success: bool,
                         ip_address: str = None):
        """Log authorization check"""
        
        event = {
            'event_type': 'authorization',
            'user_id': user_id,
            'resource': resource,
            'action': action,
            'success': success,
            'ip_address': ip_address,
            'timestamp': datetime.utcnow().isoformat()
        }
        
        level = logging.INFO if success else logging.WARNING
        self.security_logger.log(level, json.dumps(event))
    
    def log_query_processing(self,
                           user_id: str,
                           query_hash: str,
                           processing_time: float,
                           cache_hit: bool,
                           ip_address: str = None):
        """Log query processing"""
        
        event = {
            'event_type': 'query_processing',
            'user_id': user_id,
            'query_hash': query_hash,
            'processing_time': processing_time,
            'cache_hit': cache_hit,
            'ip_address': ip_address,
            'timestamp': datetime.utcnow().isoformat()
        }
        
        self.security_logger.info(json.dumps(event))
    
    def log_security_violation(self,
                              violation_type: str,
                              user_id: str = None,
                              details: Dict[str, Any] = None,
                              ip_address: str = None,
                              severity: str = 'medium'):
        """Log security violation"""
        
        event = {
            'event_type': 'security_violation',
            'violation_type': violation_type,
            'user_id': user_id,
            'details': details or {},
            'ip_address': ip_address,
            'severity': severity,
            'timestamp': datetime.utcnow().isoformat()
        }
        
        # Map severity to log level
        level_map = {
            'low': logging.INFO,
            'medium': logging.WARNING,
            'high': logging.ERROR,
            'critical': logging.CRITICAL
        }
        
        level = level_map.get(severity, logging.WARNING)
        self.security_logger.log(level, json.dumps(event))
        
        # Alert on high/critical violations
        if severity in ['high', 'critical']:
            self._send_security_alert(event)
    
    def _check_brute_force(self, user_id: str, ip_address: str):
        """Check for brute force attacks"""
        # Implementation would check recent failed attempts
        # and trigger alerts/blocks if threshold exceeded
        pass
    
    def _send_security_alert(self, event: Dict[str, Any]):
        """Send security alert to monitoring system"""
        # Implementation would send alert to monitoring/alerting system
        pass

🚀 Secure Deployment

Docker Security

# Multi-stage secure Docker build
FROM python:3.11-slim as builder

# Create non-root user
RUN groupadd -r fact && useradd -r -g fact fact

# Install dependencies in builder stage
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt

# Production stage
FROM python:3.11-slim

# Install security updates
RUN apt-get update && apt-get upgrade -y && \
    apt-get install -y --no-install-recommends \
    ca-certificates && \
    rm -rf /var/lib/apt/lists/*

# Create non-root user
RUN groupadd -r fact && useradd -r -g fact fact

# Copy dependencies from builder
COPY --from=builder /root/.local /home/fact/.local

# Create app directory
WORKDIR /app

# Copy application code
COPY --chown=fact:fact . .

# Remove unnecessary files
RUN find . -name "*.pyc" -exec rm -f {} + && \
    find . -name "*.pyo" -exec rm -f {} + && \
    find . -name "*~" -exec rm -f {} +

# Set proper permissions
RUN chmod -R 750 /app && \
    chmod -R 640 /app/*.py

# Switch to non-root user
USER fact

# Set PATH
ENV PATH=/home/fact/.local/bin:$PATH

# Security settings
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONHASHSEED=random

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8000/health')" || exit 1

# Expose port (non-privileged)
EXPOSE 8000

# Run application
CMD ["python", "-m", "fact.main"]

Kubernetes Security

# Secure Kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fact-deployment
  labels:
    app: fact
spec:
  replicas: 3
  selector:
    matchLabels:
      app: fact
  template:
    metadata:
      labels:
        app: fact
    spec:
      # Security context
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        runAsGroup: 1000
        fsGroup: 1000
        seccompProfile:
          type: RuntimeDefault
      
      # Service account
      serviceAccountName: fact-service-account
      automountServiceAccountToken: false
      
      containers:
      - name: fact
        image: fact:latest
        
        # Container security context
        securityContext:
          allowPrivilegeEscalation: false
          readOnlyRootFilesystem: true
          runAsNonRoot: true
          runAsUser: 1000
          capabilities:
            drop:
            - ALL
        
        # Resource limits
        resources:
          limits:
            cpu: 1000m
            memory: 2Gi
          requests:
            cpu: 500m
            memory: 1Gi
        
        # Environment variables
        env:
        - name: FACT_MASTER_KEY
          valueFrom:
            secretKeyRef:
              name: fact-secrets
              key: master-key
        
        # Ports
        ports:
        - containerPort: 8000
          name: http
          protocol: TCP
        
        # Health checks
        livenessProbe:
          httpGet:
            path: /health
            port: http
          initialDelaySeconds: 30
          periodSeconds: 10
        
        readinessProbe:
          httpGet:
            path: /ready
            port: http
          initialDelaySeconds: 5
          periodSeconds: 5
        
        # Volume mounts
        volumeMounts:
        - name: tmp-volume
          mountPath: /tmp
        - name: cache-volume
          mountPath: /app/cache
      
      volumes:
      - name: tmp-volume
        emptyDir: {}
      - name: cache-volume
        emptyDir: {}
      
      # Pod security
      restartPolicy: Always
      terminationGracePeriodSeconds: 30

📋 Security Checklist

Development Security

  • Use secure coding practices
  • Implement input validation
  • Enable static code analysis
  • Regular dependency updates
  • Code review for security
  • Secret management (no hardcoded secrets)
  • Error handling (no information leakage)
  • Logging security events

Deployment Security

  • TLS/SSL configuration
  • Non-root container execution
  • Resource limits
  • Network policies
  • Secret management
  • Security scanning
  • Monitoring and alerting
  • Backup encryption

Operational Security

  • Regular security audits
  • Vulnerability assessments
  • Incident response plan
  • Access control reviews
  • Security training
  • Compliance monitoring
  • Documentation updates
  • Emergency procedures

This security guide provides comprehensive protection for FACT deployments. Regular review and updates of security measures are essential to maintain a strong security posture.

Clone this wiki locally