Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions backend/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ThreadSafeCache:

def __init__(self, ttl: int = 300, max_size: int = 100):
self._data = collections.OrderedDict()
self._timestamps = {}
self._timestamps = collections.OrderedDict()
self._ttl = ttl # Time to live in seconds
self._max_size = max_size # Maximum number of cache entries
self._lock = threading.RLock() # Reentrant lock for thread safety
Expand All @@ -34,6 +34,8 @@ def get(self, key: str = "default") -> Optional[Any]:
if current_time - self._timestamps[key] < self._ttl:
# Move to end (most recently used)
self._data.move_to_end(key)
# Note: We don't update timestamp here to maintain fixed TTL from creation/last set.
# To implement sliding expiration, we would update timestamp and move_to_end in _timestamps.
self._hits += 1
return self._data[key]
else:
Expand All @@ -51,7 +53,7 @@ def set(self, data: Any, key: str = "default") -> None:
current_time = time.time()

# Clean up expired entries before adding new one
self._cleanup_expired()
self._cleanup_expired(current_time)

# If cache is full, evict least recently used entry
if len(self._data) >= self._max_size and key not in self._data:
Expand All @@ -61,6 +63,7 @@ def set(self, data: Any, key: str = "default") -> None:
self._data[key] = data
self._data.move_to_end(key)
self._timestamps[key] = current_time
self._timestamps.move_to_end(key)

logger.debug(f"Cache set: key={key}, size={len(self._data)}")

Expand Down Expand Up @@ -109,16 +112,23 @@ def _remove_key(self, key: str) -> None:
self._data.pop(key, None)
self._timestamps.pop(key, None)

def _cleanup_expired(self) -> None:
def _cleanup_expired(self, current_time: Optional[float] = None) -> None:
"""
Internal method to clean up expired entries.
Optimized to O(K) where K is the number of expired entries.
Must be called within lock context.
"""
current_time = time.time()
expired_keys = [
key for key, timestamp in self._timestamps.items()
if current_time - timestamp >= self._ttl
]
if current_time is None:
current_time = time.time()

expired_keys = []
# Since _timestamps is an OrderedDict and we use move_to_end on set,
# we can iterate from the beginning and stop at the first non-expired entry.
for key, timestamp in self._timestamps.items():
if current_time - timestamp >= self._ttl:
expired_keys.append(key)
else:
break

for key in expired_keys:
self._remove_key(key)
Expand Down
12 changes: 4 additions & 8 deletions backend/closure_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,13 @@ def check_and_finalize_closure(grievance_id: int, db: Session) -> dict:
GrievanceFollower.grievance_id == grievance_id
).scalar()

# Optimized: Use a single GROUP BY query instead of separate count queries
confirmation_stats = db.query(
# Get all confirmation counts in a single query instead of multiple round-trips
counts = db.query(
ClosureConfirmation.confirmation_type,
func.count(ClosureConfirmation.id)
).filter(
ClosureConfirmation.grievance_id == grievance_id
).group_by(
ClosureConfirmation.confirmation_type
).all()
).filter(ClosureConfirmation.grievance_id == grievance_id).group_by(ClosureConfirmation.confirmation_type).all()

counts_dict = {c_type: count for c_type, count in confirmation_stats}
counts_dict = {ctype: count for ctype, count in counts}
confirmations_count = counts_dict.get("confirmed", 0)
disputes_count = counts_dict.get("disputed", 0)

Expand Down
32 changes: 32 additions & 0 deletions backend/hf_api_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
AUDIO_CLASS_API_URL = "https://router.huggingface.co/models/MIT/ast-finetuned-audioset-10-10-0.4593"

# Speech-to-Text Model (Whisper)
FACIAL_EMOTION_API_URL = "https://router.huggingface.co/models/dima806/facial_emotions_image_detection"
WHISPER_API_URL = "https://router.huggingface.co/models/openai/whisper-large-v3-turbo"

async def _make_request(client, url, payload):
Expand Down Expand Up @@ -456,3 +457,34 @@ async def detect_abandoned_vehicle_clip(image: Union[Image.Image, bytes], client
labels = ["abandoned car", "rusted vehicle", "car with flat tires", "wrecked car", "normal parked car"]
targets = ["abandoned car", "rusted vehicle", "car with flat tires", "wrecked car"]
return await _detect_clip_generic(image, labels, targets, client)


async def detect_facial_emotion(image: Union[Image.Image, bytes], client: httpx.AsyncClient = None):
"""
Detects facial emotions in an image using Hugging Face's dima806/facial_emotions_image_detection model.
"""
img_bytes = _prepare_image_bytes(image)

try:
headers_bin = {"Authorization": f"Bearer {token}"} if token else {}
async def do_post(c):
return await c.post(FACIAL_EMOTION_API_URL, headers=headers_bin, content=img_bytes, timeout=30.0)

if client:
response = await do_post(client)
else:
async with httpx.AsyncClient() as new_client:
response = await do_post(new_client)

if response.status_code == 200:
data = response.json()
if isinstance(data, list) and len(data) > 0:
return {"emotions": data[:3]} # Return top 3 emotions
return {"emotions": []}
else:
logger.error(f"Emotion API Error: {response.status_code} - {response.text}")
return {"error": "Failed to analyze emotions", "details": response.text}

except Exception as e:
logger.error(f"Emotion Estimation Error: {e}")
return {"error": str(e)}
50 changes: 30 additions & 20 deletions backend/routers/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from backend.utils import process_and_detect, validate_uploaded_file, process_uploaded_image
from backend.schemas import DetectionResponse, UrgencyAnalysisRequest, UrgencyAnalysisResponse
from backend.cache import ThreadSafeCache
from backend.pothole_detection import detect_potholes, validate_image_for_processing
from backend.unified_detection_service import (
detect_vandalism as detect_vandalism_unified,
Expand Down Expand Up @@ -35,7 +36,9 @@
detect_civic_eye_clip,
detect_graffiti_art_clip,
detect_traffic_sign_clip,
detect_abandoned_vehicle_clip
detect_abandoned_vehicle_clip,
detect_facial_emotion,

)
from backend.dependencies import get_http_client
import backend.dependencies
Expand All @@ -46,35 +49,22 @@

# Cached Functions

# Simple Cache Implementation to avoid async-lru dependency issues on Render
_cache_store = {}
CACHE_TTL = 3600 # 1 hour
MAX_CACHE_SIZE = 500
# Use ThreadSafeCache for better performance and proper TTL/LRU management
detection_cache = ThreadSafeCache(ttl=3600, max_size=500)

async def _get_cached_result(key: str, func, *args, **kwargs):
current_time = time.time()

# Check cache
if key in _cache_store:
result, timestamp = _cache_store[key]
if current_time - timestamp < CACHE_TTL:
return result
else:
del _cache_store[key]

# Prune cache if too large
if len(_cache_store) > MAX_CACHE_SIZE:
keys_to_remove = list(_cache_store.keys())[:int(MAX_CACHE_SIZE * 0.2)]
for k in keys_to_remove:
del _cache_store[k]
cached_result = detection_cache.get(key)
if cached_result is not None:
return cached_result

# Execute function
if 'client' not in kwargs:
import backend.dependencies
kwargs['client'] = backend.dependencies.SHARED_HTTP_CLIENT

result = await func(*args, **kwargs)
_cache_store[key] = (result, current_time)
detection_cache.set(data=result, key=key)
return result

async def _cached_detect_severity(image_bytes: bytes):
Expand Down Expand Up @@ -465,3 +455,23 @@ async def detect_abandoned_vehicle_endpoint(image: UploadFile = File(...)):
except Exception as e:
logger.error(f"Abandoned vehicle detection error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")

@router.post("/api/detect-emotion")
async def detect_emotion_endpoint(
image: UploadFile = File(...),
client: httpx.AsyncClient = backend.dependencies.Depends(get_http_client)
):
"""
Analyze facial emotions in the image using Hugging Face inference.
"""
img_data = await validate_uploaded_file(image)
if "error" in img_data:
raise HTTPException(status_code=400, detail=img_data["error"])

processed_bytes = await run_in_threadpool(process_uploaded_image, img_data["bytes"])
result = await detect_facial_emotion(processed_bytes, client)

if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])

return result
12 changes: 4 additions & 8 deletions backend/routers/grievances.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,17 +402,13 @@ def get_closure_status(
GrievanceFollower.grievance_id == grievance_id
).scalar()

# Optimized: Use a single GROUP BY query instead of separate count queries
confirmation_stats = db.query(
# Get all confirmation counts in a single query instead of multiple round-trips
counts = db.query(
ClosureConfirmation.confirmation_type,
func.count(ClosureConfirmation.id)
).filter(
ClosureConfirmation.grievance_id == grievance_id
).group_by(
ClosureConfirmation.confirmation_type
).all()
).filter(ClosureConfirmation.grievance_id == grievance_id).group_by(ClosureConfirmation.confirmation_type).all()

counts_dict = {c_type: count for c_type, count in confirmation_stats}
counts_dict = {ctype: count for ctype, count in counts}
confirmations_count = counts_dict.get("confirmed", 0)
disputes_count = counts_dict.get("disputed", 0)

Expand Down
33 changes: 33 additions & 0 deletions backend/tests/benchmark_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import time
import collections
import threading
import sys
import os

# Add parent directory to path to import backend.cache
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))

from backend.cache import ThreadSafeCache

def benchmark_cache(cache_size, num_ops):
cache = ThreadSafeCache(ttl=300, max_size=cache_size)

# Fill cache
for i in range(cache_size):
cache.set(data=i, key=f"key{i}")

start_time = time.time()
for i in range(num_ops):
# Update existing keys to keep the cache full and trigger cleanup
cache.set(data=i, key=f"key{i % cache_size}")
end_time = time.time()

return end_time - start_time

if __name__ == "__main__":
size = 1000
ops = 5000
print(f"Benchmarking ThreadSafeCache with size={size}, ops={ops}...")
duration = benchmark_cache(size, ops)
print(f"Duration: {duration:.4f} seconds")
print(f"Ops/sec: {ops / duration:.2f}")
20 changes: 9 additions & 11 deletions backend/tests/benchmark_closure_status.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import sys
import time
import os
from datetime import datetime, timedelta, timezone

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

sys.path.insert(0, os.path.abspath('.'))

from backend.database import Base, get_db
from backend.models import Grievance, GrievanceFollower, ClosureConfirmation
from backend.database import Base
from backend.models import Grievance, GrievanceFollower, ClosureConfirmation, SeverityLevel, GrievanceStatus
from backend.routers.grievances import get_closure_status
from backend.closure_service import ClosureService
from unittest.mock import patch, MagicMock

# In-memory SQLite for testing
engine = create_engine('sqlite:///:memory:', connect_args={"check_same_thread": False})
Expand All @@ -23,12 +18,15 @@ def seed_data(db):
grievance = Grievance(
unique_id="G123",
category="pothole",
status="open",
description="test",
severity=SeverityLevel.LOW,
status=GrievanceStatus.OPEN,
pincode="123456",
city="city",
district="district",
state="state"
state="state",
current_jurisdiction_id=1,
assigned_authority="test_authority",
sla_deadline=datetime.now(timezone.utc) + timedelta(days=7),
)
db.add(grievance)
db.commit()
Expand Down
33 changes: 33 additions & 0 deletions backend/tests/test_cache_perf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import time
import collections

def run_bench():
N = 1000
ops = 10000
timestamps = {f"key{i}": time.time() for i in range(N)}
current_time = time.time()
ttl = 300

start = time.time()
for _ in range(ops):
expired_keys = [
key for key, timestamp in timestamps.items()
if current_time - timestamp >= ttl
]
print(f"Current O(N) cleanup time for {ops} ops: {time.time() - start:.4f}s")

# Optimized version
timestamps_od = collections.OrderedDict(timestamps)
start = time.time()
for _ in range(ops):
# Simulated optimized cleanup
# In real code we use next(iter(self._timestamps.items()))
for key, ts in timestamps_od.items():
if current_time - ts >= ttl:
pass
else:
break
print(f"Optimized O(K) cleanup time (K=0) for {ops} ops: {time.time() - start:.4f}s")

if __name__ == "__main__":
run_bench()
Loading