From 239828cb6c3a6fab20e69c19c2b06c0b55e00490 Mon Sep 17 00:00:00 2001 From: Marcello Date: Fri, 27 Mar 2026 01:25:59 +0100 Subject: [PATCH] feat: add beauty functionality --- .env.example | 3 +- .gitignore | 4 +- requirements_dev.txt | 1 + src/beauty.py | 125 +++++++++++++++++++++++++++++++++++++++++++ src/handlers.py | 50 +++++++++-------- tests/test_beauty.py | 96 +++++++++++++++++++++++++++++++++ 6 files changed, 255 insertions(+), 24 deletions(-) create mode 100644 src/beauty.py create mode 100644 tests/test_beauty.py diff --git a/.env.example b/.env.example index 4b679c2..09c905f 100644 --- a/.env.example +++ b/.env.example @@ -1 +1,2 @@ -BOT_TOKEN="INSERT_YOUR_BOT_TOKEN_HERE_MY_FRIEND" \ No newline at end of file +BOT_TOKEN="INSERT_YOUR_BOT_TOKEN_HERE_MY_FRIEND" +PIXABAY_TOKEN="INSERT_YOUR_PIXABAY_TOKEN_MY_FRIEND" \ No newline at end of file diff --git a/.gitignore b/.gitignore index 7cc80c0..66e30eb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ **/__pycache__ .venv/ -.tmp/ .env -.coverage \ No newline at end of file +.coverage +tmp/ \ No newline at end of file diff --git a/requirements_dev.txt b/requirements_dev.txt index 40417d7..8dd6dd4 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -7,3 +7,4 @@ flake8 black isort types-yt-dlp +types-requests \ No newline at end of file diff --git a/src/beauty.py b/src/beauty.py new file mode 100644 index 0000000..6fa8a09 --- /dev/null +++ b/src/beauty.py @@ -0,0 +1,125 @@ +import asyncio +import os +import random +from datetime import date +from pathlib import Path + +import requests +from dotenv import load_dotenv +from telegram import Update + +DEFAULT_QUERY = "woman,models,real,beauty,pose" +DEFAULT_OUTPUT_PATH = "tmp/downloads/beauty_of_the_day.png" +_BEAUTY_CACHE_LOCK = asyncio.Lock() + + +async def handle_beauty(update: Update): + message = update.message + if message is None: + return + + path = await get_beauty_image_path() + if path is None: + print("Errore nel download dell'immagine") + return + + with path.open("rb") as image_file: + await message.reply_photo(photo=image_file) + + +def is_beauty_image_fresh(path: Path, today: date | None = None) -> bool: + if not path.exists(): + return False + + if today is None: + today = date.today() + + return date.fromtimestamp(path.stat().st_mtime) == today + + +async def get_beauty_image_path( + output_path: str = DEFAULT_OUTPUT_PATH, + downloader=None, +) -> Path | None: + if downloader is None: + downloader = download_beauty_image + + async with _BEAUTY_CACHE_LOCK: + path = Path(output_path) + + if is_beauty_image_fresh(path): + print("già scaricata, invio l'immagine") + return path + + result = await asyncio.to_thread(downloader, DEFAULT_QUERY, output_path) + if result is None: + return None + + path = Path(result) + print("immagine aggiornata, invio l'immagine") + return path + + +def download_beauty_image( + query: str = DEFAULT_QUERY, + output_path: str = DEFAULT_OUTPUT_PATH, +) -> str | None: + load_dotenv() + + api_key = os.getenv("PIXABAY_TOKEN") + if not api_key: + print("❌ PIXABAY_TOKEN non configurato") + return None + + risultati = cerca_immagini_pixabay(query, api_key) + if not risultati: + print("❌ Nessuna immagine trovata") + return None + + link = random.choice(risultati) + if scarica_risorsa(link, output_path): + return output_path + + return None + + +def cerca_immagini_pixabay(tag, api_key): + url = "https://pixabay.com/api/" + params = { + "key": api_key, + "q": tag, + "image_type": "photo", + "safesearch": "false", + "per_page": 150, + } + + response = requests.get(url, params=params, timeout=15) + + if response.status_code == 200: + dati = response.json() + return [foto["webformatURL"] for foto in dati.get("hits", [])] + + print(f"Errore Pixabay: {response.status_code}") + return [] + + +def scarica_risorsa(url, percorso_salvataggio): + try: + cartella = os.path.dirname(percorso_salvataggio) + if cartella and not os.path.exists(cartella): + os.makedirs(cartella) + + with requests.get(url, stream=True, timeout=15) as r: + r.raise_for_status() + + with open(percorso_salvataggio, "wb") as f: + for chunk in r.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + print(f"✅ Scaricato con successo: {percorso_salvataggio}") + return True + + except requests.RequestException as e: + print(f"❌ Errore nel download: {e}") + return False diff --git a/src/handlers.py b/src/handlers.py index 839882d..5867115 100644 --- a/src/handlers.py +++ b/src/handlers.py @@ -1,13 +1,14 @@ # pylint: disable=unused-argument + import validators from telegram import Update from telegram.ext import ContextTypes +from src.beauty import handle_beauty from src.buttons import get_main_menu async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - if update.message and update.effective_user: await update.message.reply_text( f"Ciao, {update.effective_user.first_name} il tuo compare di fiducia ti aiuta a scaricare i tuoi media!" @@ -25,34 +26,41 @@ async def service(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def download(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - if update.message: + if not update.message or not update.message.text: + return - if not update.message or not update.message.text: - return + parts = update.message.text.split(" ", 1) + + if len(parts) != 2: + await update.message.reply_text( + "Devi fornire un messaggio così formato /download " + ) + return - parts = update.message.text.split(" ", 1) + url_video = parts[1] - if len(parts) != 2: - await update.message.reply_text( - "Devi fornire un messaggio così formato /download " - ) + # Verifico se è un url ben formato. + if validators.url(url_video): + + if context.user_data is None: return - url_video = parts[1] + context.user_data["url"] = url_video - # Verifico se è un url ben formato. - if validators.url(url_video): + await update.message.reply_text( + "Ciao, scarica il tuo contenuto!", reply_markup=get_main_menu() + ) - if context.user_data is None: - return - context.user_data["url"] = url_video +async def beauty(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if not update.message or not update.message.text: + return - await update.message.reply_text( - "Ciao, scarica il tuo contenuto!", reply_markup=get_main_menu() - ) + raw_text = update.message.text.strip() + command = raw_text.split()[0] + if raw_text != command: + await update.message.reply_text("Usa solo /beauty senza altri argomenti.") + return -async def beauty(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - if update.message: - await update.message.reply_text("Ciao, bellezza del giorno!") + await handle_beauty(update) diff --git a/tests/test_beauty.py b/tests/test_beauty.py new file mode 100644 index 0000000..1bfacd9 --- /dev/null +++ b/tests/test_beauty.py @@ -0,0 +1,96 @@ +import asyncio +import os +from datetime import date, timedelta +from datetime import datetime +from unittest.mock import AsyncMock, MagicMock + +from src import beauty + + +def test_is_beauty_image_fresh_returns_false_when_file_does_not_exist(tmp_path): + path = tmp_path / "missing.png" + + assert beauty.is_beauty_image_fresh(path, today=date(2026, 3, 28)) is False + + +def test_is_beauty_image_fresh_returns_true_for_file_from_same_day(tmp_path): + path = tmp_path / "beauty.png" + path.write_bytes(b"image") + + today = date.today() + _set_file_mtime(path, datetime.combine(today, datetime.min.time()).timestamp()) + + assert beauty.is_beauty_image_fresh(path, today=today) is True + + +def test_is_beauty_image_fresh_returns_false_for_file_from_previous_day(tmp_path): + path = tmp_path / "beauty.png" + path.write_bytes(b"image") + + today = date.today() + yesterday = today - timedelta(days=1) + _set_file_mtime(path, datetime.combine(yesterday, datetime.min.time()).timestamp()) + + assert beauty.is_beauty_image_fresh(path, today=today) is False + + +def test_get_beauty_image_path_returns_cached_file_without_downloading(tmp_path, monkeypatch): + path = tmp_path / "beauty.png" + path.write_bytes(b"cached") + monkeypatch.setattr(beauty, "is_beauty_image_fresh", lambda candidate: candidate == path) + + downloader = MagicMock() + to_thread = AsyncMock() + monkeypatch.setattr(beauty.asyncio, "to_thread", to_thread) + + result = asyncio.run(beauty.get_beauty_image_path(str(path), downloader)) + + assert result == path + downloader.assert_not_called() + to_thread.assert_not_awaited() + + +def test_get_beauty_image_path_downloads_stale_file_in_thread(tmp_path, monkeypatch): + path = tmp_path / "beauty.png" + monkeypatch.setattr(beauty, "is_beauty_image_fresh", lambda candidate: False) + + downloader = MagicMock(return_value=str(path)) + to_thread = AsyncMock(return_value=str(path)) + monkeypatch.setattr(beauty.asyncio, "to_thread", to_thread) + + result = asyncio.run(beauty.get_beauty_image_path(str(path), downloader)) + + assert result == path + to_thread.assert_awaited_once_with(downloader, beauty.DEFAULT_QUERY, str(path)) + + +def test_handle_beauty_replies_with_photo(monkeypatch, tmp_path): + path = tmp_path / "beauty.png" + path.write_bytes(b"image-content") + + message = MagicMock() + message.reply_photo = AsyncMock() + update = MagicMock(message=message) + + get_path = AsyncMock(return_value=path) + monkeypatch.setattr(beauty, "get_beauty_image_path", get_path) + + asyncio.run(beauty.handle_beauty(update)) + + message.reply_photo.assert_awaited_once() + sent_photo = message.reply_photo.await_args.kwargs["photo"] + assert sent_photo.name == str(path) + + +def test_handle_beauty_returns_when_download_fails(monkeypatch): + message = MagicMock() + message.reply_photo = AsyncMock() + update = MagicMock(message=message) + + monkeypatch.setattr(beauty, "get_beauty_image_path", AsyncMock(return_value=None)) + + asyncio.run(beauty.handle_beauty(update)) + + message.reply_photo.assert_not_awaited() +def _set_file_mtime(path, timestamp: float) -> None: + os.utime(path, (timestamp, timestamp))