diff --git a/src/backend/apps/consumer/processor.py b/src/backend/apps/consumer/processor.py index 98891b25d..e74fed417 100644 --- a/src/backend/apps/consumer/processor.py +++ b/src/backend/apps/consumer/processor.py @@ -1,5 +1,4 @@ import io -import json import logging import time from math import floor @@ -14,7 +13,6 @@ import boto3 import aio_pika import asyncio -import aiofiles from pathlib import Path from PIL import Image, ImageDraw, ImageFont from .db import get_all_from_db, load_index_from_db @@ -57,16 +55,12 @@ S3_REGION = os.getenv("S3_REGION") S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY") S3_SECRET_KEY = os.getenv("S3_SECRET_KEY") -RABBITMQ_URL = os.getenv("RABBITMQ_URL") S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL") QUEUE_NAME = os.getenv("RABBITMQ_QUEUE_NAME") QUEUE_MAX_BYTES = int(os.getenv("RABBITMQ_QUEUE_MAX_BYTES")) EXCHANGE_NAME = os.getenv("RABBITMQ_EXCHANGE_NAME") CAMERA_CACHE_REFRESH_SECONDS = int(os.getenv("CAMERA_CACHE_REFRESH_SECONDS", "60")) - -#boto3.set_stream_logger('botocore', logging.DEBUG) - # S3 client configuration config = Config( signature_version='s3v4', @@ -96,33 +90,25 @@ last_camera_refresh = 0.0 image_invalid = False -async def run_consumer(): +async def consume_from(rb_url: str): """ - Long-running RabbitMQ consumer that processes image messages and watermarks them. - Shuts down cleanly when stop_event is set. + Consumer instance bound to a single RabbitMQ URL. """ - rb_url = os.getenv("RABBITMQ_URL") - if not rb_url: - raise RuntimeError("RABBITMQ_URL environment variable is not set.") - - rows = await sync_to_async(get_all_from_db)() - - global db_data, last_camera_refresh - db_data = process_camera_rows(rows) - last_camera_refresh = time.time() - if not db_data: - logger.error("No camera data available for watermarking. Consumer exiting.") + try: + db_data = await parse_rows(rb_url) + except Exception as exc: + logger.error("Failed to initialize consumer for %s: %s", rb_url, exc) return - logger.info("Starting RabbitMQ consumer.") + logger.info("Starting RabbitMQ consumer for %s", rb_url) - connection: Optional[aio_pika.RobustConnection] = None - channel: Optional[aio_pika.RobustChannel] = None + connection = None + channel = None queue = None try: connection = await aio_pika.connect_robust(rb_url) - logger.info("Connected to RabbitMQ.") + logger.info("Connected to RabbitMQ at %s", rb_url) channel = await connection.channel() # Limit how many unacked messages take at once @@ -143,13 +129,13 @@ async def run_consumer(): ) await queue.bind(exchange) - logger.info("Queue bound to exchange; beginning consume loop.") + logger.info("Queue bound to exchange for %s; beginning consume loop.", rb_url) # Consume loop async with queue.iterator() as queue_iter: async for message in queue_iter: if stop_event.is_set(): - logger.info("Stop requested. Breaking consume loop.") + logger.info("Stop requested. Breaking consume loop for %s.", rb_url) break async with message.process(ignore_processed=True): @@ -159,39 +145,102 @@ async def run_consumer(): camera_status = calculate_camera_status(timestamp_utc) try: - timestamp_local = generate_local_timestamp(db_data, camera_id, timestamp_utc) - # # # For testing purposes, only allow camera with IDs below to be processed - # if camera_id != "57": + timestamp_local = generate_local_timestamp( + db_data, camera_id, timestamp_utc + ) + + # # For testing purposes, only allow camera with IDs below to be processed + # if camera_id != "524": # logger.info("Skipping processing for camera %s", camera_id) # continue - await handle_image_message(camera_id, message.body, timestamp_local, camera_status) - logger.info("Processed message for camera %s.", camera_id) + await handle_image_message( + camera_id, + message.body, + timestamp_local, + camera_status, + ) + logger.info("Processed message for camera %s on %s.", camera_id, rb_url) + except Exception as e: logger.exception("Failed processing message (camera %s): %s", camera_id, e) - logger.info("Exited message iterator.") + logger.info("Exited message iterator for %s.", rb_url) except asyncio.CancelledError: - logger.info("Consumer cancelled; shutting down.") + logger.info("Consumer cancelled for %s.", rb_url) raise except ChannelInvalidStateError: - logger.warning("AMQP channel closed unexpectedly during shutdown.") + logger.warning("AMQP channel closed unexpectedly for %s.", rb_url) except Exception as e: - logger.exception("Unhandled error in consumer: %s", e) + logger.exception("Unhandled error in consumer for %s: %s", rb_url, e) finally: - logger.info("Cleaning up RabbitMQ resources...") + logger.info("Cleaning up RabbitMQ resources for %s...", rb_url) if channel: - try: + try: await channel.close() - except Exception as e: + except Exception as e: logger.warning("Error closing channel: %s", e) if connection: - try: + try: await connection.close() - except Exception as e: + except Exception as e: logger.warning("Error closing connection: %s", e) - logger.info("Consumer stopped.") + logger.info("Consumer stopped for %s.", rb_url) + + +async def parse_rows(rb_url: str): + """ + Loads camera rows from DB and prepares DB metadata for this RabbitMQ URL. + Raises an exception if no data is available. + """ + logger.info("Fetching DB camera rows for %s", rb_url) + + rows = await sync_to_async(get_all_from_db)() + db_data = process_camera_rows(rows) + + if not db_data: + raise RuntimeError( + f"No camera data available for watermarking. Consumer ({rb_url}) exiting." + ) + + logger.info("DB camera data loaded for %s (%d rows).", rb_url, len(db_data)) + return db_data + + +async def run_consumer(): + """ + Launch consumers for Gold and GoldDR in parallel. + Each consumer listens to its own RabbitMQ instance. + """ + gold_url = os.getenv("RABBITMQ_URL_GOLD") + golddr_url = os.getenv("RABBITMQ_URL_GOLDDR") + + if not gold_url and not golddr_url: + raise RuntimeError("No RabbitMQ URLs configured. At least one is required.") + + tasks = [] + + if gold_url: + logger.info("Starting GOLD consumer...") + tasks.append(asyncio.create_task(consume_from(gold_url))) + + if golddr_url: + logger.info("Starting GOLDDR consumer...") + tasks.append(asyncio.create_task(consume_from(golddr_url))) + + logger.info("All configured RabbitMQ consumers started.") + + # Wait until stop_event is set (SIGTERM / SIGINT) + try: + await stop_event.wait() + finally: + logger.info("Stop event received; cancelling consumers...") + for task in tasks: + task.cancel() + + await asyncio.gather(*tasks, return_exceptions=True) + logger.info("All consumers stopped.") def shutdown(): """Signal handler to gracefully stop the consumer.""" diff --git a/src/backend/apps/webcam/tasks.py b/src/backend/apps/webcam/tasks.py index 4eaac9692..24a37e401 100644 --- a/src/backend/apps/webcam/tasks.py +++ b/src/backend/apps/webcam/tasks.py @@ -60,7 +60,6 @@ S3_REGION = os.getenv("S3_REGION") S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY") S3_SECRET_KEY = os.getenv("S3_SECRET_KEY") -RABBITMQ_URL = os.getenv("RABBITMQ_URL") S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL") # Define PVC directory