Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 90 additions & 41 deletions src/backend/apps/consumer/processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import io
import json
import logging
import time
from math import floor
Expand All @@ -14,7 +13,6 @@
import boto3
import aio_pika
import asyncio
import aiofiles
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont
from .db import get_all_from_db, load_index_from_db
Expand Down Expand Up @@ -57,16 +55,12 @@
S3_REGION = os.getenv("S3_REGION")
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
RABBITMQ_URL = os.getenv("RABBITMQ_URL")
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL")
QUEUE_NAME = os.getenv("RABBITMQ_QUEUE_NAME")
QUEUE_MAX_BYTES = int(os.getenv("RABBITMQ_QUEUE_MAX_BYTES"))
EXCHANGE_NAME = os.getenv("RABBITMQ_EXCHANGE_NAME")
CAMERA_CACHE_REFRESH_SECONDS = int(os.getenv("CAMERA_CACHE_REFRESH_SECONDS", "60"))


#boto3.set_stream_logger('botocore', logging.DEBUG)

# S3 client configuration
config = Config(
signature_version='s3v4',
Expand Down Expand Up @@ -96,33 +90,25 @@
last_camera_refresh = 0.0
image_invalid = False

async def run_consumer():
async def consume_from(rb_url: str):
"""
Long-running RabbitMQ consumer that processes image messages and watermarks them.
Shuts down cleanly when stop_event is set.
Consumer instance bound to a single RabbitMQ URL.
"""
rb_url = os.getenv("RABBITMQ_URL")
if not rb_url:
raise RuntimeError("RABBITMQ_URL environment variable is not set.")

rows = await sync_to_async(get_all_from_db)()

global db_data, last_camera_refresh
db_data = process_camera_rows(rows)
last_camera_refresh = time.time()
if not db_data:
logger.error("No camera data available for watermarking. Consumer exiting.")
try:
db_data = await parse_rows(rb_url)
except Exception as exc:
logger.error("Failed to initialize consumer for %s: %s", rb_url, exc)
return

logger.info("Starting RabbitMQ consumer.")
logger.info("Starting RabbitMQ consumer for %s", rb_url)

connection: Optional[aio_pika.RobustConnection] = None
channel: Optional[aio_pika.RobustChannel] = None
connection = None
channel = None
queue = None

try:
connection = await aio_pika.connect_robust(rb_url)
logger.info("Connected to RabbitMQ.")
logger.info("Connected to RabbitMQ at %s", rb_url)

channel = await connection.channel()
# Limit how many unacked messages take at once
Expand All @@ -143,13 +129,13 @@ async def run_consumer():
)

await queue.bind(exchange)
logger.info("Queue bound to exchange; beginning consume loop.")
logger.info("Queue bound to exchange for %s; beginning consume loop.", rb_url)

# Consume loop
async with queue.iterator() as queue_iter:
async for message in queue_iter:
if stop_event.is_set():
logger.info("Stop requested. Breaking consume loop.")
logger.info("Stop requested. Breaking consume loop for %s.", rb_url)
break

async with message.process(ignore_processed=True):
Expand All @@ -159,39 +145,102 @@ async def run_consumer():
camera_status = calculate_camera_status(timestamp_utc)

try:
timestamp_local = generate_local_timestamp(db_data, camera_id, timestamp_utc)
# # # For testing purposes, only allow camera with IDs below to be processed
# if camera_id != "57":
timestamp_local = generate_local_timestamp(
db_data, camera_id, timestamp_utc
)

# # For testing purposes, only allow camera with IDs below to be processed
# if camera_id != "524":
# logger.info("Skipping processing for camera %s", camera_id)
# continue
await handle_image_message(camera_id, message.body, timestamp_local, camera_status)
logger.info("Processed message for camera %s.", camera_id)
await handle_image_message(
camera_id,
message.body,
timestamp_local,
camera_status,
)
logger.info("Processed message for camera %s on %s.", camera_id, rb_url)

except Exception as e:
logger.exception("Failed processing message (camera %s): %s", camera_id, e)

logger.info("Exited message iterator.")
logger.info("Exited message iterator for %s.", rb_url)

except asyncio.CancelledError:
logger.info("Consumer cancelled; shutting down.")
logger.info("Consumer cancelled for %s.", rb_url)
raise
except ChannelInvalidStateError:
logger.warning("AMQP channel closed unexpectedly during shutdown.")
logger.warning("AMQP channel closed unexpectedly for %s.", rb_url)
except Exception as e:
logger.exception("Unhandled error in consumer: %s", e)
logger.exception("Unhandled error in consumer for %s: %s", rb_url, e)
finally:
logger.info("Cleaning up RabbitMQ resources...")
logger.info("Cleaning up RabbitMQ resources for %s...", rb_url)
if channel:
try:
try:
await channel.close()
except Exception as e:
except Exception as e:
logger.warning("Error closing channel: %s", e)
if connection:
try:
try:
await connection.close()
except Exception as e:
except Exception as e:
logger.warning("Error closing connection: %s", e)

logger.info("Consumer stopped.")
logger.info("Consumer stopped for %s.", rb_url)


async def parse_rows(rb_url: str):
"""
Loads camera rows from DB and prepares DB metadata for this RabbitMQ URL.
Raises an exception if no data is available.
"""
logger.info("Fetching DB camera rows for %s", rb_url)

rows = await sync_to_async(get_all_from_db)()
db_data = process_camera_rows(rows)

if not db_data:
raise RuntimeError(
f"No camera data available for watermarking. Consumer ({rb_url}) exiting."
)

logger.info("DB camera data loaded for %s (%d rows).", rb_url, len(db_data))
return db_data


async def run_consumer():
"""
Launch consumers for Gold and GoldDR in parallel.
Each consumer listens to its own RabbitMQ instance.
"""
gold_url = os.getenv("RABBITMQ_URL_GOLD")
golddr_url = os.getenv("RABBITMQ_URL_GOLDDR")

if not gold_url and not golddr_url:
raise RuntimeError("No RabbitMQ URLs configured. At least one is required.")

tasks = []

if gold_url:
logger.info("Starting GOLD consumer...")
tasks.append(asyncio.create_task(consume_from(gold_url)))

if golddr_url:
logger.info("Starting GOLDDR consumer...")
tasks.append(asyncio.create_task(consume_from(golddr_url)))

logger.info("All configured RabbitMQ consumers started.")

# Wait until stop_event is set (SIGTERM / SIGINT)
try:
await stop_event.wait()
finally:
logger.info("Stop event received; cancelling consumers...")
for task in tasks:
task.cancel()

await asyncio.gather(*tasks, return_exceptions=True)
logger.info("All consumers stopped.")

def shutdown():
"""Signal handler to gracefully stop the consumer."""
Expand Down
1 change: 0 additions & 1 deletion src/backend/apps/webcam/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
S3_REGION = os.getenv("S3_REGION")
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
RABBITMQ_URL = os.getenv("RABBITMQ_URL")
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL")

# Define PVC directory
Expand Down