Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions backend/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class ThreadSafeCache:

def __init__(self, ttl: int = 300, max_size: int = 100):
self._data = collections.OrderedDict()
self._timestamps = {}
# Use OrderedDict for timestamps to enable O(K) expiration cleanup
self._timestamps = collections.OrderedDict()
self._ttl = ttl # Time to live in seconds
self._max_size = max_size # Maximum number of cache entries
self._lock = threading.RLock() # Reentrant lock for thread safety
Expand Down Expand Up @@ -60,7 +61,10 @@ def set(self, data: Any, key: str = "default") -> None:
# Set new data atomically (adds to end, updating if exists)
self._data[key] = data
self._data.move_to_end(key)

# Update timestamp and move to end to maintain O(K) cleanup order
self._timestamps[key] = current_time
self._timestamps.move_to_end(key)

logger.debug(f"Cache set: key={key}, size={len(self._data)}")

Expand Down Expand Up @@ -113,12 +117,19 @@ def _cleanup_expired(self) -> None:
"""
Internal method to clean up expired entries.
Must be called within lock context.
Optimized to O(K) by breaking at the first non-expired entry.
"""
current_time = time.time()
expired_keys = [
key for key, timestamp in self._timestamps.items()
if current_time - timestamp >= self._ttl
]
expired_keys = []

# Iterating through OrderedDict yields items in insertion order (oldest first)
for key, timestamp in self._timestamps.items():
if current_time - timestamp >= self._ttl:
expired_keys.append(key)
else:
# Since we iterate in order of insertion/update, once we find a
# non-expired entry, all subsequent entries are also non-expired.
break

for key in expired_keys:
self._remove_key(key)
Expand Down Expand Up @@ -163,4 +174,6 @@ def invalidate(self):
recent_issues_cache = ThreadSafeCache(ttl=300, max_size=20) # 5 minutes TTL, max 20 entries
nearby_issues_cache = ThreadSafeCache(ttl=60, max_size=100) # 1 minute TTL, max 100 entries
user_upload_cache = ThreadSafeCache(ttl=3600, max_size=1000) # 1 hour TTL for upload limits
user_issues_cache = ThreadSafeCache(ttl=300, max_size=100) # 5 minutes TTL for user history
blockchain_last_hash_cache = ThreadSafeCache(ttl=3600, max_size=1)
grievance_last_hash_cache = ThreadSafeCache(ttl=3600, max_size=1)
Comment on lines +177 to +179
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Consider max_size for grievance_last_hash_cache with concurrent requests.

grievance_last_hash_cache has max_size=1, which is appropriate for a single "last_hash" key. However, in backend/grievance_service.py, there's a race condition window between get("last_hash") and set("last_hash") when multiple concurrent grievance creations occur. The cache itself is thread-safe, but the get-compute-set pattern is not atomic.

This could result in two concurrent grievances computing their hash using the same prev_hash, leading to incorrect chain linkage. Consider using a lock around the entire hash computation block in GrievanceService.create_grievance().

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/cache.py` around lines 177 - 179, The grievance_last_hash_cache is
sized for a single key but the get-then-set sequence in
GrievanceService.create_grievance() is not atomic and allows a race where two
requests read the same prev_hash; surround the entire sequence that reads
last_hash, computes the new grievance hash, and writes last_hash with a mutex to
serialize access (e.g., add a threading.Lock or asyncio.Lock used by
GrievanceService and acquire it around the get/compute/set block referencing
grievance_last_hash_cache and the create_grievance method) so only one requester
can compute and update the last_hash at a time.

22 changes: 21 additions & 1 deletion backend/grievance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

import json
import uuid
import hashlib
from typing import Dict, Any, Optional, List
from sqlalchemy.orm import Session, joinedload
from datetime import datetime, timezone, timedelta

from backend.models import Grievance, Jurisdiction, GrievanceStatus, SeverityLevel, Issue
from backend.database import SessionLocal
from backend.cache import grievance_last_hash_cache
from backend.routing_service import RoutingService
from backend.sla_config_service import SLAConfigService
from backend.escalation_engine import EscalationEngine
Expand Down Expand Up @@ -84,6 +86,22 @@ def create_grievance(self, grievance_data: Dict[str, Any], db: Session = None) -
# Generate unique ID
unique_id = str(uuid.uuid4())[:8].upper()

# Blockchain feature: calculate integrity hash for the grievance
# Performance Boost: Use thread-safe cache to eliminate DB query for last hash
prev_hash = grievance_last_hash_cache.get("last_hash")
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: The read-compute-write cycle on grievance_last_hash_cache is not atomic across the full operation. Concurrent create_grievance calls can both read the same prev_hash, producing two grievances that share the same previous_integrity_hash and forking the chain. Consider holding an external lock (or using a DB-level advisory lock) around the entire hash-chaining section to serialize chain extensions.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/grievance_service.py, line 91:

<comment>The read-compute-write cycle on `grievance_last_hash_cache` is not atomic across the full operation. Concurrent `create_grievance` calls can both read the same `prev_hash`, producing two grievances that share the same `previous_integrity_hash` and forking the chain. Consider holding an external lock (or using a DB-level advisory lock) around the entire hash-chaining section to serialize chain extensions.</comment>

<file context>
@@ -84,6 +86,22 @@ def create_grievance(self, grievance_data: Dict[str, Any], db: Session = None) -
 
+            # Blockchain feature: calculate integrity hash for the grievance
+            # Performance Boost: Use thread-safe cache to eliminate DB query for last hash
+            prev_hash = grievance_last_hash_cache.get("last_hash")
+            if prev_hash is None:
+                # Cache miss: Fetch only the last hash from DB
</file context>
Fix with Cubic

if prev_hash is None:
# Cache miss: Fetch only the last hash from DB
prev_grievance = db.query(Grievance.integrity_hash).order_by(Grievance.id.desc()).first()
prev_hash = prev_grievance[0] if prev_grievance and prev_grievance[0] else ""
grievance_last_hash_cache.set(data=prev_hash, key="last_hash")

# SHA-256 chaining for grievance integrity
hash_content = f"{unique_id}|{grievance_data.get('category', 'general')}|{severity.value}|{prev_hash}"
integrity_hash = hashlib.sha256(hash_content.encode()).hexdigest()

# Update cache for next grievance
grievance_last_hash_cache.set(data=integrity_hash, key="last_hash")
Comment on lines +89 to +103
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Race condition in concurrent grievance hash computation.

The get-compute-set pattern between lines 91-103 is not atomic. Under concurrent grievance creation:

  1. Thread A reads prev_hash = "X" from cache
  2. Thread B reads prev_hash = "X" (same value, before A updates)
  3. Both compute hashes using the same prev_hash
  4. Both store grievances with identical previous_integrity_hash

This breaks the blockchain chain integrity since two records will share the same predecessor.

🔧 Suggested fix: Use a dedicated lock for hash computation
+import threading
+
+# Module-level lock for grievance hash computation
+_grievance_hash_lock = threading.Lock()
+
 class GrievanceService:
     # ...
     def create_grievance(self, grievance_data: Dict[str, Any], db: Session = None) -> Optional[Grievance]:
         # ... earlier code ...
 
-            # Blockchain feature: calculate integrity hash for the grievance
-            prev_hash = grievance_last_hash_cache.get("last_hash")
-            if prev_hash is None:
-                prev_grievance = db.query(Grievance.integrity_hash).order_by(Grievance.id.desc()).first()
-                prev_hash = prev_grievance[0] if prev_grievance and prev_grievance[0] else ""
-                grievance_last_hash_cache.set(data=prev_hash, key="last_hash")
-
-            hash_content = f"{unique_id}|{grievance_data.get('category', 'general')}|{severity.value}|{prev_hash}"
-            integrity_hash = hashlib.sha256(hash_content.encode()).hexdigest()
-
-            grievance_last_hash_cache.set(data=integrity_hash, key="last_hash")
+            # Blockchain feature: calculate integrity hash with lock to prevent race conditions
+            with _grievance_hash_lock:
+                prev_hash = grievance_last_hash_cache.get("last_hash")
+                if prev_hash is None:
+                    prev_grievance = db.query(Grievance.integrity_hash).order_by(Grievance.id.desc()).first()
+                    prev_hash = prev_grievance[0] if prev_grievance and prev_grievance[0] else ""
+
+                hash_content = f"{unique_id}|{grievance_data.get('category', 'general')}|{severity.value}|{prev_hash}"
+                integrity_hash = hashlib.sha256(hash_content.encode()).hexdigest()
+                grievance_last_hash_cache.set(data=integrity_hash, key="last_hash")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Blockchain feature: calculate integrity hash for the grievance
# Performance Boost: Use thread-safe cache to eliminate DB query for last hash
prev_hash = grievance_last_hash_cache.get("last_hash")
if prev_hash is None:
# Cache miss: Fetch only the last hash from DB
prev_grievance = db.query(Grievance.integrity_hash).order_by(Grievance.id.desc()).first()
prev_hash = prev_grievance[0] if prev_grievance and prev_grievance[0] else ""
grievance_last_hash_cache.set(data=prev_hash, key="last_hash")
# SHA-256 chaining for grievance integrity
hash_content = f"{unique_id}|{grievance_data.get('category', 'general')}|{severity.value}|{prev_hash}"
integrity_hash = hashlib.sha256(hash_content.encode()).hexdigest()
# Update cache for next grievance
grievance_last_hash_cache.set(data=integrity_hash, key="last_hash")
# Blockchain feature: calculate integrity hash with lock to prevent race conditions
with _grievance_hash_lock:
prev_hash = grievance_last_hash_cache.get("last_hash")
if prev_hash is None:
prev_grievance = db.query(Grievance.integrity_hash).order_by(Grievance.id.desc()).first()
prev_hash = prev_grievance[0] if prev_grievance and prev_grievance[0] else ""
hash_content = f"{unique_id}|{grievance_data.get('category', 'general')}|{severity.value}|{prev_hash}"
integrity_hash = hashlib.sha256(hash_content.encode()).hexdigest()
grievance_last_hash_cache.set(data=integrity_hash, key="last_hash")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/grievance_service.py` around lines 89 - 103, There is a race between
reading grievance_last_hash_cache and updating it so concurrent create flows can
compute identical previous_integrity_hash; wrap the read-compute-set sequence in
a dedicated lock (e.g., create a module-level threading.Lock named
grievance_hash_lock) so that in the critical section you: acquire
grievance_hash_lock, read prev_hash from grievance_last_hash_cache (fall back to
the DB query using db.query(Grievance.integrity_hash).order_by(...).first() if
missing), compute hash_content and integrity_hash, update
grievance_last_hash_cache with integrity_hash, then release the lock; ensure the
lock covers both the cache miss DB fetch and the subsequent cache set to make
the operation atomic.

Comment on lines +102 to +103
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Cache-DB inconsistency risk on transaction rollback.

Line 103 updates the cache with the new integrity_hash before the DB commit on line 133. If db.commit() fails and triggers rollback (line 139), the cache retains the hash of a grievance that doesn't exist, causing the next grievance to chain from a phantom record.

Consider updating the cache only after successful commit:

🔧 Suggested fix
             # Compute hash but defer cache update
             hash_content = f"{unique_id}|{grievance_data.get('category', 'general')}|{severity.value}|{prev_hash}"
             integrity_hash = hashlib.sha256(hash_content.encode()).hexdigest()
-
-            # Update cache for next grievance
-            grievance_last_hash_cache.set(data=integrity_hash, key="last_hash")

             # ... create grievance object ...

             db.add(grievance)
             db.commit()
             db.refresh(grievance)

+            # Update cache only after successful commit
+            grievance_last_hash_cache.set(data=integrity_hash, key="last_hash")
+
             return grievance

Also applies to: 132-134

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/grievance_service.py` around lines 102 - 103, The cache update is
happening before the database commit, causing a cache-DB consistency risk if
db.commit() fails; move the grievance_last_hash_cache.set(...) call so it
executes only after a successful db.commit() (and after any rollback/exception
handling completes) — e.g., relocate the grievance_last_hash_cache.set
invocation currently placed near the top of the transaction to immediately after
the call to db.commit() (and ensure any exception path does not update the
cache), or wrap the DB work in a transaction/context manager and perform the
cache write inside the success branch; refer to grievance_last_hash_cache.set
and db.commit()/rollback exception handling in this file to make the change.

Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Cache is updated with the new integrity_hash before db.commit(). If the commit fails and rolls back, the cache retains a hash for a non-existent grievance, permanently corrupting the chain for all subsequent records. Move the cache update to after the successful commit, and add grievance_last_hash_cache.invalidate("last_hash") in the except block as a safety net.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/grievance_service.py, line 103:

<comment>Cache is updated with the new `integrity_hash` before `db.commit()`. If the commit fails and rolls back, the cache retains a hash for a non-existent grievance, permanently corrupting the chain for all subsequent records. Move the cache update to after the successful commit, and add `grievance_last_hash_cache.invalidate("last_hash")` in the `except` block as a safety net.</comment>

<file context>
@@ -84,6 +86,22 @@ def create_grievance(self, grievance_data: Dict[str, Any], db: Session = None) -
+            integrity_hash = hashlib.sha256(hash_content.encode()).hexdigest()
+
+            # Update cache for next grievance
+            grievance_last_hash_cache.set(data=integrity_hash, key="last_hash")
+
             # Extract location data
</file context>
Fix with Cubic


# Extract location data
location_data = grievance_data.get('location', {})
latitude = location_data.get('latitude') if isinstance(location_data, dict) else None
Expand All @@ -106,7 +124,9 @@ def create_grievance(self, grievance_data: Dict[str, Any], db: Session = None) -
assigned_authority=assigned_authority,
sla_deadline=sla_deadline,
status=GrievanceStatus.OPEN,
issue_id=grievance_data.get('issue_id')
issue_id=grievance_data.get('issue_id'),
integrity_hash=integrity_hash,
previous_integrity_hash=prev_hash
)

db.add(grievance)
Expand Down
11 changes: 11 additions & 0 deletions backend/init_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ def index_exists(table, index_name):
if not index_exists("grievances", "ix_grievances_category_status"):
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_grievances_category_status ON grievances (category, status)"))

if not column_exists("grievances", "integrity_hash"):
conn.execute(text("ALTER TABLE grievances ADD COLUMN integrity_hash VARCHAR"))
logger.info("Added integrity_hash column to grievances")

if not column_exists("grievances", "previous_integrity_hash"):
conn.execute(text("ALTER TABLE grievances ADD COLUMN previous_integrity_hash VARCHAR"))
logger.info("Added previous_integrity_hash column to grievances")

if not index_exists("grievances", "ix_grievances_previous_integrity_hash"):
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_grievances_previous_integrity_hash ON grievances (previous_integrity_hash)"))

# Field Officer Visits Table (Issue #288)
# This table is newly created for field officer check-in system
if not inspector.has_table("field_officer_visits"):
Expand Down
20 changes: 10 additions & 10 deletions backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,20 @@ async def lifespan(app: FastAPI):
logger.info("Starting database initialization...")
await run_in_threadpool(Base.metadata.create_all, bind=engine)
logger.info("Base.metadata.create_all completed.")
# Temporarily disabled - comment out to debug startup issues
# await run_in_threadpool(migrate_db)
logger.info("Database initialized successfully (migrations skipped for local dev).")
# Enabled database migrations for production deployment
await run_in_threadpool(migrate_db)
logger.info("Database initialized successfully with migrations.")
except Exception as e:
logger.error(f"Database initialization failed: {e}", exc_info=True)
# We continue to allow health checks even if DB has issues (for debugging)

# Startup: Initialize Grievance Service (needed for escalation engine)
try:
logger.info("Initializing grievance service...")
# Temporarily disabled for local dev
# grievance_service = GrievanceService()
# app.state.grievance_service = grievance_service
logger.info("Grievance service initialization skipped for local dev.")
# Enabled grievance service for escalation logic
grievance_service = GrievanceService()
app.state.grievance_service = grievance_service
logger.info("Grievance service initialized successfully.")
except Exception as e:
logger.error(f"Error initializing grievance service: {e}", exc_info=True)

Expand Down Expand Up @@ -126,9 +126,9 @@ async def lifespan(app: FastAPI):
app = FastAPI(
title="VishwaGuru Backend",
description="AI-powered civic issue reporting and resolution platform",
version="1.0.0"
# Temporarily disable lifespan for local dev debugging
# lifespan=lifespan
version="1.0.0",
# Enable lifespan context manager for resource management and startup tasks
lifespan=lifespan
)

# Add centralized exception handlers
Expand Down
2 changes: 2 additions & 0 deletions backend/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class Grievance(Base):
pending_closure = Column(Boolean, default=False, index=True)

issue_id = Column(Integer, ForeignKey("issues.id"), nullable=True, index=True)
integrity_hash = Column(String, nullable=True) # Blockchain integrity seal
previous_integrity_hash = Column(String, nullable=True, index=True) # Linked hash for O(1) verification

# Relationships
jurisdiction = relationship("Jurisdiction", back_populates="grievances")
Expand Down
87 changes: 73 additions & 14 deletions backend/routers/grievances.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import func
from typing import List, Optional
from sqlalchemy import func, case
from typing import List, Optional, Union, Dict, Any
import os
import json
import logging
import hashlib
from datetime import datetime, timezone

from backend.database import get_db
Expand All @@ -15,7 +16,7 @@
FollowGrievanceRequest, FollowGrievanceResponse,
RequestClosureRequest, RequestClosureResponse,
ConfirmClosureRequest, ConfirmClosureResponse,
ClosureStatusResponse
ClosureStatusResponse, BlockchainVerificationResponse
)
from backend.grievance_service import GrievanceService
from backend.closure_service import ClosureService
Expand Down Expand Up @@ -387,30 +388,88 @@ def confirm_grievance_closure(
raise HTTPException(status_code=500, detail="Failed to confirm closure")


@router.get("/grievances/{grievance_id}/blockchain-verify", response_model=BlockchainVerificationResponse)
def verify_grievance_blockchain_integrity(grievance_id: int, db: Session = Depends(get_db)):
"""
Verify the cryptographic integrity of a grievance using blockchain-style chaining.
Optimized: Uses previous_integrity_hash column for O(1) verification.
"""
# Fetch current grievance data including the link to previous hash
# Performance Boost: Use projected previous_integrity_hash to avoid N+1 or secondary lookups
current_grievance = db.query(
Grievance.id,
Grievance.unique_id,
Grievance.category,
Grievance.severity,
Grievance.integrity_hash,
Grievance.previous_integrity_hash
).filter(Grievance.id == grievance_id).first()

if not current_grievance:
raise HTTPException(status_code=404, detail="Grievance not found")

# Determine previous hash (use stored link or fallback for legacy records)
prev_hash = current_grievance.previous_integrity_hash

if prev_hash is None:
# Fallback for legacy records created before O(1) optimization
prev_grievance_hash = db.query(Grievance.integrity_hash).filter(Grievance.id < grievance_id).order_by(Grievance.id.desc()).first()
prev_hash = prev_grievance_hash[0] if prev_grievance_hash and prev_grievance_hash[0] else ""

# Recompute hash based on current data and previous hash
# Chaining logic from GrievanceService: hash(unique_id|category|severity|prev_hash)
severity_val = current_grievance.severity.value if hasattr(current_grievance.severity, 'value') else str(current_grievance.severity)
hash_content = f"{current_grievance.unique_id}|{current_grievance.category}|{severity_val}|{prev_hash}"
computed_hash = hashlib.sha256(hash_content.encode()).hexdigest()

is_valid = (computed_hash == current_grievance.integrity_hash)

if is_valid:
message = "Integrity verified. This grievance is cryptographically sealed and has not been tampered with."
else:
message = "Integrity check failed! The grievance data does not match its cryptographic seal."

return BlockchainVerificationResponse(
is_valid=is_valid,
current_hash=current_grievance.integrity_hash,
computed_hash=computed_hash,
message=message
)


@router.get("/grievances/{grievance_id}/closure-status", response_model=ClosureStatusResponse)
def get_closure_status(
grievance_id: int,
db: Session = Depends(get_db)
):
"""Get current closure confirmation status for a grievance"""
"""
Get current closure confirmation status for a grievance.
Optimized: Uses single aggregate queries to reduce database round-trips.
"""
try:
grievance = db.query(Grievance).filter(Grievance.id == grievance_id).first()
grievance = db.query(
Grievance.id,
Grievance.pending_closure,
Grievance.closure_approved,
Grievance.closure_confirmation_deadline
).filter(Grievance.id == grievance_id).first()

if not grievance:
raise HTTPException(status_code=404, detail="Grievance not found")

# Combined count for followers and confirmation types in fewer round-trips
total_followers = db.query(func.count(GrievanceFollower.id)).filter(
GrievanceFollower.grievance_id == grievance_id
).scalar()

confirmations_count = db.query(func.count(ClosureConfirmation.id)).filter(
ClosureConfirmation.grievance_id == grievance_id,
ClosureConfirmation.confirmation_type == "confirmed"
).scalar()
# Optimized: Count confirmations and disputes in a single query using case statements
confirmation_stats = db.query(
func.sum(case((ClosureConfirmation.confirmation_type == "confirmed", 1), else_=0)).label("confirmed"),
func.sum(case((ClosureConfirmation.confirmation_type == "disputed", 1), else_=0)).label("disputed")
).filter(ClosureConfirmation.grievance_id == grievance_id).first()

disputes_count = db.query(func.count(ClosureConfirmation.id)).filter(
ClosureConfirmation.grievance_id == grievance_id,
ClosureConfirmation.confirmation_type == "disputed"
).scalar()
confirmations_count = int(confirmation_stats.confirmed or 0)
disputes_count = int(confirmation_stats.disputed or 0)

required_confirmations = max(1, int(total_followers * ClosureService.CONFIRMATION_THRESHOLD))

Expand Down
Loading
Loading