diff --git a/tools/2ndRoundDeliberation/initialize_followup_event.py b/tools/2ndRoundDeliberation/initialize_followup_event.py index 15dbb3d..7807ed9 100644 --- a/tools/2ndRoundDeliberation/initialize_followup_event.py +++ b/tools/2ndRoundDeliberation/initialize_followup_event.py @@ -113,6 +113,8 @@ def initialize_event_collection( 'follow_up_questions': follow_up_toggle, 'extra_questions': extra_questions, # Add extra questions block 'mode': 'followup', # or "listener" / "survey" + 'interaction_limit': 450, # Default; can be customized per event later + 'second_round_prompts': { 'system_prompt': ( diff --git a/tools/2ndRoundDeliberation/initialize_listener_event.py b/tools/2ndRoundDeliberation/initialize_listener_event.py index 56c2682..bb3f08d 100644 --- a/tools/2ndRoundDeliberation/initialize_listener_event.py +++ b/tools/2ndRoundDeliberation/initialize_listener_event.py @@ -63,6 +63,10 @@ def initialize_event_collection(event_id, event_name, event_location, event_back 'language_guidance': language_guidance, 'extra_questions': extra_questions, 'mode': 'listener', # or "followup" / "survey" + 'interaction_limit': 450, # Default; can be customized per event later + 'default_model': 'gpt-4o-mini', + + 'second_round_prompts': { 'system_prompt': ( diff --git a/tools/blocked_numbers.py b/tools/blocked_numbers.py new file mode 100644 index 0000000..c348f7d --- /dev/null +++ b/tools/blocked_numbers.py @@ -0,0 +1,56 @@ +import firebase_admin +from firebase_admin import credentials, firestore +import logging +import os, json + +FIREBASE_CREDENTIALS_JSON = os.environ.get("FIREBASE_CREDENTIALS_JSON") + +if not FIREBASE_CREDENTIALS_JSON: + raise RuntimeError("Missing FIREBASE_CREDENTIALS_JSON environment variable") + +cred = credentials.Certificate(json.loads(FIREBASE_CREDENTIALS_JSON)) +firebase_admin.initialize_app(cred) +db = firestore.client() +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def initialize_blacklist_config(default_ttl_seconds=3600, initial_blocked_numbers=None): + """ + Creates or updates everything under the same 'blocked_numbers' collection. + - blocked_numbers/_config : stores cache TTL and other metadata + - blocked_numbers/ : empty doc or metadata for each blocked user + """ + blk_ref = db.collection('blocked_numbers') + + # 1. Save the cache TTL under a special _config document + config_ref = blk_ref.document('_config') + config_ref.set({ + "cache_ttl_seconds": default_ttl_seconds + }, merge=True) + + logger.info(f"[initialize_blacklist_config] Set cache_ttl_seconds={default_ttl_seconds}") + + # 2. Optionally preload blocked numbers + if initial_blocked_numbers: + for num in initial_blocked_numbers: + blk_ref.document(num).set({}) # Empty doc = blocked + logger.info(f"[initialize_blacklist_config] Added blocked number: {num}") + +if __name__ == "__main__": + initialize_blacklist_config( + default_ttl_seconds=3600, # 1-hour cache TTL + initial_blocked_numbers=[ + "+whatsapp:131xxx", + "+whatsapp:131xxx", + "whatsapp:" # add more as needed + ] + ) + + # Optional verification + doc = db.collection("blocked_numbers").document("_config").get() + print("Config exists:", doc.exists) + if doc.exists: + print("Data:", doc.to_dict()) + else: + print("⚠️ Still missing – check credentials or Firestore project ID.") diff --git a/tools/initialize_listener_event.py b/tools/initialize_listener_event.py index 88b0f79..ffab27e 100644 --- a/tools/initialize_listener_event.py +++ b/tools/initialize_listener_event.py @@ -66,6 +66,7 @@ def initialize_event_collection(event_id, event_name, event_location, event_back 'language_guidance': language_guidance, 'extra_questions': extra_questions, 'mode': 'listener' # or "followup" / "survey" + }) logger.info(f"[initialize_event_collection] Event '{event_name}' initialized/overwritten with extra questions.") diff --git a/tools/initialize_survey_event.py b/tools/initialize_survey_event.py index 83a1b0f..2c5101f 100644 --- a/tools/initialize_survey_event.py +++ b/tools/initialize_survey_event.py @@ -59,7 +59,9 @@ def initialize_event_collection( 'questions': formatted_questions, 'completion_message': completion_message, 'extra_questions': extra_questions, - 'mode': 'survey' # or "followup" / "survey" + 'mode': 'survey', # or "followup" / "survey" + 'interaction_limit': 450 # Default; can be customized per event later + }) logger.info(f"Event '{event_name}' initialized with {len(formatted_questions)} survey questions and {len(extra_questions)} extra questions.") diff --git a/whatsapp_bot/app/deliberation/tests/test_blocklist_helpers.py b/whatsapp_bot/app/deliberation/tests/test_blocklist_helpers.py new file mode 100644 index 0000000..4f16e23 --- /dev/null +++ b/whatsapp_bot/app/deliberation/tests/test_blocklist_helpers.py @@ -0,0 +1,64 @@ + +import time +import pytest + +from app.utils import blocklist_helpers + + +def test_default_ttl_is_int(): + ttl = blocklist_helpers._get_cache_ttl() + assert isinstance(ttl, int) + assert ttl > 0 + + +def test_cache_update_logic(monkeypatch): + blocklist_helpers._last_ttl_fetch = 0 + + class FakeDoc: + def __init__(self, exists=True): + self.exists = exists + def to_dict(self): + return {"cache_ttl_seconds": 42} + + class FakeDB: + def collection(self, name): + assert name == "system_settings" or name == "blocked_numbers" + return self + def document(self, name): + return self + def get(self): + return FakeDoc() + + monkeypatch.setattr(blocklist_helpers, "db", FakeDB()) + + ttl = blocklist_helpers._get_cache_ttl() + assert ttl == 42, "TTL should update from Firestore mock" + + +def test_cache_reuse(monkeypatch): + """Ensures cached TTL is reused within refresh interval.""" + blocklist_helpers._ttl_value = 99 + blocklist_helpers._last_ttl_fetch = time.time() + + class FakeDB: + def collection(self, *_): + raise AssertionError("Firestore should not be called") + monkeypatch.setattr(blocklist_helpers, "db", FakeDB()) + + ttl = blocklist_helpers._get_cache_ttl() + assert ttl == 99 + + +def test_is_blocked_number_caching(monkeypatch): + """Tests that cached phone results are reused correctly.""" + fake_phone = "12345" + now = time.time() + blocklist_helpers._cache[fake_phone] = {"value": True, "time": now} + + class FakeDB: + def collection(self, *_): + raise AssertionError("Firestore should not be called") + monkeypatch.setattr(blocklist_helpers, "db", FakeDB()) + + result = blocklist_helpers.is_blocked_number(fake_phone) + assert result is True diff --git a/whatsapp_bot/app/handlers/FollowupMode.py b/whatsapp_bot/app/handlers/FollowupMode.py index a7762c5..ddad752 100644 --- a/whatsapp_bot/app/handlers/FollowupMode.py +++ b/whatsapp_bot/app/handlers/FollowupMode.py @@ -29,6 +29,8 @@ from app.utils.validators import _norm from app.utils.validators import normalize_event_path +from app.utils.blocklist_helpers import is_blocked_number, get_interaction_limit + def is_second_round_enabled(event_id: str) -> bool: @@ -69,9 +71,13 @@ async def reply_followup(Body: str, From: str, MediaUrl0: str = None): logger.info(f"Received message from {From} with body '{Body}' and media URL {MediaUrl0}") - # Normalize phone number normalized_phone = From.replace("+", "").replace("-", "").replace(" ", "") + if is_blocked_number(normalized_phone): + logger.warning(f"[Blacklist] Ignoring message from blocked number: {normalized_phone}") + return Response(status_code=200) + + # Step 1: Retrieve or initialize user tracking document user_tracking_ref = db.collection('user_event_tracking').document(normalized_phone) user_tracking_doc = user_tracking_ref.get() @@ -746,8 +752,20 @@ def process_second_round(transaction, ref, user_msg, sr_reply=None): data = event_doc.to_dict() interactions = data.get('interactions', []) - if len(interactions) >= 450: - send_message(From, "You have reached your interaction limit with AOI. Please contact AOI for further assistance.") + interaction_limit = get_interaction_limit(current_event_id) + if len(interactions) >= interaction_limit: + logger.info(f"[FollowUpMode] {normalized_phone} reached interaction limit " + f"({len(interactions)} / {interaction_limit}) for {current_event_id}") + # Log event for moderation + db.collection("users_exceeding_limit").document(normalized_phone).set({ + "phone": normalized_phone, + "event_id": current_event_id, + "timestamp": datetime.utcnow().isoformat(), + "total_interactions": len(interactions), + "limit_used": interaction_limit + }, merge=True) + + send_message(From, f"You have reached your interaction limit ({interaction_limit}) for this event. Please contact AOI for assistance.") return Response(status_code=200) # Send user prompt to LLM diff --git a/whatsapp_bot/app/handlers/ListenerMode.py b/whatsapp_bot/app/handlers/ListenerMode.py index 7fda340..1ebe68f 100644 --- a/whatsapp_bot/app/handlers/ListenerMode.py +++ b/whatsapp_bot/app/handlers/ListenerMode.py @@ -11,7 +11,8 @@ from pydub import AudioSegment from fastapi import Response from app.deliberation.second_round_agent import run_second_round_for_user - +from app.utils.blocklist_helpers import get_interaction_limit, is_blocked_number +import random from config.config import ( db, logger, client, twilio_client, @@ -35,7 +36,8 @@ from app.utils.validators import _norm from app.utils.validators import normalize_event_path - +DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "gpt-4o-mini") +FALLBACK_MODEL = os.getenv("FALLBACK_MODEL", "gpt-4.1-mini") def is_second_round_enabled(event_id: str) -> bool: """Return True iff info.second_round_claims_source.enabled is truthy.""" @@ -65,13 +67,14 @@ def is_second_round_enabled(event_id: str) -> bool: async def reply_listener(Body: str, From: str, MediaUrl0: str = None): - - logger.info(f"Received message from {From} with body '{Body}' and media URL {MediaUrl0}") - # Normalize phone number normalized_phone = From.replace("+", "").replace("-", "").replace(" ", "") + if is_blocked_number(normalized_phone): + logger.warning(f"[Blacklist] Ignoring message from blocked number: {normalized_phone}") + return Response(status_code=200) + # Step 1: Retrieve or initialize user tracking document user_tracking_ref = db.collection('user_event_tracking').document(normalized_phone) user_tracking_doc = user_tracking_ref.get() @@ -742,8 +745,22 @@ def process_second_round(transaction, ref, user_msg, sr_reply=None): data = event_doc.to_dict() interactions = data.get('interactions', []) - if len(interactions) >= 450: - send_message(From, "You have reached your interaction limit with AOI. Please contact AOI for further assistance.") + + + interaction_limit = get_interaction_limit(current_event_id) + if len(interactions) >= interaction_limit: + logger.info(f"[Listener Mode] {normalized_phone} reached interaction limit " + f"({len(interactions)} / {interaction_limit}) for {current_event_id}") + # Log event for moderation + db.collection("users_exceeding_limit").document(normalized_phone).set({ + "phone": normalized_phone, + "event_id": current_event_id, + "timestamp": datetime.utcnow().isoformat(), + "total_interactions": len(interactions), + "limit_used": interaction_limit + }, merge=True) + + send_message(From, f"You have reached your interaction limit ({interaction_limit}) for this event. Please contact AOI for assistance.") return Response(status_code=200) # Send user prompt to LLM @@ -758,20 +775,97 @@ def process_second_round(transaction, ref, user_msg, sr_reply=None): 'interactions': firestore.ArrayUnion([{'message': Body}]) }) - run = client.beta.threads.runs.create_and_poll( - thread_id=thread.id, - assistant_id=assistant_id, - instructions=event_instructions - ) + try: + # Attempt to fetch model configuration from Firestore + event_info_ref = db.collection(normalize_event_path(current_event_id)).document("info") + event_info_doc = event_info_ref.get() + + # Pre-initialize with the environment or constant default + default_model = DEFAULT_MODEL + if event_info_doc.exists: + event_info_data = event_info_doc.to_dict() + default_model = event_info_data.get("default_model", default_model) + + logger.info(f"[LLM Config] Using model from Firestore: {default_model}") + + except Exception as e: + logger.error(f"[LLM Config] Failed to fetch model from Firestore, defaulting to {DEFAULT_MODEL}: {e}") + default_model = DEFAULT_MODEL + + + try: + # Primary model attempt + logger.info(f"[LLM Run] Starting primary run with model: {default_model}") + + run = client.beta.threads.runs.create_and_poll( + thread_id=thread.id, + assistant_id=assistant_id, + instructions=event_instructions, + model=default_model + ) + + logger.info(f"[LLM Debug] Primary run status: {getattr(run, 'status', 'N/A')}") + + # Fallback if the primary model failed or didn’t complete + if run.status != "completed": + logger.warning(f"[LLM Fallback] Model {default_model} failed, retrying with {FALLBACK_MODEL}") + + if hasattr(run, 'last_error'): + logger.error(f"[LLM Debug] last_error (primary): {run.last_error}") + if hasattr(run, 'incomplete_details'): + logger.error(f"[LLM Debug] incomplete_details (primary): {run.incomplete_details}") + + run = client.beta.threads.runs.create_and_poll( + thread_id=thread.id, + assistant_id=assistant_id, + instructions=event_instructions, + model=FALLBACK_MODEL + ) + + logger.info(f"[LLM Debug] Fallback run status: {getattr(run, 'status', 'N/A')}") + + except Exception as e: + logger.exception(f"[LLM Exception] Error while creating run: {e}") + run = None + + + # --- RESPONSE HANDLING --- + if run and run.status == "completed": + final_model = getattr(run, 'model', default_model) + logger.info(f"[LLM Success] Final model used: {final_model}") - if run.status == 'completed': messages = client.beta.threads.messages.list(thread_id=thread.id) assistant_response = extract_text_from_messages(messages) + send_message(From, assistant_response) event_doc_ref.update({ - 'interactions': firestore.ArrayUnion([{'response': assistant_response}]) + 'interactions': firestore.ArrayUnion([ + {'response': assistant_response, 'model': final_model, 'fallback': False} + ]) }) + else: - send_message(From, "There was an issue processing your request.") + logger.warning("[LLM Fallback] Both models failed or returned incomplete response.") + + if run and hasattr(run, 'last_error'): + logger.error(f"[LLM Debug] last_error (final): {run.last_error}") + if run and hasattr(run, 'incomplete_details'): + logger.error(f"[LLM Debug] incomplete_details (final): {run.incomplete_details}") + + fallback_responses = [ + "Agreed.", + "Please continue.", + "That’s an interesting point, tell me more.", + "I understand.", + "Go on, I’m listening." + ] + fallback_message = random.choice(fallback_responses) + + send_message(From, fallback_message) + event_doc_ref.update({ + 'interactions': firestore.ArrayUnion([ + {'response': fallback_message, 'model': None, 'fallback': True} + ]) + }) return Response(status_code=200) \ No newline at end of file diff --git a/whatsapp_bot/app/handlers/SurveyMode.py b/whatsapp_bot/app/handlers/SurveyMode.py index 01f3cbe..acf749b 100644 --- a/whatsapp_bot/app/handlers/SurveyMode.py +++ b/whatsapp_bot/app/handlers/SurveyMode.py @@ -32,13 +32,16 @@ ) from app.utils.survey_helpers import initialize_user_document from app.utils.validators import normalize_event_path +from app.utils.blocklist_helpers import get_interaction_limit, is_blocked_number async def reply_survey(Body: str, From: str, MediaUrl0: str = None): logger.info(f"Received message from {From} with body '{Body}' and media URL {MediaUrl0}") - # Normalize phone number normalized_phone = From.replace("+", "").replace("-", "").replace(" ", "") + if is_blocked_number(normalized_phone): + logger.warning(f"[Blacklist] Ignoring message from blocked number: {normalized_phone}") + return Response(status_code=200) # Step 1: Retrieve or initialize user tracking document user_tracking_ref = db.collection('user_event_tracking').document(normalized_phone) @@ -390,6 +393,27 @@ async def reply_survey(Body: str, From: str, MediaUrl0: str = None): send_message(From, "Survey ended. Thank you for participating!") db.collection(normalize_event_path(current_event_id)).document(normalized_phone).update({'survey_complete':True} ) return Response(status_code=200) + + # --- Interaction limit enforcement --- + interaction_limit = get_interaction_limit(current_event_id) + participant_ref = db.collection(normalize_event_path(current_event_id)).document(normalized_phone) + participant_doc = participant_ref.get() + participant_data = participant_doc.to_dict() if participant_doc.exists else {} + interactions = participant_data.get('interactions', []) + + if len(interactions) >= interaction_limit: + logger.info(f"[Survey] {normalized_phone} exceeded interaction limit ({len(interactions)} >= {interaction_limit}) for {current_event_id}") + db.collection("users_exceeding_limit").document(normalized_phone).set({ + "phone": normalized_phone, + "event_id": current_event_id, + "timestamp": datetime.utcnow().isoformat(), + "total_interactions": len(interactions), + "limit_used": interaction_limit + }, merge=True) + + send_message(From, f"You have reached your interaction limit ({interaction_limit}) for this survey. Please contact AOI for assistance.") + return Response(status_code=200) + # Step 10: Survey question loop ev_ref = db.collection(normalize_event_path(current_event_id)).document(normalized_phone) diff --git a/whatsapp_bot/app/utils/blocklist_helpers.py b/whatsapp_bot/app/utils/blocklist_helpers.py new file mode 100644 index 0000000..b3f70e8 --- /dev/null +++ b/whatsapp_bot/app/utils/blocklist_helpers.py @@ -0,0 +1,105 @@ +import time +from config.config import db, logger +from app.utils.validators import normalize_event_path + +# In-memory cache for phone lookups +_cache = {} +_DEFAULT_TTL = 60 # fallback if Firestore config missing +_last_ttl_fetch = 0 +_ttl_value = _DEFAULT_TTL +_TTL_REFRESH_INTERVAL = 60 # how often (seconds) to re-check Firestore for new TTL + + +def _get_cache_ttl() -> int: + """ + Fetch the TTL configuration from Firestore system_settings/blacklist_config. + Cached locally to avoid frequent reads. + """ + global _last_ttl_fetch, _ttl_value + + now = time.time() + # only fetch TTL value once per _TTL_REFRESH_INTERVAL + if now - _last_ttl_fetch < _TTL_REFRESH_INTERVAL: + return _ttl_value + + try: + doc = db.collection("blocked_numbers").document("_config").get() + if doc.exists: + val = doc.to_dict().get("cache_ttl_seconds", _DEFAULT_TTL) + _ttl_value = int(val) + logger.info(f"[Blacklist] TTL updated from Firestore: {_ttl_value}s") + else: + _ttl_value = _DEFAULT_TTL + except Exception as e: + logger.error(f"[Blacklist] Failed to load TTL config: {e}") + _ttl_value = _DEFAULT_TTL + + _last_ttl_fetch = now + return _ttl_value + + +def is_blocked_number(phone: str) -> bool: + """ + Return True if the normalized phone is in Firestore blocked_numbers collection. + Uses a dynamically configurable TTL for caching results. + """ + now = time.time() + ttl = _get_cache_ttl() + + cached = _cache.get(phone) + if cached and now - cached['time'] < ttl: + return cached['value'] + + try: + ref = db.collection('blocked_numbers').document(phone) + doc = ref.get() + blocked = doc.exists + _cache[phone] = {'value': blocked, 'time': now} + if blocked: + logger.info(f"[Blacklist] Blocked number detected: {phone}") + return blocked + except Exception as e: + logger.error(f"[Blacklist] Error checking {phone}: {e}") + return False + +# Cache for limits (keyed by event_id) +_LIMIT_CACHE = {} +_LIMIT_CACHE_TTL = 60 # 1 minute +_DEFAULT_LIMIT = 450 + +def get_interaction_limit(event_id: str) -> int: + """ + Fetch interaction limit for a given event. + Priority: + 1. AOI_/info.interaction_limit (per event) + 2. system_config/interaction_limits.max_interactions_per_user (global) + 3. fallback 450 + Cached per event for 1 minute. + """ + now = time.time() + cached = _LIMIT_CACHE.get(event_id) + if cached and now - cached["time"] < _LIMIT_CACHE_TTL: + return cached["value"] + + limit = _DEFAULT_LIMIT + + try: + # Try per-event config + doc = db.collection(normalize_event_path(event_id)).document("info").get() + if doc.exists: + data = doc.to_dict() or {} + limit = int(data.get("interaction_limit", _DEFAULT_LIMIT)) + else: + logger.warning(f"[SystemConfig] Missing info doc for event {event_id}; falling back to global limit.") + + # Try global fallback (only if not found) + if limit == _DEFAULT_LIMIT: + sys_doc = db.collection("system_config").document("interaction_limits").get() + if sys_doc.exists: + limit = int(sys_doc.to_dict().get("max_interactions_per_user", _DEFAULT_LIMIT)) + except Exception as e: + logger.error(f"[SystemConfig] Failed to load interaction limit for {event_id}: {e}") + limit = _DEFAULT_LIMIT + + _LIMIT_CACHE[event_id] = {"value": limit, "time": now} + return limit