diff --git a/plugins/factoids.py b/plugins/factoids.py index ba8d229..509e772 100644 --- a/plugins/factoids.py +++ b/plugins/factoids.py @@ -1,6 +1,6 @@ from datetime import datetime import json -from typing import TYPE_CHECKING, Literal, Mapping, Optional, TypedDict, Union, cast, overload +from typing import TYPE_CHECKING, Any, Literal, Mapping, Optional, TypedDict, Union, cast, overload from typing_extensions import NotRequired from discord import AllowedMentions, Embed, Message, MessageReference, Thread @@ -41,9 +41,15 @@ class GlobalConfig: def __init__(self, *, id: int = ..., prefix: Optional[str] = ...) -> None: ... +class CooldownInfo(TypedDict): + length: int + applies_for: str + + class Flags(TypedDict): mentions: NotRequired[bool] acl: NotRequired[str] + cooldown: NotRequired[CooldownInfo] @registry.mapped @@ -150,10 +156,32 @@ async def init() -> None: prefix = conf.prefix +class CooldownMap: + def __init__(self): + self.cooldown_buckets = {} + + def _purge_expired_buckets(self, current: int) -> None: + to_delete = [key for key, expires_at in self.cooldown_buckets.items() if expires_at <= current] + for key in to_delete: + del self.cooldown_buckets[key] + + def update_cooldown(self, key: Any, length: int, current: int) -> bool: + self._purge_expired_buckets(current) + + if key not in self.cooldown_buckets: + self.cooldown_buckets[key] = current + length + return False + return True + + @cog class Factoids(Cog): """Manage factoids.""" + def __init__(self): + super().__init__() + self.cd_mapping = CooldownMap() + @Cog.listener() async def on_message(self, msg: Message) -> None: if msg.author.bot: @@ -184,6 +212,18 @@ async def on_message(self, msg: Message) -> None: if "mentions" in flags and flags["mentions"]: mentions = AllowedMentions(roles=True, users=True) + cooldown = flags.get("cooldown", None) + # has a cooldown, acl evalutes to true and currently on cooldown + if ( + cooldown + and evaluate_acl(cooldown["applies_for"], msg.author, msg.channel) == EvalResult.TRUE + and self.cd_mapping.update_cooldown( + (msg.channel.id, alias.factoid.id), cooldown["length"], int(msg.created_at.timestamp()) + ) + ): + await msg.add_reaction("\u231B") + return + embed = Embed.from_dict(alias.factoid.embed_data) if alias.factoid.embed_data is not None else None if msg.reference is not None and msg.reference.message_id is not None: reference = MessageReference( @@ -392,6 +432,9 @@ async def tag_flags(self, ctx: Context, name: str, flags: Optional[Union[CodeBlo Configure admin-only flags for a factoid. The flags are a JSON dictionary with the following keys: - "mentions": a boolean, if true, makes the factoid invocation ping the roles and users it involves - "acl": a string referring to an ACL (configurable with `acl`) required to use the factoid + - "cooldown": a dictionary that if present, specifies a cooldown for the factoid + - "length": how long the cooldown lasts for + - "applies_for": a string referring to an ACL that specifies when the cooldown applies """ assert prefix is not None name = validate_name(name)