Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions flow_deployments/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
from src.config import settings
from src.flows.storage.memes import ocr_uploaded_memes

deployment_ocr_uploaded_memes = Deployment.build_from_flow(
flow=ocr_uploaded_memes,
name="OCR Uploaded Memes",
schedules=[CronSchedule(cron="*/5 * * * *", timezone="Europe/London")],
work_pool_name=settings.ENVIRONMENT,
)
if settings.OCR_ENABLED:
deployment_ocr_uploaded_memes = Deployment.build_from_flow(
flow=ocr_uploaded_memes,
name="OCR Uploaded Memes",
schedules=[CronSchedule(cron="*/5 * * * *", timezone="Europe/London")],
work_pool_name=settings.ENVIRONMENT,
)

deployment_ocr_uploaded_memes.apply()
deployment_ocr_uploaded_memes.apply()
else:
print(
"Skipping Prefect deployment for OCR Uploaded Memes because OCR is disabled. "
"Set OCR_ENABLED=true to deploy the OCR flow."
)
2 changes: 2 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class Config(BaseSettings):

OPENAI_API_KEY: str | None = None

OCR_ENABLED: bool = False

# @model_validator(mode="after")
# def validate_sentry_non_local(self) -> "Config":
# if self.ENVIRONMENT.is_deployed and not self.SENTRY_DSN:
Expand Down
99 changes: 69 additions & 30 deletions src/flows/storage/memes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from prefect import flow, get_run_logger

from src.config import settings
from src.storage import ads
from src.storage.constants import MemeSourceType, MemeStatus, MemeType
from src.storage.etl import (
Expand Down Expand Up @@ -38,6 +39,13 @@ async def ocr_meme_content(
meme_id: int, content: bytes, language: str
) -> dict[str, Any] | None:
logger = get_run_logger()
if not settings.OCR_ENABLED:
logger.info(
"Skipping OCR for meme %s because OCR_ENABLED is disabled. "
"Set OCR_ENABLED=true to re-enable OCR.",
meme_id,
)
return True
logger.debug(f"OCRing meme {meme_id} content.")
if language not in ("en", "ru"):
logger.info(f"Can't OCR meme with language_code: {language}")
Expand Down Expand Up @@ -126,25 +134,31 @@ async def tg_meme_pipeline() -> None:
logger.info("Getting unloaded memes to upload to Telegram.")
unloaded_memes = await get_unloaded_tg_memes(limit=100)
logger.info(f"Received {len(unloaded_memes)} memes to upload to Telegram.")
if not settings.OCR_ENABLED:
logger.info(
"OCR is currently disabled. Memes will be processed without OCR. "
"Set OCR_ENABLED=true to resume OCR checks."
)
for unloaded_meme in unloaded_memes:
meme = await upload_meme_to_telegram(unloaded_meme)
if not meme or meme["type"] != MemeType.IMAGE:
continue

res = await ocr_meme_content(
meme["id"],
meme["__original_content"],
meme["language_code"],
)
if settings.OCR_ENABLED:
res = await ocr_meme_content(
meme["id"],
meme["__original_content"],
meme["language_code"],
)

if res is None:
logger.warning(
"""
if res is None:
logger.warning(
"""
org_meme_content returned NULL, meaning OCR doesn't work.
To save on quota I quit from tg_meme_pipeline
"""
)
return
)
return

# next step of a pipeline
await final_meme_pipeline()
Expand All @@ -164,25 +178,31 @@ async def vk_meme_pipeline() -> None:
logger.info("Getting unloaded memes to upload to Telegram.")
unloaded_memes = await get_unloaded_vk_memes(limit=100)
logger.info(f"Received {len(unloaded_memes)}" " memes to upload to Telegram.")
if not settings.OCR_ENABLED:
logger.info(
"OCR is currently disabled. Memes will be processed without OCR. "
"Set OCR_ENABLED=true to resume OCR checks."
)
for unloaded_meme in unloaded_memes:
meme = await upload_meme_to_telegram(unloaded_meme)
if not meme or meme["type"] != MemeType.IMAGE:
continue

res = await ocr_meme_content(
meme["id"],
meme["__original_content"],
meme["language_code"],
)
if settings.OCR_ENABLED:
res = await ocr_meme_content(
meme["id"],
meme["__original_content"],
meme["language_code"],
)

if res is None:
logger.warning(
"""
if res is None:
logger.warning(
"""
org_meme_content returned NULL, meaning OCR doesn't work.
To save on quota I quit from tg_meme_pipeline
"""
)
return
)
return

# next step of a pipeline
await final_meme_pipeline()
Expand All @@ -202,25 +222,31 @@ async def ig_meme_pipeline() -> None:
logger.info("Getting unloaded memes to upload to Telegram.")
unloaded_memes = await get_unloaded_ig_memes(limit=100)
logger.info(f"Received {len(unloaded_memes)}" " memes to upload to Telegram.")
if not settings.OCR_ENABLED:
logger.info(
"OCR is currently disabled. Memes will be processed without OCR. "
"Set OCR_ENABLED=true to resume OCR checks."
)
for unloaded_meme in unloaded_memes:
meme = await upload_meme_to_telegram(unloaded_meme)
if not meme or meme["type"] != MemeType.IMAGE:
continue

res = await ocr_meme_content(
meme["id"],
meme["__original_content"],
meme["language_code"],
)
if settings.OCR_ENABLED:
res = await ocr_meme_content(
meme["id"],
meme["__original_content"],
meme["language_code"],
)

if res is None:
logger.warning(
"""
if res is None:
logger.warning(
"""
org_meme_content returned NULL, meaning OCR doesn't work.
To save on quota I quit from tg_meme_pipeline
"""
)
return
)
return

await asyncio.sleep(3) # flood control

Expand All @@ -237,6 +263,14 @@ async def ocr_uploaded_memes(limit=100):
"""
logger = get_run_logger()
memes = await get_memes_to_ocr(limit=limit)
if not settings.OCR_ENABLED:
logger.info(
"OCR is disabled. Skipping OCR for uploaded memes. "
"Set OCR_ENABLED=true to re-enable OCR."
)
await final_meme_pipeline()
return

logger.info(f"Going to OCR {len(memes)} memes.")

for meme in memes:
Expand Down Expand Up @@ -286,6 +320,11 @@ async def final_meme_pipeline() -> None:

memes = await get_pending_memes()
logger.info(f"Final meme pipeline has {len(memes)} pending memes.")
if not settings.OCR_ENABLED:
logger.info(
"OCR is disabled. Duplicates will only be detected for memes with "
"existing OCR payloads. Set OCR_ENABLED=true to restore full OCR checks."
)

for meme in memes:
await analyse_meme_caption(meme)
Expand Down
14 changes: 1 addition & 13 deletions src/storage/service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any

from sqlalchemy import nulls_first, or_, select, text
from sqlalchemy import nulls_first, select, text

from src.database import (
fetch_all,
Expand Down Expand Up @@ -71,12 +71,6 @@ async def get_pending_memes() -> list[dict[str, Any]]:
select(meme)
.where(meme.c.status == MemeStatus.CREATED)
.where(meme.c.telegram_file_id.is_not(None))
.where(
or_(
meme.c.ocr_result.is_not(None),
meme.c.type != MemeType.IMAGE,
)
)
.order_by(nulls_first(meme.c.created_at))
)
return await fetch_all(select_query)
Expand Down Expand Up @@ -203,12 +197,6 @@ async def update_meme_status_of_ready_memes() -> list[dict[str, Any]]:
meme.update()
.where(meme.c.status == MemeStatus.CREATED)
.where(meme.c.telegram_file_id.is_not(None))
.where(
or_(
meme.c.ocr_result.is_not(None),
meme.c.type != MemeType.IMAGE,
)
)
.where(meme.c.duplicate_of.is_(None))
.values(status=MemeStatus.OK)
.returning(meme)
Expand Down
Loading