Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/features/gdpr/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# GDPR Compliance Package
from .endpoints import gdpr_bp
from .service import GDPRService
from .audit import log_gdpr_action, get_audit_logs, get_compliance_report

__all__ = [
'gdpr_bp',
'GDPRService',
'log_gdpr_action',
'get_audit_logs',
'get_compliance_report'
]
50 changes: 50 additions & 0 deletions src/features/gdpr/audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# GDPR Audit Logging
import json
from datetime import datetime
from typing import Dict, Any

# In a real implementation, this would connect to a database or logging service
audit_log_storage = []


def log_gdpr_action(user_id: str, action: str, details: str = ""):
"""Log GDPR-related actions for compliance auditing"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'action': action,
'details': details,
'ip_address': '', # Would capture from request in actual implementation
'user_agent': '' # Would capture from request in actual implementation
}

# In a real implementation, this would save to a database or logging system
# For now, we'll just append to our in-memory list
audit_log_storage.append(log_entry)

# Also print to standard output for visibility during development
print(f"GDPR Audit Log: {json.dumps(log_entry)}")


def get_audit_logs(user_id: str = None) -> list:
"""Retrieve audit logs, optionally filtered by user_id"""
if user_id:
return [log for log in audit_log_storage if log['user_id'] == user_id]
return audit_log_storage


def get_compliance_report() -> Dict[str, Any]:
"""Generate a GDPR compliance report"""
total_actions = len(audit_log_storage)

action_counts = {}
for log in audit_log_storage:
action = log['action']
action_counts[action] = action_counts.get(action, 0) + 1

return {
'report_generated_at': datetime.utcnow().isoformat(),
'total_audit_entries': total_actions,
'action_breakdown': action_counts,
'recent_activities': audit_log_storage[-10:] # Last 10 entries
}
121 changes: 121 additions & 0 deletions src/features/gdpr/endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# GDPR Compliance Endpoints
import json
import csv
import io
from datetime import datetime
from flask import Blueprint, request, jsonify, Response
from src.features.gdpr.service import GDPRService
from src.features.gdpr.audit import log_gdpr_action

gdpr_bp = Blueprint('gdpr', __name__)

gdpr_service = GDPRService()


def validate_user_id(user_id):
"""Validate user ID format"""
if not user_id or not isinstance(user_id, str) or len(user_id.strip()) == 0:
return False
return True


def validate_auth_token(token):
"""Validate authentication token"""
# In a real implementation, this would verify JWT or session token
# For now, we'll implement basic validation
if not token or not isinstance(token, str) or len(token.strip()) < 10:
return False
return True


@gdpr_bp.route('/export/<user_id>', methods=['GET'])
def export_user_data(user_id):
"""Export user data in JSON or CSV format"""
auth_header = request.headers.get('Authorization')

if not validate_user_id(user_id):
log_gdpr_action(user_id, 'export_attempt_failed', 'Invalid user ID')
return jsonify({'error': 'Invalid user ID'}), 400

if not auth_header or not validate_auth_token(auth_header.replace('Bearer ', '')):
log_gdpr_action(user_id, 'export_attempt_failed', 'Unauthorized access attempt')
return jsonify({'error': 'Unauthorized'}), 401

format_type = request.args.get('format', 'json').lower()

try:
user_data = gdpr_service.get_user_data(user_id)

if not user_data:
log_gdpr_action(user_id, 'export_attempt_failed', 'User not found')
return jsonify({'error': 'User not found'}), 404

if format_type == 'csv':
csv_content = gdpr_service.convert_to_csv(user_data)
log_gdpr_action(user_id, 'data_exported', f'Exported as CSV, {len(csv_content)} bytes')
return Response(
csv_content,
mimetype='text/csv',
headers={'Content-Disposition': f'attachment; filename=user_{user_id}_data.csv'}
)
else:
log_gdpr_action(user_id, 'data_exported', f'Exported as JSON, {len(json.dumps(user_data))} bytes')
return Response(
json.dumps(user_data, indent=2, default=str),
mimetype='application/json',
headers={'Content-Disposition': f'attachment; filename=user_{user_id}_data.json'}
)
except Exception as e:
log_gdpr_action(user_id, 'export_error', str(e))
return jsonify({'error': 'Internal server error during export'}), 500


@gdpr_bp.route('/delete/<user_id>', methods=['DELETE'])
def delete_user_data(user_id):
"""Delete all user data"""
auth_header = request.headers.get('Authorization')

if not validate_user_id(user_id):
log_gdpr_action(user_id, 'delete_attempt_failed', 'Invalid user ID')
return jsonify({'error': 'Invalid user ID'}), 400

if not auth_header or not validate_auth_token(auth_header.replace('Bearer ', '')):
log_gdpr_action(user_id, 'delete_attempt_failed', 'Unauthorized access attempt')
return jsonify({'error': 'Unauthorized'}), 401

confirmation = request.json.get('confirm', False) if request.json else False

if not confirmation:
log_gdpr_action(user_id, 'delete_attempt_failed', 'Deletion not confirmed')
return jsonify({'error': 'Deletion must be confirmed by setting confirm=true'}), 400

try:
result = gdpr_service.delete_user_data(user_id)

if result:
log_gdpr_action(user_id, 'data_deleted', 'All user data successfully deleted')
return jsonify({'message': 'User data successfully deleted'}), 200
else:
log_gdpr_action(user_id, 'delete_failed', 'User not found')
return jsonify({'error': 'User not found'}), 404
except Exception as e:
log_gdpr_action(user_id, 'delete_error', str(e))
return jsonify({'error': 'Internal server error during deletion'}), 500


@gdpr_bp.route('/audit/<user_id>', methods=['GET'])
def get_audit_log(user_id):
"""Get audit log for a specific user"""
auth_header = request.headers.get('Authorization')

if not validate_user_id(user_id):
return jsonify({'error': 'Invalid user ID'}), 400

if not auth_header or not validate_auth_token(auth_header.replace('Bearer ', '')):
return jsonify({'error': 'Unauthorized'}), 401

try:
audit_logs = gdpr_service.get_audit_logs(user_id)
return jsonify(audit_logs), 200
except Exception as e:
return jsonify({'error': 'Error retrieving audit logs'}), 500
135 changes: 135 additions & 0 deletions src/features/gdpr/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# GDPR Compliance Service
import json
import csv
import io
from typing import Dict, List, Optional
from datetime import datetime


class GDPRService:
def __init__(self):
# In a real implementation, this would connect to your database
# For now, we'll simulate with in-memory storage
self.user_data_store = {}
self.audit_log_store = []

def get_user_data(self, user_id: str) -> Optional[Dict]:
"""Retrieve all personal data for a user"""
# Simulate database lookup
# In a real implementation, this would query your actual database
# and collect all personal data associated with the user

# This is a mock implementation - in reality you'd fetch from your DB
# and collect data from all tables/models that store user information
mock_user_data = {
'user_id': user_id,
'personal_info': {
'name': 'John Doe',
'email': 'john@example.com',
'phone': '+1234567890',
'address': '123 Main St, City, State'
},
'financial_data': [
{'id': 'trans_1', 'amount': 100.00, 'date': '2023-01-01', 'description': 'Grocery'},
{'id': 'trans_2', 'amount': 50.00, 'date': '2023-01-02', 'description': 'Gas'}
],
'preferences': {
'notifications_enabled': True,
'currency': 'USD',
'budget_alerts': True
},
'account_info': {
'created_at': '2023-01-01T00:00:00Z',
'last_login': '2023-12-01T10:00:00Z',
'status': 'active'
}
}

# Return mock data if user exists, otherwise None
if user_id.startswith('test_') or user_id.isdigit():
return mock_user_data
return None

def convert_to_csv(self, user_data: Dict) -> str:
"""Convert user data to CSV format"""
output = io.StringIO()
writer = csv.writer(output)

# Write header
writer.writerow(['Category', 'Field', 'Value'])

# Flatten the user data into rows
def flatten_dict(d, parent_key=''):
items = []
for k, v in d.items():
new_key = f'{parent_key}.{k}' if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key).items())
elif isinstance(v, list):
for i, item in enumerate(v):
if isinstance(item, dict):
items.extend(flatten_dict(item, f'{new_key}[{i}]').items())
else:
items.append((f'{new_key}[{i}]', str(item)))
else:
items.append((new_key, str(v)))
return dict(items)

flattened = flatten_dict(user_data)
for key, value in flattened.items():
writer.writerow(['Personal Data', key, value])

return output.getvalue()

def delete_user_data(self, user_id: str) -> bool:
"""Delete all personal data for a user"""
# In a real implementation, this would:
# 1. Delete user records from all relevant tables
# 2. Remove any related data (transactions, preferences, etc.)
# 3. Ensure foreign key constraints are handled properly
# 4. Log the deletion action

# Mock implementation - mark user as deleted
# In real implementation, actually remove from database
user_exists = self.get_user_data(user_id) is not None

if user_exists:
# Here you would perform actual deletion from database
# For example:
# db.session.execute(delete(User).where(User.id == user_id))
# db.session.execute(delete(Transaction).where(Transaction.user_id == user_id))
# db.session.commit()

# For this mock, just return success
return True

return False

def get_audit_logs(self, user_id: str) -> List[Dict]:
"""Get audit logs for a specific user"""
# In a real implementation, this would query the audit log table
# and return logs related to the specified user

# Mock implementation
mock_logs = [
{
'timestamp': '2023-12-01T10:00:00Z',
'action': 'data_accessed',
'details': 'User profile accessed'
},
{
'timestamp': '2023-12-01T10:05:00Z',
'action': 'data_exported',
'details': 'Data exported as JSON'
}
]

return mock_logs

def get_all_users_for_deletion(self, days_inactive: int = 365) -> List[str]:
"""Get list of users eligible for data deletion based on inactivity"""
# In a real implementation, this would query the database
# for users who have been inactive for the specified number of days

# Mock implementation
return ['inactive_user_1', 'inactive_user_2']