diff --git a/.env.example b/.env.example index 77cafc5..edca347 100644 --- a/.env.example +++ b/.env.example @@ -1,39 +1,39 @@ # Discord DISCORD_TOKEN= # Optional: comma-separated guild IDs for fast command registration in dev -GUILD_IDS= +# GUILD_IDS= # Twitch (Android first-party tokens) TWITCH_ACCESS_TOKEN= TWITCH_REFRESH_TOKEN= # Optional: alternate Android refresh token (used if primary fails) -TWITCH_REFRESH_TOKEN_ANDROID= +# TWITCH_REFRESH_TOKEN_ANDROID= # Monitor + paths -TWITCH_REFRESH_MINUTES=30 -TWITCH_STATE_PATH=data/campaigns_state.json -TWITCH_GUILD_STORE_PATH=data/guild_config.json -TWITCH_NOTIFY_ON_BOOT=false +# TWITCH_REFRESH_MINUTES=30 +# TWITCH_STATE_PATH=data/campaigns_state.json +# TWITCH_GUILD_STORE_PATH=data/guild_config.json +# TWITCH_NOTIFY_ON_BOOT=false # Environment (single toggle; defaults to prod) # Set to 'dev' to enable dev-only commands like /drops_notify_random. # Any value other than 'dev' (or unset) is treated as production. -EVIRONMENT=prod +# EVIRONMENT=prod # Networking (optional overrides) # Custom UA if needed; otherwise Android UA is used by default -TWITCH_USER_AGENT= +# TWITCH_USER_AGENT= # Collage + sending behavior # Cache TTL for fetched campaigns (seconds) -DROPS_FETCH_TTL_SECONDS=120 +# DROPS_FETCH_TTL_SECONDS=120 # Delay between messages when attachments are present (ms) -DROPS_SEND_DELAY_MS=350 +# DROPS_SEND_DELAY_MS=350 # Max collages per command (0 = unlimited) -DROPS_MAX_ATTACHMENTS_PER_CMD=0 +# DROPS_MAX_ATTACHMENTS_PER_CMD=0 # Optional: cap attachments for notifier specifically (defaults to same as above) # DROPS_MAX_ATTACHMENTS_PER_NOTIFY=0 # Collage composition -DROPS_ICON_LIMIT=9 -DROPS_ICON_COLUMNS=3 -DROPS_ICON_SIZE=96 +# DROPS_ICON_LIMIT=9 +# DROPS_ICON_COLUMNS=3 +# DROPS_ICON_SIZE=96 diff --git a/DropScout.py b/DropScout.py index ee38e41..8c3d763 100644 --- a/DropScout.py +++ b/DropScout.py @@ -7,6 +7,7 @@ import os import asyncio +from datetime import datetime, timedelta, timezone import hikari import lightbulb from dotenv import load_dotenv @@ -23,6 +24,11 @@ from functionality.twitch_drops import DropsMonitor, GuildConfigStore from functionality.twitch_drops.commands import register_commands +from functionality.twitch_drops.game_catalog import ( + ensure_game_catalog_ready_hook, + register_game_catalog_handlers, + warm_game_catalog, +) # Load .env file and read token load_dotenv() @@ -55,6 +61,7 @@ client = lightbulb.client_from_app( bot, default_enabled_guilds=tuple(default_enabled_guilds), + hooks=(ensure_game_catalog_ready_hook,), ) # Start/stop Lightbulb with the Hikari app lifecycle @@ -73,9 +80,11 @@ async def on_started(_: hikari.StartedEvent) -> None: _monitor: DropsMonitor | None = None _guild_store = GuildConfigStore(GUILD_STORE_PATH) +_catalog_refresh_task: asyncio.Task | None = None # Register commands (kept separate for maintainability) register_commands(client) +register_game_catalog_handlers(client) @bot.listen(hikari.StartedEvent) @@ -88,18 +97,71 @@ async def _note_started(_: hikari.StartedEvent) -> None: interval_minutes=REFRESH_MINUTES, state_path=os.getenv("TWITCH_STATE_PATH", "data/campaigns_state.json"), guild_store_path=GUILD_STORE_PATH, + favorites_store_path=os.getenv("TWITCH_FAVORITES_STORE_PATH", "data/favorites.json"), notify_on_boot=(os.getenv("TWITCH_NOTIFY_ON_BOOT", "false").lower() == "true"), ) _monitor.start() print("DropScout bot ready. Monitoring for campaign changes...") +@bot.listen(hikari.StartedEvent) +async def _prime_game_catalog(_: hikari.StartedEvent) -> None: + async def runner() -> None: + print("📦 Preparing Twitch game cache refresh…") + try: + await warm_game_catalog(state_path=os.getenv("TWITCH_STATE_PATH", "data/campaigns_state.json")) + except Exception as exc: + print(f"Game catalog warm-up failed: {exc}") + asyncio.create_task(runner(), name="twitch-top-games-cache") + + +def _seconds_until_next_noon_utc() -> float: + now = datetime.now(timezone.utc) + target = now.replace(hour=12, minute=0, second=0, microsecond=0) + if target <= now: + target += timedelta(days=1) + return max((target - now).total_seconds(), 0.0) + + +@bot.listen(hikari.StartedEvent) +async def _schedule_daily_catalog_refresh(_: hikari.StartedEvent) -> None: + global _catalog_refresh_task + if _catalog_refresh_task and not _catalog_refresh_task.done(): + return + + state_path = os.getenv("TWITCH_STATE_PATH", "data/campaigns_state.json") + + async def scheduler() -> None: + print("📦 Daily Twitch game cache refresh scheduled for 12:00 UTC.") + while True: + delay = _seconds_until_next_noon_utc() + try: + await asyncio.sleep(delay) + except asyncio.CancelledError: + break + print("📦 Daily Twitch game cache refresh starting…") + try: + await warm_game_catalog(state_path=state_path) + except Exception as exc: + print(f"⚠️ Daily game cache refresh failed: {exc}") + + _catalog_refresh_task = asyncio.create_task(scheduler(), name="twitch-game-cache-daily") + + @bot.listen(hikari.StoppingEvent) async def _note_stopping(_: hikari.StoppingEvent) -> None: """Stop the background monitor when the app is shutting down.""" global _monitor if _monitor: await _monitor.stop() + global _catalog_refresh_task + if _catalog_refresh_task: + _catalog_refresh_task.cancel() + try: + await _catalog_refresh_task + except asyncio.CancelledError: + pass + _catalog_refresh_task = None @bot.listen(hikari.GuildJoinEvent) diff --git a/functionality/twitch_drops/__init__.py b/functionality/twitch_drops/__init__.py index 7bbb811..e0ec10c 100644 --- a/functionality/twitch_drops/__init__.py +++ b/functionality/twitch_drops/__init__.py @@ -4,6 +4,7 @@ from .differ import DropsDiff, DropsDiffer from .embeds import build_campaign_embed from .config import GuildConfigStore +from .favorites import FavoritesStore from .notifier import DropsNotifier from .monitor import DropsMonitor @@ -16,7 +17,7 @@ "DropsDiffer", "build_campaign_embed", "GuildConfigStore", + "FavoritesStore", "DropsNotifier", "DropsMonitor", ] - diff --git a/functionality/twitch_drops/commands/__init__.py b/functionality/twitch_drops/commands/__init__.py index 479fff3..7a5bb74 100644 --- a/functionality/twitch_drops/commands/__init__.py +++ b/functionality/twitch_drops/commands/__init__.py @@ -12,6 +12,8 @@ import lightbulb from ..config import GuildConfigStore +from ..favorites import FavoritesStore +from ..game_catalog import get_game_catalog from .common import SharedContext @@ -31,6 +33,8 @@ def register_commands(client: lightbulb.Client) -> List[str]: """Register all DropScout commands on a Lightbulb client and return names.""" GUILD_STORE_PATH = os.getenv("TWITCH_GUILD_STORE_PATH", "data/guild_config.json") guild_store = GuildConfigStore(GUILD_STORE_PATH) + FAVORITES_STORE_PATH = os.getenv("TWITCH_FAVORITES_STORE_PATH", "data/favorites.json") + favorites_store = FavoritesStore(FAVORITES_STORE_PATH) ICON_LIMIT = int(os.getenv("DROPS_ICON_LIMIT", "9") or 9) ICON_SIZE = int(os.getenv("DROPS_ICON_SIZE", "96") or 96) @@ -47,6 +51,8 @@ def register_commands(client: lightbulb.Client) -> List[str]: MAX_ATTACH_PER_CMD=MAX_ATTACH_PER_CMD, SEND_DELAY_MS=SEND_DELAY_MS, FETCH_TTL=FETCH_TTL, + game_catalog=get_game_catalog(), + favorites_store=favorites_store, ) names: List[str] = [] @@ -59,6 +65,7 @@ def register_commands(client: lightbulb.Client) -> List[str]: from .active import register as reg_active from .this_week import register as reg_this_week from .search_game import register as reg_search_game + from .favorites import register as reg_favorites names.append(reg_hello(client, shared)) names.append(reg_help(client, shared)) @@ -67,6 +74,7 @@ def register_commands(client: lightbulb.Client) -> List[str]: names.append(reg_set_channel(client, shared)) names.append(reg_channel(client, shared)) names.append(reg_search_game(client, shared)) + names.append(reg_favorites(client, shared)) # Dev-only commands if not _is_production(): diff --git a/functionality/twitch_drops/commands/active.py b/functionality/twitch_drops/commands/active.py index b764c04..d73797e 100644 --- a/functionality/twitch_drops/commands/active.py +++ b/functionality/twitch_drops/commands/active.py @@ -24,6 +24,7 @@ class DropsActive( async def invoke(self, ctx: lightbulb.Context) -> None: try: await ctx.defer() + setattr(ctx, "_dropscout_deferred", True) except Exception: pass recs = await shared.get_campaigns_cached() @@ -55,4 +56,3 @@ async def invoke(self, ctx: lightbulb.Context) -> None: await shared.finalize_interaction(ctx) return "drops_active" - diff --git a/functionality/twitch_drops/commands/common.py b/functionality/twitch_drops/commands/common.py index c23d601..ea0cb84 100644 --- a/functionality/twitch_drops/commands/common.py +++ b/functionality/twitch_drops/commands/common.py @@ -11,6 +11,8 @@ from hikari.files import Bytes, Resourceish from ..config import GuildConfigStore +from ..favorites import FavoritesStore +from ..game_catalog import GameCatalog from ..models import CampaignRecord @@ -25,6 +27,8 @@ class SharedContext: MAX_ATTACH_PER_CMD: int SEND_DELAY_MS: int FETCH_TTL: int + game_catalog: GameCatalog + favorites_store: FavoritesStore _cache_data: list[CampaignRecord] = field(default_factory=list) _cache_exp: float = 0.0 @@ -39,6 +43,10 @@ async def get_campaigns_cached(self) -> list[CampaignRecord]: data = await fetcher.fetch_condensed() self._cache_data = data self._cache_exp = now_ts + self.FETCH_TTL + try: + self.game_catalog.merge_from_campaign_records(data) + except Exception: + pass return data def _get_async(self, ctx: Any, name: str) -> Optional[Callable[..., Awaitable[Any]]]: @@ -47,39 +55,84 @@ def _get_async(self, ctx: Any, name: str) -> Optional[Callable[..., Awaitable[An return None return cast(Callable[..., Awaitable[Any]], fn) + def _was_deferred(self, ctx: Any) -> bool: + """Best-effort detection that an interaction was previously deferred.""" + for attr in ("_dropscout_deferred", "deferred", "_deferred"): + val = getattr(ctx, attr, None) + if isinstance(val, bool): + if val: + return True + elif val is not None and not callable(val): + try: + if bool(val): + return True + except Exception: + continue + for attr in ("is_deferred",): + val = getattr(ctx, attr, None) + if callable(val): + try: + result = val() + except Exception: + continue + else: + if isinstance(result, bool) and result: + return True + elif isinstance(val, bool) and val: + return True + interaction = getattr(ctx, "interaction", None) + if interaction is None: + return False + for attr in ("is_deferred", "has_responded"): + val = getattr(interaction, attr, None) + if callable(val): + try: + result = val() + except Exception: + continue + else: + if isinstance(result, bool) and result: + return True + elif isinstance(val, bool) and val: + return True + return False + async def finalize_interaction(self, ctx: Any, *, message: Optional[str] = None) -> None: """Clear or update the deferred 'thinking…' placeholder if present.""" - # Try delete last/initial response first to avoid clutter - fn = self._get_async(ctx, "delete_last_response") - if fn is not None: + content = message if (message is not None and message != "") else "Done." + notify = bool(message not in (None, "")) or self._was_deferred(ctx) + + async def _run(name: str, **kwargs: Any) -> bool: + fn = self._get_async(ctx, name) + if fn is None: + return False try: - await fn() - return + await fn(**kwargs) except Exception: - pass - fn = self._get_async(ctx, "delete_initial_response") - if fn is not None: - try: - await fn() + return False + return True + + if notify: + if await _run("edit_last_response", content=content): return - except Exception: - pass - # Fall back to editing the placeholder - content = message if (message is not None and message != "") else "Done." - fn = self._get_async(ctx, "edit_last_response") - if fn is not None: - try: - await fn(content=content) + if await _run("edit_initial_response", content=content): return - except Exception: - pass - fn = self._get_async(ctx, "edit_initial_response") - if fn is not None: + await _run("delete_last_response") + await _run("delete_initial_response") try: - await fn(content=content) - return + await ctx.respond(content, ephemeral=True) except Exception: pass + return + + if await _run("delete_last_response"): + return + if await _run("delete_initial_response"): + return + if await _run("edit_last_response", content=content): + return + if await _run("edit_initial_response", content=content): + return # Absolute last resort: ephemeral follow-up note try: await ctx.respond(content, ephemeral=True) diff --git a/functionality/twitch_drops/commands/dev_notify_random.py b/functionality/twitch_drops/commands/dev_notify_random.py index 21618c2..fe46cc1 100644 --- a/functionality/twitch_drops/commands/dev_notify_random.py +++ b/functionality/twitch_drops/commands/dev_notify_random.py @@ -1,7 +1,6 @@ from __future__ import annotations import random - import lightbulb from ..differ import DropsDiff @@ -9,34 +8,85 @@ from .common import SharedContext + def register(client: lightbulb.Client, shared: SharedContext) -> str: - @client.register - class DropsNotifyRandom( - lightbulb.SlashCommand, - name="drops_notify_random", - description="Dev-only: trigger notifier for a random ACTIVE campaign", - ): - @lightbulb.invoke - async def invoke(self, ctx: lightbulb.Context) -> None: - try: - await ctx.defer(ephemeral=True) - except Exception: - pass - recs = await shared.get_campaigns_cached() - active = [r for r in recs if r.status == "ACTIVE"] - if not active: - await ctx.respond("No ACTIVE campaigns available to notify.", ephemeral=True) - return - r = random.choice(active) - notifier = DropsNotifier(ctx.client.app, shared.guild_store) - try: - await notifier.notify(DropsDiff(activated=[r])) - await ctx.respond( - f"Triggered notifier for: {(r.game_name or r.name or r.id)}.", - ephemeral=True, - ) - except Exception: - await ctx.respond("Failed to trigger notifier.", ephemeral=True) + async def _reply(ctx: lightbulb.Context, deferred: bool, message: str) -> None: + if deferred: + try: + await ctx.edit_initial_response(message) + return + except Exception: + pass + await ctx.respond(message, ephemeral=True) - return "drops_notify_random" + @client.register + class DropsNotifyRandom( + lightbulb.SlashCommand, + name="drops_notify_random", + description="Dev-only: trigger notifier for a random ACTIVE campaign", + ): + @lightbulb.invoke + async def invoke(self, ctx: lightbulb.Context) -> None: + if not ctx.guild_id: + await ctx.respond("This command must be used in a server.", ephemeral=True) + return + try: + await ctx.defer(ephemeral=True) + except Exception: + deferred = False + else: + deferred = True + setattr(ctx, "_dropscout_deferred", True) + try: + recs = await shared.get_campaigns_cached() + except Exception: + await _reply(ctx, deferred, "Failed to load campaigns.") + return + active = [r for r in recs if r.status == "ACTIVE"] + if not active: + await _reply(ctx, deferred, "No ACTIVE campaigns available to notify.") + return + r = random.choice(active) + try: + if ctx.guild_id: + shared.guild_store.set_channel_id(int(ctx.guild_id), int(ctx.channel_id)) + except Exception: + pass + notifier = DropsNotifier( + ctx.client.app, + shared.guild_store, + shared.favorites_store, + shared.game_catalog, + ) + game_key = None + user_obj = getattr(ctx, "user", None) or getattr(ctx, "member", None) or getattr(ctx, "author", None) + for candidate in (r.game_slug, r.game_name): + if not candidate: + continue + entry = shared.game_catalog.get(candidate) + if entry: + game_key = entry.key + break + try: + game_key = shared.game_catalog.normalize(candidate) + except Exception: + game_key = candidate.casefold() + if game_key: + break + if game_key and user_obj is not None: + try: + shared.favorites_store.add_favorite(int(ctx.guild_id), int(getattr(user_obj, "id")), game_key) + except Exception: + pass + try: + await notifier.notify(DropsDiff(activated=[r])) + except Exception as exc: + await _reply(ctx, deferred, f"Failed to trigger notifier: {exc}") + return + await _reply( + ctx, + deferred, + f"Triggered notifier for: {(r.game_name or r.name or r.id)}.", + ) + return "drops_notify_random" diff --git a/functionality/twitch_drops/commands/favorites.py b/functionality/twitch_drops/commands/favorites.py new file mode 100644 index 0000000..b3746bd --- /dev/null +++ b/functionality/twitch_drops/commands/favorites.py @@ -0,0 +1,493 @@ +from __future__ import annotations + +from typing import List, Optional, Tuple + +import asyncio + +import hikari +import lightbulb +from lightbulb import context as lb_context +from lightbulb.commands import options as opt +from hikari.files import Bytes + +from ..game_catalog import GameEntry +from ..models import CampaignRecord +from ..embeds import build_campaign_embed +from ..images import build_benefits_collage +from ..notifier import DropsNotifier +from .common import SharedContext + +CUSTOM_ID_PREFIX = "drops:fav" +REMOVE_SELECT_ID = f"{CUSTOM_ID_PREFIX}:remove" +REFRESH_BUTTON_ID = f"{CUSTOM_ID_PREFIX}:refresh" + + +def _build_overview( + app: hikari.RESTAware, + shared: SharedContext, + guild_id: int, + user_id: int, +) -> tuple[hikari.Embed, List[hikari.api.special_endpoints.ComponentBuilder]]: + favorites = shared.favorites_store.get_user_favorites(guild_id, user_id) + lines: list[str] = [] + select_entries: list[tuple[str, str]] = [] + for idx, key in enumerate(favorites, start=1): + entry = shared.game_catalog.get(key) + name = entry.name if entry else key + lines.append(f"{idx}. **{name}**") + select_entries.append((name, key)) + description = ( + "\n".join(lines) + if lines + else "You have no favorite games yet. Use `/drops_favorites add` to follow games you care about." + ) + embed = hikari.Embed(title="Favorite Games", description=description[:4096]) + embed.set_footer("Use `/drops_favorites add` to add more games.") + + components: List[hikari.api.special_endpoints.ComponentBuilder] = [] + try: + row = app.rest.build_message_action_row() + row.add_button(hikari.ButtonStyle.SECONDARY, REFRESH_BUTTON_ID).set_label("Refresh") + components.append(row) + except Exception: + components = [] + + if select_entries and components is not None: + try: + select_row = app.rest.build_message_action_row() + menu = select_row.add_text_select_menu(REMOVE_SELECT_ID) + menu.set_placeholder("Remove favorites…") + menu.set_min_values(1) + menu.set_max_values(min(len(select_entries), 25)) + for name, key in select_entries[:25]: + option = menu.add_option(name[:100], key[:100]) + option.set_description("Remove this game") + components.append(select_row) + except Exception: + pass + + return embed, components + + +async def _find_active_campaigns(shared: SharedContext, entry: GameEntry | None) -> list[CampaignRecord]: + if entry is None: + return [] + try: + recs = await shared.get_campaigns_cached() + except Exception: + return [] + matches: list[CampaignRecord] = [] + for rec in recs: + if rec.status != "ACTIVE": + continue + try: + if shared.game_catalog.matches_campaign(entry, rec): + matches.append(rec) + except Exception: + continue + return matches + + +async def _send_ephemeral_response( + ctx: lightbulb.Context, + deferred: bool, + *, + content: Optional[str] = None, + embeds: Optional[List[hikari.Embed]] = None, + components: Optional[List[hikari.api.special_endpoints.ComponentBuilder]] = None, +) -> None: + payload: dict[str, object] = {} + if content is not None: + payload["content"] = content + if embeds is not None: + payload["embeds"] = embeds + if components is not None: + payload["components"] = components + if deferred: + try: + await ctx.edit_initial_response(**payload) + return + except hikari.errors.NotFoundError: + pass + except Exception: + pass + payload["flags"] = hikari.MessageFlag.EPHEMERAL + await ctx.respond(**payload) + + +def register(client: lightbulb.Client, shared: SharedContext) -> str: + async def _autocomplete_add_game(ctx: lb_context.AutocompleteContext[str]) -> None: + if not shared.game_catalog.is_ready(): + await ctx.respond([]) + return + prefix = str(ctx.focused.value or "").strip() + try: + matches = shared.game_catalog.search(prefix, limit=25) + except Exception: + matches = [] + await ctx.respond([(entry.name, entry.key) for entry in matches]) + + async def _autocomplete_remove_game(ctx: lb_context.AutocompleteContext[str]) -> None: + guild_id = getattr(ctx.interaction, "guild_id", None) + user = getattr(ctx.interaction, "user", None) + if guild_id is None or user is None: + await ctx.respond([]) + return + try: + gid = int(guild_id) + uid = int(user.id) + except (TypeError, ValueError): + await ctx.respond([]) + return + candidates = shared.favorites_store.get_user_favorites(gid, uid) + if not candidates: + await ctx.respond([]) + return + prefix = str(ctx.focused.value or "").strip().casefold() + results: list[Tuple[str, str]] = [] + for key in candidates: + entry = shared.game_catalog.get(key) + name = entry.name if entry else key + if prefix and prefix not in name.casefold(): + continue + results.append((name, key)) + await ctx.respond(results[:25]) + + group = lightbulb.Group( + name="drops_favorites", + description="Manage your favorite games for Drop alerts.", + ) + + @group.register + class DropsFavoritesView( + lightbulb.SlashCommand, + name="view", + description="Show the games you follow for Drop alerts.", + ): + @lightbulb.invoke + async def invoke(self, ctx: lightbulb.Context) -> None: + if not ctx.guild_id: + await ctx.respond("Favorites can only be managed inside a server.", ephemeral=True) + return + user_obj = getattr(ctx, "user", None) or getattr(ctx, "member", None) or getattr(ctx, "author", None) + if user_obj is None: + await ctx.respond("Could not resolve your user information.", ephemeral=True) + return + try: + guild_id = int(ctx.guild_id) + user_id = int(getattr(user_obj, "id")) + except (TypeError, ValueError, AttributeError): + await ctx.respond("Could not resolve your user information.", ephemeral=True) + return + + app = ctx.client.app + embed, components = _build_overview(app, shared, guild_id, user_id) + await ctx.respond(embeds=[embed], components=components, ephemeral=True) + + @group.register + class DropsFavoritesAdd( + lightbulb.SlashCommand, + name="add", + description="Add a game to your favorites.", + ): + game: str = opt.string( + "game", + "Pick the game you want to follow.", + autocomplete=_autocomplete_add_game, + ) + + @lightbulb.invoke + async def invoke(self, ctx: lightbulb.Context) -> None: + if not ctx.guild_id: + await ctx.respond("Favorites can only be managed inside a server.", ephemeral=True) + return + user_obj = getattr(ctx, "user", None) or getattr(ctx, "member", None) or getattr(ctx, "author", None) + if user_obj is None: + await ctx.respond("Could not resolve your user information.", ephemeral=True) + return + try: + guild_id = int(ctx.guild_id) + user_id = int(getattr(user_obj, "id")) + except (TypeError, ValueError, AttributeError): + await ctx.respond("Could not resolve your user information.", ephemeral=True) + return + + app = ctx.client.app + try: + await ctx.defer(ephemeral=True) + except Exception: + deferred = False + else: + deferred = True + setattr(ctx, "_dropscout_deferred", True) + + key = (self.game or "").strip() + if not key: + await _send_ephemeral_response(ctx, deferred, content="Select a game from the suggestions to add it.") + return + entry = shared.game_catalog.get(key) + if entry is None: + await _send_ephemeral_response( + ctx, + deferred, + content="Select a game from the autocomplete suggestions to add it.", + ) + return + + added = shared.favorites_store.add_favorite(guild_id, user_id, entry.key) + if added: + message = f"Added **{entry.name}** to your favorites." + else: + message = f"**{entry.name}** is already in your favorites." + + active = await _find_active_campaigns(shared, entry) + embed, components = _build_overview(app, shared, guild_id, user_id) + if active: + lines = [] + for rec in active[:5]: + ending = f" – ends " if rec.ends_ts else "" + lines.append(f"- **{rec.name}**{ending}") + embed.add_field( + name="Active Campaigns Right Now", + value="\n".join(lines)[:1024], + inline=False, + ) + + await _send_ephemeral_response( + ctx, + deferred, + content=message, + embeds=[embed], + components=components, + ) + + @group.register + class DropsFavoritesCheck( + lightbulb.SlashCommand, + name="check", + description="Check now active campaigns for your favorite games.", + ): + @lightbulb.invoke + async def invoke(self, ctx: lightbulb.Context) -> None: + if not ctx.guild_id: + await ctx.respond("Favorites can only be managed inside a server.", ephemeral=True) + return + user_obj = getattr(ctx, "user", None) or getattr(ctx, "member", None) or getattr(ctx, "author", None) + if user_obj is None: + await ctx.respond("Could not resolve your user information.", ephemeral=True) + return + try: + guild_id = int(ctx.guild_id) + user_id = int(getattr(user_obj, "id")) + except Exception: + await ctx.respond("Could not resolve your user information.", ephemeral=True) + return + try: + await ctx.defer() + setattr(ctx, "_dropscout_deferred", True) + except Exception: + pass + + favorites = shared.favorites_store.get_user_favorites(guild_id, user_id) + if not favorites: + await shared.finalize_interaction(ctx, message="You have no favorite games yet.") + return + + recs = await shared.get_campaigns_cached() + entry_cache: dict[str, GameEntry | None] = {} + matches: list[CampaignRecord] = [] + for rec in recs: + if rec.status != "ACTIVE": + continue + for fav_key in favorites: + entry = entry_cache.get(fav_key) + if entry is None: + entry = shared.game_catalog.get(fav_key) + entry_cache[fav_key] = entry + if entry and shared.game_catalog.matches_campaign(entry, rec): + matches.append(rec) + break + if not matches: + await shared.finalize_interaction(ctx, message="No active campaigns for your favorites right now.") + return + + channel_id = shared.guild_store.get_channel_id(guild_id) + if channel_id is None: + try: + channel_id = int(ctx.channel_id) + shared.guild_store.set_channel_id(guild_id, channel_id) + except Exception: + channel_id = int(ctx.channel_id) + + notifier = DropsNotifier( + ctx.client.app, + shared.guild_store, + shared.favorites_store, + shared.game_catalog, + ) + favorites_map = shared.favorites_store.get_guild_favorites(guild_id) + + attachments_budget = shared.MAX_ATTACH_PER_CMD if shared.MAX_ATTACH_PER_CMD > 0 else None + attachments_used = 0 + sent = 0 + for campaign in matches: + embed = build_campaign_embed(campaign, title_prefix="Now Active") + png_bytes: bytes | None = None + filename: str | None = None + if attachments_budget is None or attachments_used < attachments_budget: + png_bytes, filename = await build_benefits_collage( + campaign, + limit=shared.ICON_LIMIT if shared.ICON_LIMIT >= 0 else 9, + icon_size=(shared.ICON_SIZE, shared.ICON_SIZE), + columns=shared.ICON_COLUMNS, + ) + if png_bytes and filename: + attachments_used += 1 + if not png_bytes and campaign.benefits and campaign.benefits[0].image_url: + embed.set_image(campaign.benefits[0].image_url) # type: ignore[arg-type] + attachment = None + if png_bytes and filename: + attachment = Bytes(png_bytes, filename) + embed.set_image(attachment) + + keys = notifier._resolve_campaign_keys(campaign) + watcher_ids = set(notifier._collect_watchers(favorites_map, keys)) + watcher_ids.add(user_id) + mention_text = notifier._join_mentions(watcher_ids, limit=1800) + + try: + await ctx.client.app.rest.create_message( + channel_id, + content=mention_text or f"<@{user_id}>", + embeds=[embed], + ) + sent += 1 + except Exception: + continue + await asyncio.sleep(shared.SEND_DELAY_MS / 1000) + + if sent == 0: + await shared.finalize_interaction(ctx, message="Failed to send favorites alerts.") + return + + await shared.finalize_interaction(ctx) + + @group.register + class DropsFavoritesRemove( + lightbulb.SlashCommand, + name="remove", + description="Remove a game from your favorites.", + ): + game: str = opt.string( + "game", + "Pick the favorite you want to unfollow.", + autocomplete=_autocomplete_remove_game, + ) + + @lightbulb.invoke + async def invoke(self, ctx: lightbulb.Context) -> None: + if not ctx.guild_id: + await ctx.respond("Favorites can only be managed inside a server.", ephemeral=True) + return + user_obj = getattr(ctx, "user", None) or getattr(ctx, "member", None) or getattr(ctx, "author", None) + if user_obj is None: + await ctx.respond("Could not resolve your user information.", ephemeral=True) + return + try: + guild_id = int(ctx.guild_id) + user_id = int(getattr(user_obj, "id")) + except (TypeError, ValueError, AttributeError): + await ctx.respond("Could not resolve your user information.", ephemeral=True) + return + + app = ctx.client.app + try: + await ctx.defer(ephemeral=True) + except Exception: + deferred = False + else: + deferred = True + setattr(ctx, "_dropscout_deferred", True) + + key = (self.game or "").strip() + if not key: + await _send_ephemeral_response(ctx, deferred, content="Select a favorite game to remove.") + return + + removed = shared.favorites_store.remove_favorite(guild_id, user_id, key) + if removed: + message = "Removed that game from your favorites." + else: + message = "That game is not currently in your favorites." + + embed, components = _build_overview(app, shared, guild_id, user_id) + await _send_ephemeral_response( + ctx, + deferred, + content=message, + embeds=[embed], + components=components, + ) + + client.register(group) + + listen_target = getattr(client, "listen", None) + if not callable(listen_target): + app_attr = getattr(client, "app", None) + listen_target = getattr(app_attr, "listen", None) if app_attr else None + + if callable(listen_target): + @listen_target(hikari.InteractionCreateEvent) + async def _favorites_component_handler(event: hikari.InteractionCreateEvent) -> None: + interaction = event.interaction + if not isinstance(interaction, hikari.ComponentInteraction): + return + if interaction.custom_id not in {REMOVE_SELECT_ID, REFRESH_BUTTON_ID}: + return + guild_id = getattr(interaction, "guild_id", None) + user = getattr(interaction, "user", None) + if guild_id is None or user is None: + try: + await interaction.create_initial_response( + hikari.ResponseType.MESSAGE_UPDATE, + content="Favorites can only be managed inside a server.", + ) + except Exception: + pass + return + try: + gid = int(guild_id) + uid = int(user.id) + except (TypeError, ValueError): + return + + app_local = interaction.app + + if interaction.custom_id == REMOVE_SELECT_ID: + values = interaction.values or [] + removed = shared.favorites_store.remove_many(gid, uid, values) + embed, components = _build_overview(app_local, shared, gid, uid) + content = "Selected favorites removed." if removed else "Those games were not in your favorites." + try: + await interaction.create_initial_response( + hikari.ResponseType.MESSAGE_UPDATE, + content=content, + embeds=[embed], + components=components, + ) + except Exception: + pass + return + + if interaction.custom_id == REFRESH_BUTTON_ID: + embed, components = _build_overview(app_local, shared, gid, uid) + try: + await interaction.create_initial_response( + hikari.ResponseType.MESSAGE_UPDATE, + embeds=[embed], + components=components, + ) + except Exception: + pass + + return "drops_favorites" diff --git a/functionality/twitch_drops/commands/search_game.py b/functionality/twitch_drops/commands/search_game.py index 2de3f88..9f77062 100644 --- a/functionality/twitch_drops/commands/search_game.py +++ b/functionality/twitch_drops/commands/search_game.py @@ -1,92 +1,64 @@ from __future__ import annotations -import difflib - -import hikari -from hikari.files import Bytes import lightbulb +from lightbulb import context as lb_context from lightbulb.commands import options as opt +from hikari.files import Bytes from ..embeds import build_campaign_embed from ..images import build_benefits_collage -from ..models import CampaignRecord from .common import SharedContext def register(client: lightbulb.Client, shared: SharedContext) -> str: + async def _autocomplete(ctx: lb_context.AutocompleteContext[str]) -> None: + if not shared.game_catalog.is_ready(): + await ctx.respond([]) + return + prefix = str(ctx.focused.value or "").strip() + try: + matches = shared.game_catalog.search(prefix, limit=25) + except Exception: + matches = [] + await ctx.respond([(entry.name, entry.key) for entry in matches]) + @client.register class DropsSearchGame( lightbulb.SlashCommand, name="drops_search_game", - description="Search active campaigns by game name and show the best match", + description="Browse active Twitch Drops campaigns by selecting a game", ): - query: str = opt.string("query", "Game name to search for") + game: str = opt.string( + "game", + "Choose a game to view active drops", + autocomplete=_autocomplete, + ) @lightbulb.invoke async def invoke(self, ctx: lightbulb.Context) -> None: - q = (self.query or "").strip() - if not q: - await ctx.respond("Provide a game name, e.g. 'Call of Duty'.", ephemeral=True) + key = (self.game or "").strip() + entry = shared.game_catalog.get(key) + if entry is None: + await ctx.respond( + "Select a game from the provided suggestions to run this search.", + ephemeral=True, + ) return try: await ctx.defer() + setattr(ctx, "_dropscout_deferred", True) except Exception: pass recs = await shared.get_campaigns_cached() - - def norm(s: str) -> str: - import re as _re - - s = s.casefold().strip() - s = _re.sub(r"[\s_]+", " ", s) - return s - - key_to_items: dict[str, list[CampaignRecord]] = {} - keys: list[str] = [] - for r in recs: - game = (r.game_name or "").strip() - if not game: - continue - gk = norm(game) - key_to_items.setdefault(gk, []).append(r) - if gk not in keys: - keys.append(gk) - - nq = norm(q) - best_rec: CampaignRecord | None = None - ambiguous = False - - if nq in key_to_items: - best_rec = key_to_items[nq][0] - else: - try: - from rapidfuzz import process, fuzz # type: ignore - - matches = process.extract(nq, keys, scorer=fuzz.token_set_ratio, limit=5) - if matches: - best_key, best_score, _ = matches[0] - if best_score >= 45: - best_rec = key_to_items[best_key][0] - strong = [m for m in matches if m[1] >= max(best_score - 5, 70)] - ambiguous = len(strong) > 1 and nq != best_key - except Exception: - close = difflib.get_close_matches(nq, keys, n=5, cutoff=0.4) - if close: - best_key = close[0] - best_rec = key_to_items[best_key][0] - ambiguous = len(close) > 1 and nq != best_key - else: - contains = [k for k in keys if nq in k] - if contains: - best_key = contains[0] - best_rec = key_to_items[best_key][0] - ambiguous = len(contains) > 1 and nq != best_key - - if not best_rec: - await ctx.respond("No matching games with drops found.") + matches = [ + r for r in recs if shared.game_catalog.matches_campaign(entry, r) + ] + if not matches: + await ctx.respond(f"No active Twitch Drops campaigns found for **{entry.name}**.") return - r = best_rec - e = build_campaign_embed(r, title_prefix="Best Match") + + r = matches[0] + e = build_campaign_embed(r, title_prefix="Selected Game") png, fname = await build_benefits_collage( r, limit=shared.ICON_LIMIT if shared.ICON_LIMIT >= 0 else 9, @@ -97,9 +69,8 @@ def norm(s: str) -> str: e.set_image(Bytes(png, fname)) elif r.benefits and r.benefits[0].image_url: e.set_image(r.benefits[0].image_url) # type: ignore[arg-type] - hint = "Did you mean this game? If not, please be more specific." if ambiguous else None - if hint: - e.set_footer(hint) + if len(matches) > 1: + e.set_footer("Multiple campaigns found; showing the first match.") await ctx.respond(embeds=[e]) await shared.finalize_interaction(ctx) diff --git a/functionality/twitch_drops/commands/this_week.py b/functionality/twitch_drops/commands/this_week.py index ad49753..0fdcc5e 100644 --- a/functionality/twitch_drops/commands/this_week.py +++ b/functionality/twitch_drops/commands/this_week.py @@ -23,6 +23,7 @@ class DropsThisWeek( async def invoke(self, ctx: lightbulb.Context) -> None: try: await ctx.defer() + setattr(ctx, "_dropscout_deferred", True) except Exception: pass recs = await shared.get_campaigns_cached() @@ -58,4 +59,3 @@ async def invoke(self, ctx: lightbulb.Context) -> None: await shared.finalize_interaction(ctx) return "drops_this_week" - diff --git a/functionality/twitch_drops/favorites.py b/functionality/twitch_drops/favorites.py new file mode 100644 index 0000000..9d92a04 --- /dev/null +++ b/functionality/twitch_drops/favorites.py @@ -0,0 +1,180 @@ +from __future__ import annotations + +"""Persistence helpers for per-user favorite games. + +Favorites are stored per guild and keyed by the normalized game key from the +catalog. The store is JSON-backed and guarded by a process-wide lock to keep +updates atomic even when multiple commands touch favorites concurrently. +""" + +import json +import os +from threading import Lock +from typing import Iterable + +_FAVORITES_LOCK = Lock() + + +class FavoritesStore: + """JSON-backed store for user favorite games per guild.""" + + def __init__(self, path: str = "data/favorites.json") -> None: + self.path = path + + def _load_unlocked(self) -> dict[str, dict[str, list[str]]]: + try: + with open(self.path, "r", encoding="utf-8") as fh: + data = json.load(fh) + except FileNotFoundError: + return {} + except Exception: + return {} + + if not isinstance(data, dict): + return {} + + result: dict[str, dict[str, list[str]]] = {} + for guild_id, users in data.items(): + if not isinstance(users, dict): + continue + guild_map: dict[str, list[str]] = {} + for user_id, favorites in users.items(): + if not isinstance(favorites, list): + continue + unique = [] + seen: set[str] = set() + for item in favorites: + if not isinstance(item, str): + continue + key = item.strip() + if not key or key in seen: + continue + seen.add(key) + unique.append(key) + if unique: + guild_map[str(user_id)] = unique + if guild_map: + result[str(guild_id)] = guild_map + return result + + def _atomic_write(self, payload: str) -> None: + dirname = os.path.dirname(self.path) or "." + os.makedirs(dirname, exist_ok=True) + tmp = f"{self.path}.tmp" + with open(tmp, "w", encoding="utf-8") as fh: + fh.write(payload) + os.replace(tmp, self.path) + + def _save_locked(self, data: dict[str, dict[str, list[str]]]) -> None: + payload = json.dumps(data, indent=2, ensure_ascii=False) + self._atomic_write(payload) + + def load(self) -> dict[str, dict[str, list[str]]]: + with _FAVORITES_LOCK: + return self._load_unlocked() + + def add_favorite(self, guild_id: int, user_id: int, game_key: str) -> bool: + game_key = (game_key or "").strip() + if not game_key: + return False + changed = False + with _FAVORITES_LOCK: + data = self._load_unlocked() + guild_key = str(guild_id) + user_key = str(user_id) + guild_map = data.get(guild_key, {}) + current = guild_map.get(user_key, []) + if game_key not in current: + current = sorted({*current, game_key}) + guild_map[user_key] = current + data[guild_key] = guild_map + self._save_locked(data) + changed = True + return changed + + def remove_favorite(self, guild_id: int, user_id: int, game_key: str) -> bool: + game_key = (game_key or "").strip() + if not game_key: + return False + changed = False + with _FAVORITES_LOCK: + data = self._load_unlocked() + guild_key = str(guild_id) + user_key = str(user_id) + guild_map = data.get(guild_key) + if not guild_map: + return False + current = guild_map.get(user_key, []) + if game_key not in current: + return False + current = [item for item in current if item != game_key] + if current: + guild_map[user_key] = current + else: + guild_map.pop(user_key, None) + if not guild_map: + data.pop(guild_key, None) + else: + data[guild_key] = guild_map + self._save_locked(data) + changed = True + return changed + + def remove_many(self, guild_id: int, user_id: int, game_keys: Iterable[str]) -> int: + keys = {item.strip() for item in game_keys if item and item.strip()} + if not keys: + return 0 + removed = 0 + with _FAVORITES_LOCK: + data = self._load_unlocked() + guild_key = str(guild_id) + user_key = str(user_id) + guild_map = data.get(guild_key) + if not guild_map: + return 0 + current = guild_map.get(user_key, []) + if not current: + return 0 + new_items = [item for item in current if item not in keys] + removed = len(current) - len(new_items) + if new_items: + guild_map[user_key] = new_items + else: + guild_map.pop(user_key, None) + if not guild_map: + data.pop(guild_key, None) + else: + data[guild_key] = guild_map + if removed: + self._save_locked(data) + return removed + + def get_user_favorites(self, guild_id: int, user_id: int) -> list[str]: + data = self.load() + guild_map = data.get(str(guild_id), {}) + items = guild_map.get(str(user_id), []) + return list(items) + + def get_guild_favorites(self, guild_id: int) -> dict[int, set[str]]: + data = self.load() + guild_map = data.get(str(guild_id), {}) + result: dict[int, set[str]] = {} + for user_id, items in guild_map.items(): + try: + uid = int(user_id) + except ValueError: + continue + result[uid] = {item for item in items if item} + return result + + def get_watchers(self, guild_id: int, keys: Iterable[str]) -> dict[int, set[str]]: + target_keys = {item.strip() for item in keys if item} + if not target_keys: + return {} + guild_map = self.get_guild_favorites(guild_id) + result: dict[int, set[str]] = {} + for uid, games in guild_map.items(): + match = games & target_keys + if match: + result[uid] = match + return result diff --git a/functionality/twitch_drops/fetcher.py b/functionality/twitch_drops/fetcher.py index 64c6c08..80dc080 100644 --- a/functionality/twitch_drops/fetcher.py +++ b/functionality/twitch_drops/fetcher.py @@ -7,6 +7,7 @@ """ from .models import CampaignRecord, BenefitRecord +from .game_catalog import get_game_catalog from .twitch_drops import fetch_active_campaigns @@ -60,4 +61,8 @@ async def fetch_condensed(self) -> list[CampaignRecord]: benefits=benefits, ) out.append(rec) + try: + get_game_catalog().merge_from_campaign_records(out) + except Exception: + pass return out diff --git a/functionality/twitch_drops/game_catalog.py b/functionality/twitch_drops/game_catalog.py new file mode 100644 index 0000000..06eb351 --- /dev/null +++ b/functionality/twitch_drops/game_catalog.py @@ -0,0 +1,540 @@ +from __future__ import annotations + +"""Caching and lookup helpers for Twitch game metadata. + +The catalog merges popular games from the Helix `games/top` endpoint with any +games observed across active and historical Drop campaigns. A ready flag keeps +commands paused until the cache has been regenerated for the current bot run. +""" + +import asyncio +import json +import os +import re +from dataclasses import dataclass, field +from threading import Lock +from typing import Any, Iterable, Optional + +import aiohttp +import lightbulb +from lightbulb import exceptions as lb_exceptions +from lightbulb.commands import execution as lb_execution + +from .models import CampaignRecord +from .twitch_drops import ANDROID_CLIENT_ID, ensure_env_access_token + +__all__ = [ + "GameCatalog", + "GameEntry", + "GameCatalogUnavailableError", + "GameCatalogNotReady", + "ensure_game_catalog_ready_hook", + "register_game_catalog_handlers", + "get_game_catalog", + "warm_game_catalog", +] + + +def _norm(value: str) -> str: + """Normalize game identifiers for consistent matching.""" + value = value.casefold().strip() + value = re.sub(r"[\s_]+", " ", value) + return value + + +class GameCatalogUnavailableError(RuntimeError): + """Raised when Twitch game metadata cannot be fetched.""" + + +class GameCatalogNotReady(lb_exceptions.ExecutionException): + """Raised when commands are invoked before the game catalog is ready.""" + + +@dataclass +class GameEntry: + """Represents a known Twitch game in the autocomplete catalog.""" + + key: str + name: str + slug: Optional[str] = None + twitch_id: Optional[str] = None + box_art_url: Optional[str] = None + weight: int = 0 + aliases: list[str] = field(default_factory=list) + sources: list[str] = field(default_factory=list) + + def copy(self) -> "GameEntry": + return GameEntry( + key=self.key, + name=self.name, + slug=self.slug, + twitch_id=self.twitch_id, + box_art_url=self.box_art_url, + weight=self.weight, + aliases=list(self.aliases), + sources=list(self.sources), + ) + + def to_payload(self) -> dict[str, Any]: + return { + "key": self.key, + "name": self.name, + "slug": self.slug, + "twitch_id": self.twitch_id, + "box_art_url": self.box_art_url, + "weight": self.weight, + "aliases": sorted({a for a in self.aliases if a}), + "sources": sorted({s for s in self.sources if s}), + } + + @classmethod + def from_payload(cls, data: dict[str, Any]) -> "GameEntry": + aliases = data.get("aliases") + sources = data.get("sources") + entry = cls( + key=str(data.get("key") or ""), + name=str(data.get("name") or ""), + slug=(data.get("slug") or None), + twitch_id=(str(data.get("twitch_id")) if data.get("twitch_id") else None), + box_art_url=data.get("box_art_url"), + weight=int(data.get("weight") or 0), + aliases=list({str(a) for a in aliases if a}) if isinstance(aliases, list) else [], + sources=list({str(s) for s in sources if s}) if isinstance(sources, list) else [], + ) + return entry + + +class GameCatalog: + """Thread-safe cache of game metadata sourced from Helix + campaign history.""" + + def __init__(self, path: str = "data/game_catalog.json") -> None: + self.path = path + self._lock = Lock() + self._games: dict[str, GameEntry] = {} + self._alias_map: dict[str, str] = {} + self._ready_event: asyncio.Event = asyncio.Event() + self._load() + + # ------------------------------------------------------------------ # + # Internal helpers + # ------------------------------------------------------------------ # + + def _normalize_entry(self, entry: GameEntry) -> GameEntry: + entry.key = _norm(entry.key or entry.name) + entry.name = entry.name.strip() or entry.key + entry.aliases = sorted({ + a for a in ( + *entry.aliases, + entry.slug or "", + entry.key, + ) + if a + }) + entry.aliases = [ + alias for alias in {_norm(a) for a in entry.aliases} + if alias and alias != entry.key + ] + entry.sources = sorted({s for s in entry.sources if s}) + return entry + + def _load(self) -> None: + try: + with open(self.path, "r", encoding="utf-8") as f: + raw = json.load(f) + except FileNotFoundError: + return + except Exception: + return + games = raw.get("games") if isinstance(raw, dict) else None + if not isinstance(games, list): + return + loaded: dict[str, GameEntry] = {} + for item in games: + if not isinstance(item, dict): + continue + entry = GameEntry.from_payload(item) + if not entry.name: + continue + entry = self._normalize_entry(entry) + loaded[entry.key] = entry + with self._lock: + self._games = loaded + self._rebuild_alias_map_locked() + + def reset(self) -> None: + """Clear any cached games so the cache can be rebuilt.""" + with self._lock: + self._games = {} + self._alias_map = {} + self._ready_event = asyncio.Event() + tmp = f"{self.path}.tmp" + os.makedirs(os.path.dirname(self.path) or ".", exist_ok=True) + with open(tmp, "w", encoding="utf-8") as f: + json.dump({"games": []}, f, indent=2, ensure_ascii=False) + os.replace(tmp, self.path) + + def _rebuild_alias_map_locked(self) -> None: + alias_map: dict[str, str] = {} + for key, entry in self._games.items(): + alias_map[key] = key + for alias in entry.aliases: + alias_map[alias] = key + self._alias_map = alias_map + + def _save_locked(self) -> None: + os.makedirs(os.path.dirname(self.path) or ".", exist_ok=True) + payload = { + "games": [ + e.copy().to_payload() + for e in sorted( + self._games.values(), + key=lambda item: (-item.weight, item.name.casefold(), item.key), + ) + ] + } + tmp = f"{self.path}.tmp" + with open(tmp, "w", encoding="utf-8") as f: + json.dump(payload, f, indent=2, ensure_ascii=False) + os.replace(tmp, self.path) + + # ------------------------------------------------------------------ # + # Public API + # ------------------------------------------------------------------ # + + def normalize(self, value: str) -> str: + return _norm(value) + + def count(self) -> int: + with self._lock: + return len(self._games) + + def set_ready(self, ready: bool = True) -> None: + if ready: + self._ready_event.set() + else: + self._ready_event = asyncio.Event() + + def is_ready(self) -> bool: + return self._ready_event.is_set() + + async def wait_ready(self, timeout: float | None = None) -> bool: + if self._ready_event.is_set(): + return True + try: + if timeout is None: + await self._ready_event.wait() + else: + await asyncio.wait_for(self._ready_event.wait(), timeout=timeout) + return True + except asyncio.TimeoutError: + return False + + def merge_games(self, entries: Iterable[GameEntry]) -> bool: + changed = False + if not entries: + return False + with self._lock: + for entry in entries: + if entry is None or not entry.name: + continue + entry = self._normalize_entry(entry) + current = self._games.get(entry.key) + if current is None: + self._games[entry.key] = entry.copy() + changed = True + continue + if self._merge_entry_locked(current, entry): + changed = True + if changed: + self._rebuild_alias_map_locked() + self._save_locked() + return changed + + def _merge_entry_locked(self, current: GameEntry, incoming: GameEntry) -> bool: + updated = False + if incoming.weight > current.weight: + current.weight = incoming.weight + updated = True + if incoming.name and incoming.name != current.name: + if len(incoming.name) > len(current.name): + current.name = incoming.name + updated = True + if incoming.slug and not current.slug: + current.slug = incoming.slug + updated = True + if incoming.twitch_id and not current.twitch_id: + current.twitch_id = incoming.twitch_id + updated = True + if incoming.box_art_url and not current.box_art_url: + current.box_art_url = incoming.box_art_url + updated = True + combined_aliases = set(current.aliases) + for alias in incoming.aliases: + alias = _norm(alias) + if alias and alias != current.key and alias not in combined_aliases: + combined_aliases.add(alias) + updated = True + current.aliases = sorted(combined_aliases) + combined_sources = set(current.sources) + for src in incoming.sources: + if src and src not in combined_sources: + combined_sources.add(src) + updated = True + current.sources = sorted(combined_sources) + return updated + + def get(self, value: str) -> Optional[GameEntry]: + if not value: + return None + key = self.normalize(value) + with self._lock: + resolved = self._alias_map.get(key) + if resolved is None: + resolved = self._alias_map.get(value) + if resolved is None: + return None + entry = self._games.get(resolved) + return entry.copy() if entry else None + + def get_all(self) -> list[GameEntry]: + with self._lock: + return [ + entry.copy() + for entry in sorted( + self._games.values(), + key=lambda item: (-item.weight, item.name.casefold(), item.key), + ) + ] + + def search(self, query: Optional[str], *, limit: int = 25) -> list[GameEntry]: + normalized = self.normalize(query or "") + with self._lock: + entries = list(self._games.values()) + scored: list[tuple[float, GameEntry]] = [] + if not normalized: + for entry in entries: + scored.append((float(entry.weight), entry)) + else: + for entry in entries: + match_strength = 0.0 + for alias in (entry.key, *entry.aliases): + if alias == normalized: + match_strength = max(match_strength, 500.0) + elif alias.startswith(normalized): + match_strength = max(match_strength, 320.0) + elif normalized in alias: + match_strength = max(match_strength, 180.0) + if match_strength <= 0.0: + continue + score = float(entry.weight) + match_strength + scored.append((score, entry)) + if not scored: + return [] + scored.sort(key=lambda item: (-item[0], item[1].name.casefold(), item[1].key)) + return [entry.copy() for _, entry in scored[:limit]] + + def matches_campaign(self, entry: GameEntry, campaign: CampaignRecord) -> bool: + target_keys = {entry.key, *entry.aliases} + name_key = self.normalize(campaign.game_name or "") + slug_key = self.normalize(campaign.game_slug or "") + for candidate in (name_key, slug_key): + if candidate and candidate in target_keys: + return True + if candidate and candidate in self._alias_map: + return self._alias_map[candidate] == entry.key + return False + + def merge_from_campaign_records(self, campaigns: Iterable[CampaignRecord]) -> bool: + entries: list[GameEntry] = [] + for rec in campaigns: + name = (rec.game_name or "").strip() + slug = rec.game_slug or None + if not name and not slug: + continue + entry = GameEntry( + key=self.normalize(name or slug or ""), + name=name or slug or "Twitch Game", + slug=slug, + twitch_id=None, + box_art_url=rec.game_box_art, + weight=700, + aliases=[self.normalize(name or slug or "")], + sources=["campaign"], + ) + if slug: + entry.aliases.append(self.normalize(slug)) + entries.append(entry) + return self.merge_games(entries) + + def merge_state_snapshot(self, snapshot: dict[str, Any]) -> bool: + entries: list[GameEntry] = [] + for item in snapshot.values(): + if not isinstance(item, dict): + continue + name = str(item.get("game_name") or "").strip() + if not name: + continue + entry = GameEntry( + key=self.normalize(name), + name=name, + slug=None, + twitch_id=None, + box_art_url=item.get("game_box_art"), + weight=350, + aliases=[self.normalize(name)], + sources=["history"], + ) + entries.append(entry) + return self.merge_games(entries) + + def merge_state_file(self, path: str) -> bool: + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + except FileNotFoundError: + return False + except Exception: + return False + if not isinstance(data, dict): + return False + return self.merge_state_snapshot(data) + + async def refresh_top_games(self, *, max_pages: int | None = None) -> int: + client_id = os.getenv("TWITCH_HELIX_CLIENT_ID") or os.getenv("TWITCH_CLIENT_ID") or ANDROID_CLIENT_ID + url = "https://api.twitch.tv/helix/games/top" + limit_pages = max_pages if max_pages is not None else 20 + total = 0 + page = 0 + entries: list[GameEntry] = [] + + try: + async with aiohttp.ClientSession() as session: + token = await ensure_env_access_token(session) + after: Optional[str] = None + while True: + params = {"first": "100"} + if after: + params["after"] = after + headers = { + "Client-ID": client_id, + "Authorization": f"Bearer {token}", + "Accept": "application/json", + } + async with session.get(url, headers=headers, params=params) as resp: + text = await resp.text() + if resp.status >= 400: + raise GameCatalogUnavailableError(f"{resp.status} {text or 'Failed to fetch Twitch top games'}") + try: + payload = await resp.json() + except Exception as exc: + raise GameCatalogUnavailableError(f"Invalid JSON from Helix games/top: {exc}") from exc + data = payload.get("data") if isinstance(payload, dict) else None + if not isinstance(data, list) or not data: + break + for rank_offset, item in enumerate(data): + if not isinstance(item, dict): + continue + name = str(item.get("name") or "").strip() + if not name: + continue + twitch_id = str(item.get("id") or "") or None + box_art = item.get("box_art_url") + rank = total + rank_offset + weight = max(1000 - rank, 100) + entry = GameEntry( + key=self.normalize(name), + name=name, + slug=None, + twitch_id=twitch_id, + box_art_url=box_art, + weight=weight, + aliases=[self.normalize(name)], + sources=["helix"], + ) + entries.append(entry) + total += len(data) + pagination = payload.get("pagination") if isinstance(payload, dict) else None + after = None + if isinstance(pagination, dict): + cursor = pagination.get("cursor") + after = str(cursor) if cursor else None + page += 1 + if not after or (limit_pages and page >= limit_pages): + break + except GameCatalogUnavailableError: + raise + except Exception as exc: # aiohttp errors + raise GameCatalogUnavailableError(str(exc)) from exc + + self.merge_games(entries) + return len(entries) + + +_CATALOG: GameCatalog | None = None + + +def get_game_catalog() -> GameCatalog: + global _CATALOG + if _CATALOG is None: + cache_path = os.getenv("TWITCH_GAME_CACHE_PATH", "data/game_catalog.json") + _CATALOG = GameCatalog(cache_path) + return _CATALOG + + +async def warm_game_catalog(*, state_path: Optional[str] = None) -> None: + """Fetch Helix top games and merge with stored campaign history.""" + catalog = get_game_catalog() + catalog.reset() + print("📦 Game cache warm-up starting…") + initial_count = 0 + if state_path: + try: + if catalog.merge_state_file(state_path): + initial_count = catalog.count() + print(f"📦 Seeded game cache with {initial_count} games from historical campaigns.") + except Exception as exc: + print(f"⚠️ Failed to reuse campaign history for game cache: {exc}") + try: + fetched = await catalog.refresh_top_games() + except GameCatalogUnavailableError as exc: + print(f"⚠️ Failed to refresh Twitch top games: {exc}") + fetched = 0 + total = catalog.count() + helix_only = max(total - initial_count, 0) + if total > 0: + catalog.set_ready(True) + print( + "📦 Game cache ready: " + f"{helix_only} from Helix this boot (raw Helix pages reported {fetched}), " + f"{initial_count} from stored campaigns, total {total} unique games." + ) + else: + print("⚠️ Game cache unavailable; commands will remain disabled until caching succeeds.") + + +@lightbulb.hook(lightbulb.ExecutionSteps.CHECKS, skip_when_failed=True, name="ensure-game-catalog-ready") +async def ensure_game_catalog_ready_hook( + pipeline: lb_execution.ExecutionPipeline, ctx: lightbulb.Context +) -> None: + catalog = get_game_catalog() + if catalog.is_ready(): + return + + awaited = await catalog.wait_ready(timeout=5.0) + if awaited and catalog.is_ready(): + return + + message = "DropScout is caching Twitch games. Please try again shortly." + try: + await ctx.respond(message, ephemeral=True) + except Exception: + pass + raise GameCatalogNotReady("Game catalog is not ready yet.") + + +def register_game_catalog_handlers(client: lightbulb.Client) -> None: + """Register error handlers to silence catalog initialization failures.""" + + @client.error_handler(priority=100) + async def _ignore_catalog_not_ready(exc: lb_exceptions.ExecutionPipelineFailedException) -> bool: + return any(isinstance(cause, GameCatalogNotReady) for cause in exc.causes) diff --git a/functionality/twitch_drops/monitor.py b/functionality/twitch_drops/monitor.py index 29e5c1b..4aa99c1 100644 --- a/functionality/twitch_drops/monitor.py +++ b/functionality/twitch_drops/monitor.py @@ -12,6 +12,8 @@ from .differ import DropsDiffer from .notifier import DropsNotifier from .config import GuildConfigStore +from .favorites import FavoritesStore +from .game_catalog import get_game_catalog class DropsMonitor: @@ -24,6 +26,7 @@ def __init__( interval_minutes: int = 30, state_path: str = "data/campaigns_state.json", guild_store_path: str = "data/guild_config.json", + favorites_store_path: str = "data/favorites.json", notify_on_boot: bool = False, ) -> None: """Configure the monitor for a Hikari app. @@ -35,7 +38,12 @@ def __init__( self.interval_minutes = max(1, int(interval_minutes)) self.store = DropsStateStore(state_path) self.fetcher = DropsFetcher() - self.notifier = DropsNotifier(app, GuildConfigStore(guild_store_path)) + self.notifier = DropsNotifier( + app, + GuildConfigStore(guild_store_path), + FavoritesStore(favorites_store_path), + get_game_catalog(), + ) self.notify_on_boot = notify_on_boot self._task: Optional[asyncio.Task] = None @@ -56,6 +64,10 @@ async def stop(self) -> None: async def _run_loop(self) -> None: """Main loop: fetch → diff → notify → persist → sleep.""" prev = self.store.load() + try: + get_game_catalog().merge_state_snapshot(prev) + except Exception: + pass first_run = True differ = DropsDiffer() while True: diff --git a/functionality/twitch_drops/notifier.py b/functionality/twitch_drops/notifier.py index 1598a3f..0e11d54 100644 --- a/functionality/twitch_drops/notifier.py +++ b/functionality/twitch_drops/notifier.py @@ -8,27 +8,47 @@ """ import asyncio +import copy import os +from dataclasses import dataclass +from typing import Iterable, List import hikari from hikari.files import Bytes +from .config import GuildConfigStore from .differ import DropsDiff from .embeds import build_campaign_embed -from .config import GuildConfigStore +from .favorites import FavoritesStore +from .game_catalog import GameCatalog from .images import build_benefits_collage +from .models import CampaignRecord + + +@dataclass(frozen=True, slots=True) +class NotifyTarget: + guild_id: int + channel_id: int class DropsNotifier: """Sends change notifications to each configured guild channel.""" - def __init__(self, app: hikari.RESTAware, guild_store: GuildConfigStore) -> None: + def __init__( + self, + app: hikari.RESTAware, + guild_store: GuildConfigStore, + favorites_store: FavoritesStore, + game_catalog: GameCatalog, + ) -> None: """Create a notifier bound to a Hikari app with REST access. Accepts any object implementing RESTAware (e.g., GatewayBot or RESTBot). """ self.app = app self.guild_store = guild_store + self.favorites_store = favorites_store + self.game_catalog = game_catalog # Collage + sending behavior (reuse command env vars when present) self.icon_limit = int(os.getenv("DROPS_ICON_LIMIT", "9") or 9) self.icon_size = int(os.getenv("DROPS_ICON_SIZE", "96") or 96) @@ -38,77 +58,122 @@ def __init__(self, app: hikari.RESTAware, guild_store: GuildConfigStore) -> None self.max_attachments = int(max_att or 0) self.send_delay_ms = int(os.getenv("DROPS_SEND_DELAY_MS", "350") or 350) - async def _resolve_targets(self) -> list[int]: - """Return the list of channel IDs to notify across all guilds.""" - channel_ids: list[int] = [] + async def _resolve_targets(self) -> list[NotifyTarget]: + """Return the list of channels (with guild context) to notify.""" + targets: list[NotifyTarget] = [] try: guilds = await self.app.rest.fetch_my_guilds() except Exception: guilds = [] for g in guilds: - cid = self.guild_store.get_channel_id(int(g.id)) + gid = int(g.id) + cid = self.guild_store.get_channel_id(gid) if cid: - channel_ids.append(cid) + targets.append(NotifyTarget(guild_id=gid, channel_id=int(cid))) continue scid = getattr(g, "system_channel_id", None) if scid: - channel_ids.append(int(scid)) - return channel_ids + targets.append(NotifyTarget(guild_id=gid, channel_id=int(scid))) + return targets + + def _resolve_campaign_keys(self, campaign: "CampaignRecord") -> set[str]: + keys: set[str] = set() + for candidate in (campaign.game_slug, campaign.game_name): + if not candidate: + continue + entry = self.game_catalog.get(candidate) + if entry is not None: + keys.add(entry.key) + else: + try: + normalized = self.game_catalog.normalize(candidate) + except Exception: + normalized = candidate.casefold() + if normalized: + keys.add(normalized) + return keys + + def _collect_watchers(self, favorites_map: dict[int, set[str]], keys: Iterable[str]) -> list[int]: + target = {k for k in keys if k} + if not target: + return [] + users = [uid for uid, games in favorites_map.items() if games & target] + users.sort() + return users + + def _join_mentions(self, user_ids: Iterable[int], *, limit: int) -> str: + mentions: list[str] = [] + total = 0 + for uid in sorted(dict.fromkeys(int(u) for u in user_ids)): + token = f"<@{uid}>" + added = len(token) if not mentions else len(token) + 1 + if total + added > limit: + if mentions: + mentions.append("…") + break + mentions.append(token) + total += added + return " ".join(mentions) async def notify(self, diff: DropsDiff) -> None: """Post embeds for any newly ACTIVE campaigns (with reward collages).""" - embeds: list[hikari.Embed] = [] - attachments_aligned: list[Bytes | None] = [] - attaches_done = 0 - - for c in diff.activated: - e = build_campaign_embed(c, title_prefix="Now Active") - - # Attempt to build a collage; cap total attachments if configured - png, fname = (None, None) - if self.max_attachments <= 0 or attaches_done < self.max_attachments: - png, fname = await build_benefits_collage( - c, + if not diff.activated: + return + + payloads: List[tuple["CampaignRecord", hikari.Embed, bytes | None, str | None]] = [] + attachments_budget = self.max_attachments if self.max_attachments > 0 else None + attachments_used = 0 + + for campaign in diff.activated: + embed = build_campaign_embed(campaign, title_prefix="Now Active") + png_bytes: bytes | None = None + filename: str | None = None + if attachments_budget is None or attachments_used < attachments_budget: + png_bytes, filename = await build_benefits_collage( + campaign, limit=self.icon_limit if self.icon_limit >= 0 else 9, icon_size=(self.icon_size, self.icon_size), columns=self.icon_cols, ) + if png_bytes and filename: + attachments_used += 1 + if not png_bytes and campaign.benefits and campaign.benefits[0].image_url: + embed.set_image(campaign.benefits[0].image_url) # type: ignore[arg-type] + payloads.append((campaign, embed, png_bytes, filename)) - if png and fname: - attachments_aligned.append(Bytes(png, fname)) - attaches_done += 1 - else: - # Fallback: show the first benefit icon directly if available - if c.benefits and c.benefits[0].image_url: - e.set_image(c.benefits[0].image_url) # type: ignore[arg-type] - attachments_aligned.append(None) - - embeds.append(e) - - if not embeds: + if not payloads: return targets = await self._resolve_targets() if not targets: return - any_attachments = any(a is not None for a in attachments_aligned) for target in targets: - if any_attachments: - # Send each embed individually to ensure correct attachment mapping - for e, a in zip(embeds, attachments_aligned): - try: - if a is not None: - e.set_image(a) - await self.app.rest.create_message(target, embeds=[e]) - except Exception: - pass - await asyncio.sleep(self.send_delay_ms / 1000) - else: - # No attachments: send in chunks for efficiency - for i in range(0, len(embeds), 10): - chunk = embeds[i : i + 10] - try: - await self.app.rest.create_message(target, embeds=chunk) - except Exception: - pass + favorites_map = self.favorites_store.get_guild_favorites(target.guild_id) + for campaign, base_embed, png_bytes, filename in payloads: + embed = copy.deepcopy(base_embed) + keys = self._resolve_campaign_keys(campaign) + watchers = self._collect_watchers(favorites_map, keys) + content = None + if watchers: + mention_text = self._join_mentions(watchers, limit=1800) + if mention_text: + content = f"Favorites alert: {mention_text}" + try: + if png_bytes and filename: + attachment = Bytes(png_bytes, filename) + embed.set_image(attachment) + await self.app.rest.create_message( + target.channel_id, + content=content, + embeds=[embed], + ) + else: + await self.app.rest.create_message( + target.channel_id, + content=content, + embeds=[embed], + ) + except Exception: + pass + await asyncio.sleep(self.send_delay_ms / 1000) diff --git a/tests/test_commands_common.py b/tests/test_commands_common.py new file mode 100644 index 0000000..4ca9d7d --- /dev/null +++ b/tests/test_commands_common.py @@ -0,0 +1,182 @@ +import hikari +from hikari.files import Bytes +import pytest + +from functionality.twitch_drops.commands.common import SharedContext +from functionality.twitch_drops.config import GuildConfigStore +from functionality.twitch_drops.favorites import FavoritesStore +from functionality.twitch_drops.models import BenefitRecord, CampaignRecord + + +class StubCatalog: + def __init__(self) -> None: + self.merged: list[list[CampaignRecord]] = [] + + def merge_from_campaign_records(self, recs): + self.merged.append(recs) + return True + + +@pytest.fixture() +def shared(tmp_path): + return SharedContext( + guild_store=GuildConfigStore(str(tmp_path / "guild.json")), + ICON_LIMIT=6, + ICON_SIZE=96, + ICON_COLUMNS=3, + MAX_ATTACH_PER_CMD=0, + SEND_DELAY_MS=0, + FETCH_TTL=60, + game_catalog=StubCatalog(), + favorites_store=FavoritesStore(str(tmp_path / "favorites.json")), + ) + + +class FakeFetcher: + call_count = 0 + + async def fetch_condensed(self): + FakeFetcher.call_count += 1 + return [ + CampaignRecord( + id=f"c{FakeFetcher.call_count}", + name="Campaign", + status="ACTIVE", + game_name="Game", + game_slug="game", + game_box_art=None, + starts_at=None, + ends_at=None, + benefits=[BenefitRecord(id="b", name="Reward", image_url=None)], + ) + ] + + +@pytest.mark.asyncio +async def test_shared_context_caches_and_expires(monkeypatch, shared): + monkeypatch.setattr("functionality.twitch_drops.fetcher.DropsFetcher", FakeFetcher) + first = await shared.get_campaigns_cached() + assert FakeFetcher.call_count == 1 + second = await shared.get_campaigns_cached() + assert FakeFetcher.call_count == 1 # cache hit + assert shared.game_catalog.merged and shared.game_catalog.merged[-1] is first + + # Force expiration and ensure refetch occurs + shared._cache_exp = 0 + third = await shared.get_campaigns_cached() + assert FakeFetcher.call_count == 2 + assert third[0].id == "c2" + + +class FinalizeCtx: + def __init__(self, *, deferred: bool) -> None: + self._dropscout_deferred = deferred + self.calls: list[tuple[str, str | None]] = [] + self.responded: list[tuple[str, dict]] = [] + + async def edit_last_response(self, *, content=None): + self.calls.append(("edit_last", content)) + return + + async def edit_initial_response(self, *, content=None): + self.calls.append(("edit_initial", content)) + raise RuntimeError("should not reach") + + async def delete_last_response(self): + self.calls.append(("delete_last", None)) + return + + async def delete_initial_response(self): + self.calls.append(("delete_initial", None)) + return + + async def respond(self, content, ephemeral=False): + self.responded.append((content, {"ephemeral": ephemeral})) + + +@pytest.mark.asyncio +async def test_finalize_interaction_prefers_edits(shared): + ctx = FinalizeCtx(deferred=True) + await shared.finalize_interaction(ctx) + assert ctx.calls[0] == ("edit_last", "Done.") + assert not ctx.responded + + +class FinalizeCtxNoops: + def __init__(self) -> None: + self.responded: list[tuple[str, dict]] = [] + + async def edit_last_response(self, *, content=None): + raise RuntimeError("fail") + + async def edit_initial_response(self, *, content=None): + raise RuntimeError("fail") + + async def delete_last_response(self): + raise RuntimeError("fail") + + async def delete_initial_response(self): + raise RuntimeError("fail") + + async def respond(self, content, ephemeral=False): + self.responded.append((content, {"ephemeral": ephemeral})) + + +@pytest.mark.asyncio +async def test_finalize_interaction_falls_back_to_ephemeral(shared): + ctx = FinalizeCtxNoops() + await shared.finalize_interaction(ctx, message="All done") + assert ctx.responded == [("All done", {"ephemeral": True})] + + +class SendCtx: + def __init__(self, *, channel_id=1): + self.channel_id = channel_id + self.respond_calls: list[dict] = [] + self.sent: list[tuple[int, list[hikari.Embed]]] = [] + + class _Rest: + def __init__(self, outer): + self.outer = outer + + async def create_message(self, channel_id, *, embeds=None, **kwargs): + self.outer.sent.append((int(channel_id), list(embeds or []))) + + class _App: + def __init__(self, outer): + self.rest = SendCtx._Rest(outer) # type: ignore[attr-defined] + + class _Client: + def __init__(self, outer): + self.app = SendCtx._App(outer) # type: ignore[attr-defined] + + # Attach helper classes with closures + SendCtx._Rest = _Rest # type: ignore[attr-defined] + SendCtx._App = _App # type: ignore[attr-defined] + SendCtx._Client = _Client # type: ignore[attr-defined] + + self.client = SendCtx._Client(self) + + async def respond(self, **payload): + self.respond_calls.append(payload) + + +@pytest.mark.asyncio +async def test_send_embeds_with_attachments(shared): + ctx = SendCtx() + embed = hikari.Embed(title="Hello") + attachment = Bytes(b"123", "a.png") + await shared.send_embeds(ctx, [embed], attachments_aligned=[attachment]) + assert ctx.sent and ctx.sent[0][0] == ctx.channel_id + assert ctx.sent[0][1][0].title == "Hello" + assert not ctx.respond_calls + + +@pytest.mark.asyncio +async def test_send_embeds_chunks_without_attachments(shared): + ctx = SendCtx() + embeds = [hikari.Embed(title=f"E{i}") for i in range(11)] + await shared.send_embeds(ctx, embeds, attachments_aligned=None) + assert len(ctx.respond_calls) == 2 # 10 + 1 embeds + assert len(ctx.respond_calls[0]["embeds"]) == 10 + assert len(ctx.respond_calls[1]["embeds"]) == 1 diff --git a/tests/test_commands_finalize.py b/tests/test_commands_finalize.py index 6e3886d..3bebce7 100644 --- a/tests/test_commands_finalize.py +++ b/tests/test_commands_finalize.py @@ -1,6 +1,6 @@ import asyncio from datetime import datetime, timezone, timedelta -from typing import cast +from typing import Optional, cast import pytest @@ -30,6 +30,9 @@ def __init__(self) -> None: self.responses: list[dict] = [] self.deleted_initial = False self.deleted_last = False + self.edited_initial = False + self.edited_last = False + self.last_edit_content: Optional[str] = None async def defer(self, *_, **__): self.deferred = True @@ -45,6 +48,14 @@ async def delete_last_response(self): async def delete_initial_response(self): self.deleted_initial = True + async def edit_last_response(self, *, content: Optional[str] = None, **kwargs): + self.edited_last = True + self.last_edit_content = content + + async def edit_initial_response(self, *, content: Optional[str] = None, **kwargs): + self.edited_initial = True + self.last_edit_content = content + def _active_week_campaign() -> CampaignRecord: now = datetime.now(timezone.utc) @@ -100,5 +111,8 @@ async def _no_collage(*args, **kwargs): assert ctx.responses, "Command did not produce any response output" # And the deferred placeholder should be finalized (deleted or edited). - # The helper prefers delete_last_response first. - assert ctx.deleted_last or ctx.deleted_initial, "Deferred placeholder was not finalized" + assert ( + ctx.deleted_last or ctx.deleted_initial or ctx.edited_last or ctx.edited_initial + ), "Deferred placeholder was not finalized" + if ctx.edited_last or ctx.edited_initial: + assert ctx.last_edit_content == "Done." diff --git a/tests/test_commands_search.py b/tests/test_commands_search.py new file mode 100644 index 0000000..d821bbe --- /dev/null +++ b/tests/test_commands_search.py @@ -0,0 +1,127 @@ +import types + +import hikari +import lightbulb +import pytest + +from functionality.twitch_drops.commands import search_game as search_mod +from functionality.twitch_drops.commands.common import SharedContext +from functionality.twitch_drops.config import GuildConfigStore +from functionality.twitch_drops.favorites import FavoritesStore +from functionality.twitch_drops.game_catalog import GameCatalog, GameEntry +from functionality.twitch_drops.models import BenefitRecord, CampaignRecord + + +class StubClient: + def __init__(self) -> None: + self.registered = [] + + def register(self, item): + self.registered.append(item) + return item + + +@pytest.fixture() +def shared(tmp_path): + catalog = GameCatalog(str(tmp_path / "catalog.json")) + catalog.merge_games( + [ + GameEntry( + key="valorant", + name="Valorant", + weight=500, + aliases=["valorant"], + sources=["seed"], + ) + ] + ) + catalog.set_ready(True) + return SharedContext( + guild_store=GuildConfigStore(str(tmp_path / "guild.json")), + ICON_LIMIT=6, + ICON_SIZE=96, + ICON_COLUMNS=3, + MAX_ATTACH_PER_CMD=0, + SEND_DELAY_MS=0, + FETCH_TTL=30, + game_catalog=catalog, + favorites_store=FavoritesStore(str(tmp_path / "favorites.json")), + ) + + +class FakeCtx: + def __init__(self): + self.responses: list[tuple[tuple, dict]] = [] + self.deferred = False + self.channel_id = 555 + self.client = type("Client", (), {"app": type("App", (), {"rest": object()})()})() + + async def respond(self, *args, **kwargs): + self.responses.append((args, kwargs)) + + async def defer(self, *args, **kwargs): + self.deferred = True + + +@pytest.fixture() +def command(shared): + client = StubClient() + search_mod.register(client, shared) + cmd_cls = next(cls for cls in client.registered if cls.__name__ == "DropsSearchGame") + return cmd_cls, shared + + +@pytest.mark.asyncio +async def test_search_game_requires_selection(command): + cmd_cls, shared = command + ctx = FakeCtx() + instance = object.__new__(cmd_cls) + instance.game = "" + await cmd_cls.invoke.__get__(instance, cmd_cls)(ctx) + assert ctx.responses + args, kwargs = ctx.responses[0] + assert args and "Select a game" in args[0] + assert kwargs["ephemeral"] is True + + +@pytest.mark.asyncio +async def test_search_game_matches_and_finalizes(monkeypatch, command): + cmd_cls, shared = command + ctx = FakeCtx() + instance = object.__new__(cmd_cls) + instance.game = "valorant" + + campaign = CampaignRecord( + id="c1", + name="Valorant Drops", + status="ACTIVE", + game_name="Valorant", + game_slug="valorant", + game_box_art=None, + starts_at=None, + ends_at=None, + benefits=[BenefitRecord(id="b1", name="Reward", image_url=None)], + ) + + async def fake_cached(self): + return [campaign] + + async def fake_finalize(self, ctx_obj): + ctx_obj.finalized = True + + shared.get_campaigns_cached = types.MethodType(fake_cached, shared) + shared.finalize_interaction = types.MethodType(fake_finalize, shared) + + async def fake_collage(campaign, **kwargs): + return None, None + + monkeypatch.setattr("functionality.twitch_drops.commands.search_game.build_benefits_collage", fake_collage) + + await cmd_cls.invoke.__get__(instance, cmd_cls)(ctx) + + assert ctx.deferred is True + assert ctx.responses + args, kwargs = ctx.responses[0] + assert "embeds" in kwargs + assert kwargs["embeds"][0].title == "Valorant" + assert getattr(ctx, "finalized", False) is True diff --git a/tests/test_differ.py b/tests/test_differ.py index 5a3c01d..5f22c1d 100644 --- a/tests/test_differ.py +++ b/tests/test_differ.py @@ -32,3 +32,9 @@ def test_differ_activated_when_newly_seen(): ids = [c.id for c in d.activated] assert "c3" in ids + +def test_differ_skips_still_active(): + prev = {"c1": {"status": "ACTIVE"}} + curr = [rec("c1", "ACTIVE")] + d = DropsDiffer().diff(prev, curr) + assert d.activated == [] diff --git a/tests/test_favorites_commands.py b/tests/test_favorites_commands.py new file mode 100644 index 0000000..8dd1ca7 --- /dev/null +++ b/tests/test_favorites_commands.py @@ -0,0 +1,222 @@ +import hikari +import lightbulb +import pytest + +from functionality.twitch_drops.commands import favorites as favorites_mod +from functionality.twitch_drops.commands.common import SharedContext +from functionality.twitch_drops.config import GuildConfigStore +from functionality.twitch_drops.favorites import FavoritesStore +from functionality.twitch_drops.game_catalog import GameCatalog, GameEntry +from functionality.twitch_drops.models import BenefitRecord, CampaignRecord + + +class StubClient: + def __init__(self) -> None: + self.registered: list[object] = [] + + def register(self, item, *args, **kwargs): + self.registered.append(item) + return item + + +class StubAutocompleteContext: + def __init__(self, *, focused: str = "", guild_id: int | None = None, user_id: int | None = None) -> None: + self.focused = type("Focus", (), {"value": focused})() + self.options: dict[str, str] = {} + self._choices: list[tuple[str, str]] | None = None + if guild_id is not None and user_id is not None: + user = type("User", (), {"id": user_id})() + self.interaction = type("Interaction", (), {"guild_id": guild_id, "user": user})() + else: + self.interaction = None + + async def respond(self, choices): + self._choices = choices + + +@pytest.fixture() +def shared(tmp_path) -> SharedContext: + game_catalog = GameCatalog(str(tmp_path / "catalog.json")) + favorites_store = FavoritesStore(str(tmp_path / "favorites.json")) + return SharedContext( + guild_store=GuildConfigStore(str(tmp_path / "guild.json")), + ICON_LIMIT=9, + ICON_SIZE=96, + ICON_COLUMNS=3, + MAX_ATTACH_PER_CMD=0, + SEND_DELAY_MS=0, + FETCH_TTL=30, + game_catalog=game_catalog, + favorites_store=favorites_store, + ) + + +@pytest.fixture() +def favorites_group(shared: SharedContext): + client = StubClient() + name = favorites_mod.register(client, shared) + assert name == "drops_favorites" + group = next(item for item in client.registered if isinstance(item, lightbulb.commands.groups.Group)) + return group, shared + + +def test_favorites_commands_group_structure(favorites_group): + group, _ = favorites_group + assert set(group.subcommands.keys()) == {"view", "add", "check", "remove"} + + view_cmd = group.subcommands["view"] + assert issubclass(view_cmd, lightbulb.SlashCommand) + assert view_cmd._command_data.options == {} + + add_cmd = group.subcommands["add"] + assert issubclass(add_cmd, lightbulb.SlashCommand) + add_option = add_cmd._command_data.options["game"] + assert getattr(add_option.type, "name", None) == "STRING" + assert add_option.autocomplete_provider is not hikari.UNDEFINED + + remove_cmd = group.subcommands["remove"] + assert issubclass(remove_cmd, lightbulb.SlashCommand) + remove_option = remove_cmd._command_data.options["game"] + assert getattr(remove_option.type, "name", None) == "STRING" + assert remove_option.autocomplete_provider is not hikari.UNDEFINED + + check_cmd = group.subcommands["check"] + assert issubclass(check_cmd, lightbulb.SlashCommand) + assert check_cmd._command_data.options == {} + + +@pytest.mark.asyncio +async def test_add_autocomplete_returns_catalog_matches(favorites_group): + group, shared = favorites_group + add_option = group.subcommands["add"]._command_data.options["game"] + provider = add_option.autocomplete_provider + + shared.game_catalog.merge_games( + [ + GameEntry(key="valorant", name="Valorant", weight=500), + GameEntry(key="apex", name="Apex Legends", weight=300), + ] + ) + shared.game_catalog.set_ready(True) + + ctx = StubAutocompleteContext(focused="val") + await provider(ctx) + assert ctx._choices == [("Valorant", "valorant")] + + +@pytest.mark.asyncio +async def test_add_autocomplete_empty_when_catalog_not_ready(favorites_group): + group, shared = favorites_group + add_option = group.subcommands["add"]._command_data.options["game"] + provider = add_option.autocomplete_provider + + shared.game_catalog.set_ready(False) + + ctx = StubAutocompleteContext(focused="anything") + await provider(ctx) + assert ctx._choices == [] + + +@pytest.mark.asyncio +async def test_remove_autocomplete_only_user_favorites(favorites_group): + group, shared = favorites_group + remove_option = group.subcommands["remove"]._command_data.options["game"] + provider = remove_option.autocomplete_provider + + shared.favorites_store.add_favorite(123, 1, "valorant") + shared.favorites_store.add_favorite(123, 1, "apex") + shared.favorites_store.add_favorite(123, 2, "fortnite") + + ctx = StubAutocompleteContext(focused="ap", guild_id=123, user_id=1) + await provider(ctx) + assert ctx._choices == [("apex", "apex")] + + +@pytest.mark.asyncio +async def test_check_sends_now_active_messages(monkeypatch, favorites_group): + group, shared = favorites_group + check_cmd = group.subcommands["check"] + cmd_instance = object.__new__(check_cmd) + + shared.game_catalog.merge_games( + [ + GameEntry(key="valorant", name="Valorant", weight=500), + ] + ) + shared.game_catalog.set_ready(True) + shared.favorites_store.add_favorite(123, 42, "valorant") + + campaign = CampaignRecord( + id="camp-1", + name="Valorant Drops", + status="ACTIVE", + game_name="Valorant", + game_slug="valorant", + game_box_art=None, + starts_at=None, + ends_at=None, + benefits=[BenefitRecord(id="b1", name="Reward", image_url=None)], + ) + + async def fake_campaigns(): + return [campaign] + + monkeypatch.setattr(shared, "get_campaigns_cached", fake_campaigns) + + sent_messages = [] + + class FakeRest: + async def create_message(self, channel_id, content=None, embeds=None): + sent_messages.append((int(channel_id), content, embeds)) + + class FakeApp: + def __init__(self) -> None: + self.rest = FakeRest() + + class FakeClient: + def __init__(self) -> None: + self.app = FakeApp() + + class FakeCtx: + def __init__(self) -> None: + self.guild_id = 123 + self.channel_id = 999 + self.user = type("User", (), {"id": 42})() + self.client = FakeClient() + + async def defer(self, *args, **kwargs): + return + + async def respond(self, *args, **kwargs): + raise AssertionError("respond should not be called in success case") + + async def edit_initial_response(self, *args, **kwargs): + return + + async def delete_last_response(self, *args, **kwargs): + return + + async def delete_initial_response(self, *args, **kwargs): + return + + async def edit_last_response(self, *args, **kwargs): + return + + ctx = FakeCtx() + + finalized = [] + + async def fake_finalize(ctx_obj, *, message=None): + finalized.append(message) + + monkeypatch.setattr(shared, "finalize_interaction", fake_finalize) + + bound_invoke = check_cmd.invoke.__get__(cmd_instance, check_cmd) + await bound_invoke(ctx) + + assert sent_messages, "Expected at least one message sent" + channel_id, content, embeds = sent_messages[0] + assert channel_id == 999 + assert "<@42>" in (content or "") + assert embeds and embeds[0].title + assert finalized[-1] is None diff --git a/tests/test_favorites_store.py b/tests/test_favorites_store.py new file mode 100644 index 0000000..1f51871 --- /dev/null +++ b/tests/test_favorites_store.py @@ -0,0 +1,67 @@ +from functionality.twitch_drops.favorites import FavoritesStore + + +def test_add_and_remove_favorites(tmp_path): + path = tmp_path / "favorites.json" + store = FavoritesStore(str(path)) + + assert store.get_user_favorites(1, 10) == [] + + assert store.add_favorite(1, 10, "apex-legends") + assert store.add_favorite(1, 10, "valorant") + # Duplicate inserts should be ignored. + assert not store.add_favorite(1, 10, "valorant") + + assert store.get_user_favorites(1, 10) == ["apex-legends", "valorant"] + + assert store.remove_favorite(1, 10, "apex-legends") + assert not store.remove_favorite(1, 10, "apex-legends") + assert store.get_user_favorites(1, 10) == ["valorant"] + + +def test_remove_many_and_cleanup(tmp_path): + path = tmp_path / "favorites.json" + store = FavoritesStore(str(path)) + + for key in ("apex-legends", "valorant", "overwatch"): + store.add_favorite(5, 20, key) + + removed = store.remove_many(5, 20, ["valorant", "unknown"]) + assert removed == 1 + assert store.get_user_favorites(5, 20) == ["apex-legends", "overwatch"] + + removed = store.remove_many(5, 20, ["apex-legends", "overwatch"]) + assert removed == 2 + assert store.get_user_favorites(5, 20) == [] + + +def test_guild_favorites_snapshot(tmp_path): + path = tmp_path / "favorites.json" + store = FavoritesStore(str(path)) + + store.add_favorite(99, 1, "apex") + store.add_favorite(99, 1, "valorant") + store.add_favorite(99, 2, "apex") + + result = store.get_guild_favorites(99) + assert result == {1: {"apex", "valorant"}, 2: {"apex"}} + + +def test_watchers_lookup(tmp_path): + path = tmp_path / "favorites.json" + store = FavoritesStore(str(path)) + + store.add_favorite(7, 101, "apex") + store.add_favorite(7, 102, "valorant") + store.add_favorite(7, 103, "apex") + store.add_favorite(7, 103, "valorant") + + watchers = store.get_watchers(7, ["valorant"]) + assert watchers == {102: {"valorant"}, 103: {"valorant"}} + + watchers = store.get_watchers(7, ["apex", "valorant"]) + assert watchers == { + 101: {"apex"}, + 102: {"valorant"}, + 103: {"apex", "valorant"}, + } diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py index a65e768..6f63e71 100644 --- a/tests/test_fetcher.py +++ b/tests/test_fetcher.py @@ -1,5 +1,4 @@ import pytest -import asyncio from functionality.twitch_drops import fetcher as fetcher_mod from functionality.twitch_drops.models import CampaignRecord @@ -34,3 +33,97 @@ async def test_fetcher_prefers_display_name_and_slug(monkeypatch): alpha = next(r for r in recs if r.id == "c_active") assert alpha.game_name == "Alpha" assert alpha.game_slug == "alpha" + + +async def _fake_fetch_active_campaigns_mixed(): + return { + "campaigns": [ + { + "id": "c_active", + "name": "Primary", + "status": "ACTIVE", + "game": {"displayName": "Game One", "slug": "game-one"}, + "timeBasedDrops": [ + { + "benefitEdges": [ + {"benefit": {"id": "b1", "name": "Reward A", "imageAssetURL": "https://img/a.png"}}, + {"benefit": {"id": "b1", "name": "Reward A (dup)", "imageAssetURL": "https://img/a2.png"}}, + ] + }, + { + "benefitEdges": [ + {"benefit": {"id": "b2", "name": "Reward B", "imageAssetURL": "https://img/b.png"}}, + ] + }, + ], + }, + { + "id": "c_future", + "name": "Future Campaign", + "status": "UPCOMING", + "game": {"displayName": "Future Game"}, + }, + { + "id": "c_invalid", + "name": "Invalid", + "status": "ACTIVE", + "game": None, + "timeBasedDrops": [ + {}, + { + "benefitEdges": [ + {"benefit": None}, + {"benefit": {}}, + ] + }, + ], + }, + "not-a-dict", + { + "id": "c_missing_status", + }, + ] + } + + +class DummyCatalog: + def __init__(self, should_raise: bool = False) -> None: + self.records: list[CampaignRecord] | None = None + self.should_raise = should_raise + + def merge_from_campaign_records(self, recs: list[CampaignRecord]) -> bool: + self.records = recs + if self.should_raise: + raise RuntimeError("boom") + return True + + +async def test_fetcher_filters_invalid_and_deduplicates(monkeypatch): + monkeypatch.setattr(fetcher_mod, "fetch_active_campaigns", _fake_fetch_active_campaigns_mixed) + catalog = DummyCatalog() + monkeypatch.setattr(fetcher_mod, "get_game_catalog", lambda: catalog) + + f = fetcher_mod.DropsFetcher() + recs = await f.fetch_condensed() + + assert len(recs) == 2 # c_active + c_invalid (even with minimal data) + active = next(r for r in recs if r.id == "c_active") + assert active.game_name == "Game One" + assert active.game_slug == "game-one" + # Benefit IDs should be unique even across multiple drops + assert [b.id for b in active.benefits] == ["b1", "b2"] + assert catalog.records is recs + + +async def test_fetcher_swallows_catalog_errors(monkeypatch): + async def fake_fetch(): + return {"campaigns": [{"id": "c1", "name": "Ok", "status": "ACTIVE", "timeBasedDrops": []}]} + + monkeypatch.setattr(fetcher_mod, "fetch_active_campaigns", fake_fetch) + catalog = DummyCatalog(should_raise=True) + monkeypatch.setattr(fetcher_mod, "get_game_catalog", lambda: catalog) + + f = fetcher_mod.DropsFetcher() + recs = await f.fetch_condensed() + assert len(recs) == 1 + assert recs[0].id == "c1" diff --git a/tests/test_game_catalog.py b/tests/test_game_catalog.py new file mode 100644 index 0000000..b9697d9 --- /dev/null +++ b/tests/test_game_catalog.py @@ -0,0 +1,149 @@ +import pytest + +from functionality.twitch_drops.game_catalog import GameCatalog, GameEntry +from functionality.twitch_drops.models import CampaignRecord, BenefitRecord + + +def test_merge_games_prefers_higher_weight(tmp_path): + catalog_path = tmp_path / "catalog.json" + catalog = GameCatalog(str(catalog_path)) + + first = GameEntry( + key="game a", + name="Game A", + weight=100, + aliases=["game a"], + sources=["helix"], + ) + second = GameEntry( + key="game a", + name="Game A Deluxe", + weight=250, + aliases=["game a"], + sources=["campaign"], + ) + + assert catalog.merge_games([first]) is True + assert catalog.merge_games([second]) is True + + entry = catalog.get("game a") + assert entry is not None + assert entry.weight == 250 + assert entry.name == "Game A Deluxe" + assert "campaign" in entry.sources + assert "helix" in entry.sources + + +def test_matches_campaign_handles_slug(tmp_path): + catalog_path = tmp_path / "catalog.json" + catalog = GameCatalog(str(catalog_path)) + + entry = GameEntry( + key="game b", + name="Game B", + weight=200, + aliases=["game b"], + sources=["helix"], + ) + catalog.merge_games([entry]) + + campaign = CampaignRecord( + id="1", + name="Summer Drops", + status="ACTIVE", + game_name="Game B", + game_slug="game-b", + game_box_art=None, + starts_at=None, + ends_at=None, + benefits=[BenefitRecord(id="b1", name="Reward", image_url=None)], + ) + + resolved = catalog.get("game b") + assert resolved is not None + assert catalog.matches_campaign(resolved, campaign) is True + + +def test_search_returns_weighted_results(tmp_path): + catalog_path = tmp_path / "catalog.json" + catalog = GameCatalog(str(catalog_path)) + catalog.merge_games( + [ + GameEntry( + key="game c", + name="Game C", + weight=300, + aliases=["game c"], + sources=["helix"], + ), + GameEntry( + key="game d", + name="Adventure D", + weight=150, + aliases=["adventure d"], + sources=["campaign"], + ), + ] + ) + + results = catalog.search("adventure") + assert results + assert results[0].name == "Adventure D" + assert len(results) == 1 + + +def test_search_filters_out_non_matches(tmp_path): + catalog_path = tmp_path / "catalog.json" + catalog = GameCatalog(str(catalog_path)) + catalog.merge_games( + [ + GameEntry( + key="game e", + name="Game E", + weight=400, + aliases=["game e"], + sources=["helix"], + ) + ] + ) + + assert catalog.search("zzz") == [] + + +def test_merge_from_campaign_records_adds_aliases(tmp_path): + catalog = GameCatalog(str(tmp_path / "catalog.json")) + campaign = CampaignRecord( + id="1", + name="Winter Bash", + status="ACTIVE", + game_name="My Game", + game_slug="my-game", + game_box_art=None, + starts_at=None, + ends_at=None, + benefits=[BenefitRecord(id="b", name="Reward", image_url=None)], + ) + assert catalog.merge_from_campaign_records([campaign]) is True + entry = catalog.get("My Game") + assert entry is not None + assert "my-game" in entry.aliases + + +def test_merge_state_snapshot_handles_invalid(tmp_path): + catalog = GameCatalog(str(tmp_path / "catalog.json")) + snapshot = { + "good": {"game_name": "Valid Game", "game_box_art": "https://example"}, + "bad": {"game_name": ""}, + "also_bad": "not a dict", + } + assert catalog.merge_state_snapshot(snapshot) is True + assert catalog.get("Valid Game") is not None + + +@pytest.mark.asyncio +async def test_ready_event_waits(tmp_path): + catalog = GameCatalog(str(tmp_path / "catalog.json")) + assert await catalog.wait_ready(timeout=0.05) is False + catalog.set_ready(True) + assert catalog.is_ready() is True + assert await catalog.wait_ready(timeout=0.05) is True diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100644 index 0000000..a5c9bdc --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,36 @@ +from datetime import datetime, timezone + +from functionality.twitch_drops.models import BenefitRecord, CampaignRecord + + +def test_campaign_record_timestamp_properties(): + rec = CampaignRecord( + id="camp", + name="Test", + status="ACTIVE", + game_name="Game", + game_slug="game", + game_box_art=None, + starts_at="2024-04-01T12:00:00Z", + ends_at="2024-04-03T05:30:00+02:00", + benefits=[BenefitRecord(id="b", name="Reward", image_url=None)], + ) + assert rec.starts_ts == int(datetime(2024, 4, 1, 12, tzinfo=timezone.utc).timestamp()) + expected_end = datetime(2024, 4, 3, 3, 30, tzinfo=timezone.utc) # converted from +02:00 + assert rec.ends_ts == int(expected_end.timestamp()) + + +def test_campaign_record_handles_invalid_timestamps(): + rec = CampaignRecord( + id="camp", + name="Invalid Times", + status="ACTIVE", + game_name=None, + game_slug=None, + game_box_art=None, + starts_at="not-a-date", + ends_at=None, + benefits=[], + ) + assert rec.starts_ts is None + assert rec.ends_ts is None diff --git a/tests/test_monitor.py b/tests/test_monitor.py new file mode 100644 index 0000000..a436aee --- /dev/null +++ b/tests/test_monitor.py @@ -0,0 +1,115 @@ +import pytest + +from functionality.twitch_drops.models import BenefitRecord, CampaignRecord +from functionality.twitch_drops.monitor import DropsMonitor + + +class StubCatalog: + def __init__(self): + self.merged = [] + + def merge_state_snapshot(self, data): + self.merged.append(data) + + +class StubStore: + def __init__(self): + self._data = {} + self.saves: list[list[str]] = [] + + def load(self): + return dict(self._data) + + def save(self, campaigns): + self.saves.append([c.id for c in campaigns]) + self._data = {c.id: {"status": c.status} for c in campaigns} + + +class StubFetcher: + def __init__(self, campaigns): + self.campaigns = campaigns + self.calls = 0 + + async def fetch_condensed(self): + self.calls += 1 + return self.campaigns + + +class StubNotifier: + def __init__(self): + self.calls = 0 + self.payloads = [] + + async def notify(self, diff): + self.calls += 1 + self.payloads.append(diff) + + +class StubApp: + def __init__(self): + self.rest = object() + + +def make_campaign(cid: str) -> CampaignRecord: + return CampaignRecord( + id=cid, + name="Camp", + status="ACTIVE", + game_name="Game", + game_slug="game", + game_box_art=None, + starts_at=None, + ends_at=None, + benefits=[BenefitRecord(id="b1", name="Reward", image_url=None)], + ) + + +@pytest.mark.asyncio +async def test_monitor_skips_notify_on_first_run(monkeypatch): + monkeypatch.setattr("functionality.twitch_drops.monitor.get_game_catalog", lambda: StubCatalog()) + + app = StubApp() + monitor = DropsMonitor(app, interval_minutes=1, notify_on_boot=False) + store = StubStore() + fetcher = StubFetcher([make_campaign("c1")]) + notifier = StubNotifier() + monitor.store = store + monitor.fetcher = fetcher + monitor.notifier = notifier + + async def stop_sleep(*args, **kwargs): + raise StopAsyncIteration + + monkeypatch.setattr("functionality.twitch_drops.monitor.asyncio.sleep", stop_sleep) + + with pytest.raises(StopAsyncIteration): + await monitor._run_loop() + + assert notifier.calls == 0 + assert fetcher.calls == 1 + assert store.saves == [["c1"]] + + +@pytest.mark.asyncio +async def test_monitor_notifies_when_enabled(monkeypatch): + monkeypatch.setattr("functionality.twitch_drops.monitor.get_game_catalog", lambda: StubCatalog()) + + app = StubApp() + monitor = DropsMonitor(app, interval_minutes=1, notify_on_boot=True) + store = StubStore() + fetcher = StubFetcher([make_campaign("c2")]) + notifier = StubNotifier() + monitor.store = store + monitor.fetcher = fetcher + monitor.notifier = notifier + + async def stop_sleep(*args, **kwargs): + raise StopAsyncIteration + + monkeypatch.setattr("functionality.twitch_drops.monitor.asyncio.sleep", stop_sleep) + + with pytest.raises(StopAsyncIteration): + await monitor._run_loop() + + assert notifier.calls == 1 + assert notifier.payloads[0].activated[0].id == "c2" diff --git a/tests/test_notifier.py b/tests/test_notifier.py new file mode 100644 index 0000000..c84bb77 --- /dev/null +++ b/tests/test_notifier.py @@ -0,0 +1,109 @@ +import pytest + +from functionality.twitch_drops.config import GuildConfigStore +from functionality.twitch_drops.differ import DropsDiff +from functionality.twitch_drops.models import BenefitRecord, CampaignRecord +from functionality.twitch_drops.favorites import FavoritesStore +from functionality.twitch_drops.game_catalog import GameCatalog, GameEntry +from functionality.twitch_drops.notifier import DropsNotifier + + +class StubRest: + def __init__(self, guild_id: int, channel_id: int): + self._guild_id = guild_id + self._channel_id = channel_id + self.sent: list[tuple[int, str | None, list]] = [] + + class _Guild: + def __init__(self, guild_id: int): + self.id = guild_id + self.system_channel_id = None + + async def fetch_my_guilds(self): + return [self._Guild(self._guild_id)] + + async def create_message(self, channel_id, *, content=None, embeds=None, **kwargs): + self.sent.append((int(channel_id), content, list(embeds or []))) + + +class StubApp: + def __init__(self, rest: StubRest): + self.rest = rest + + +@pytest.mark.asyncio +async def test_notifier_posts_collage_and_mentions(monkeypatch, tmp_path): + rest = StubRest(guild_id=123, channel_id=999) + app = StubApp(rest) + + guild_store = GuildConfigStore(str(tmp_path / "guild.json")) + guild_store.set_channel_id(123, 999) + favorites = FavoritesStore(str(tmp_path / "favorites.json")) + favorites.add_favorite(123, 111, "valorant") + favorites.add_favorite(123, 222, "valorant") + + catalog = GameCatalog(str(tmp_path / "catalog.json")) + catalog.merge_games( + [ + GameEntry( + key="valorant", + name="Valorant", + weight=500, + aliases=["valorant"], + sources=["seed"], + ) + ] + ) + + monkeypatch.setenv("DROPS_MAX_ATTACHMENTS_PER_NOTIFY", "1") + monkeypatch.setenv("DROPS_SEND_DELAY_MS", "0") + + async def fake_collage(campaign, **kwargs): + return b"png-bytes", "file.png" + + monkeypatch.setattr("functionality.twitch_drops.notifier.build_benefits_collage", fake_collage) + + async def no_sleep(*args, **kwargs): + return None + + monkeypatch.setattr("functionality.twitch_drops.notifier.asyncio.sleep", no_sleep) + + notifier = DropsNotifier(app, guild_store, favorites, catalog) + + campaign = CampaignRecord( + id="camp1", + name="Valorant Drops", + status="ACTIVE", + game_name="Valorant", + game_slug="valorant", + game_box_art=None, + starts_at=None, + ends_at=None, + benefits=[BenefitRecord(id="b1", name="Reward", image_url="https://img.example/1.png")], + ) + diff = DropsDiff(activated=[campaign]) + + await notifier.notify(diff) + + assert rest.sent, "notification should be sent" + channel_id, content, embeds = rest.sent[0] + assert channel_id == 999 + assert "<@111>" in (content or "") + assert "<@222>" in (content or "") + assert embeds and embeds[0].title == "Valorant" + assert embeds[0].image is not None + + +def test_join_mentions_truncates(tmp_path): + rest = StubRest(guild_id=123, channel_id=999) + app = StubApp(rest) + guild_store = GuildConfigStore(str(tmp_path / "guild.json")) + favorites = FavoritesStore(str(tmp_path / "favorites.json")) + catalog = GameCatalog(str(tmp_path / "catalog.json")) + notifier = DropsNotifier(app, guild_store, favorites, catalog) + + ids = [100 + i for i in range(10)] + text = notifier._join_mentions(ids, limit=15) + assert text.endswith("…") + # Should include at least the first user mention + assert "<@100>" in text diff --git a/tests/test_register_commands.py b/tests/test_register_commands.py index 147bdfe..7194240 100644 --- a/tests/test_register_commands.py +++ b/tests/test_register_commands.py @@ -9,4 +9,14 @@ def test_register_commands_adds_expected(): bot = hikari.GatewayBot(token="X", intents=hikari.Intents.ALL_UNPRIVILEGED) client = lightbulb.client_from_app(bot) names = set(register_commands(client)) - assert {"hello", "help", "drops_active", "drops_this_week", "drops_set_channel", "drops_channel", "drops_search_game"}.issubset(names) + expected = { + "hello", + "help", + "drops_active", + "drops_this_week", + "drops_set_channel", + "drops_channel", + "drops_search_game", + "drops_favorites", + } + assert expected.issubset(names) diff --git a/tests/test_twitch_drops_core.py b/tests/test_twitch_drops_core.py new file mode 100644 index 0000000..2f0dc1c --- /dev/null +++ b/tests/test_twitch_drops_core.py @@ -0,0 +1,29 @@ +from functionality.twitch_drops.twitch_drops import ( + GQLOperation, + _merge_data, + is_first_party_validate, + ANDROID_CLIENT_ID, +) + + +def test_gql_operation_with_variables_merges_copy(): + ops = GQLOperation("TestOp", "abc123", variables={"foo": "bar"}) + merged = ops.with_variables({"foo": "override", "baz": 1}) + assert merged is not ops + assert merged["variables"]["foo"] == "override" + assert merged["variables"]["baz"] == 1 + + +def test_merge_data_prefers_primary_values(): + a = {"id": 1, "nested": {"value": "primary", "other": 1}} + b = {"nested": {"value": "secondary", "extra": 2}, "new": 3} + result = _merge_data(a, b) + assert result["nested"]["value"] == "primary" + assert result["nested"]["extra"] == 2 + assert result["new"] == 3 + + +def test_is_first_party_validate(): + payload = {"client_id": ANDROID_CLIENT_ID} + assert is_first_party_validate(payload) is True + assert is_first_party_validate({"client_id": "other"}) is False