From 155151e5067a7abda877433ad72df9830979f50e Mon Sep 17 00:00:00 2001 From: Daniil Okhlopkov <5613295+ohld@users.noreply.github.com> Date: Sun, 5 Oct 2025 14:41:00 +0300 Subject: [PATCH] Add OCR feature flag and guard pipelines when disabled --- flow_deployments/storage.py | 20 +++++--- src/config.py | 2 + src/flows/storage/memes.py | 99 ++++++++++++++++++++++++++----------- src/storage/service.py | 14 +----- 4 files changed, 85 insertions(+), 50 deletions(-) diff --git a/flow_deployments/storage.py b/flow_deployments/storage.py index 32084b32..83c252c7 100644 --- a/flow_deployments/storage.py +++ b/flow_deployments/storage.py @@ -4,11 +4,17 @@ from src.config import settings from src.flows.storage.memes import ocr_uploaded_memes -deployment_ocr_uploaded_memes = Deployment.build_from_flow( - flow=ocr_uploaded_memes, - name="OCR Uploaded Memes", - schedules=[CronSchedule(cron="*/5 * * * *", timezone="Europe/London")], - work_pool_name=settings.ENVIRONMENT, -) +if settings.OCR_ENABLED: + deployment_ocr_uploaded_memes = Deployment.build_from_flow( + flow=ocr_uploaded_memes, + name="OCR Uploaded Memes", + schedules=[CronSchedule(cron="*/5 * * * *", timezone="Europe/London")], + work_pool_name=settings.ENVIRONMENT, + ) -deployment_ocr_uploaded_memes.apply() + deployment_ocr_uploaded_memes.apply() +else: + print( + "Skipping Prefect deployment for OCR Uploaded Memes because OCR is disabled. " + "Set OCR_ENABLED=true to deploy the OCR flow." + ) diff --git a/src/config.py b/src/config.py index 12d7f100..26bab90f 100644 --- a/src/config.py +++ b/src/config.py @@ -44,6 +44,8 @@ class Config(BaseSettings): OPENAI_API_KEY: str | None = None + OCR_ENABLED: bool = False + # @model_validator(mode="after") # def validate_sentry_non_local(self) -> "Config": # if self.ENVIRONMENT.is_deployed and not self.SENTRY_DSN: diff --git a/src/flows/storage/memes.py b/src/flows/storage/memes.py index 51ff6029..4552766e 100644 --- a/src/flows/storage/memes.py +++ b/src/flows/storage/memes.py @@ -4,6 +4,7 @@ from prefect import flow, get_run_logger +from src.config import settings from src.storage import ads from src.storage.constants import MemeSourceType, MemeStatus, MemeType from src.storage.etl import ( @@ -38,6 +39,13 @@ async def ocr_meme_content( meme_id: int, content: bytes, language: str ) -> dict[str, Any] | None: logger = get_run_logger() + if not settings.OCR_ENABLED: + logger.info( + "Skipping OCR for meme %s because OCR_ENABLED is disabled. " + "Set OCR_ENABLED=true to re-enable OCR.", + meme_id, + ) + return True logger.debug(f"OCRing meme {meme_id} content.") if language not in ("en", "ru"): logger.info(f"Can't OCR meme with language_code: {language}") @@ -126,25 +134,31 @@ async def tg_meme_pipeline() -> None: logger.info("Getting unloaded memes to upload to Telegram.") unloaded_memes = await get_unloaded_tg_memes(limit=100) logger.info(f"Received {len(unloaded_memes)} memes to upload to Telegram.") + if not settings.OCR_ENABLED: + logger.info( + "OCR is currently disabled. Memes will be processed without OCR. " + "Set OCR_ENABLED=true to resume OCR checks." + ) for unloaded_meme in unloaded_memes: meme = await upload_meme_to_telegram(unloaded_meme) if not meme or meme["type"] != MemeType.IMAGE: continue - res = await ocr_meme_content( - meme["id"], - meme["__original_content"], - meme["language_code"], - ) + if settings.OCR_ENABLED: + res = await ocr_meme_content( + meme["id"], + meme["__original_content"], + meme["language_code"], + ) - if res is None: - logger.warning( - """ + if res is None: + logger.warning( + """ org_meme_content returned NULL, meaning OCR doesn't work. To save on quota I quit from tg_meme_pipeline """ - ) - return + ) + return # next step of a pipeline await final_meme_pipeline() @@ -164,25 +178,31 @@ async def vk_meme_pipeline() -> None: logger.info("Getting unloaded memes to upload to Telegram.") unloaded_memes = await get_unloaded_vk_memes(limit=100) logger.info(f"Received {len(unloaded_memes)}" " memes to upload to Telegram.") + if not settings.OCR_ENABLED: + logger.info( + "OCR is currently disabled. Memes will be processed without OCR. " + "Set OCR_ENABLED=true to resume OCR checks." + ) for unloaded_meme in unloaded_memes: meme = await upload_meme_to_telegram(unloaded_meme) if not meme or meme["type"] != MemeType.IMAGE: continue - res = await ocr_meme_content( - meme["id"], - meme["__original_content"], - meme["language_code"], - ) + if settings.OCR_ENABLED: + res = await ocr_meme_content( + meme["id"], + meme["__original_content"], + meme["language_code"], + ) - if res is None: - logger.warning( - """ + if res is None: + logger.warning( + """ org_meme_content returned NULL, meaning OCR doesn't work. To save on quota I quit from tg_meme_pipeline """ - ) - return + ) + return # next step of a pipeline await final_meme_pipeline() @@ -202,25 +222,31 @@ async def ig_meme_pipeline() -> None: logger.info("Getting unloaded memes to upload to Telegram.") unloaded_memes = await get_unloaded_ig_memes(limit=100) logger.info(f"Received {len(unloaded_memes)}" " memes to upload to Telegram.") + if not settings.OCR_ENABLED: + logger.info( + "OCR is currently disabled. Memes will be processed without OCR. " + "Set OCR_ENABLED=true to resume OCR checks." + ) for unloaded_meme in unloaded_memes: meme = await upload_meme_to_telegram(unloaded_meme) if not meme or meme["type"] != MemeType.IMAGE: continue - res = await ocr_meme_content( - meme["id"], - meme["__original_content"], - meme["language_code"], - ) + if settings.OCR_ENABLED: + res = await ocr_meme_content( + meme["id"], + meme["__original_content"], + meme["language_code"], + ) - if res is None: - logger.warning( - """ + if res is None: + logger.warning( + """ org_meme_content returned NULL, meaning OCR doesn't work. To save on quota I quit from tg_meme_pipeline """ - ) - return + ) + return await asyncio.sleep(3) # flood control @@ -237,6 +263,14 @@ async def ocr_uploaded_memes(limit=100): """ logger = get_run_logger() memes = await get_memes_to_ocr(limit=limit) + if not settings.OCR_ENABLED: + logger.info( + "OCR is disabled. Skipping OCR for uploaded memes. " + "Set OCR_ENABLED=true to re-enable OCR." + ) + await final_meme_pipeline() + return + logger.info(f"Going to OCR {len(memes)} memes.") for meme in memes: @@ -286,6 +320,11 @@ async def final_meme_pipeline() -> None: memes = await get_pending_memes() logger.info(f"Final meme pipeline has {len(memes)} pending memes.") + if not settings.OCR_ENABLED: + logger.info( + "OCR is disabled. Duplicates will only be detected for memes with " + "existing OCR payloads. Set OCR_ENABLED=true to restore full OCR checks." + ) for meme in memes: await analyse_meme_caption(meme) diff --git a/src/storage/service.py b/src/storage/service.py index 9e02ed81..674d1339 100644 --- a/src/storage/service.py +++ b/src/storage/service.py @@ -1,6 +1,6 @@ from typing import Any -from sqlalchemy import nulls_first, or_, select, text +from sqlalchemy import nulls_first, select, text from src.database import ( fetch_all, @@ -71,12 +71,6 @@ async def get_pending_memes() -> list[dict[str, Any]]: select(meme) .where(meme.c.status == MemeStatus.CREATED) .where(meme.c.telegram_file_id.is_not(None)) - .where( - or_( - meme.c.ocr_result.is_not(None), - meme.c.type != MemeType.IMAGE, - ) - ) .order_by(nulls_first(meme.c.created_at)) ) return await fetch_all(select_query) @@ -203,12 +197,6 @@ async def update_meme_status_of_ready_memes() -> list[dict[str, Any]]: meme.update() .where(meme.c.status == MemeStatus.CREATED) .where(meme.c.telegram_file_id.is_not(None)) - .where( - or_( - meme.c.ocr_result.is_not(None), - meme.c.type != MemeType.IMAGE, - ) - ) .where(meme.c.duplicate_of.is_(None)) .values(status=MemeStatus.OK) .returning(meme)