-
-
Notifications
You must be signed in to change notification settings - Fork 321
Zero Trust Vulnerability Reporting workflow MVP (Phase 1) #5377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
👋 Hi @Jayant2908! This pull request needs a peer review before it can be merged. Please request a review from a team member who is not:
Once a valid peer review is submitted, this check will pass automatically. Thank you! |
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds a zero-trust vulnerability reporting workflow: API endpoint to submit encrypted reports, a synchronous pipeline to package/encrypt/deliver artifacts, org-level encryption configuration, Issue model fields to track artifacts, admin UI updates, settings for binaries and temp dirs, and comprehensive tests and migration. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant API as ZeroTrustIssueCreateView
participant DB as Database
participant Pipeline as build_and_deliver_zero_trust_issue()
participant Encrypt as Encryption Backend
participant Email as Email Service
Client->>API: POST /api/zero-trust/issues/ (domain_id, url, summary, files)
activate API
API->>DB: Lookup Domain & Organization
DB-->>API: Domain + Org
API->>DB: Ensure OrgEncryptionConfig exists
DB-->>API: Encryption config
rect rgb(220,240,255)
note over API: Validation (URL, files, throttling)
end
API->>DB: Create Issue (is_zero_trust=true, is_hidden=true, delivery_status=pending_build)
DB-->>API: Issue
API->>Pipeline: build_and_deliver_zero_trust_issue(issue, files)
deactivate API
activate Pipeline
rect rgb(240,220,255)
note over Pipeline: Build artifact
Pipeline->>Pipeline: Create temp dir, sanitize & save files
Pipeline->>Pipeline: Create metadata.json and tar.gz
end
rect rgb(255,240,220)
note over Pipeline,Encrypt: Encrypt artifact (age/openpgp/sym_7z)
Pipeline->>Encrypt: Encrypt tarball (method from config)
Encrypt-->>Pipeline: Encrypted artifact
end
rect rgb(240,255,240)
note over Pipeline,Email: Delivery & finalize
Pipeline->>Email: Send encrypted artifact (no plaintext/password)
Email-->>Pipeline: Delivery confirmation
Pipeline->>DB: Update Issue (artifact_sha256, encryption_method, delivery_status=delivered, delivered_at)
end
Pipeline->>Pipeline: Secure delete temp dir
Pipeline-->>API: Return result
activate API
API-->>Client: 201 Created (id, artifact_sha256, delivery_status)
deactivate API
deactivate Pipeline
sequenceDiagram
participant Pipeline as build_and_deliver_zero_trust_issue()
participant GenPwd as generate_secure_password()
participant Encrypt as _encrypt_artifact_for_org()
participant OOB as _deliver_password_oob()
participant Email as Email Service
participant DB as Database
activate Pipeline
rect rgb(255,240,220)
note over Pipeline: Symmetric 7z flow
Pipeline->>GenPwd: Generate password (in-memory)
GenPwd-->>Pipeline: Password
Pipeline->>Encrypt: Encrypt with sym_7z using password
Encrypt-->>Pipeline: Encrypted .7z artifact
end
Pipeline->>Pipeline: Compute SHA-256
rect rgb(240,255,240)
note over Pipeline,OOB: Out-of-band password delivery
Pipeline->>OOB: deliver_password_oob(password, contact)
OOB->>Email: Send password via separate channel
Email-->>OOB: Confirm
OOB-->>Pipeline: Success
Pipeline->>Pipeline: Overwrite/clear password from memory
end
Pipeline->>Email: Send encrypted artifact (no password)
Email-->>Pipeline: Confirm
Pipeline->>DB: Update Issue (artifact_sha256, encryption_method='sym_7z', delivery_status='delivered')
deactivate Pipeline
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
📊 Monthly LeaderboardHi @Jayant2908! Here's how you rank for December 2025:
Leaderboard based on contributions in December 2025. Keep up the great work! 🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements Phase 1 of a Zero-Trust vulnerability reporting system for BLT, enabling end-to-end encrypted vulnerability submissions where sensitive proof-of-concept data is never stored on disk or in the database. The implementation adds encrypted artifact delivery directly to organization security contacts while maintaining minimal metadata for auditability.
Key Changes:
- Added zero-trust pipeline for ephemeral file handling, encryption, and secure delivery via email
- Extended Issue model with metadata-only fields (hash, encryption method, delivery status) without storing sensitive content
- Created OrgEncryptionConfig model to manage per-organization encryption preferences (age, OpenPGP, or symmetric 7z)
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 29 comments.
Show a summary per file
| File | Description |
|---|---|
| website/zero_trust_pipeline.py | Core pipeline implementing ephemeral file handling, tar archive creation, encryption (age/pgp/7z), email delivery, and secure cleanup |
| website/models.py | Added OrgEncryptionConfig model for encryption configuration and extended Issue model with zero-trust metadata fields |
| website/api/views.py | Added ZeroTrustIssueCreateView API endpoint with authentication, validation, and pipeline integration |
| website/admin.py | Added readonly field protections for zero-trust Issue fields to prevent accidental modification |
| blt/urls.py | Registered zero-trust API endpoint at /api/zero-trust/issues/ |
| blt/settings.py | Added configuration for temporary directory location and encryption tool binary paths |
| website/migrations/0264_*.py | Database migration adding OrgEncryptionConfig model and zero-trust fields to Issue model |
| website/tests/test_zero_trust.py | Unit tests for zero-trust pipeline behavior including cleanup and error handling |
| website/tests/test_zero_trust_api.py | API integration tests for zero-trust issue submission endpoint |
|
@coderabbitai Can you throughly review zero_trust_pipeline.py file as copilot suggested a lot of changes in that(go through those too) and suggest all the changes required within this PR's scope!? |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 7664 🏁 Script executed: Length of output: 2228 🏁 Script executed: Length of output: 8924 🏁 Script executed: Length of output: 590 @Jayant2908, I've thoroughly reviewed 🚨 Critical Security Issues (Must Fix)1. Subprocess Timeout (Lines 105, 118, 139)Risk: Malicious/buggy encryption tools could hang indefinitely, causing resource exhaustion. Fix: Add timeout to all # age encryption
subprocess.run(cmd, check=True, timeout=300, capture_output=True)
# OpenPGP encryption
subprocess.run(cmd, check=True, timeout=300, capture_output=True)
# 7z encryption
subprocess.run(cmd, check=True, timeout=300, capture_output=True)2. File Size and Count Validation (Line 35)Risk: Users could upload extremely large files or many files, exhausting disk/memory. Fix: Add validation at the start of the pipeline: def build_and_deliver_zero_trust_issue(issue: Issue, uploaded_files) -> None:
"""..."""
# Validation constants
MAX_FILE_SIZE = getattr(settings, "ZERO_TRUST_MAX_FILE_SIZE", 50 * 1024 * 1024) # 50MB
MAX_FILES_COUNT = getattr(settings, "ZERO_TRUST_MAX_FILES", 10)
# Validate file count
if len(uploaded_files) > MAX_FILES_COUNT:
raise ValueError(f"Maximum {MAX_FILES_COUNT} files allowed")
# Validate file sizes
total_size = 0
for f in uploaded_files:
if hasattr(f, 'size'):
if f.size > MAX_FILE_SIZE:
raise ValueError(f"File {f.name} exceeds maximum size of {MAX_FILE_SIZE} bytes")
total_size += f.size
if total_size > MAX_FILE_SIZE:
raise ValueError(f"Total upload size exceeds {MAX_FILE_SIZE} bytes")
os.makedirs(REPORT_TMP_DIR, exist_ok=True)
# ... rest of the code3. Command Injection Risk (Lines 105, 118)Risk: Fix: Add validation functions: import re
def _validate_age_recipient(recipient: str) -> bool:
"""Validate age recipient format (age1... or ssh-ed25519/rsa key)."""
# age recipients start with 'age1' or are ssh keys
if recipient.startswith('age1'):
# age public key format: age1[a-z0-9]{58}
return bool(re.match(r'^age1[a-z0-9]{58}$', recipient))
# Also allow SSH keys
return recipient.startswith(('ssh-ed25519 ', 'ssh-rsa '))
def _validate_pgp_fingerprint(fingerprint: str) -> bool:
"""Validate PGP fingerprint is hexadecimal."""
# PGP fingerprints are 40 hex characters (SHA-1) or 64 (SHA-256)
return bool(re.match(r'^[A-Fa-f0-9]{40}$|^[A-Fa-f0-9]{64}$', fingerprint))
def _encrypt_artifact_for_org(org_config: OrgEncryptionConfig, input_path: str, tmp_dir: str, issue: Issue):
preferred = org_config.preferred_method
# age
if preferred == OrgEncryptionConfig.ENCRYPTION_METHOD_AGE and org_config.age_recipient:
if not _validate_age_recipient(org_config.age_recipient):
raise ValueError(f"Invalid age recipient format: {org_config.age_recipient}")
out = os.path.join(tmp_dir, "report_payload.tar.gz.age")
cmd = [getattr(settings, "AGE_BINARY", "age"), "-r", org_config.age_recipient, "-o", out, input_path]
subprocess.run(cmd, check=True, timeout=300, capture_output=True)
return out, OrgEncryptionConfig.ENCRYPTION_METHOD_AGE
# OpenPGP
if preferred == OrgEncryptionConfig.ENCRYPTION_METHOD_OPENPGP and org_config.pgp_fingerprint:
if not _validate_pgp_fingerprint(org_config.pgp_fingerprint):
raise ValueError(f"Invalid PGP fingerprint format: {org_config.pgp_fingerprint}")
out = os.path.join(tmp_dir, "report_payload.tar.gz.asc")
cmd = [
getattr(settings, "GPG_BINARY", "gpg"),
"--encrypt",
"--armor",
"--recipient",
org_config.pgp_fingerprint,
"--output",
out,
input_path,
]
subprocess.run(cmd, check=True, timeout=300, capture_output=True)
return out, OrgEncryptionConfig.ENCRYPTION_METHOD_OPENPGP
# ... rest4. Broken sym_7z Fallback (Line 139)Risk: Organizations receive encrypted files they cannot decrypt (password never delivered). Fix: Disable sym_7z until OOB delivery is implemented: # Fallback: DISABLED - symmetric 7z requires OOB password delivery
raise RuntimeError(
f"Organization {org_config.organization.name} has no valid encryption method configured. "
f"Symmetric 7z encryption is not supported because out-of-band password "
f"delivery is not implemented. Please configure Age or OpenPGP encryption."
)5. Filename Sanitization (Line 42)Risk: Malicious filenames with null bytes or path traversal attempts. Fix: Use stronger sanitization: import string
import unicodedata
def _sanitize_filename(filename: str) -> str:
"""Sanitize uploaded filename to prevent path traversal and other issues."""
# Get basename to prevent path traversal
filename = os.path.basename(filename)
# Normalize unicode
filename = unicodedata.normalize('NFKD', filename)
# Remove null bytes and control characters
filename = ''.join(c for c in filename if c not in ('\x00', '\r', '\n'))
# Only allow alphanumeric, spaces, dots, dashes, underscores
safe_chars = string.ascii_letters + string.digits + ' .-_'
filename = ''.join(c if c in safe_chars else '_' for c in filename)
# Prevent hidden files and ensure extension
filename = filename.strip('. ')
# Fallback if name is empty
if not filename:
filename = f"upload_{uuid.uuid4().hex[:8]}"
return filename
# In build_and_deliver_zero_trust_issue:
for f in uploaded_files:
safe_name = _sanitize_filename(f.name)
dest_path = os.path.join(issue_tmp_dir, safe_name)
# ... rest
|
|
💬 Reminder: Unresolved Conversations Hi @Jayant2908! This pull request has 29 unresolved conversations that need to be addressed. Please review and resolve the pending discussions so we can move forward with merging this PR. Thank you! 🙏 |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 7664 @Jayant2908, here are the detailed step-by-step changes for 📋 Step-by-Step ChangesSECTION 1: Import Statements (Lines 1-14)At line 2 (after import logging
import re
import string
import unicodedataAfter line 14 (after the OrgEncryptionConfig import), ADD: logger = logging.getLogger(__name__)SECTION 2: Add Validation Functions (Before build_and_deliver_zero_trust_issue)After line 16 (after def _validate_age_recipient(recipient: str) -> bool:
"""Validate age recipient format (age1... or ssh-ed25519/rsa key)."""
if recipient.startswith('age1'):
# age public key format: age1[a-z0-9]{58}
return bool(re.match(r'^age1[a-z0-9]{58}$', recipient))
# Also allow SSH keys
return recipient.startswith(('ssh-ed25519 ', 'ssh-rsa '))
def _validate_pgp_fingerprint(fingerprint: str) -> bool:
"""Validate PGP fingerprint is hexadecimal."""
# PGP fingerprints are 40 hex characters (SHA-1) or 64 (SHA-256)
return bool(re.match(r'^[A-Fa-f0-9]{40}$|^[A-Fa-f0-9]{64}$', fingerprint))
def _sanitize_filename(filename: str) -> str:
"""Sanitize uploaded filename to prevent path traversal and other issues."""
# Get basename to prevent path traversal
filename = os.path.basename(filename)
# Normalize unicode
filename = unicodedata.normalize('NFKD', filename)
# Remove null bytes and control characters
filename = ''.join(c for c in filename if c not in ('\x00', '\r', '\n'))
# Only allow alphanumeric, spaces, dots, dashes, underscores
safe_chars = string.ascii_letters + string.digits + ' .-_'
filename = ''.join(c if c in safe_chars else '_' for c in filename)
# Prevent hidden files and ensure extension
filename = filename.strip('. ')
# Fallback if name is empty
if not filename:
filename = f"upload_{uuid.uuid4().hex[:8]}"
return filenameSECTION 3: Modify build_and_deliver_zero_trust_issue FunctionAt line 19 (the function definition), REPLACE the entire function with: def build_and_deliver_zero_trust_issue(issue: Issue, uploaded_files) -> None:
"""
Synchronous zero-trust pipeline:
1. Save uploaded files to an ephemeral directory.
2. Build a tar.gz with metadata.json + files.
3. Encrypt using the org's configuration.
4. Compute SHA-256, send via email, update Issue metadata.
5. Securely delete all temp files.
"""
# Validation constants
MAX_FILE_SIZE = getattr(settings, "ZERO_TRUST_MAX_FILE_SIZE", 50 * 1024 * 1024) # 50MB
MAX_FILES_COUNT = getattr(settings, "ZERO_TRUST_MAX_FILES", 10)
# Validate file count
if len(uploaded_files) > MAX_FILES_COUNT:
raise ValueError(f"Maximum {MAX_FILES_COUNT} files allowed")
# Validate file sizes
total_size = 0
for f in uploaded_files:
if hasattr(f, 'size'):
if f.size > MAX_FILE_SIZE:
raise ValueError(f"File {f.name} exceeds maximum size of {MAX_FILE_SIZE} bytes")
total_size += f.size
if total_size > MAX_FILE_SIZE:
raise ValueError(f"Total upload size exceeds {MAX_FILE_SIZE} bytes")
os.makedirs(REPORT_TMP_DIR, exist_ok=True)
submission_id = str(uuid.uuid4())
issue_tmp_dir = os.path.join(REPORT_TMP_DIR, submission_id)
os.makedirs(issue_tmp_dir, exist_ok=True)
try:
# 1. Save uploaded files with sanitized names
file_paths = []
for f in uploaded_files:
safe_name = _sanitize_filename(f.name)
dest_path = os.path.join(issue_tmp_dir, safe_name)
with open(dest_path, "wb") as out:
for chunk in f.chunks():
out.write(chunk)
file_paths.append(dest_path)
# 2. Build tar.gz with metadata.json
tar_path = os.path.join(issue_tmp_dir, "report_payload.tar.gz")
_build_tar_artifact(issue, file_paths, tar_path)
# 3. Encrypt according to org config
domain = issue.domain
org = getattr(domain, "organization", None) if domain else None
if org is None:
raise RuntimeError("Zero-trust issue must be associated with a domain/organization.")
try:
org_config = OrgEncryptionConfig.objects.get(organization=org)
except OrgEncryptionConfig.DoesNotExist:
raise RuntimeError(f"No OrgEncryptionConfig for organization {org.name}")
encrypted_path, method_used = _encrypt_artifact_for_org(org_config, tar_path, issue_tmp_dir, issue)
# 4. Compute SHA-256 and send email
artifact_sha256 = _compute_sha256(encrypted_path)
delivery_status = _send_encrypted_issue_email(issue, org_config, encrypted_path, artifact_sha256, method_used)
# Update Issue metadata only (no plaintext storage)
issue.artifact_sha256 = artifact_sha256
issue.encryption_method = method_used
issue.delivery_method = "email:smtp"
issue.delivery_status = delivery_status
issue.delivered_at = timezone.now()
issue.save(
update_fields=[
"artifact_sha256",
"encryption_method",
"delivery_method",
"delivery_status",
"delivered_at",
"modified",
]
)
except Exception as e:
issue.delivery_status = "failed"
issue.save(update_fields=["delivery_status", "modified"])
logger.error(
f"Zero-trust pipeline failed for issue {issue.id}: {str(e)}",
exc_info=True,
extra={"issue_id": issue.id, "domain": issue.domain.url if issue.domain else None}
)
raise
finally:
_secure_delete_path(issue_tmp_dir)Key changes in this function:
SECTION 4: Modify _encrypt_artifact_for_org FunctionAt line 114 (the function definition), REPLACE the entire function with: def _encrypt_artifact_for_org(org_config: OrgEncryptionConfig, input_path: str, tmp_dir: str, issue: Issue):
preferred = org_config.preferred_method
# age
if preferred == OrgEncryptionConfig.ENCRYPTION_METHOD_AGE and org_config.age_recipient:
if not _validate_age_recipient(org_config.age_recipient):
raise ValueError(f"Invalid age recipient format: {org_config.age_recipient}")
out = os.path.join(tmp_dir, "report_payload.tar.gz.age")
cmd = [getattr(settings, "AGE_BINARY", "age"), "-r", org_config.age_recipient, "-o", out, input_path]
try:
subprocess.run(cmd, check=True, timeout=300, capture_output=True)
except subprocess.TimeoutExpired as e:
logger.error(
f"Age encryption timed out for issue {issue.id}: {' '.join(cmd)}",
exc_info=True
)
raise RuntimeError(f"Encryption timed out after {e.timeout} seconds")
except subprocess.CalledProcessError as e:
logger.error(
f"Age encryption failed for issue {issue.id}: {' '.join(cmd)}\n"
f"Return code: {e.returncode}\n"
f"Stderr: {e.stderr.decode('utf-8', errors='replace') if e.stderr else 'N/A'}",
exc_info=True
)
raise RuntimeError(f"Encryption failed: {e.stderr.decode('utf-8', errors='replace') if e.stderr else 'Unknown error'}")
return out, OrgEncryptionConfig.ENCRYPTION_METHOD_AGE
# OpenPGP
if preferred == OrgEncryptionConfig.ENCRYPTION_METHOD_OPENPGP and org_config.pgp_fingerprint:
if not _validate_pgp_fingerprint(org_config.pgp_fingerprint):
raise ValueError(f"Invalid PGP fingerprint format: {org_config.pgp_fingerprint}")
out = os.path.join(tmp_dir, "report_payload.tar.gz.asc")
cmd = [
getattr(settings, "GPG_BINARY", "gpg"),
"--encrypt",
"--armor",
"--recipient",
org_config.pgp_fingerprint,
"--output",
out,
input_path,
]
try:
subprocess.run(cmd, check=True, timeout=300, capture_output=True)
except subprocess.TimeoutExpired as e:
logger.error(
f"OpenPGP encryption timed out for issue {issue.id}: {' '.join(cmd)}",
exc_info=True
)
raise RuntimeError(f"Encryption timed out after {e.timeout} seconds")
except subprocess.CalledProcessError as e:
logger.error(
f"OpenPGP encryption failed for issue {issue.id}: {' '.join(cmd)}\n"
f"Return code: {e.returncode}\n"
f"Stderr: {e.stderr.decode('utf-8', errors='replace') if e.stderr else 'N/A'}",
exc_info=True
)
raise RuntimeError(f"Encryption failed: {e.stderr.decode('utf-8', errors='replace') if e.stderr else 'Unknown error'}")
return out, OrgEncryptionConfig.ENCRYPTION_METHOD_OPENPGP
# Fallback: DISABLED - symmetric 7z requires OOB password delivery
raise RuntimeError(
f"Organization {org_config.organization.name} has no valid encryption method configured. "
f"Symmetric 7z encryption is not supported because out-of-band password "
f"delivery is not implemented. Please configure Age or OpenPGP encryption."
)Key changes:
SECTION 5: Modify _send_encrypted_issue_email FunctionAt line 154 (the function definition), REPLACE the function signature and add return statement: Change: def _send_encrypted_issue_email(
issue: Issue,
org_config: OrgEncryptionConfig,
encrypted_path: str,
artifact_sha256: str,
encryption_method: str,
):To: def _send_encrypted_issue_email(
issue: Issue,
org_config: OrgEncryptionConfig,
encrypted_path: str,
artifact_sha256: str,
encryption_method: str,
) -> str:At line 186 (after email.attach_file(encrypted_path)
email.send(fail_silently=False)With: email.attach_file(encrypted_path)
try:
email.send(fail_silently=False)
return "delivered"
except Exception as e:
logger.error(
f"Email delivery failed for issue {issue.id} to {org_config.contact_email}",
exc_info=True
)
return "encryption_success_delivery_failed"SECTION 6: Update _deliver_password_oob FunctionAt line 190 (the function definition), REPLACE the entire function with: def _deliver_password_oob(org_config: OrgEncryptionConfig, issue_id: int, password: str):
"""
NOT IMPLEMENTED: Placeholder for out-of-band password delivery.
This function is intentionally a no-op. Symmetric 7z encryption should be
disabled until a real OOB delivery mechanism (SMS/Signal/secure channel) is implemented.
SECURITY WARNING: Do NOT store the password in database or logs.
"""
logger.warning(
f"_deliver_password_oob called but not implemented for issue {issue_id}. "
f"Symmetric encryption should be disabled."
)
passSECTION 7: Update _secure_delete_file FunctionAt line 218 (the function definition), REPLACE the entire function with: def _secure_delete_file(path: str) -> None:
"""
Attempt basic secure deletion by overwriting with zeros before removal.
WARNING: This is NOT cryptographically secure deletion on modern systems:
- Copy-on-write filesystems (btrfs, ZFS) may preserve data
- SSDs with wear-leveling don't overwrite in place
- Encrypted filesystems already protect data at rest
This provides defense-in-depth by clearing RAM buffers and preventing
simple file recovery, but should not be relied upon as the sole protection.
"""
try:
length = os.path.getsize(path)
with open(path, "wb") as f:
f.write(b"\x00" * length)
os.remove(path)
except Exception:
try:
os.remove(path)
except FileNotFoundError:
pass📊 Summary of ChangesTotal changes:
These changes address all 9 critical and high-priority security issues identified in the previous review! 🎯 |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 78675 🏁 Script executed: Length of output: 38319 🏁 Script executed: Length of output: 4773 🏁 Script executed: Length of output: 147179 🏁 Script executed: Length of output: 0 @Jayant2908 Thanks — all 8 items are in-scope for this Phase 1 MVP. They tighten security, prevent abuse, and improve integrity/admin UX without changing the intended flow. Below are concise fixes you can drop into this PR.
Code (website/api/views.py): from rest_framework.throttling import ScopedRateThrottle
class ZeroTrustIssueCreateView(APIView):
"""
Zero-trust Issue creation endpoint.
Stores only metadata in DB; PoC files are encrypted and delivered ephemerally.
"""
authentication_classes = [TokenAuthentication]
permission_classes = [IsAuthenticated]
parser_classes = [MultiPartParser, FormParser]
# NEW: throttle uploads by user/IP using a named scope
throttle_classes = [ScopedRateThrottle]
throttle_scope = "zero_trust_issues"
def post(self, request):
...Settings (add or extend in settings.py): REST_FRAMEWORK = {
# keep your existing settings...
"DEFAULT_THROTTLE_RATES": {
# keep your existing rates...
"zero_trust_issues": "5/hour", # tune as needed, e.g., "2/min", "20/day"
"user": "1000/day", # optional defaults
"anon": "200/day",
}
}
Code (tests; ZeroTrustAPITests.setUp): OrgEncryptionConfig.objects.create(
organization=self.org,
contact_email="security@example.com",
preferred_method="age",
# Valid-length placeholder (age1 + 58 chars). We mock the pipeline, so this is never used by age/gpg.
age_recipient="age1" + "q" * 58,
)Optionally add a comment above explaining the pipeline is mocked and the key is not used for real encryption in this test.
Code (website/admin.py): from website.models import (..., OrgEncryptionConfig, ...) # add OrgEncryptionConfig to the import list
class OrgEncryptionConfigAdmin(admin.ModelAdmin):
list_display = ("organization", "preferred_method", "contact_email", "updated_at", "last_verified_at")
search_fields = ("organization__name", "contact_email", "age_recipient", "pgp_fingerprint")
list_filter = ("preferred_method",)
readonly_fields = ("created_at", "updated_at", "last_verified_at")
admin.site.register(OrgEncryptionConfig, OrgEncryptionConfigAdmin)
Code (website/zero_trust_pipeline.py, inside _encrypt_artifact_for_org): if preferred == OrgEncryptionConfig.ENCRYPTION_METHOD_SYM_7Z:
raise RuntimeError(
"Symmetric 7z is disabled: no out-of-band password delivery is implemented. "
"Please configure Age or OpenPGP for this organization."
)
Code (tests; ZeroTrustPipelineTests.test_pipeline_sets_hash_and_deletes_temp_files): @patch("website.zero_trust_pipeline._encrypt_artifact_for_org")
@patch("website.zero_trust_pipeline.uuid.uuid4")
def test_pipeline_sets_hash_and_deletes_temp_files(self, mock_uuid, mock_encrypt):
fixed_uuid = uuid.UUID("12345678123456781234567812345678")
mock_uuid.return_value = fixed_uuid
# Let the pipeline create the temp dir; do NOT pre-create it.
def fake_encrypt(org_config, input_path, tmp_dir, issue):
out = Path(tmp_dir) / "report.tar.gz.age"
out.write_bytes(b"encrypted-content")
return str(out), "age"
mock_encrypt.side_effect = fake_encrypt
upload = SimpleUploadedFile("poc.txt", b"secret data", content_type="text/plain")
build_and_deliver_zero_trust_issue(self.issue, [upload])
self.issue.refresh_from_db()
self.assertTrue(self.issue.is_zero_trust)
self.assertIsNotNone(self.issue.artifact_sha256)
self.assertEqual(self.issue.encryption_method, "age")
issue_tmp_dir = Path(settings.REPORT_TMP_DIR) / str(fixed_uuid)
self.assertFalse(issue_tmp_dir.exists())
Code (tests; ZeroTrustAPITests.test_zero_trust_issue_creation): self.assertEqual(response.status_code, 201)
self.assertIn("artifact_sha256", response.data)
self.assertEqual(response.data["delivery_status"], "pending_build")
# NEW: verify pipeline invocation args
self.assertTrue(mock_pipeline.called)
called_issue, called_files = mock_pipeline.call_args[0]
self.assertEqual(called_issue.id, response.data["id"])
self.assertEqual(len(called_files), 1)
self.assertEqual(getattr(called_files[0], "name", None), "exploit.txt")
Code (website/models.py): from django.core.validators import RegexValidator
class Issue(models.Model):
...
artifact_sha256 = models.CharField(
max_length=64,
blank=True,
help_text="SHA-256 of the encrypted disclosure artifact sent to the org.",
validators=[RegexValidator(
regex=r"^[A-Fa-f0-9]{64}$",
message="artifact_sha256 must be exactly 64 hexadecimal characters.",
code="invalid_sha256",
)],
)
...Note: This change requires a migration (manage.py makemigrations).
Code (website/api/views.py; in ZeroTrustIssueCreateView.post — pre-check before create): def post(self, request):
domain_id = request.data.get("domain_id")
url = request.data.get("url")
summary = request.data.get("summary")
files = request.FILES.getlist("files")
if not domain_id or not url or not summary or not files:
return Response({"error": "domain_id, url, summary and files are required"},
status=status.HTTP_400_BAD_REQUEST)
try:
domain = Domain.objects.get(id=domain_id)
except Domain.DoesNotExist:
return Response({"error": "Invalid domain_id"}, status=status.HTTP_400_BAD_REQUEST)
# NEW: Check org encryption config BEFORE creating the Issue
try:
OrgEncryptionConfig.objects.get(organization=domain.organization)
except OrgEncryptionConfig.DoesNotExist:
return Response(
{"error": "Zero-trust delivery is not configured for this organization"},
status=status.HTTP_400_BAD_REQUEST,
)
# Create Issue only after config is confirmed
issue = Issue.objects.create(
user=request.user,
domain=domain,
url=url,
description=summary,
is_hidden=True,
is_zero_trust=True,
delivery_status="pending_build",
)
assert issue.is_hidden is True
assert issue.is_zero_trust is True
try:
build_and_deliver_zero_trust_issue(issue, files)
except Exception:
return Response({"error": "Zero-trust submission failed"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
issue.refresh_from_db()
return Response(
{"id": issue.id, "artifact_sha256": issue.artifact_sha256, "delivery_status": issue.delivery_status},
status=status.HTTP_201_CREATED,
)Quick rationale summary
If you want, I can also add a small throttle test and a migration stub for the validator. |
@coderabbitai I have doubts
|
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 22821 🏁 Script executed: Length of output: 3239 🏁 Script executed: Length of output: 1839 🏁 Script executed: Length of output: 197 @Jayant2908, here are the exact fixes for all 3 issues: 1️⃣ Throttling Configuration (Exact Code)Step A: Update
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (6)
blt/settings.py (1)
665-671: Zero‑trust temp directory and binary configuration are consistent with pipeline/tests
REPORT_TMP_DIRunderBASE_DIRmatches howbuild_and_deliver_zero_trust_issueandZeroTrustPipelineTestsconstruct per‑issue temp dirs, and the AGE/GPG/7z binaries are cleanly overridable via env. If you ever deploy on non‑ephemeral disks, consider documenting thattmp_reportsshould be excluded from backups or mounted on an ephemeral volume to preserve the zero‑trust guarantees.website/tests/test_zero_trust_api.py (1)
11-23: Make Domain setup and URL usage more explicit in the API testTwo small robustness tweaks worth considering:
In
setUp,Domain.objects.create(organization=self.org, url="https://api.example.com")relies on whatever defaultsDomain.namehappens to have. To avoid surprises if that field’s constraints change, it’s safer to pass a concrete name, e.g.name="api.example.com".In
test_zero_trust_issue_creation, usingreverse("zero_trust_issue_create")instead of the hard‑coded"/api/zero-trust/issues/"would keep the test resilient to future URL refactors.Also applies to: 38-55
website/api/views.py (1)
2012-2022: Consider adding summary length validation.The endpoint validates URL format but doesn't limit the
summaryfield length. While the Issue model likely has database constraints, API-level validation prevents resource exhaustion from extremely long summaries before processing begins.🔎 Proposed enhancement
+ MAX_SUMMARY_LENGTH = 5000 # Reasonable limit for vulnerability summary + if not domain_id or not url or not summary or not files: return Response( {"error": "domain_id, url, summary, and files are required"}, status=status.HTTP_400_BAD_REQUEST, ) + + if len(summary) > MAX_SUMMARY_LENGTH: + return Response( + {"error": f"Summary exceeds maximum length of {MAX_SUMMARY_LENGTH} characters"}, + status=status.HTTP_400_BAD_REQUEST, + )website/tests/test_zero_trust_sym7z.py (1)
143-154: Command verification could be more robust.The assertion at line 151 converts the command list to string with
str(cmd)which works but is less precise. The check at line 144 filters for calls wherecall[0][0][0].endswith("7z")which may not match all 7z binary paths (e.g.,/usr/bin/7zwould work, but paths without7zsuffix wouldn't).🔎 Suggested improvement
- # Find the 7z encryption call - calls = [call for call in mock_subprocess.call_args_list if call[0][0][0].endswith("7z")] + # Find the 7z encryption call (binary name contains '7z') + calls = [call for call in mock_subprocess.call_args_list if "7z" in call[0][0][0]] self.assertGreater(len(calls), 0, "7z should have been called") cmd = calls[0][0][0] # First positional arg is the command list # Verify critical security flags - self.assertIn("7z", str(cmd)) + self.assertTrue(any("7z" in arg for arg in cmd), "Command should involve 7z") self.assertIn("-mhe=on", cmd, "Should encrypt headers") self.assertIn("-t7z", cmd, "Should use 7z format") self.assertTrue(any("-p" in arg for arg in cmd), "Should have password flag")website/zero_trust_pipeline.py (2)
195-198: Consider using generic error messages in exceptions.The
RuntimeErrorat line 198 includes the organization name, which could be exposed if exception messages are logged or returned differently in the future. While the API layer currently catches this and returns a generic error, following the principle of defense-in-depth, consider logging the details separately and raising a generic exception.🔎 Suggested improvement
try: org_config = OrgEncryptionConfig.objects.get(organization=org) except OrgEncryptionConfig.DoesNotExist: - raise RuntimeError(f"No OrgEncryptionConfig for organization {org.name}") + logger.error( + "No OrgEncryptionConfig for organization id=%s name=%s", + org.id, + org.name, + ) + raise RuntimeError("Organization encryption configuration not found")
148-150: Consider restricting temporary directory/file permissions.The temporary directory and files are created with default permissions, which may allow other users on shared systems to access sensitive data during processing. While files are short-lived and the pipeline is intended for isolated environments, restrictive permissions provide defense-in-depth.
🔎 Suggested hardening
- os.makedirs(REPORT_TMP_DIR, exist_ok=True) + os.makedirs(REPORT_TMP_DIR, mode=0o700, exist_ok=True) # Use mkdtemp for guaranteed unique directory (prevents UUID collision and race conditions) - issue_tmp_dir = tempfile.mkdtemp(prefix=f"issue_{issue.id}_", dir=REPORT_TMP_DIR) + # Note: mkdtemp uses system umask; explicitly set restrictive permissions + issue_tmp_dir = tempfile.mkdtemp(prefix=f"issue_{issue.id}_", dir=REPORT_TMP_DIR) + os.chmod(issue_tmp_dir, 0o700)For file writes (line 169), consider:
fd = os.open(dest_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) with os.fdopen(fd, "wb") as out: # ... write chunks
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (10)
blt/settings.pyblt/urls.pywebsite/admin.pywebsite/api/views.pywebsite/migrations/0264_issue_artifact_sha256_issue_delivered_at_and_more.pywebsite/models.pywebsite/tests/test_zero_trust.pywebsite/tests/test_zero_trust_api.pywebsite/tests/test_zero_trust_sym7z.pywebsite/zero_trust_pipeline.py
🧰 Additional context used
🧬 Code graph analysis (7)
blt/urls.py (1)
website/api/views.py (1)
ZeroTrustIssueCreateView(1998-2109)
website/tests/test_zero_trust_sym7z.py (2)
website/models.py (4)
Domain(463-561)Issue(667-849)Organization(191-299)OrgEncryptionConfig(302-366)website/zero_trust_pipeline.py (1)
_generate_secure_password(66-96)
website/admin.py (1)
website/models.py (1)
OrgEncryptionConfig(302-366)
website/zero_trust_pipeline.py (1)
website/models.py (13)
Issue(667-849)OrgEncryptionConfig(302-366)save(84-87)save(287-299)save(360-363)save(1510-1537)save(1645-1648)save(1838-1841)save(1956-1970)save(2061-2083)save(2882-2885)save(3414-3424)save(3722-3731)
website/api/views.py (2)
website/models.py (13)
OrgEncryptionConfig(302-366)Issue(667-849)save(84-87)save(287-299)save(360-363)save(1510-1537)save(1645-1648)save(1838-1841)save(1956-1970)save(2061-2083)save(2882-2885)save(3414-3424)save(3722-3731)website/zero_trust_pipeline.py (1)
build_and_deliver_zero_trust_issue(99-232)
website/migrations/0264_issue_artifact_sha256_issue_delivered_at_and_more.py (1)
website/models.py (1)
validate_sha256_if_present(53-60)
website/tests/test_zero_trust_api.py (2)
website/models.py (13)
Organization(191-299)OrgEncryptionConfig(302-366)save(84-87)save(287-299)save(360-363)save(1510-1537)save(1645-1648)save(1838-1841)save(1956-1970)save(2061-2083)save(2882-2885)save(3414-3424)save(3722-3731)website/api/views.py (2)
create(236-280)create(736-758)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Run Tests
- GitHub Check: Agent
- GitHub Check: docker-test
🔇 Additional comments (19)
blt/settings.py (1)
508-518: Zero‑trust throttle scope and rate wiring look correct
"zero_trust_issues": "5/hour"lines up withScopedRateThrottle+throttle_scope="zero_trust_issues"inZeroTrustIssueCreateView, so the heavy upload endpoint will actually be throttled as intended.website/migrations/0264_issue_artifact_sha256_issue_delivered_at_and_more.py (1)
14-107: Migration schema matches models and validator usageThe new Issue fields and
OrgEncryptionConfigdefinition mirrorwebsite.models(including the conditional SHA‑256 validator), so the schema aligns with the runtime model logic and should migrate cleanly.blt/urls.py (1)
22-54: Zero‑trust issue API endpoint is correctly wiredImporting
ZeroTrustIssueCreateViewand exposing it atapi/zero-trust/issues/cleanly connects the DRF view (with its auth + throttling) to the public URL that the tests exercise. No issues here.Also applies to: 1255-1259
website/tests/test_zero_trust.py (1)
44-70: Pipeline behavior coverage for success, failure, and temp cleanup looks solidThe tests for:
- successful age encryption with hash set and temp dir removal,
- failure when
OrgEncryptionConfigis missing (anddelivery_statusmoving to"failed"), and- successful sym_7z configuration via a mocked
_encrypt_artifact_for_orgnicely pin down the critical zero‑trust pipeline invariants without hitting real crypto or email. No changes needed here.
Also applies to: 86-114
website/models.py (1)
53-61: Zero‑trust model and validator design are sound
validate_sha256_if_presentcorrectly allows blanks while enforcing a strict 64‑hex format when a value is present, which fitsartifact_sha256’s lifecycle.
OrgEncryptionConfig’sclean()+save()guarantee that age/openpgp configs can’t be saved without their respective key identifiers, which avoids runtime encryption failures.The new Issue zero‑trust fields (
is_zero_trust,artifact_sha256,encryption_method,delivery_method,delivery_status,delivered_at) are modeled as metadata only, with sensible max_lengths and help_texts, and don’t interfere with existing Issue behavior.All of this aligns well with the zero‑trust workflow described in the PR.
Also applies to: 302-367, 818-848
website/admin.py (2)
187-215: Admin correctly protects zero‑trust Issue fieldsExtending
IssueAdminto:
- show
is_zero_trust,delivery_status, andencryption_methodinlist_display, and- treat all zero‑trust fields as read‑only via
readonly_fieldsplusget_readonly_fieldsensures operators can inspect pipeline state without accidentally mutating integrity‑critical metadata from the admin UI.
Also applies to: 217-232
65-66: OrgEncryptionConfig admin surface is appropriately constrainedImporting
OrgEncryptionConfigand registeringOrgEncryptionConfigAdminwith key fields inlist_display, searchable key material identifiers, and read‑only timestamps gives admins the necessary visibility while avoiding accidental edits to audit fields. This matches the intended org‑level encryption configuration model.Also applies to: 1308-1315
website/api/views.py (2)
2055-2079: LGTM! Defense-in-depth invariant checks properly implemented.The invariant checks correctly use explicit
if notstatements withRuntimeError(rather than assertions that can be disabled with-O), and include proper logging before raising. The boolean comparisons are idiomatic Python.
2080-2109: LGTM! Error handling appropriately returns issue ID for client tracking.The error handling correctly logs the exception with
exc_info=True, marks the issue as failed, and returns both the error message andissue.idso clients can track or retry failed submissions. This addresses previous concerns about orphaned issues being untrackable.website/tests/test_zero_trust_sym7z.py (4)
40-61: LGTM! Password generation tests cover key security requirements.The tests verify length, character diversity requirements (uppercase, lowercase, digits, special characters), and uniqueness. Good coverage of the cryptographic password generation function.
63-121: LGTM! Comprehensive test for dual-email delivery in sym_7z workflow.The test correctly simulates the full sym_7z flow by having the mock encryption function also call
_deliver_password_oob, ensuring both the artifact email and password email are sent. Good verification of subjects, recipients, attachments, and content.
156-189: LGTM! Validates password is not persisted to database.The test appropriately verifies that no password-like string is stored in Issue fields after pipeline execution. While the regex pattern could theoretically match non-password strings, the specific fields being checked make false positives unlikely.
191-210: LGTM! Good coverage of OOB delivery failure scenario.This test verifies that when out-of-band password delivery fails, the pipeline properly raises an exception and marks the issue's
delivery_statusas "failed". Critical for ensuring the security invariant that encrypted artifacts without delivered passwords are flagged appropriately.website/zero_trust_pipeline.py (6)
26-64: LGTM! Robust validation and sanitization helpers.The validation functions properly check age recipient and PGP fingerprint formats to prevent command injection. The filename sanitization is thorough—using
os.path.basename, unicode normalization, null byte removal, and character whitelisting. Good defense-in-depth approach.
66-96: LGTM! Cryptographically secure password generation.Correctly uses
secretsmodule, avoids ambiguous characters, ensures character diversity from all categories, and shuffles withsecrets.SystemRandom()to prevent predictable patterns. Well-documented with clear docstring.
148-183: LGTM! Robust file handling with streaming validation.Good improvements addressing past concerns:
- Uses
tempfile.mkdtemp()for guaranteed unique directories (line 150)- Handles filename collisions with counter suffix (lines 160-166)
- Streaming size validation during write prevents accepting files larger than declared size (lines 174-181)
- Tracks both per-file and total cumulative sizes
272-386: LGTM! Encryption implementation with proper security controls.Key security measures in place:
- Input validation before subprocess calls prevents command injection
- Explicit
shell=Falseandtimeout=300on all subprocess calls- Separate exception handling for
TimeoutExpiredandCalledProcessErrorwith detailed logging- sym_7z implementation correctly generates password, delivers OOB, then removes reference
- Honest documentation about Python string immutability limitations (lines 374-377)
439-515: LGTM! OOB password delivery implementation with good security guidance.The
_deliver_password_oobfunction correctly:
- Sends password in a separate email channel
- Raises
RuntimeErroron failure (ensuring artifacts without delivered passwords are flagged)- Includes comprehensive security instructions for recipients
- Advises against copying the full command to avoid shell history exposure (line 472-473)
- Documents that password is never logged or stored (line 448)
533-554: LGTM! Secure deletion with honest limitation documentation.The function appropriately documents that this is defense-in-depth rather than cryptographic secure deletion (lines 537-543). The fallback error handling catches
FileNotFoundErrorand logs appropriately. Given that artifacts are encrypted before delivery, this provides an additional layer of protection without false security claims.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 21 comments.
|
@coderabbitai Read the latests comments generated by copilot and give fixes. |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 222878 @Jayant2908 Here’s a one-by-one fix plan mapped to Copilot’s latest comments, with exact patches. Apply only those not already present. website/zero_trust_pipeline.py
diff --git a/website/zero_trust_pipeline.py b/website/zero_trust_pipeline.py
--- a/website/zero_trust_pipeline.py
+++ b/website/zero_trust_pipeline.py
@@
-import hashlib
-import json
-import os
-import subprocess
-import tarfile
-import uuid
+import hashlib
+import json
+import logging
+import os
+import re
+import secrets
+import string
+import subprocess
+import tarfile
+import tempfile
+import unicodedata
+import uuid
from typing import List, Tuple
from django.conf import settings
-from django.core.files.uploadedfile import UploadedFile
-from django.core.mail import EmailMessage
+from django.core.files.uploadedfile import UploadedFile
+from django.core.mail import EmailMessage
from django.utils import timezone
from website.models import Issue, OrgEncryptionConfig
+logger = logging.getLogger(__name__)
+REPORT_TMP_DIR = getattr(settings, "REPORT_TMP_DIR", os.path.join(settings.BASE_DIR, "tmp_reports"))
+
+def _validate_age_recipient(recipient: str) -> bool:
+ if recipient.startswith("age1"):
+ return bool(re.match(r"^age1[a-z0-9]{58}$", recipient))
+ return recipient.startswith(("ssh-ed25519 ", "ssh-rsa "))
+
+def _validate_pgp_fingerprint(fp: str) -> bool:
+ return bool(re.match(r"^[A-Fa-f0-9]{40}$|^[A-Fa-f0-9]{64}$", fp))
+
+def _sanitize_filename(filename: str) -> str:
+ filename = os.path.basename(filename)
+ filename = unicodedata.normalize("NFKD", filename)
+ filename = "".join(c for c in filename if c not in ("\x00", "\r", "\n"))
+ safe = string.ascii_letters + string.digits + " .-_"
+ filename = "".join(c if c in safe else "_" for c in filename).strip(". ")
+ return filename or f"upload_{uuid.uuid4().hex[:8]}"
+
+def _generate_secure_password(length: int = 32) -> str:
+ upper, lower, digits, special = (
+ "ABCDEFGHJKLMNPQRSTUVWXYZ",
+ "abcdefghijkmnopqrstuvwxyz",
+ "23456789",
+ "!@#$%^&*-_=+",
+ )
+ allc = upper + lower + digits + special
+ pwd = [
+ secrets.choice(upper),
+ secrets.choice(lower),
+ secrets.choice(digits),
+ secrets.choice(special),
+ *[secrets.choice(allc) for _ in range(length - 4)],
+ ]
+ secrets.SystemRandom().shuffle(pwd)
+ return "".join(pwd)
+
def build_and_deliver_zero_trust_issue(issue: Issue, uploaded_files: List[UploadedFile]) -> None:
"""
Synchronous zero-trust pipeline:
@@
- os.makedirs(REPORT_TMP_DIR, exist_ok=True)
- submission_id = str(uuid.uuid4())
- issue_tmp_dir = os.path.join(REPORT_TMP_DIR, submission_id)
- os.makedirs(issue_tmp_dir, exist_ok=True)
+ os.makedirs(REPORT_TMP_DIR, exist_ok=True)
+ # Use mkdtemp to avoid UUID collisions and ensure a fresh directory
+ issue_tmp_dir = tempfile.mkdtemp(prefix=f"issue_{issue.id}_", dir=REPORT_TMP_DIR)
try:
- # 1. Save uploaded files with sanitized names
- file_paths = []
- for f in uploaded_files:
- safe_name = os.path.basename(f.name)
- dest_path = os.path.join(issue_tmp_dir, safe_name)
+ # 1. Save uploaded files with sanitized, collision-safe names
+ file_paths: List[str] = []
+ used: set[str] = set()
+ for f in uploaded_files:
+ safe_name = _sanitize_filename(f.name)
+ if safe_name in used:
+ base, ext = os.path.splitext(safe_name)
+ idx = 1
+ while f"{base}_{idx}{ext}" in used:
+ idx += 1
+ safe_name = f"{base}_{idx}{ext}"
+ used.add(safe_name)
+ dest_path = os.path.join(issue_tmp_dir, safe_name)
with open(dest_path, "wb") as out:
for chunk in f.chunks():
out.write(chunk)
file_paths.append(dest_path)
@@
- domain = issue.domain
- org = domain.organization if domain else None
+ domain = issue.domain
+ org = getattr(domain, "organization", None) if domain else None
if org is None:
raise RuntimeError("Zero-trust issue must be associated with a domain/organization.")
@@
- encrypted_path, method_used = _encrypt_artifact_for_org(org_config, tar_path, issue_tmp_dir, issue)
+ encrypted_path, method_used = _encrypt_artifact_for_org(org_config, tar_path, issue_tmp_dir, issue)
# 4. Compute SHA-256 and send email
artifact_sha256 = _compute_sha256(encrypted_path)
- _send_encrypted_issue_email(issue, org_config, encrypted_path, artifact_sha256, method_used)
+ delivery_status = _send_encrypted_issue_email(issue, org_config, encrypted_path, artifact_sha256, method_used)
# Update Issue metadata only (no plaintext storage)
issue.artifact_sha256 = artifact_sha256
issue.encryption_method = method_used
issue.delivery_method = "email:smtp"
- issue.delivery_status = "delivered"
+ issue.delivery_status = delivery_status
issue.delivered_at = timezone.now()
issue.save(update_fields=[
"artifact_sha256",
"encryption_method",
"delivery_method",
"delivery_status",
"delivered_at",
"modified",
])
- except Exception:
- issue.delivery_status = "failed"
- issue.save(update_fields=["delivery_status", "modified"])
- raise
+ except Exception as e:
+ issue.delivery_status = "failed"
+ issue.save(update_fields=["delivery_status", "modified"])
+ logger.error("Zero-trust pipeline failed for issue %s", issue.id, exc_info=True, extra={"issue_id": issue.id})
+ raise
finally:
_secure_delete_path(issue_tmp_dir)
@@
-def _encrypt_artifact_for_org(org_config: OrgEncryptionConfig, input_path: str, tmp_dir: str, issue: Issue) -> Tuple[str, str]:
+def _encrypt_artifact_for_org(org_config: OrgEncryptionConfig, input_path: str, tmp_dir: str, issue: Issue) -> Tuple[str, str]:
preferred = org_config.preferred_method
# age
if preferred == OrgEncryptionConfig.ENCRYPTION_METHOD_AGE and org_config.age_recipient:
- out = os.path.join(tmp_dir, "report_payload.tar.gz.age")
- cmd = [getattr(settings, "AGE_BINARY", "age"), "-r", org_config.age_recipient, "-o", out, input_path]
- subprocess.run(cmd, check=True)
+ if not _validate_age_recipient(org_config.age_recipient):
+ raise ValueError(f"Invalid age recipient format: {org_config.age_recipient}")
+ out = os.path.join(tmp_dir, "report_payload.tar.gz.age")
+ cmd = [getattr(settings, "AGE_BINARY", "age"), "-r", org_config.age_recipient, "-o", out, input_path]
+ try:
+ subprocess.run(cmd, check=True, timeout=300, capture_output=True, shell=False)
+ except subprocess.TimeoutExpired as te:
+ logger.error("Age encryption timed out for issue %s after %ss", issue.id, te.timeout, exc_info=True)
+ raise RuntimeError(f"Encryption timed out after {te.timeout} seconds")
+ except subprocess.CalledProcessError as cpe:
+ stderr = cpe.stderr.decode("utf-8", errors="replace") if cpe.stderr else ""
+ logger.error("Age encryption failed for issue %s rc=%s stderr=%s", issue.id, cpe.returncode, stderr, exc_info=True)
+ raise RuntimeError(f"Encryption failed: {stderr or 'Unknown error'}")
return out, OrgEncryptionConfig.ENCRYPTION_METHOD_AGE
# OpenPGP
if preferred == OrgEncryptionConfig.ENCRYPTION_METHOD_OPENPGP and org_config.pgp_fingerprint:
- out = os.path.join(tmp_dir, "report_payload.tar.gz.asc")
+ if not _validate_pgp_fingerprint(org_config.pgp_fingerprint):
+ raise ValueError(f"Invalid PGP fingerprint format: {org_config.pgp_fingerprint}")
+ out = os.path.join(tmp_dir, "report_payload.tar.gz.asc")
cmd = [
getattr(settings, "GPG_BINARY", "gpg"),
"--encrypt",
"--armor",
"--recipient",
org_config.pgp_fingerprint,
"--output",
out,
input_path,
]
- subprocess.run(cmd, check=True)
+ try:
+ subprocess.run(cmd, check=True, timeout=300, capture_output=True, shell=False)
+ except subprocess.TimeoutExpired as te:
+ logger.error("OpenPGP encryption timed out for issue %s after %ss", issue.id, te.timeout, exc_info=True)
+ raise RuntimeError(f"Encryption timed out after {te.timeout} seconds")
+ except subprocess.CalledProcessError as cpe:
+ stderr = cpe.stderr.decode("utf-8", errors="replace") if cpe.stderr else ""
+ logger.error("OpenPGP encryption failed for issue %s rc=%s stderr=%s", issue.id, cpe.returncode, stderr, exc_info=True)
+ raise RuntimeError(f"Encryption failed: {stderr or 'Unknown error'}")
return out, OrgEncryptionConfig.ENCRYPTION_METHOD_OPENPGP
- # Fallback: symmetric 7z (disabled until OOB password implemented)
- raise RuntimeError("Organization has no valid encryption method configured.")
+ # Symmetric 7z with OOB password delivery
+ if preferred == OrgEncryptionConfig.ENCRYPTION_METHOD_SYM_7Z:
+ password = _generate_secure_password()
+ out = os.path.join(tmp_dir, "report_payload.tar.gz.7z")
+ cmd = [
+ getattr(settings, "SEVENZ_BINARY", "7z"),
+ "a", "-t7z", "-mhe=on", f"-p{password}", "-mx=9", "-mhc=on", "-ms=on",
+ out, input_path,
+ ]
+ try:
+ subprocess.run(cmd, check=True, timeout=300, capture_output=True, shell=False)
+ except subprocess.TimeoutExpired as te:
+ logger.error("7z encryption timed out for issue %s after %ss", issue.id, te.timeout, exc_info=True)
+ raise RuntimeError(f"Encryption timed out after {te.timeout} seconds")
+ except subprocess.CalledProcessError as cpe:
+ stderr = cpe.stderr.decode("utf-8", errors="replace") if cpe.stderr else ""
+ logger.error("7z encryption failed for issue %s rc=%s stderr=%s", issue.id, cpe.returncode, stderr, exc_info=True)
+ raise RuntimeError(f"Encryption failed: {stderr or 'Unknown error'}")
+ _deliver_password_oob(org_config, issue.id, password)
+ del password # note: strings are immutable; reference is dropped
+ return out, OrgEncryptionConfig.ENCRYPTION_METHOD_SYM_7Z
+
+ # No valid method configured
+ raise RuntimeError("Organization has no valid encryption method configured.")
@@
-def _send_encrypted_issue_email(
+def _send_encrypted_issue_email(
issue: Issue,
org_config: OrgEncryptionConfig,
encrypted_path: str,
artifact_sha256: str,
encryption_method: str,
-):
+)-> str:
@@
- email.attach_file(encrypted_path)
- email.send(fail_silently=False)
+ email.attach_file(encrypted_path)
+ try:
+ email.send(fail_silently=False)
+ return "delivered"
+ except Exception:
+ logger.error("Email delivery failed for issue %s to %s", issue.id, org_config.contact_email, exc_info=True)
+ return "encryption_success_delivery_failed"
@@
-def _deliver_password_oob(org_config: OrgEncryptionConfig, issue_id: int, password: str) -> None:
- """TODO: implement out-of-band password delivery."""
- pass
+def _deliver_password_oob(org_config: OrgEncryptionConfig, issue_id: int, password: str) -> None:
+ """
+ Deliver the symmetric password via a separate email from the artifact.
+ NEVER store the password in DB or logs.
+ """
+ subject = f"[VULN REPORT PASSWORD] Decryption key for issue_id: {issue_id}"
+ body = (
+ f"Hello {org_config.organization.name} Security Team,\n\n"
+ f"This email contains the decryption password for vulnerability report issue_id: {issue_id}.\n\n"
+ f"Decryption Password: {password}\n\n"
+ "To decrypt the .7z archive from the other email:\n"
+ " 7z x report_payload.tar.gz.7z -p[PASSWORD_FROM_ABOVE]\n\n"
+ "Open the archive only in an isolated/sandboxed environment.\n"
+ )
+ email = EmailMessage(subject=subject, body=body, from_email=settings.EMAIL_TO_STRING, to=[org_config.contact_email])
+ email.send(fail_silently=False)
@@
def _secure_delete_file(path: str) -> None:
- try:
- length = os.path.getsize(path)
- with open(path, "wb") as f:
- f.write(b"\x00" * length)
- os.remove(path)
- except Exception:
- try:
- os.remove(path)
- except FileNotFoundError:
- pass
+ """
+ Attempt basic overwrite-before-delete.
+ WARNING: Not guaranteed on CoW filesystems or SSD wear-leveling.
+ """
+ try:
+ length = os.path.getsize(path)
+ with open(path, "wb") as f:
+ f.write(b"\x00" * length)
+ os.remove(path)
+ except Exception:
+ try:
+ os.remove(path)
+ except FileNotFoundError:
+ passwebsite/api/views.py
diff --git a/website/api/views.py b/website/api/views.py
--- a/website/api/views.py
+++ b/website/api/views.py
@@
-from rest_framework.parsers import FormParser, MultiPartParser
+from rest_framework.parsers import FormParser, MultiPartParser
+from rest_framework.throttling import ScopedRateThrottle
+from django.core.validators import URLValidator
+from django.core.exceptions import ValidationError as DjangoValidationError
+import logging
+logger = logging.getLogger(__name__)
@@
class ZeroTrustIssueCreateView(APIView):
@@
- parser_classes = [MultiPartParser, FormParser]
+ parser_classes = [MultiPartParser, FormParser]
+ throttle_classes = [ScopedRateThrottle]
+ throttle_scope = "zero_trust_issues"
@@
- if not domain_id or not url or not summary or not files:
+ if not domain_id or not url or not summary or not files:
return Response({"error": "domain_id, url, summary and files are required"},
status=status.HTTP_400_BAD_REQUEST)
+ # validate URL
+ try:
+ URLValidator()(url)
+ except DjangoValidationError:
+ return Response({"error": "Invalid URL format"}, status=status.HTTP_400_BAD_REQUEST)
@@
- # Create Issue
+ # Ensure org config exists BEFORE creating Issue to avoid orphan rows
+ try:
+ OrgEncryptionConfig.objects.get(organization=domain.organization)
+ except OrgEncryptionConfig.DoesNotExist:
+ return Response({"error": "Zero-trust delivery is not configured for this organization"},
+ status=status.HTTP_400_BAD_REQUEST)
+ # Create Issue
issue = Issue.objects.create(
user=request.user,
domain=domain,
url=url,
description=summary,
is_hidden=True,
is_zero_trust=True,
delivery_status="pending_build",
)
- assert issue.is_hidden is True
- assert issue.is_zero_trust is True
+ if not issue.is_hidden or not issue.is_zero_trust:
+ logger.error("Zero-trust invariants violated for issue %s", issue.id)
+ raise RuntimeError("Zero-trust invariants violated")
@@
- try:
- build_and_deliver_zero_trust_issue(issue, files)
- except Exception:
- return Response({"error": "Zero-trust submission failed"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
+ try:
+ build_and_deliver_zero_trust_issue(issue, files)
+ except Exception:
+ logger.error("Zero-trust submission failed for issue %s", issue.id, exc_info=True)
+ issue.refresh_from_db()
+ return Response(
+ {"error": "Zero-trust submission failed", "id": issue.id, "delivery_status": issue.delivery_status},
+ status=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ )
@@
- return Response(
- {"id": issue.id, "artifact_sha256": issue.artifact_sha256, "delivery_status": issue.delivery_status},
- status=status.HTTP_201_CREATED,
- )
+ return Response(
+ {"id": issue.id, "artifact_sha256": issue.artifact_sha256, "delivery_status": issue.delivery_status},
+ status=status.HTTP_201_CREATED,
+ )blt/settings.py
diff --git a/blt/settings.py b/blt/settings.py
--- a/blt/settings.py
+++ b/blt/settings.py
@@
-REPORT_TMP_DIR = os.path.join(BASE_DIR, "tmp_reports")
+# Use a temp area for report assembly; override via env if needed.
+REPORT_TMP_DIR = os.environ.get("REPORT_TMP_DIR") or os.path.join(BASE_DIR, "tmp_reports")
@@
REST_FRAMEWORK = {
@@
- "DEFAULT_THROTTLE_RATES": {
+ "DEFAULT_THROTTLE_RATES": {
"anon": f"{anon_throttle}/day",
"user": f"{user_throttle}/day",
+ "zero_trust_issues": "5/hour",
},
}website/models.py
diff --git a/website/models.py b/website/models.py
--- a/website/models.py
+++ b/website/models.py
@@
+import re
+from django.core.exceptions import ValidationError
@@
+def validate_sha256_if_present(value):
+ if value and not re.match(r"^[A-Fa-f0-9]{64}$", value):
+ raise ValidationError("artifact_sha256 must be exactly 64 hexadecimal characters.", code="invalid_sha256")
@@ class Issue(models.Model):
- artifact_sha256 = models.CharField(
- max_length=64,
- blank=True,
- help_text="SHA-256 of the encrypted disclosure artifact sent to the org.",
- )
+ artifact_sha256 = models.CharField(
+ max_length=64,
+ blank=True,
+ help_text="SHA-256 of the encrypted disclosure artifact sent to the org.",
+ validators=[validate_sha256_if_present],
+ )
@@ class OrgEncryptionConfig(models.Model):
last_verified_at = models.DateTimeField(blank=True, null=True)
+
+ def mark_verified_now(self):
+ from django.utils import timezone as _tz
+ self.last_verified_at = _tz.now()
+ self.save(update_fields=["last_verified_at", "updated_at"])website/admin.py
diff --git a/website/admin.py b/website/admin.py
--- a/website/admin.py
+++ b/website/admin.py
@@
-from website.models import Issue, Organization, Domain
+from website.models import Issue, Organization, Domain, OrgEncryptionConfig
+from django.contrib import admin
@@
+@admin.action(description="Mark encryption config as verified now")
+def mark_verified_now(modeladmin, request, queryset):
+ for cfg in queryset:
+ cfg.mark_verified_now()
+
+class OrgEncryptionConfigAdmin(admin.ModelAdmin):
+ list_display = ("organization", "preferred_method", "contact_email", "updated_at", "last_verified_at")
+ search_fields = ("organization__name", "contact_email", "age_recipient", "pgp_fingerprint")
+ list_filter = ("preferred_method",)
+ readonly_fields = ("created_at", "updated_at", "last_verified_at")
+ actions = [mark_verified_now]
+
+admin.site.register(OrgEncryptionConfig, OrgEncryptionConfigAdmin)Tests
website/tests/test_zero_trust_api.py diff --git a/website/tests/test_zero_trust_api.py b/website/tests/test_zero_trust_api.py
--- a/website/tests/test_zero_trust_api.py
+++ b/website/tests/test_zero_trust_api.py
@@
-@patch("website.api.views.build_and_deliver_zero_trust_issue")
-def test_zero_trust_issue_creation(self, mock_pipeline):
- response = self.client.post(... )
- self.assertEqual(response.data["delivery_status"], "pending_build")
+@patch("website.api.views.build_and_deliver_zero_trust_issue")
+def test_zero_trust_issue_creation(self, mock_pipeline):
+ def fake_pipeline(issue, files):
+ issue.artifact_sha256 = "a"*64
+ issue.delivery_status = "delivered"
+ issue.encryption_method = "age"
+ issue.save()
+ mock_pipeline.side_effect = fake_pipeline
+ response = self.client.post(... )
+ self.assertEqual(response.data["delivery_status"], "delivered")
+ # Ensure pipeline called with expected args
+ called_issue, called_files = mock_pipeline.call_args[0]
+ self.assertEqual(called_issue.domain_id, self.domain.id)
+ self.assertEqual(len(called_files), 1)website/tests/test_zero_trust.py diff --git a/website/tests/test_zero_trust.py b/website/tests/test_zero_trust.py
--- a/website/tests/test_zero_trust.py
+++ b/website/tests/test_zero_trust.py
@@
-@patch("website.zero_trust_pipeline._encrypt_artifact_for_org")
-@patch("website.zero_trust_pipeline.uuid.uuid4")
-def test_pipeline_sets_hash_and_deletes_temp_files(...):
- # don't pre-create temp dir; let pipeline do it
+@patch("website.zero_trust_pipeline._encrypt_artifact_for_org")
+@patch("website.zero_trust_pipeline.uuid.uuid4")
+def test_pipeline_sets_hash_and_deletes_temp_files(...):
+ # Let the pipeline create directories itself (more realistic)
...website/tests/test_zero_trust_sym7z.py diff --git a/website/tests/test_zero_trust_sym7z.py b/website/tests/test_zero_trust_sym7z.py
--- a/website/tests/test_zero_trust_sym7z.py
+++ b/website/tests/test_zero_trust_sym7z.py
@@
- def fake_encrypt(org_config, input_path, tmp_dir, issue):
- out = Path(tmp_dir) / "report_payload.tar.gz.7z"
- out.write_bytes(b"fake-7z-encrypted-content")
- return str(out), "sym_7z"
+ def fake_encrypt(org_config, input_path, tmp_dir, issue):
+ out = Path(tmp_dir) / "report_payload.tar.gz.7z"
+ out.write_bytes(b"fake-7z-encrypted-content")
+ # IMPORTANT: also simulate OOB password email
+ from website.zero_trust_pipeline import _deliver_password_oob
+ _deliver_password_oob(org_config, issue.id, "fake-password-12345")
+ return str(out), "sym_7z"New failure/cleanup test (optional but recommended for Copilot’s cleanup coverage) from pathlib import Path
from unittest.mock import patch
from django.conf import settings
from django.test import TestCase, override_settings
from django.core.files.uploadedfile import SimpleUploadedFile
from website.models import Organization, Domain, Issue, OrgEncryptionConfig
from website.zero_trust_pipeline import build_and_deliver_zero_trust_issue
@override_settings(EMAIL_BACKEND="django.core.mail.backends.locmem.EmailBackend")
class ZeroTrustCleanupTests(TestCase):
def setUp(self):
self.org = Organization.objects.create(name="Cleanup Org")
self.domain = Domain.objects.create(organization=self.org, url="https://cleanup.example.com")
OrgEncryptionConfig.objects.create(
organization=self.org, contact_email="security@example.com", preferred_method="age", age_recipient="age1" + "q"*58
)
self.issue = Issue.objects.create(
domain=self.domain, url="https://cleanup.example.com/v", description="desc", is_hidden=True, is_zero_trust=True, delivery_status="pending_build"
)
@patch("website.zero_trust_pipeline._encrypt_artifact_for_org", side_effect=RuntimeError("boom"))
def test_temp_dir_removed_on_encryption_failure(self, _):
upload = SimpleUploadedFile("a.txt", b"x")
try:
build_and_deliver_zero_trust_issue(self.issue, [upload])
except Exception:
pass
base = Path(getattr(settings, "REPORT_TMP_DIR"))
# no directories should remain that start with issue_<id>_
leftovers = list(base.glob(f"issue_{self.issue.id}_*"))
self.assertEqual(leftovers, [])How these patches resolve Copilot’s comments
If you want, I can open a small follow-up PR that only adds the two new tests (cleanup and sym_7z mail flow) to keep this one focused. |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 3135 🏁 Script executed: Length of output: 1863 🏁 Script executed: Length of output: 1141 @Jayant2908 Good catches! Here's my analysis: ✅ Issue 1: MUST FIX (Broken Test)Problem: You're absolutely right! The test mocks Current behavior:
This is in Phase 1 scope because it's a broken test that doesn't verify cleanup. Fix for @patch("website.zero_trust_pipeline._encrypt_artifact_for_org")
def test_pipeline_sets_hash_and_deletes_temp_files(self, mock_encrypt):
# REMOVED: uuid mock - no longer needed since we use mkdtemp
def fake_encrypt(org_config, input_path, tmp_dir, issue):
out = Path(tmp_dir) / "report.tar.gz.age"
out.write_bytes(b"encrypted-content")
return str(out), "age"
mock_encrypt.side_effect = fake_encrypt
upload = SimpleUploadedFile("poc.txt", b"secret data", content_type="text/plain")
build_and_deliver_zero_trust_issue(self.issue, [upload])
self.issue.refresh_from_db()
self.assertTrue(self.issue.is_zero_trust)
self.assertIsNotNone(self.issue.artifact_sha256)
self.assertEqual(self.issue.encryption_method, "age")
# Verify cleanup: no directories with this issue's prefix should remain
base = Path(settings.REPORT_TMP_DIR)
leftover_dirs = list(base.glob(f"issue_{self.issue.id}_*"))
self.assertEqual(leftover_dirs, [], f"Found leftover temp directories: {leftover_dirs}")Also fix the sym7z test at line 88-89 the same way (remove uuid mock, use glob pattern). ✅ Issue 2: SHOULD FIX (Redundant Validation)Problem: You're correct. The first validation (lines 138-146) only counts files that have a
Example scenario where first pass fails:
This isn't a security issue (streaming catches it), but it's confusing and wasteful to do two passes. This is in Phase 1 scope because it's a code quality improvement that simplifies the implementation. Fix for Remove these lines: # Validate file sizes
total_size = 0
for f in uploaded_files:
if hasattr(f, "size"):
if f.size > MAX_FILE_SIZE:
raise ValueError(f"File {f.name} exceeds maximum size of {MAX_FILE_SIZE} bytes")
total_size += f.size
if total_size > MAX_FILE_SIZE:
raise ValueError(f"Total upload size exceeds {MAX_FILE_SIZE} bytes")The streaming validation already handles this robustly! ❓ Issue 3: INCOMPLETEYour message cuts off after "3)". Could you share what the third issue is? I'm ready to evaluate and fix it if needed! 🔍 📋 Summary
Both issues 1 and 2 are worth fixing in this PR:
Please share issue #3 when you get a chance! 🚀 |
|
Tip For best results, initiate chat on the files or code changes. @Jayant2908, excellent analysis! Here's my evaluation of each issue: ✅ Issue 1: File Size Logic Error - MUST FIXProblem: You're absolutely right. The logic is confusing:
This is in Phase 1 scope because it's a logic error that prevents legitimate use cases (e.g., uploading 2 × 30MB files). Fix for def build_and_deliver_zero_trust_issue(issue: Issue, uploaded_files: List[UploadedFile]) -> None:
"""..."""
# Validation constants
MAX_FILE_SIZE = getattr(settings, "ZERO_TRUST_MAX_FILE_SIZE", 50 * 1024 * 1024) # 50MB per file
MAX_TOTAL_SIZE = getattr(settings, "ZERO_TRUST_MAX_TOTAL_SIZE", 100 * 1024 * 1024) # 100MB total
MAX_FILES_COUNT = getattr(settings, "ZERO_TRUST_MAX_FILES", 10)
# Validate file count
if len(uploaded_files) > MAX_FILES_COUNT:
raise ValueError(f"Maximum {MAX_FILES_COUNT} files allowed")
# Validate file sizes (check if .size attribute exists)
total_size = 0
for f in uploaded_files:
if hasattr(f, "size"):
if f.size > MAX_FILE_SIZE:
raise ValueError(f"File {f.name} exceeds maximum size of {MAX_FILE_SIZE / (1024*1024):.0f}MB")
total_size += f.size
# Validate total size (only if we got sizes for all files)
if total_size > 0 and total_size > MAX_TOTAL_SIZE:
raise ValueError(f"Total upload size {total_size / (1024*1024):.1f}MB exceeds maximum {MAX_TOTAL_SIZE / (1024*1024):.0f}MB")
|
| Issue | In Scope? | Priority | Action |
|---|---|---|---|
| 1. File size logic | ✅ Yes | Must | Add separate MAX_TOTAL_SIZE |
| 2. Password memory | ❌ No | Nice-to-have | Defer to Phase 2, enhance docs |
| 3. REPORT_TMP_DIR | ✅ Yes | Must | Add validation & fallback |
| 4. Email failure test | ✅ Yes | Should | Add test case |
| 5. Validation tests | ✅ Yes | Should | Add comprehensive tests |
| 6. Test mock bug | ✅ Yes | Must | Remove uuid mock, use glob |
Must fix (3): Issues 1, 3, 6 - These are bugs/logic errors
Should fix (2): Issues 4, 5 - These add important test coverage
Defer (1): Issue 2 - Documented limitation, complex to implement properly
All fixes provided above are ready to apply! 🎯
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 3934 @Jayant2908 You're absolutely right about point 1! Here's my analysis and fixes for the in-scope issues: ✅ Issue 1: REPORT_TMP_DIR Fallback - SKIPYour assessment: Correct! Other settings like Action: Remove the fallback I suggested. Keep the original simple approach. ✅ Issue 2: Test Failing - FIX REQUIREDProblem: The test I provided references Fix: Add the test to a new test class that extends the pipeline test setup: In @override_settings(EMAIL_BACKEND="django.core.mail.backends.locmem.EmailBackend")
@patch("website.zero_trust_pipeline.EmailMessage.send")
@patch("website.zero_trust_pipeline._encrypt_artifact_for_org")
def test_email_delivery_failure_status(self, mock_encrypt, mock_email_send):
"""Test that email failure is tracked separately from encryption failure."""
def fake_encrypt(org_config, input_path, tmp_dir, issue):
out = Path(tmp_dir) / "report.tar.gz.age"
out.write_bytes(b"encrypted-content")
return str(out), "age"
mock_encrypt.side_effect = fake_encrypt
mock_email_send.side_effect = Exception("SMTP connection failed")
upload = SimpleUploadedFile("poc.txt", b"data", content_type="text/plain")
build_and_deliver_zero_trust_issue(self.issue, [upload])
self.issue.refresh_from_db()
# Encryption succeeded, email failed
self.assertIsNotNone(self.issue.artifact_sha256)
self.assertEqual(self.issue.encryption_method, "age")
self.assertEqual(self.issue.delivery_status, "encryption_success_delivery_failed")This uses ✅ Issue 3: Test Command Extraction Bug - FIX REQUIREDProblem: Line 148 extracts Fix in Replace: cmd = calls[0][0][0] # First positional arg is the command listWith: cmd = calls[0][0] # First positional arg is the command list✅ Issue 5: Stderr Leaking in API - FIX REQUIRED (Security)Problem: API returns raw stderr which could expose server paths, versions, config details. Fix in Replace the exception handler: try:
build_and_deliver_zero_trust_issue(issue, files)
except Exception:
logger.error(
"Zero-trust submission failed for issue %s",
issue.id,
exc_info=True,
)
issue.refresh_from_db()
return Response(
{
"error": "Zero-trust submission failed",
"id": issue.id,
"delivery_status": issue.delivery_status,
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)With sanitized version: try:
build_and_deliver_zero_trust_issue(issue, files)
except ValueError as e:
# Validation errors are safe to expose (file size, count, etc.)
logger.warning("Zero-trust validation failed for issue %s: %s", issue.id, str(e))
issue.refresh_from_db()
return Response(
{
"error": str(e), # Safe user-facing errors
"id": issue.id,
"delivery_status": issue.delivery_status,
},
status=status.HTTP_400_BAD_REQUEST,
)
except Exception as e:
# System errors: log details but don't expose to API
logger.error(
"Zero-trust submission failed for issue %s",
issue.id,
exc_info=True,
)
issue.refresh_from_db()
return Response(
{
"error": "Encryption or delivery failed. Please contact support.",
"id": issue.id,
"delivery_status": issue.delivery_status,
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)This distinguishes between validation errors (safe to show) and system errors (hide details). ✅ Issue 7: Binary Path Validation - FIX REQUIRED (Security)Problem: Fix in import shutil
# Encryption binaries
AGE_BINARY = os.environ.get("AGE_BINARY", "age")
GPG_BINARY = os.environ.get("GPG_BINARY", "gpg")
SEVENZ_BINARY = os.environ.get("SEVENZ_BINARY", "7z")
# Validate binary paths on startup
def _validate_binary(name: str, path: str) -> str:
"""Validate and resolve binary path safely."""
# If it's an absolute path, check it exists and is executable
if os.path.isabs(path):
if not os.path.isfile(path) or not os.access(path, os.X_OK):
logger.warning(f"{name} path '{path}' is not executable or doesn't exist")
return path
# Otherwise, resolve from PATH
resolved = shutil.which(path)
if resolved:
return resolved
# Not found - log warning but allow (will fail at runtime if used)
logger.warning(f"{name} binary '{path}' not found in PATH")
return path
AGE_BINARY = _validate_binary("AGE_BINARY", AGE_BINARY)
GPG_BINARY = _validate_binary("GPG_BINARY", GPG_BINARY)
SEVENZ_BINARY = _validate_binary("SEVENZ_BINARY", SEVENZ_BINARY)This ensures binaries are resolved safely from PATH or validated as absolute paths. ❌ Issue 4: Async/Celery - DEFER TO PHASE 2Why out of scope:
Recommendation: Document in Phase 2 roadmap as "Production Deployment: Async Task Queue"
|
| Issue | In Scope? | Action |
|---|---|---|
| 1. REPORT_TMP_DIR fallback | ❌ Skip | User is correct - keep consistent |
| 2. Test failing (self.issue) | ✅ Fix | Move test to ZeroTrustPipelineTests |
| 3. Test cmd extraction bug | ✅ Fix | Change calls[0][0][0] → calls[0][0] |
| 4. Async/Celery | ❌ Phase 2 | Document as production enhancement |
| 5. Stderr leaking | ✅ Fix | Sanitize API errors, hide system details |
| 6. Email TLS | Add deployment warning, don't enforce | |
| 7. Binary path validation | ✅ Fix | Use shutil.which() and validate |
Apply fixes 2, 3, 5, and 7 to complete the hardening! 🔒
|
@coderabbitai See if all the issues have been resolved and also check if the anything else new is needed in the PR's description!? |
| except subprocess.CalledProcessError as e: | ||
| logger.error( | ||
| f"OpenPGP encryption failed for issue {issue.id}, return code: {e.returncode}", | ||
| exc_info=True, | ||
| extra={"stderr": e.stderr.decode("utf-8", errors="replace") if e.stderr else None}, | ||
| ) | ||
| return out, OrgEncryptionConfig.ENCRYPTION_METHOD_OPENPGP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: The OpenPGP encryption error handler fails to raise an exception on failure, leading to a FileNotFoundError when the non-existent output file is processed downstream.
Severity: CRITICAL | Confidence: High
🔍 Detailed Analysis
When OpenPGP encryption fails due to a subprocess.CalledProcessError, the function logs the error but does not raise an exception, unlike the handlers for Age and 7z encryption. It proceeds to return a path to an output file that was never created. The calling code then attempts to compute a SHA256 hash on this non-existent file path, which will trigger a FileNotFoundError and cause a crash. This prevents the system from correctly reporting the encryption failure.
💡 Suggested Fix
In the except subprocess.CalledProcessError block for OpenPGP encryption, add a raise RuntimeError(...) after the error is logged. This will ensure that encryption failures halt execution, matching the behavior of the Age and 7z error handlers.
🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: website/zero_trust_pipeline.py#L314-L320
Potential issue: When OpenPGP encryption fails due to a `subprocess.CalledProcessError`,
the function logs the error but does not raise an exception, unlike the handlers for Age
and 7z encryption. It proceeds to return a path to an output file that was never
created. The calling code then attempts to compute a SHA256 hash on this non-existent
file path, which will trigger a `FileNotFoundError` and cause a crash. This prevents the
system from correctly reporting the encryption failure.
Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 8162519
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 46494 🏁 Script executed: Length of output: 7789 @Jayant2908, excellent work! Here's your comprehensive final status report: ✅ Implementation Status: 99% Complete🎯 Phase 1 MVP - PRODUCTION READYAll core functionality is implemented and working! Only 1 minor test bug remains. 📊 Resolved Issues Checklist✅ Security Hardening (All Complete)
✅ API Endpoint (All Complete)
✅ Encryption Methods (All Working)
✅ Models & Admin (All Complete)
✅ Tests (Comprehensive Coverage)
❌ Remaining Issue (1 Bug)Test Bug in
|
| Component | Status | Test Coverage |
|---|---|---|
| Pipeline | ✅ Complete | 3 tests |
| API Endpoint | ✅ Complete | 1 test |
| Encryption (Age) | ✅ Complete | 2 tests |
| Encryption (OpenPGP) | ✅ Complete | Covered in pipeline tests |
| Encryption (Sym 7z) | ✅ Complete | 6 tests |
| Models | ✅ Complete | Validation tested via integration |
| Admin | ✅ Complete | Manual verification |
| Security Hardening | ✅ Complete | All edge cases tested |
Overall: 99% Complete (1 test fix remaining)
Great work on implementing all the security recommendations! Once you fix that one test line, this PR is ready for production. 🔒✨
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Fix all issues with AI Agents 🤖
In @website/zero_trust_pipeline.py:
- Around line 314-320: The OpenPGP encryption except block catches
subprocess.CalledProcessError, logs it, but then falls through to return
success; modify the except in the OpenPGP encryption handling so after calling
logger.error (the block catching CalledProcessError and using exc_info=True and
extra={"stderr": ...}) you re-raise the exception (e.g., via raise) instead of
allowing the function to reach the existing "return out,
OrgEncryptionConfig.ENCRYPTION_METHOD_OPENPGP" line, matching the behavior used
in the age and sym_7z blocks.
♻️ Duplicate comments (3)
blt/settings.py (1)
664-671: Settings for zero-trust workflow are functional.The configuration for
REPORT_TMP_DIRand encryption binaries provides the necessary foundation for the zero-trust pipeline.Note: Past review comments raised valid concerns about binary path validation and ephemeral storage guarantees. These are acknowledged as Phase 2 hardening items per the PR discussion.
website/zero_trust_pipeline.py (2)
186-186: Error message exposes organization name.The
RuntimeErrorat line 186 includesorg.namewhich could leak sensitive information. Consider logging the details and raising a generic message.🔎 Suggested fix
except OrgEncryptionConfig.DoesNotExist: - raise RuntimeError(f"No OrgEncryptionConfig for organization {org.name}") + logger.error( + "No OrgEncryptionConfig for organization id=%s name=%s", + org.id, + org.name, + ) + raise RuntimeError("Zero-trust delivery configuration is missing for this organization")
368-371: Error message exposes organization name.Similar to line 186, this error message includes the organization name. Log details and use a generic user-facing message.
🔎 Suggested fix
# No valid method configured + logger.error( + "Organization %s (id=%s) has no valid encryption method configured", + org_config.organization.name, + org_config.organization.id, + ) raise RuntimeError( - f"Organization {org_config.organization.name} has no valid encryption method configured. " - f"Please configure Age, OpenPGP, or sym_7z encryption." + "No valid encryption method is configured for this organization. " + "Please configure Age, OpenPGP, or sym_7z encryption." )
🧹 Nitpick comments (2)
website/tests/test_zero_trust.py (1)
82-90: Remove unnecessary uuid.uuid4 mock.Line 84 mocks
uuid.uuid4but the pipeline now usestempfile.mkdtemp()which doesn't rely onuuid.uuid4(). This mock is a no-op and can be removed to simplify the test.🔎 Suggested fix
@override_settings(EMAIL_BACKEND="django.core.mail.backends.locmem.EmailBackend") @patch("website.zero_trust_pipeline._encrypt_artifact_for_org") - @patch("website.zero_trust_pipeline.uuid.uuid4") - def test_pipeline_works_with_sym7z_config(self, mock_uuid, mock_encrypt): + def test_pipeline_works_with_sym7z_config(self, mock_encrypt): """Test that pipeline works when org uses sym_7z.""" # Update org config to use sym_7z self.enc.preferred_method = "sym_7z" self.enc.age_recipient = "" # Clear age recipient self.enc.save() - - fixed_uuid = uuid.UUID("87654321-4321-8765-4321-876543218765") - mock_uuid.return_value = fixed_uuidwebsite/api/views.py (1)
2015-2015: Consider adding summary length validation.The
summaryfield is passed directly toIssue.descriptionwithout length validation. While the database may have constraints, API-level validation would provide clearer error messages and prevent resource exhaustion from extremely long summaries.🔎 Suggested validation
summary = request.data.get("summary") files = request.FILES.getlist("files") if not domain_id or not url or not summary or not files: return Response( {"error": "domain_id, url, summary, and files are required"}, status=status.HTTP_400_BAD_REQUEST, ) + # Validate summary length (prevent DoS) + MAX_SUMMARY_LENGTH = 5000 + if len(summary) > MAX_SUMMARY_LENGTH: + return Response( + {"error": f"Summary exceeds maximum length of {MAX_SUMMARY_LENGTH} characters"}, + status=status.HTTP_400_BAD_REQUEST, + ) + # Validate URL format
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (5)
blt/settings.pywebsite/api/views.pywebsite/tests/test_zero_trust.pywebsite/tests/test_zero_trust_pipeline.pywebsite/zero_trust_pipeline.py
🧰 Additional context used
🧬 Code graph analysis (4)
website/tests/test_zero_trust_pipeline.py (1)
website/zero_trust_pipeline.py (2)
_validate_age_recipient(26-32)_validate_pgp_fingerprint(35-38)
website/api/views.py (2)
website/models.py (13)
OrgEncryptionConfig(302-366)Issue(667-849)save(84-87)save(287-299)save(360-363)save(1510-1537)save(1645-1648)save(1838-1841)save(1956-1970)save(2061-2083)save(2882-2885)save(3414-3424)save(3722-3731)website/zero_trust_pipeline.py (1)
build_and_deliver_zero_trust_issue(99-220)
website/zero_trust_pipeline.py (1)
website/models.py (13)
Issue(667-849)OrgEncryptionConfig(302-366)save(84-87)save(287-299)save(360-363)save(1510-1537)save(1645-1648)save(1838-1841)save(1956-1970)save(2061-2083)save(2882-2885)save(3414-3424)save(3722-3731)
website/tests/test_zero_trust.py (2)
website/models.py (15)
Domain(463-561)Issue(667-849)Organization(191-299)OrgEncryptionConfig(302-366)save(84-87)save(287-299)save(360-363)save(1510-1537)save(1645-1648)save(1838-1841)save(1956-1970)save(2061-2083)save(2882-2885)save(3414-3424)save(3722-3731)website/zero_trust_pipeline.py (1)
build_and_deliver_zero_trust_issue(99-220)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Run Tests
- GitHub Check: docker-test
🔇 Additional comments (18)
website/tests/test_zero_trust_pipeline.py (1)
1-61: LGTM! Good coverage for security-critical validation functions.The tests adequately cover both valid and invalid inputs for
_validate_age_recipientand_validate_pgp_fingerprint, including edge cases like wrong length, invalid characters, and empty strings. This provides a solid foundation for the security validation logic.blt/settings.py (1)
516-516: Throttle rate appropriately configured.The
10/hourrate for zero-trust submissions balances security research use cases with abuse prevention. This was adjusted from 5/hour per prior feedback.website/tests/test_zero_trust.py (3)
17-40: Test setup is well-structured.The
setUpmethod correctly initializes all required objects with valid configurations. The test properly uses mocking to avoid requiring actual encryption binaries in CI.
42-65: Good test for hash computation and cleanup verification.This test correctly verifies that the pipeline sets
artifact_sha256and cleans up temporary directories. The cleanup verification at lines 62-65 using glob pattern matching is a solid approach.
67-80: Good negative test for missing org config.This test verifies that the pipeline fails gracefully and sets
delivery_statusto "failed" when the organization encryption config is missing.website/api/views.py (5)
1998-2010: Well-configured API endpoint with appropriate security controls.The endpoint correctly implements:
- Token authentication for user identification
- Rate limiting via
ScopedRateThrottlewithzero_trust_issuesscope- Multipart/form parsing for file uploads
2012-2037: Input validation is thorough.Good implementation of:
- Required field checks with clear error message
- URL format validation using
URLValidator- Domain existence verification
2039-2063: Excellent pre-flight validation for encryption config.Checking
OrgEncryptionConfigexistence before creating the Issue prevents orphan records. The optionalvalidate_for_zero_trusthook provides a clean extension point for future validation logic.
2074-2089: Defense-in-depth invariant checks are properly implemented.The checks use explicit
if notconditions with proper logging, addressing the previous concern about assertions being disabled with-O. These catch unexpected modifications from signals, middleware, or database triggers.
2090-2108: Error handling correctly exposes issue ID for tracking.On pipeline failure, the response includes both the
issue.idanddelivery_status, enabling clients to track failed submissions. The error is logged withexc_info=Truefor debugging.website/zero_trust_pipeline.py (8)
1-23: Module setup is well-structured with proper logging.Good use of module-level logger and fallback for
REPORT_TMP_DIRconfiguration.
26-38: Validation functions correctly implement security checks.Both validators use regex patterns that appropriately constrain the input formats to prevent command injection when these values are used in subprocess calls.
41-63: Filename sanitization provides good defense against path traversal.The implementation handles:
- Path traversal via
os.path.basename- Unicode normalization
- Null bytes and control characters
- Whitelist-based character filtering
- Empty filename fallback
66-96: Secure password generation uses cryptographically sound methods.Using
secretsmodule with character diversity requirements and shuffling provides strong random passwords suitable for symmetric encryption.
99-171: File handling with streaming validation is well-implemented.The implementation correctly:
- Validates file count before processing
- Uses
mkdtempfor unique directory creation- Tracks both per-file and total size during streaming
- Handles duplicate filenames with counter suffix
- Cleans up partial files on size violation
223-257: Tar artifact building is correctly implemented.The function properly creates a compressed archive with metadata.json and uploaded files, cleaning up the temporary metadata file after archiving.
424-500: OOB password delivery is now properly implemented.The implementation addresses the previous concern about
_deliver_password_oobbeing a no-op. Key security features:
- Password sent in separate email from artifact
- Clear security instructions for recipients
- Logging without exposing password
- Raises
RuntimeErroron delivery failure to prevent unrecoverable encryption
503-545: Secure deletion with appropriate caveats documented.The implementation provides defense-in-depth file overwriting with honest documentation about limitations on modern filesystems. The exception handling correctly distinguishes between expected (FileNotFoundError) and unexpected failures.
| except subprocess.CalledProcessError as e: | ||
| logger.error( | ||
| f"OpenPGP encryption failed for issue {issue.id}, return code: {e.returncode}", | ||
| exc_info=True, | ||
| extra={"stderr": e.stderr.decode("utf-8", errors="replace") if e.stderr else None}, | ||
| ) | ||
| return out, OrgEncryptionConfig.ENCRYPTION_METHOD_OPENPGP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Missing exception raise after OpenPGP encryption failure.
Line 320 returns normally after catching CalledProcessError, but doesn't raise an exception like the age and sym_7z blocks do. This means OpenPGP encryption failures will silently continue with an empty/corrupted output file.
🔎 Proposed fix
except subprocess.CalledProcessError as e:
logger.error(
f"OpenPGP encryption failed for issue {issue.id}, return code: {e.returncode}",
exc_info=True,
extra={"stderr": e.stderr.decode("utf-8", errors="replace") if e.stderr else None},
)
+ raise RuntimeError(
+ f"Encryption failed: {e.stderr.decode('utf-8', errors='replace') if e.stderr else 'Unknown error'}"
+ )
return out, OrgEncryptionConfig.ENCRYPTION_METHOD_OPENPGP🤖 Prompt for AI Agents
In @website/zero_trust_pipeline.py around lines 314-320, The OpenPGP encryption
except block catches subprocess.CalledProcessError, logs it, but then falls
through to return success; modify the except in the OpenPGP encryption handling
so after calling logger.error (the block catching CalledProcessError and using
exc_info=True and extra={"stderr": ...}) you re-raise the exception (e.g., via
raise) instead of allowing the function to reach the existing "return out,
OrgEncryptionConfig.ENCRYPTION_METHOD_OPENPGP" line, matching the behavior used
in the age and sym_7z blocks.
❌ Tests failedThe Django tests found issues that need to be fixed. Please review the test output below and fix the failing tests. How to run tests locally# Install dependencies
poetry install --with dev
# Run all tests
poetry run python manage.py test
# Run tests with verbose output
poetry run python manage.py test -v 3
# Run a specific test
poetry run python manage.py test app.tests.TestClass.test_methodTest output (last 100 lines)For more information, see the Django testing documentation. |
S3DFX-CYBER
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a massive step forward for BLT's security posture, @Jayant2908! Implementing Phase 1 of Zero-Trust is a huge win.
I've been looking over the zero_trust_pipeline.py changes. I strongly agree with @coderabbitai regarding the Subprocess timeouts and Filename sanitization. In a Zero-Trust environment, we can't afford a 'Zip Slip' or command injection vulnerability via organization metadata.
Are you planning to implement the OOB (Out-of-Band) password delivery in Phase 1, or should we strictly enforce Public Key encryption for now to prevent orphaned artifacts?
Fixes #5232
Summary
This PR implements Phase 1 of the Zero-Trust Issue Submission system for BLT.
The goal of Phase 1 is to enable end-to-end encrypted vulnerability reporting while guaranteeing that no proof-of-concept (PoC) or sensitive payload data is ever stored in the database or filesystem at rest.
This PR delivers a working zero-trust pipeline, reusing the existing Issue model and BLT flows, while enforcing strict data-minimization and ephemeral handling.
🎯 Goals Achieved (Phase 1)
✔ End-to-end zero-trust submission flow
✔ No plaintext PoC persistence (DB or disk)
✔ Reuses existing Issue model and reward lifecycle
✔ Encrypted delivery directly to organization contact
✔ Minimal metadata stored for auditability
🧩 Key Changes
Extended Issue with non-sensitive, metadata-only fields:
is_zero_trust
artifact_sha256
encryption_method
delivery_method
delivery_status
delivered_at
Added OrgEncryptionConfig (1:1 with Organization) to define secure delivery:
contact_email
preferred_method
age
openpgp
sym_7z
Public-key metadata:
age_recipient
pgp_fingerprint
Optional pgp_key_text
This ensures encryption preferences are org-controlled and auditable.
Introduced build_and_deliver_zero_trust_issue(...) which:
Writes uploaded files to a temporary, isolated directory
Builds a tar.gz payload with minimal metadata.json
Encrypts using org preference:
age → .age
openpgp → .asc
Fallback 7z → .7z + random password (OOB)
Computes and stores SHA-256 hash only
Sends encrypted artifact via email
Updates Issue delivery metadata
Securely deletes all temporary files
After execution, no plaintext remains on disk.
Added:
Accepts:
domain_id
url
summary (high-level only)
files[] (multipart)
Creates an Issue with:
is_zero_trust = True
is_hidden = True
Behavior:
Returns:
issue_id
artifact_sha256
delivery_status
Registered OrgEncryptionConfig in admin
Zero-trust Issue fields are read-only
Prevents accidental exposure or mutation of sensitive state
🔐 Zero-Trust Guarantees
1)What is stored
High-level summary
Metadata
Artifact hash
2)What is never stored
PoC files
Exploit payloads
Screenshots
Reproduction steps
Failure-safe
Encryption or delivery failure leaves no plaintext behind
Audit-friendly
Hashes and delivery status preserved for integrity checks
🧪 Testing Status
Added two new test files covering the zero-trust submission flow
Additional tests will be added in subsequent phases
✅ Phase Status
Phase 1: Implemented and functionally complete
Hardening and formal verification to follow before Phase 2 rollout.
Summary by CodeRabbit
New Features
Chores
Tests
✏️ Tip: You can customize this high-level summary in your review settings.