diff --git a/src/bot/config.py b/src/bot/config.py index 285d8c5..dd9f818 100644 --- a/src/bot/config.py +++ b/src/bot/config.py @@ -85,6 +85,7 @@ class Settings(BaseSettings): duplicate_spam_threshold: int = 2 duplicate_spam_min_length: int = 20 duplicate_spam_similarity: float = 0.95 + bio_bait_enabled: bool = True groups_config_path: str = "groups.json" logfire_token: str | None = None logfire_service_name: str = "pythonid-bot" diff --git a/src/bot/constants.py b/src/bot/constants.py index 2940f93..6bba4be 100644 --- a/src/bot/constants.py +++ b/src/bot/constants.py @@ -242,6 +242,38 @@ def format_hours_display(hours: int) -> str: "๐Ÿ“Œ [Peraturan Grup]({rules_link})" ) +# Bio bait spam notification (e.g. "cek bio aku" / "lihat byoh") +BIO_BAIT_SPAM_NOTIFICATION = ( + "๐Ÿšซ *Spam Bio Bait Terdeteksi*\n\n" + "Pesan dari {user_mention} telah dihapus karena berisi ajakan " + "untuk mengecek bio/profil, pola yang umum dipakai untuk spam/promosi/scam.\n\n" + "Pengguna telah dibatasi.\n\n" + "๐Ÿ“Œ [Peraturan Grup]({rules_link})" +) + +BIO_BAIT_SPAM_NOTIFICATION_NO_RESTRICT = ( + "๐Ÿšซ *Spam Bio Bait Terdeteksi*\n\n" + "Pesan dari {user_mention} telah dihapus karena berisi ajakan " + "untuk mengecek bio/profil, pola yang umum dipakai untuk spam/promosi/scam.\n\n" + "๐Ÿ“Œ [Peraturan Grup]({rules_link})" +) + +# Bio profile link spam (user's profile bio contains promo/scam links) +BIO_LINK_SPAM_NOTIFICATION = ( + "๐Ÿšซ *Spam Bio Profil Terdeteksi*\n\n" + "Pesan dari {user_mention} telah dihapus karena akun ini memiliki " + "bio profil dengan tautan/mention Telegram mencurigakan.\n\n" + "Pengguna telah dibatasi.\n\n" + "๐Ÿ“Œ [Peraturan Grup]({rules_link})" +) + +BIO_LINK_SPAM_NOTIFICATION_NO_RESTRICT = ( + "๐Ÿšซ *Spam Bio Profil Terdeteksi*\n\n" + "Pesan dari {user_mention} telah dihapus karena akun ini memiliki " + "bio profil dengan tautan/mention Telegram mencurigakan.\n\n" + "๐Ÿ“Œ [Peraturan Grup]({rules_link})" +) + # Whitelisted URL domains for new user probation # These domains are allowed even during probation period # Matches exact domain or subdomains (e.g., "github.com" matches "www.github.com") diff --git a/src/bot/group_config.py b/src/bot/group_config.py index 224d70a..154504d 100644 --- a/src/bot/group_config.py +++ b/src/bot/group_config.py @@ -41,6 +41,7 @@ class GroupConfig(BaseModel): duplicate_spam_threshold: int = 2 duplicate_spam_min_length: int = 20 duplicate_spam_similarity: float = 0.95 + bio_bait_enabled: bool = True @field_validator("group_id") @classmethod @@ -193,6 +194,7 @@ def build_group_registry(settings: object) -> GroupRegistry: duplicate_spam_threshold=settings.duplicate_spam_threshold, duplicate_spam_min_length=settings.duplicate_spam_min_length, duplicate_spam_similarity=settings.duplicate_spam_similarity, + bio_bait_enabled=settings.bio_bait_enabled, ) registry.register(config) diff --git a/src/bot/handlers/bio_bait.py b/src/bot/handlers/bio_bait.py new file mode 100644 index 0000000..12efed3 --- /dev/null +++ b/src/bot/handlers/bio_bait.py @@ -0,0 +1,356 @@ +""" +Bio bait spam detection handler. + +Spammers commonly post short messages telling other members to check their +profile bio, where the bio itself contains a link to a Telegram channel/group +(typically scam/promo/gambling). To evade keyword filters they obfuscate the +word "bio" with misspellings, separators, and Cyrillic look-alikes +(e.g. "byooh", "b.i.o", "ะฌั–ะพ", "b1o", "bioohh"). + +This handler covers TWO related vectors: + +1. Bait phrase in the message text (e.g. "cek bio aku", "liat byoh"). +2. The user's *Telegram profile bio* itself contains promo/scam links + (private t.me/+ invite links and/or non-whitelisted @mentions). In + this case the group message may be innocuous; the spam is in the bio. + We fetch the bio once per hour per user and cache it. + +On match the handler deletes the message, restricts the user, and posts a +notification to the warning topic. +""" + +import logging +import re +import unicodedata +from time import monotonic + +from telegram import Update +from telegram.ext import ApplicationHandlerStop, ContextTypes + +from bot.constants import ( + BIO_BAIT_SPAM_NOTIFICATION, + BIO_BAIT_SPAM_NOTIFICATION_NO_RESTRICT, + BIO_LINK_SPAM_NOTIFICATION, + BIO_LINK_SPAM_NOTIFICATION_NO_RESTRICT, + RESTRICTED_PERMISSIONS, + WHITELISTED_TELEGRAM_PATHS, +) +from bot.group_config import get_group_config_for_update +from bot.handlers.anti_spam import is_url_whitelisted +from bot.services.telegram_utils import get_user_mention + +logger = logging.getLogger(__name__) + +# Maximum normalized text length to consider as bait. Real bait is short. +BIO_BAIT_MAX_LENGTH = 80 + +# Per-user bio cache (TTL in seconds). Stored in context.bot_data. +USER_BIO_CACHE_KEY = "user_bio_cache" +USER_BIO_CACHE_TTL_SECONDS = 3600 + +# Strip common zero-width characters used to break keyword filters. +ZERO_WIDTH_RE = re.compile(r"[\u200b-\u200f\u2060-\u2064\ufeff]") + +# Canonicalize obfuscated "bio" variants to a plain "bio" token. +# Covers: bio, b1o, b!o, b.i.o, b i o, b-i-o, bioh, bioo, bioohh, plus +# Cyrillic look-alikes ัŒ and ั–. (Input is already lowercased.) +BIO_OBFUSCATED_RE = re.compile( + r"\b[bัŒ][\s._\-]*[i1!ั–][\s._\-]*[o0ะพ](?:[\s._\-]*h+)?\b" +) + +# Canonicalize "byo / byoh / byooh" variants. +BYO_OBFUSCATED_RE = re.compile( + r"\b[bัŒ][\s._\-]*y[\s._\-]*[o0ะพ](?:[\s._\-]*h+)?\b" +) + +# Catch elongated forms after partial canonicalization, e.g. "biooo", "byoooh". +BIO_ELONGATED_RE = re.compile(r"\bb(?:i|y)o+h*\b") + +# Common Indonesian first-person possessives + English equivalents. +_BIO_OWNER_RE = r"\b(?:aku|gw|gue|saya|ku|ane|me|my)\b" +# Optional address particle that often follows bait phrases. +_BIO_SUFFIX_RE = r"(?:\s+\b(?:dong|ya|kak|bro|sis)\b)?" + +# Bait phrase patterns matched against the normalized text. +# Each requires either: +# (a) imperative cue + bio (with optional address particle), OR +# (b) bio + first-person possessive at end of message, OR +# (c) imperative cue + profil/profile + possessive, OR +# (d) imperative cue + my + profile/bio. +BIO_BAIT_PATTERNS = ( + re.compile( + r"\b(?:cek|check|liat|lihat|buka|open|view|see|kunjungi|kunjungin)\b" + rf"(?:\s+\w+){{0,2}}\s+\bbio\b{_BIO_SUFFIX_RE}" + ), + re.compile( + rf"\bbio\b\s+{_BIO_OWNER_RE}" + rf"(?:\s+\b(?:update|updated|baru|new)\b)?" + rf"{_BIO_SUFFIX_RE}$" + ), + re.compile( + r"\b(?:cek|check|liat|lihat|buka|open|view|see)\b" + r"\s+\b(?:profil|profile)\b" + rf"\s+{_BIO_OWNER_RE}{_BIO_SUFFIX_RE}" + ), + re.compile( + r"\b(?:cek|check|liat|lihat|buka|open|view|see)\b" + r"\s+\bmy\b" + r"\s+\b(?:profile|bio)\b" + ), +) + +# Telegram private invite links (t.me/+). +TELEGRAM_INVITE_LINK_RE = re.compile( + r"(?:https?://)?(?:t\.me|telegram\.me)/\+[A-Za-z0-9_-]{8,}", + re.IGNORECASE, +) + +# Telegram public channel/user links (e.g. t.me/somechannel). +TELEGRAM_LINK_RE = re.compile( + r"((?:https?://)?(?:t\.me|telegram\.me)/[A-Za-z][A-Za-z0-9_]{4,31}(?:/[^\s]+)?)", + re.IGNORECASE, +) + +# Bare @username mentions. +TELEGRAM_USERNAME_RE = re.compile(r"(? str: + """ + Normalize text for bio-bait detection. + + Applies NFKC, lowercases, strips zero-width characters, canonicalizes + obfuscated bio/byo variants to "bio", strips remaining punctuation, + and collapses whitespace. + + Args: + text: Raw message text or caption. + + Returns: + Normalized text suitable for regex matching. + """ + text = unicodedata.normalize("NFKC", text).lower() + text = ZERO_WIDTH_RE.sub("", text) + text = BIO_OBFUSCATED_RE.sub(" bio ", text) + text = BYO_OBFUSCATED_RE.sub(" bio ", text) + text = BIO_ELONGATED_RE.sub(" bio ", text) + text = re.sub(r"[^\w\s]", " ", text, flags=re.UNICODE) + text = re.sub(r"\s+", " ", text).strip() + return text + + +def is_bio_bait_spam(text: str) -> bool: + """ + Check whether the given text matches any bio bait pattern. + + Args: + text: Raw message text or caption. + + Returns: + bool: True if text matches a bait pattern within the length cap. + """ + normalized = normalize_bio_bait_text(text) + if not normalized: + return False + if len(normalized) > BIO_BAIT_MAX_LENGTH: + return False + return any(pattern.search(normalized) for pattern in BIO_BAIT_PATTERNS) + + +def has_suspicious_bio_links(bio: str) -> bool: + """ + Check whether a user's bio text contains suspicious Telegram promo refs. + + Triggers on: + - Any t.me/+... private invite link. + - Any non-whitelisted t.me/{username} link. + - Two or more non-whitelisted bare @mentions. + - A single non-whitelisted @mention combined with a promo hint word. + + Args: + bio: Raw bio string from the user's profile. + + Returns: + bool: True if the bio is considered spammy. + """ + if not bio: + return False + + normalized = unicodedata.normalize("NFKC", bio) + lowered = normalized.lower() + + if TELEGRAM_INVITE_LINK_RE.search(normalized): + return True + + for match in TELEGRAM_LINK_RE.finditer(normalized): + if not is_url_whitelisted(match.group(1)): + return True + + mentions = { + m.group(1).lower() + for m in TELEGRAM_USERNAME_RE.finditer(normalized) + if m.group(1).lower() not in WHITELISTED_TELEGRAM_PATHS + } + if len(mentions) >= 2: + return True + if mentions and any(hint in lowered for hint in BIO_PROMO_HINTS): + return True + + return False + + +def _get_user_bio_cache( + context: ContextTypes.DEFAULT_TYPE, +) -> dict[int, tuple[float, str | None]]: + """Get or initialize the per-user bio cache stored in bot_data.""" + return context.bot_data.setdefault(USER_BIO_CACHE_KEY, {}) + + +def clear_cached_user_bio( + context: ContextTypes.DEFAULT_TYPE, user_id: int +) -> None: + """Remove a user's bio cache entry (call after restriction).""" + _get_user_bio_cache(context).pop(user_id, None) + + +async def get_cached_user_bio( + context: ContextTypes.DEFAULT_TYPE, user_id: int +) -> str | None: + """ + Fetch the user's profile bio with a per-user TTL cache. + + Returns the cached bio if the entry is still fresh. Otherwise calls + bot.get_chat(user_id) and stores the result. Errors are swallowed and + cause this function to return None for that call. + """ + cache = _get_user_bio_cache(context) + now = monotonic() + + cached = cache.get(user_id) + if cached and cached[0] > now: + return cached[1] + + try: + chat = await context.bot.get_chat(user_id) + bio = (getattr(chat, "bio", None) or "").strip() or None + except Exception: + logger.debug("Failed to fetch user bio: user_id=%s", user_id, exc_info=True) + return None + + cache[user_id] = (now + USER_BIO_CACHE_TTL_SECONDS, bio) + return bio + + +async def handle_bio_bait_spam( + update: Update, context: ContextTypes.DEFAULT_TYPE +) -> None: + """ + Handle bio-bait spam (phrase in message OR promo links in user's bio). + + Skips bots and admins. On match, deletes the message, restricts the + user, and notifies the warning topic. Always raises ApplicationHandlerStop + after handling a detected message to prevent downstream handlers from + re-processing it. + + Args: + update: Telegram update containing the message. + context: Bot context with helper methods. + """ + if not update.message or not update.message.from_user: + return + + group_config = get_group_config_for_update(update) + if group_config is None: + return + + if not group_config.bio_bait_enabled: + return + + user = update.message.from_user + if user.is_bot: + return + + admin_ids = context.bot_data.get("group_admin_ids", {}).get(group_config.group_id, []) + if user.id in admin_ids: + return + + text = update.message.text or update.message.caption or "" + + detection_reason: str | None = None + if text and is_bio_bait_spam(text): + detection_reason = "message_bait" + else: + user_bio = await get_cached_user_bio(context, user.id) + if user_bio and has_suspicious_bio_links(user_bio): + detection_reason = "bio_links" + + if detection_reason is None: + return + + user_mention = get_user_mention(user) + logger.info( + f"Bio bait spam detected: user_id={user.id}, " + f"group_id={group_config.group_id}, reason={detection_reason}" + ) + + try: + await update.message.delete() + logger.info(f"Deleted bio bait spam from user_id={user.id}") + except Exception: + logger.error( + f"Failed to delete bio bait spam: user_id={user.id}", + exc_info=True, + ) + + restricted = False + try: + await context.bot.restrict_chat_member( + chat_id=group_config.group_id, + user_id=user.id, + permissions=RESTRICTED_PERMISSIONS, + ) + restricted = True + clear_cached_user_bio(context, user.id) + logger.info(f"Restricted user_id={user.id} for bio bait spam") + except Exception: + logger.error( + f"Failed to restrict user for bio bait spam: user_id={user.id}", + exc_info=True, + ) + + try: + if detection_reason == "bio_links": + template = ( + BIO_LINK_SPAM_NOTIFICATION if restricted + else BIO_LINK_SPAM_NOTIFICATION_NO_RESTRICT + ) + else: + template = ( + BIO_BAIT_SPAM_NOTIFICATION if restricted + else BIO_BAIT_SPAM_NOTIFICATION_NO_RESTRICT + ) + notification_text = template.format( + user_mention=user_mention, + rules_link=group_config.rules_link, + ) + await context.bot.send_message( + chat_id=group_config.group_id, + message_thread_id=group_config.warning_topic_id, + text=notification_text, + parse_mode="Markdown", + ) + logger.info(f"Sent bio bait spam notification for user_id={user.id}") + except Exception: + logger.error( + f"Failed to send bio bait spam notification: user_id={user.id}", + exc_info=True, + ) + + raise ApplicationHandlerStop diff --git a/src/bot/main.py b/src/bot/main.py index f59f59b..c9726f8 100644 --- a/src/bot/main.py +++ b/src/bot/main.py @@ -19,6 +19,7 @@ from bot.group_config import get_group_registry, init_group_registry from bot.handlers import captcha from bot.handlers.anti_spam import handle_contact_spam, handle_inline_keyboard_spam, handle_new_user_spam +from bot.handlers.bio_bait import handle_bio_bait_spam from bot.handlers.duplicate_spam import handle_duplicate_spam from bot.handlers.dm import handle_dm from bot.handlers.message import handle_message @@ -317,15 +318,27 @@ def main() -> None: ) logger.info("Registered handler: inline_keyboard_spam_handler (group=1)") + # Handler: Bio bait spam handler - catches "cek bio aku" / "lihat byoh" style + # messages where spammers point users to their profile bio (which contains + # external promo/scam links). + application.add_handler( + MessageHandler( + filters.ChatType.GROUPS & ~filters.COMMAND, + handle_bio_bait_spam, + ), + group=2, + ) + logger.info("Registered handler: bio_bait_spam_handler (group=2)") + # Handler: Contact spam handler - blocks contact card sharing for all members application.add_handler( MessageHandler( filters.ChatType.GROUPS & filters.CONTACT, handle_contact_spam, ), - group=2, + group=3, ) - logger.info("Registered handler: contact_spam_handler (group=2)") + logger.info("Registered handler: contact_spam_handler (group=3)") # Handler 9: New-user anti-spam handler - checks for forwards/links from users on probation application.add_handler( @@ -333,9 +346,9 @@ def main() -> None: filters.ChatType.GROUPS, handle_new_user_spam, ), - group=3, + group=4, ) - logger.info("Registered handler: anti_spam_handler (group=3)") + logger.info("Registered handler: anti_spam_handler (group=4)") # Handler 10: Duplicate message spam handler - detects repeated identical messages application.add_handler( @@ -343,9 +356,9 @@ def main() -> None: filters.ChatType.GROUPS & ~filters.COMMAND, handle_duplicate_spam, ), - group=4, + group=5, ) - logger.info("Registered handler: duplicate_spam_handler (group=4)") + logger.info("Registered handler: duplicate_spam_handler (group=5)") # Handler 11: Group message handler - monitors messages in monitored # groups and warns/restricts users with incomplete profiles @@ -354,9 +367,9 @@ def main() -> None: filters.ChatType.GROUPS & ~filters.COMMAND, handle_message, ), - group=5, + group=6, ) - logger.info("Registered handler: message_handler (group=5)") + logger.info("Registered handler: message_handler (group=6)") # Register auto-restriction job to run every 5 minutes if application.job_queue: diff --git a/tests/test_bio_bait.py b/tests/test_bio_bait.py new file mode 100644 index 0000000..1ad321f --- /dev/null +++ b/tests/test_bio_bait.py @@ -0,0 +1,419 @@ +"""Tests for the bio bait spam detection handler.""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from telegram import Chat, Message, User +from telegram.ext import ApplicationHandlerStop + +from bot.group_config import GroupConfig +from bot.handlers.bio_bait import ( + BIO_BAIT_MAX_LENGTH, + USER_BIO_CACHE_KEY, + USER_BIO_CACHE_TTL_SECONDS, + clear_cached_user_bio, + get_cached_user_bio, + handle_bio_bait_spam, + has_suspicious_bio_links, + is_bio_bait_spam, + normalize_bio_bait_text, +) + + +class TestNormalizeBioBaitText: + """Tests for the normalize_bio_bait_text function.""" + + def test_lowercase(self): + assert normalize_bio_bait_text("CEK BIO") == "cek bio" + + def test_strip_zero_width(self): + result = normalize_bio_bait_text("cek b\u200bi\u200bo aku") + assert "bio" in result + assert "aku" in result + + def test_canonicalize_b1o(self): + assert "bio" in normalize_bio_bait_text("cek b1o aku") + + def test_canonicalize_b_dot_i_dot_o(self): + assert "bio" in normalize_bio_bait_text("cek b.i.o aku") + + def test_canonicalize_spaced(self): + assert "bio" in normalize_bio_bait_text("cek b i o aku") + + def test_canonicalize_byoh(self): + assert "bio" in normalize_bio_bait_text("liat byoh") + + def test_canonicalize_bioohh(self): + assert "bio" in normalize_bio_bait_text("cek bioohh aku") + + def test_canonicalize_cyrillic(self): + # Cyrillic ะฌ + ั– + ะพ, gets lowercased then matched. + assert "bio" in normalize_bio_bait_text("cek ะฌั–ะพ aku") + + def test_strip_punctuation(self): + assert normalize_bio_bait_text("cek bio, aku!") == "cek bio aku" + + def test_collapse_whitespace(self): + assert normalize_bio_bait_text("cek bio aku") == "cek bio aku" + + def test_empty_string(self): + assert normalize_bio_bait_text("") == "" + + +class TestIsBioBaitSpam: + """Tests for the is_bio_bait_spam function.""" + + @pytest.mark.parametrize("text", [ + "cek bio", + "lihat bio aku", + "liat byoh", + "buka b1o aku", + "cek b!o aku", + "b.i.o aku", + "b i o aku", + "bioooo aku", + "ะฌั–ะพ aku", + "open my bio", + "check my profile", + "cek\nbio aku", + "lihat profil aku", + "cek bioohh aku", + "cek bio kak", + "lihat bio dong", + "bio aku update", + "bio aku updated", + "bio aku baru", + ]) + def test_detects_bait(self, text): + assert is_bio_bait_spam(text) is True + + @pytest.mark.parametrize("text", [ + "biology itu menarik banget", + "bioinformatics adalah bidang yang luas", + "biome dan biodiversity penting", + "DM aku", + "pm aku", + "profile picture saya rusak", + "halo semua", + "info ada di sini bro", + "thank you my bro", + "bio aku ada di README", + "bio aku untuk eksperimen regex", + "", + ]) + def test_does_not_detect_safe(self, text): + assert is_bio_bait_spam(text) is False + + def test_too_long_not_detected(self): + text = "cek bio aku " + ("padding " * 30) + assert is_bio_bait_spam(text) is False + + def test_length_cap_constant(self): + assert BIO_BAIT_MAX_LENGTH > 0 + + +class TestHasSuspiciousBioLinks: + """Tests for has_suspicious_bio_links.""" + + def test_empty_bio(self): + assert has_suspicious_bio_links("") is False + + def test_invite_link(self): + bio = "VIP promo t.me/+exampleinvitehash ASP" + assert has_suspicious_bio_links(bio) is True + + def test_invite_link_with_https(self): + assert has_suspicious_bio_links("https://t.me/+exampleinvitehash") is True + + def test_non_whitelisted_public_link(self): + assert has_suspicious_bio_links("Join t.me/somerandomscamchannel") is True + + def test_whitelisted_public_link_alone(self): + # A bio mentioning the official group is fine. + assert has_suspicious_bio_links("Member of t.me/pythonid") is False + + def test_single_bare_mention_not_enough(self): + assert has_suspicious_bio_links("Contact: @somerandomname") is False + + def test_two_non_whitelisted_mentions(self): + assert has_suspicious_bio_links("@channel_one @channel_two") is True + + def test_single_mention_with_promo_hint(self): + assert has_suspicious_bio_links("VIP @channel_one") is True + + def test_whitelisted_mention_alone(self): + assert has_suspicious_bio_links("@pythonid") is False + + def test_plain_bio_no_links(self): + assert has_suspicious_bio_links("Just a Python developer from Indonesia.") is False + + +class TestUserBioCache: + """Tests for get_cached_user_bio / clear_cached_user_bio.""" + + @pytest.fixture + def context(self): + ctx = MagicMock() + ctx.bot_data = {} + ctx.bot = MagicMock() + ctx.bot.get_chat = AsyncMock() + return ctx + + async def test_fetch_and_cache(self, context): + chat = MagicMock() + chat.bio = " hello world " + context.bot.get_chat.return_value = chat + + bio = await get_cached_user_bio(context, 42) + assert bio == "hello world" + assert 42 in context.bot_data[USER_BIO_CACHE_KEY] + + async def test_cache_hit_skips_api(self, context): + chat = MagicMock() + chat.bio = "first" + context.bot.get_chat.return_value = chat + + await get_cached_user_bio(context, 7) + await get_cached_user_bio(context, 7) + assert context.bot.get_chat.call_count == 1 + + async def test_empty_bio_cached_as_none(self, context): + chat = MagicMock() + chat.bio = "" + context.bot.get_chat.return_value = chat + + bio = await get_cached_user_bio(context, 9) + assert bio is None + assert context.bot_data[USER_BIO_CACHE_KEY][9][1] is None + + async def test_missing_bio_attribute_cached_as_none(self, context): + chat = MagicMock(spec=[]) # no bio attribute + context.bot.get_chat.return_value = chat + + bio = await get_cached_user_bio(context, 11) + assert bio is None + + async def test_get_chat_error_returns_none(self, context): + context.bot.get_chat = AsyncMock(side_effect=Exception("boom")) + bio = await get_cached_user_bio(context, 13) + assert bio is None + # Failures are NOT cached so we retry next time. + assert 13 not in context.bot_data.get(USER_BIO_CACHE_KEY, {}) + + def test_clear_cache(self, context): + context.bot_data[USER_BIO_CACHE_KEY] = {42: (123.0, "x")} + clear_cached_user_bio(context, 42) + assert 42 not in context.bot_data[USER_BIO_CACHE_KEY] + + def test_clear_cache_missing(self, context): + # Should not raise even if the entry doesn't exist. + clear_cached_user_bio(context, 999) + + def test_ttl_constant_positive(self): + assert USER_BIO_CACHE_TTL_SECONDS > 0 + + +class TestHandleBioBaitSpam: + """Tests for the handle_bio_bait_spam handler.""" + + @pytest.fixture + def group_config(self): + return GroupConfig( + group_id=-100, + warning_topic_id=999, + bio_bait_enabled=True, + ) + + @pytest.fixture + def mock_update(self): + update = MagicMock() + update.message = MagicMock(spec=Message) + update.message.from_user = MagicMock(spec=User) + update.message.from_user.id = 42 + update.message.from_user.is_bot = False + update.message.from_user.full_name = "Test User" + update.message.from_user.username = "testuser" + update.message.text = "cek bio aku" + update.message.caption = None + update.message.message_id = 100 + update.message.delete = AsyncMock() + update.effective_chat = MagicMock(spec=Chat) + update.effective_chat.id = -100 + return update + + @pytest.fixture + def mock_context(self): + context = MagicMock() + context.bot_data = {"group_admin_ids": {-100: [1, 2]}} + context.bot = MagicMock() + context.bot.restrict_chat_member = AsyncMock() + context.bot.send_message = AsyncMock() + # Default: empty bio so bio-link branch won't trigger unintentionally. + chat = MagicMock() + chat.bio = "" + context.bot.get_chat = AsyncMock(return_value=chat) + return context + + async def test_skips_no_message(self, mock_context, group_config): + update = MagicMock() + update.message = None + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + await handle_bio_bait_spam(update, mock_context) + + async def test_skips_no_user(self, mock_context, group_config): + update = MagicMock() + update.message = MagicMock(spec=Message) + update.message.from_user = None + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + await handle_bio_bait_spam(update, mock_context) + + async def test_skips_unmonitored_group(self, mock_update, mock_context): + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=None): + await handle_bio_bait_spam(mock_update, mock_context) + mock_update.message.delete.assert_not_called() + + async def test_skips_when_disabled(self, mock_update, mock_context, group_config): + group_config.bio_bait_enabled = False + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + await handle_bio_bait_spam(mock_update, mock_context) + mock_update.message.delete.assert_not_called() + + async def test_skips_bots(self, mock_update, mock_context, group_config): + mock_update.message.from_user.is_bot = True + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + await handle_bio_bait_spam(mock_update, mock_context) + mock_update.message.delete.assert_not_called() + + async def test_skips_admins(self, mock_update, mock_context, group_config): + mock_update.message.from_user.id = 1 + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + await handle_bio_bait_spam(mock_update, mock_context) + mock_update.message.delete.assert_not_called() + + async def test_skips_innocuous_message_with_clean_bio( + self, mock_update, mock_context, group_config + ): + mock_update.message.text = "halo semua, ada yang tahu cara install python?" + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + await handle_bio_bait_spam(mock_update, mock_context) + mock_update.message.delete.assert_not_called() + + async def test_detects_message_bait_and_restricts( + self, mock_update, mock_context, group_config + ): + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + with pytest.raises(ApplicationHandlerStop): + await handle_bio_bait_spam(mock_update, mock_context) + + mock_update.message.delete.assert_called_once() + mock_context.bot.restrict_chat_member.assert_called_once() + mock_context.bot.send_message.assert_called_once() + call_kwargs = mock_context.bot.send_message.call_args.kwargs + assert "Bio Bait" in call_kwargs["text"] + assert "dibatasi" in call_kwargs["text"] + + async def test_uses_caption_when_no_text(self, mock_update, mock_context, group_config): + mock_update.message.text = None + mock_update.message.caption = "lihat bio aku" + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + with pytest.raises(ApplicationHandlerStop): + await handle_bio_bait_spam(mock_update, mock_context) + mock_update.message.delete.assert_called_once() + + async def test_detects_via_bio_links_with_innocuous_message( + self, mock_update, mock_context, group_config + ): + mock_update.message.text = "halo" + chat = MagicMock() + chat.bio = "VIP promo t.me/+exampleinvitehash" + mock_context.bot.get_chat = AsyncMock(return_value=chat) + + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + with pytest.raises(ApplicationHandlerStop): + await handle_bio_bait_spam(mock_update, mock_context) + + mock_update.message.delete.assert_called_once() + mock_context.bot.restrict_chat_member.assert_called_once() + call_kwargs = mock_context.bot.send_message.call_args.kwargs + assert "Bio Profil" in call_kwargs["text"] + + async def test_no_text_no_bad_bio_does_nothing( + self, mock_update, mock_context, group_config + ): + mock_update.message.text = None + mock_update.message.caption = None + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + await handle_bio_bait_spam(mock_update, mock_context) + mock_update.message.delete.assert_not_called() + + async def test_no_text_with_bad_bio_triggers_restriction( + self, mock_update, mock_context, group_config + ): + mock_update.message.text = None + mock_update.message.caption = None + chat = MagicMock() + chat.bio = "VIP t.me/+exampleinvitehash" + mock_context.bot.get_chat = AsyncMock(return_value=chat) + + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + with pytest.raises(ApplicationHandlerStop): + await handle_bio_bait_spam(mock_update, mock_context) + mock_update.message.delete.assert_called_once() + + async def test_restriction_clears_bio_cache( + self, mock_update, mock_context, group_config + ): + mock_update.message.text = "halo" + chat = MagicMock() + chat.bio = "VIP t.me/+exampleinvitehash" + mock_context.bot.get_chat = AsyncMock(return_value=chat) + + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + with pytest.raises(ApplicationHandlerStop): + await handle_bio_bait_spam(mock_update, mock_context) + + cache = mock_context.bot_data.get(USER_BIO_CACHE_KEY, {}) + assert mock_update.message.from_user.id not in cache + + async def test_delete_failure_continues(self, mock_update, mock_context, group_config): + mock_update.message.delete = AsyncMock(side_effect=Exception("Delete failed")) + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + with pytest.raises(ApplicationHandlerStop): + await handle_bio_bait_spam(mock_update, mock_context) + mock_context.bot.restrict_chat_member.assert_called_once() + mock_context.bot.send_message.assert_called_once() + + async def test_restrict_failure_uses_no_restrict_template( + self, mock_update, mock_context, group_config + ): + mock_context.bot.restrict_chat_member = AsyncMock(side_effect=Exception("Restrict failed")) + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + with pytest.raises(ApplicationHandlerStop): + await handle_bio_bait_spam(mock_update, mock_context) + mock_context.bot.send_message.assert_called_once() + call_kwargs = mock_context.bot.send_message.call_args.kwargs + assert "dibatasi" not in call_kwargs["text"] + + async def test_restrict_failure_for_bio_link_uses_no_restrict_template( + self, mock_update, mock_context, group_config + ): + mock_update.message.text = "halo" + chat = MagicMock() + chat.bio = "VIP t.me/+exampleinvitehash" + mock_context.bot.get_chat = AsyncMock(return_value=chat) + mock_context.bot.restrict_chat_member = AsyncMock(side_effect=Exception("fail")) + + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + with pytest.raises(ApplicationHandlerStop): + await handle_bio_bait_spam(mock_update, mock_context) + call_kwargs = mock_context.bot.send_message.call_args.kwargs + assert "Bio Profil" in call_kwargs["text"] + assert "dibatasi" not in call_kwargs["text"] + + async def test_notification_failure_still_raises_stop( + self, mock_update, mock_context, group_config + ): + mock_context.bot.send_message = AsyncMock(side_effect=Exception("Send failed")) + with patch("bot.handlers.bio_bait.get_group_config_for_update", return_value=group_config): + with pytest.raises(ApplicationHandlerStop): + await handle_bio_bait_spam(mock_update, mock_context)