Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion plugins/factoids.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime
import json
from typing import TYPE_CHECKING, Literal, Mapping, Optional, TypedDict, Union, cast, overload
from typing import TYPE_CHECKING, Any, Literal, Mapping, Optional, TypedDict, Union, cast, overload
from typing_extensions import NotRequired

from discord import AllowedMentions, Embed, Message, MessageReference, Thread
Expand Down Expand Up @@ -41,9 +41,15 @@ class GlobalConfig:
def __init__(self, *, id: int = ..., prefix: Optional[str] = ...) -> None: ...


class CooldownInfo(TypedDict):
length: int
applies_for: str


class Flags(TypedDict):
mentions: NotRequired[bool]
acl: NotRequired[str]
cooldown: NotRequired[CooldownInfo]


@registry.mapped
Expand Down Expand Up @@ -150,10 +156,32 @@ async def init() -> None:
prefix = conf.prefix


class CooldownMap:
def __init__(self):
self.cooldown_buckets = {}

def _purge_expired_buckets(self, current: int) -> None:
to_delete = [key for key, expires_at in self.cooldown_buckets.items() if expires_at <= current]
for key in to_delete:
del self.cooldown_buckets[key]

def update_cooldown(self, key: Any, length: int, current: int) -> bool:
self._purge_expired_buckets(current)

if key not in self.cooldown_buckets:
self.cooldown_buckets[key] = current + length
return False
return True


@cog
class Factoids(Cog):
"""Manage factoids."""

def __init__(self):
super().__init__()
self.cd_mapping = CooldownMap()

@Cog.listener()
async def on_message(self, msg: Message) -> None:
if msg.author.bot:
Expand Down Expand Up @@ -184,6 +212,18 @@ async def on_message(self, msg: Message) -> None:
if "mentions" in flags and flags["mentions"]:
mentions = AllowedMentions(roles=True, users=True)

cooldown = flags.get("cooldown", None)
# has a cooldown, acl evalutes to true and currently on cooldown
if (
cooldown
and evaluate_acl(cooldown["applies_for"], msg.author, msg.channel) == EvalResult.TRUE
and self.cd_mapping.update_cooldown(
(msg.channel.id, alias.factoid.id), cooldown["length"], int(msg.created_at.timestamp())
)
):
await msg.add_reaction("\u231B")
return

embed = Embed.from_dict(alias.factoid.embed_data) if alias.factoid.embed_data is not None else None
if msg.reference is not None and msg.reference.message_id is not None:
reference = MessageReference(
Expand Down Expand Up @@ -392,6 +432,9 @@ async def tag_flags(self, ctx: Context, name: str, flags: Optional[Union[CodeBlo
Configure admin-only flags for a factoid. The flags are a JSON dictionary with the following keys:
- "mentions": a boolean, if true, makes the factoid invocation ping the roles and users it involves
- "acl": a string referring to an ACL (configurable with `acl`) required to use the factoid
- "cooldown": a dictionary that if present, specifies a cooldown for the factoid
- "length": how long the cooldown lasts for
- "applies_for": a string referring to an ACL that specifies when the cooldown applies
"""
assert prefix is not None
name = validate_name(name)
Expand Down