diff --git a/.gitignore b/.gitignore index ccefa01..5631163 100644 --- a/.gitignore +++ b/.gitignore @@ -155,3 +155,7 @@ fabric.properties *.tmp *.temp *.sass-cache + +# Python stuff: +.venv +__pycache__ diff --git a/appwrite/discord-bots/public-bot/addonsearch.py b/appwrite/discord-bots/public-bot/addonsearch.py deleted file mode 100644 index 6894674..0000000 --- a/appwrite/discord-bots/public-bot/addonsearch.py +++ /dev/null @@ -1,52 +0,0 @@ -import discord -import meilisearch -import dotenv -import os - -dotenv.load_dotenv() -try: - client = meilisearch.Client(os.getenv("MEILISEARCH_URL"), os.getenv("SEARCH_TOKEN")) - index = client.index("addons") -except Exception as e: - print(f"Error initializing Meilisearch: {e}") - exit() - -loaders = ["Forge", "NeoForge", "Fabric", "Quilt"] - - -async def addonsearch(interaction: discord.Interaction, query: str, limit: int = 1): - request = index.search(query, {"limit": limit}) - data = request.get("hits", []) # Récupérer en toute sécurité 'hits' ou une liste vide - await send_embeds(interaction, data) - return data - - -async def send_embeds(interaction: discord.Interaction, data: list[dict]): - for mod_data in data: # `data` est une liste de dictionnaires d'addons - embed = await gen_embed(mod_data) - await interaction.followup.send(embed=embed) - - -def format_loaders(loaders: list[str]) -> str: - if not loaders: - return "No loaders available" - return " ".join([f"{loader}" for loader in loaders]) - - -async def gen_embed(data: dict) -> discord.Embed: - loaders_field = format_loaders(data.get("loaders", [])) - embed = discord.Embed(title="🟦 " + data["name"] +" 🟦", description=data["description"], color=0x689AEE) - embed.set_author(name="📋 ᴀᴜᴛʜᴏʀ : " + data["author"]) - embed.set_thumbnail(url=data["icon"]) - - embed.add_field(name="🚀 ʟᴏᴀᴅᴇʀs", value=loaders_field, inline=False) - embed.add_field(name="🔵 ᴀᴠᴀɪʟᴀʙʟᴇ ᴏɴ :", value="", inline=False) - if data.get("modrinth_raw"): - embed.add_field(name="", value=f'ᴍᴏᴅʀɪɴᴛʜ 🟢') - - if data.get("curseforge_raw"): - embed.add_field(name="", value=f'ᴄᴜʀsᴇғᴏʀɢᴇ 🟠') - - embed.add_field(name="🔎 ɪɴғᴏs", value=f"https://nottelling.youthe.domain/addons/{data['slug']}", inline=False) - embed.set_footer(text=f" ⚡ {data['downloads']} ᴅᴏᴡɴʟᴏᴀᴅs") - return embed diff --git a/appwrite/discord-bots/public-bot/bot.py b/appwrite/discord-bots/public-bot/bot.py index 728039e..9563791 100644 --- a/appwrite/discord-bots/public-bot/bot.py +++ b/appwrite/discord-bots/public-bot/bot.py @@ -1,104 +1,77 @@ +""" This is the main file for the bot. """ +import os +import asyncio +import traceback +import dotenv import discord -from discord import app_commands from discord.ext import commands -from colorama import Back, Style -from addonsearch import addonsearch -from schematicsearch import schematicsearch -import dotenv -import os dotenv.load_dotenv() +TOKEN = os.getenv("PUBLIC_DISCORD_TOKEN") +GUILD_ID = os.getenv("GUILD_ID") intents = discord.Intents.default() intents.message_content = True -bot = commands.Bot(command_prefix="??", intents=intents) - -# Remove the old help command +bot = commands.Bot(command_prefix="!!", intents=intents, help_command=None) bot.remove_command("help") +@bot.event +async def setup_hook(): + """Runs async setup before the bot logs in.""" + print("Loading cogs...") + try: + await bot.load_extension("cogs.general") + await bot.load_extension("cogs.events") + await bot.load_extension("cogs.developer") + await bot.load_extension("cogs.moderation") + await bot.load_extension("cogs.leveling") + await bot.load_extension("cogs.search") + except commands.ExtensionError as e: + print(f"Failed to load cog: {type(e).__name__} - {e}") + print("Cogs loaded.") + + @bot.event async def on_ready(): - # Synchronize slash commands with Discord - await bot.tree.sync() - print(f"{Back.GREEN}Logged in as {bot.user}{Style.RESET_ALL}") + """Synchronize slash commands with Discord""" + # In production, use the manual sync commands instead + try: + guild_commands = await bot.tree.sync() + print(f"App command sync complete: {len(guild_commands)} commands registered.") + except discord.DiscordServerError as e: + print(f"Command sync failed: Discord server error: {e}") + except discord.HTTPException as e: + print(f"Command sync failed: HTTP error: {e}") + except discord.DiscordException as e: + print(f"Command sync failed: {e}") -@bot.tree.command(name="help", description="Displays the list of available commands") -async def help_command(interaction: discord.Interaction): - embed = discord.Embed( - title="Commands available", - description="Addons & Schematics commands", - color=discord.Color.blurple() - ) - embed.add_field( - name="Addon Commands", - value="/addon search [limit=1] - Search for addons on Blueprint.", - inline=False - ) - embed.add_field( - name="Schematic Commands", - value="/schematic search [limit=1]", - inline=False - ) - embed.add_field( - name="Data types", - value=""" - <> - needed - [] - optional - """, - inline=False - ) - await interaction.response.send_message(embed=embed) -# Slash command for addon search -@bot.tree.command(name="addon", description="Manages commands related to addons") -async def addon_command(interaction: discord.Interaction, query: str, limit: int = 1): - limit = max(1, min(limit, 5)) - - # Defer the response to avoid InteractionResponded error - await interaction.response.defer(ephemeral=True) - - # Search for addons - await addonsearch(interaction=interaction, query=query, limit=limit) +async def main(): + """Main function to start the bot""" + if TOKEN is None: + print("ERROR: Bot token not found in .env file!") + return -# Slash command for schematic search -@bot.tree.command(name="schematic", description="Manages commands related to schematics") -async def schematic_command(interaction: discord.Interaction, query: str, limit: int = 1): - limit = max(1, min(limit, 5)) - - # Defer the response to avoid InteractionResponded error - await interaction.response.defer(ephemeral=True) - - # Search for schematics - await schematicsearch(interaction=interaction, query=query, limit=limit) + try: + print("Starting bot...") + await bot.start(TOKEN) + except discord.LoginFailure: + print("ERROR: Improper token passed. Check your .env file.") + except discord.PrivilegedIntentsRequired: + print( + "ERROR: Privileged Intents (Members/Message Content) are not enabled in the Developer Portal or here." + ) + except discord.HTTPException as e: + print(f"ERROR: HTTP request failed: {e}") + except discord.ConnectionClosed as e: + print(f"ERROR: Discord connection closed unexpectedly: {e}") + except (RuntimeError, TypeError, ValueError, OSError) as e: + print(f"CRITICAL ERROR: An unexpected error occurred: {e}") + traceback.print_exc() -@bot.tree.command(name="link", description="Displays the link to the official website") -async def site_command(interaction: discord.Interaction): - embed = discord.Embed( - title="Official Website", - description="Visit our website to discover more addons and schematics!", - color=discord.Color.blue(), - url="https://your-website.com" # Replace with your actual URL - ) - - # Add an image to the embed (optional) - embed.set_thumbnail(url="https://your-website.com/logo.png") # Replace with your logo URL - - # Add additional information - embed.add_field( - name="Latest Updates", - value="Check out the latest addons and schematics added to our site!", - inline=False - ) - - embed.add_field( - name="Support", - value="Need help? Visit our forum or contact us directly on the site.", - inline=False - ) - - # Add a footer - embed.set_footer(text="© 2025 Your Name - All rights reserved") - - await interaction.response.send_message(embed=embed) -bot.run(os.getenv("PUBLIC_DISCORD_TOKEN")) \ No newline at end of file +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("Bot shutdown requested.") diff --git a/appwrite/discord-bots/public-bot/cogs/developer.py b/appwrite/discord-bots/public-bot/cogs/developer.py new file mode 100644 index 0000000..e1c5014 --- /dev/null +++ b/appwrite/discord-bots/public-bot/cogs/developer.py @@ -0,0 +1,85 @@ +"""Developer-only commands for the bot.""" + +import os +import json + +import discord +from discord.ext import commands +from discord import app_commands # For slash commands + +OWNER_ID = os.getenv("OWNER_ID") +SERVER_ID = os.getenv("SERVER_ID") +CHANNEL_ID = os.getenv("CHANNEL_ID") +DEV_ROLE_ID = os.getenv("DEV_ROLE_ID") +NO_DEV_ROLE_MSG = "You need the Developer role to use this command." +DEV_SERVER_MSG = "This command can only be used in the developer server." + + +class DeveloperCog(commands.Cog): + """ + Developer-only commands for the bot. + Args: + commands (commands.Bot): The bot instance to which this cog is attached. + """ + + def __init__(self, bot: commands.Bot): + self.bot = bot + self.config_path = "./config/testingurl.json" # Adjusted path slightly + # Ensure owner_id is set on the bot instance for checks if needed elsewhere + self.bot.owner_id = int(OWNER_ID) if OWNER_ID else None + self.dev_role_id = int(DEV_ROLE_ID) if DEV_ROLE_ID else None + self.server_id = int(SERVER_ID) if SERVER_ID else None + self.channel_id = int(CHANNEL_ID) if CHANNEL_ID else None + + # Print out debug info + print("Developer Cog initialized with settings:") + print(f"Owner ID: {self.bot.owner_id}") + print(f"Developer Role ID: {self.dev_role_id}") + print(f"Server ID: {self.server_id}") + print(f"Channel ID: {self.channel_id}") + + # --- Helper: Check if user is a dev --- + def _is_dev(self, user: discord.User | discord.Member) -> bool: + """Check if a user is a developer based on role or owner status""" + if user.id == self.bot.owner_id: + return True + + if isinstance(user, discord.Member) and self.dev_role_id: + return any(role.id == self.dev_role_id for role in user.roles) + + return False + + # --- Helper for prefix commands (for backward compatibility) --- + async def is_dev(self, ctx: commands.Context) -> bool: + """Legacy method for prefix command checks""" + if not ctx.guild: + return False + + return self._is_dev(ctx.author) + + # --- Helper: Load/Save Config --- + def _load_testing_config(self): + try: + with open(self.config_path, "r", encoding="utf-8") as f: + return json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + # Return default structure if file missing or invalid + return {"url": None} + + def _save_testing_config(self, config_data): + try: + # Create directory if it doesn't exist + os.makedirs(os.path.dirname(self.config_path), exist_ok=True) + with open(self.config_path, "w", encoding="utf-8") as f: + json.dump(config_data, f, indent=4) + return True + except (OSError, IOError, TypeError) as e: + print(f"Error saving testing config: {e}") + return False + + +# Setup function to add the Cog to the bot +async def setup(bot: commands.Bot): + """Setup function to add the Cog to the bot""" + await bot.add_cog(DeveloperCog(bot)) + print(" DeveloperCog loaded.") diff --git a/appwrite/discord-bots/public-bot/cogs/events.py b/appwrite/discord-bots/public-bot/cogs/events.py new file mode 100644 index 0000000..8a57daa --- /dev/null +++ b/appwrite/discord-bots/public-bot/cogs/events.py @@ -0,0 +1,72 @@ +""" +This cog handles events, such as message processing and command suggestions. +""" + +import random +import discord +from discord.ext import commands + + +class EventsCog(commands.Cog): + """ + A Discord event handling cog that processes messages and provides command migration guidance. + + This cog listens to all messages, processes prefix commands, and occasionally suggests + migrating from legacy prefix commands to new slash commands by providing helpful reminders. + """ + + def __init__(self, bot: commands.Bot): + self.bot = bot + + # Use Cog.listener() for events + @commands.Cog.listener() + async def on_message(self, message: discord.Message): + """ + Processes incoming messages to handle prefix commands and suggest slash commands. + + Args: + message (discord.Message): The message object representing the incoming message. + """ + # Ignore bots + if message.author == self.bot.user or message.author.bot: + return + + # Process prefix commands + await self.bot.process_commands(message) + + # Debug: Print received message for testing + print(f"Message received from {message.author.name}: {message.content[:30]}...") + + # Check for old command usage and suggest slash commands instead + # This helps users transition to the new slash command system + if message.content.startswith("!!") and message.guild: + command = message.content[2:].split(" ")[0].lower() + + # Mapping of old prefix commands to new slash commands + command_mapping = { + "yapping": "yapping", + "wiki": "wiki", + "plshelp": "wiki", + "github": "github", + "git": "github", + "socials": "socials", + "members": "members", + "statusbot": "statusbot", + "issue8ball": "issue8ball", + } + + debug_commands = ["ping", "syncguild", "syncglobal", "cmdinfo"] + + if command in command_mapping and command not in debug_commands: + # Only remind occasionally to avoid spamming + if random.random() < 0.3: + slash_command = command_mapping[command] + await message.channel.send( + f"💡 **Tip:** We're moving to slash commands! Try using /{slash_command} instead of !!{command}", + delete_after=10, + ) + +async def setup(bot: commands.Bot): + """ Add the EventsCog to the bot """ + await bot.add_cog(EventsCog(bot)) + print(" EventsCog loaded.") diff --git a/appwrite/discord-bots/public-bot/cogs/general.py b/appwrite/discord-bots/public-bot/cogs/general.py new file mode 100644 index 0000000..07fecb3 --- /dev/null +++ b/appwrite/discord-bots/public-bot/cogs/general.py @@ -0,0 +1,257 @@ +""" +This cog handles events such as message processing and command processing. +""" + +import random + +import discord +from discord import app_commands # Required for slash commands +from discord.ext import commands + + +# Cog class for general commands +class GeneralCog(commands.Cog): + """ + A Discord bot cog that handles general commands, including slash commands and prefix commands. + This cog includes commands for debugging, information retrieval, and social media links. + """ + + def __init__(self, bot: commands.Bot): + self.bot = bot + + @commands.command(name="cmdinfo") + async def command_info(self, ctx: commands.Context): + """Show information about registered commands.""" + app_command_count = len(self.bot.tree.get_commands()) + prefix_command_count = len(self.bot.commands) + + embed = discord.Embed(title="Bot Command Information", color=0x00FF00) + embed.add_field( + name="Application (Slash) Commands", + value=str(app_command_count), + inline=True, + ) + embed.add_field( + name="Prefix Commands", value=str(prefix_command_count), inline=True + ) + embed.add_field( + name="Prefix", value=f"`{self.bot.command_prefix}`", inline=True + ) + + if app_command_count > 0: + slash_cmd_names = ", ".join( + [cmd.name for cmd in self.bot.tree.get_commands()] + ) + embed.add_field(name="Slash Commands", value=slash_cmd_names, inline=False) + + await ctx.send(embed=embed) + + # --- Slash Commands --- + + @app_commands.command(name="help", description="List available slash commands") + async def help_slash(self, interaction: discord.Interaction): + """Shows the list of available slash commands""" + embed = discord.Embed( + title="Blueprint Bot Commands", + description="Here are the available slash commands:", + color=0x3498DB, + ) + + slash_commands = self.bot.tree.get_commands() + slash_commands.sort(key=lambda x: x.name) + + # Add each command to the embed + for cmd in slash_commands: + embed.add_field( + name=f"/{cmd.name}", + value=cmd.description or "No description provided", + inline=False, + ) + + embed.set_footer( + text="💡 Use these commands by typing / and selecting from the list" + ) + + await interaction.response.send_message(embed=embed) + + @app_commands.command( + name="ping", + description="Simple ping command to check if the bot is responding.", + ) + async def ping_slash(self, interaction: discord.Interaction): + """Simple ping command to check if the bot is responding""" + await interaction.response.send_message( + f"Pong! 🏓 Bot latency: {round(self.bot.latency * 1000)}ms" + ) + + @app_commands.command(name="yapping", description="Tells people to calm down.") + async def yapping_slash(self, interaction: discord.Interaction): + """Tells people to calm down.""" + await interaction.response.send_message("Shhhh! Calm down!") + + @app_commands.command( + name="statusbot", description="Checks if the bot is responding." + ) + async def statusbot_slash(self, interaction: discord.Interaction): + """Checks if the bot is responding.""" + embed = discord.Embed(title="Blueprint-Bot status", color=0x00CC73) + embed.add_field( + name="Uhhh", value="As you can see I responded, that means I'm online!" + ) + await interaction.response.send_message(embed=embed) + + @app_commands.command( + name="wiki", description="Sends a link to the Blueprint wiki." + ) + async def wiki_slash(self, interaction: discord.Interaction): + """Sends a link to the Blueprint wiki.""" + await interaction.response.send_message("https://wiki.blueprint-create.com/") + + # Combining plshelp into wiki for slash commands makes sense + # You could add an alias if needed, but separate command is redundant + + @app_commands.command( + name="github", description="Sends a link to the Blueprint GitHub." + ) + async def github_slash(self, interaction: discord.Interaction): + """Sends a link to the Blueprint GitHub.""" + embed = discord.Embed( + title="Our GitHub url!", + description="Here: https://github.com/blueprint-site/blueprint-site.github.io", + color=0x282828, + ) + await interaction.response.send_message(embed=embed) + + @app_commands.command( + name="socials", description="Shows Blueprint social media links." + ) + async def socials_slash(self, interaction: discord.Interaction): + """Shows Blueprint social media links.""" + embed = discord.Embed( + title="Our Socials!", description="check them out", color=0x1DA1F2 + ) + embed.add_field( + name="X (twitter)", value="https://x.com/blueprint_site", inline=False + ) + embed.add_field( + name="Bluesky", + value="https://bsky.app/profile/blueprint-site.bsky.social", + inline=False, + ) + embed.add_field( + name="Mastodon", + value="https://mastodon.social/@blueprint_site", + inline=False, + ) + await interaction.response.send_message(embed=embed) + + @app_commands.command( + name="issue8ball", description="Guess our resolution for your issue!" + ) + @app_commands.describe(issue="Briefly describe the issue (optional)") + async def issue8ball_slash( + self, interaction: discord.Interaction, issue: str | None = None + ): + """A funny 8-ball command for issues""" + # This command is a joke, so we don't need to do anything with the issue + # The 'issue' arg isn't used by the logic, but good practice to include if describing + responses = [ + "Will be fixed", + "Will be fixed soon", + "We will think on the resolution", + "We will not fix this", + "We don't know how to fix this", + "I gotta ask MrSpinn", + "Have you tried turning it off and on again?", + "That's a feature, not a bug", + "Sounds like a skill issue", + "The bug has been promoted to a feature", + "Error 404: Fix not found", + "It works on my machine", + "That's impossible, our code is perfect", + "Let's call it a 'known limitation'", + "The hamsters powering our servers need a break", + "Mercury must be in retrograde", + "Did you read the README?", + "That's above my pay grade", + "Have you tried asking Stack Overflow?", + "Sounds like undefined behavior to me", + "Let's put that in the backlog", + "We'll fix it in the next major version", + "That's not a bug, that's surprise functionality", + "Working as intended™", + "Let me forward that to /dev/null", + ] + + embed = discord.Embed( + title="The 8-ball has spoken! 🎱", + description=random.choice(responses), + color=0xFFD700, + ) + embed.set_footer( + text="The Blueprint team is not beholden to responses from this 8-ball. Use at your own risk." + ) + await interaction.response.send_message(embed=embed) + + @app_commands.command( + name="members", description="Shows the current member count of the server." + ) + async def members_slash(self, interaction: discord.Interaction): + """Shows the current member count of the server.""" + if interaction.guild: # Check if used in a server + embed = discord.Embed( + title="How many members on our discord?", color=0xFFD700 + ) + embed.add_field( + name="This many:", + value=f"{interaction.guild.member_count}", + inline=False, + ) + await interaction.response.send_message(embed=embed) + else: + # Ephemeral message only visible to the user + await interaction.response.send_message( + "This command can only be used in a server.", ephemeral=True + ) + + @app_commands.command( + name="link", description="Displays the link to the official website" + ) + async def site_command(self, interaction: discord.Interaction): + """Displays the link to the official website""" + embed = discord.Embed( + title="Official Website", + description="Visit our website to discover more addons and schematics!", + color=discord.Color.blue(), + url="https://your-website.com", # Replace with your actual URL + ) + + # Add an image to the embed (optional) + embed.set_thumbnail( + url="https://your-website.com/logo.png" + ) # Replace with your logo URL + + # Add additional information + embed.add_field( + name="Latest Updates", + value="Check out the latest addons and schematics added to our site!", + inline=False, + ) + + embed.add_field( + name="Support", + value="Need help? Visit our forum or contact us directly on the site.", + inline=False, + ) + + # Add a footer + embed.set_footer(text="© 2025 Your Name - All rights reserved") + + await interaction.response.send_message(embed=embed) + + +# Setup function to add the Cog to the bot +async def setup(bot: commands.Bot): + """Add the GeneralCog to the bot.""" + await bot.add_cog(GeneralCog(bot)) + print(" GeneralCog loaded.") diff --git a/appwrite/discord-bots/public-bot/cogs/leveling.py b/appwrite/discord-bots/public-bot/cogs/leveling.py new file mode 100644 index 0000000..cf9dfb1 --- /dev/null +++ b/appwrite/discord-bots/public-bot/cogs/leveling.py @@ -0,0 +1,792 @@ +""" +Leveling system for Discord bot that tracks user activity and awards XP. +Includes voice channel XP, text message XP tracking, and admin commands. +""" +import sqlite3 +import time +from typing import Optional, Union, List, Dict, Any, Callable + +import discord +from discord import app_commands +from discord.ext import commands, tasks + +# Configuration +CONFIG = { + "database": {"path": "leveling.db", "pool_size": 5}, + "xp": {"base_voice_xp": 1, "base_message_xp": 5, "min_voice_users": 2}, + "channels": { + "blacklisted_voice": [1352349213614145547], + "bonus_channels": [1242015121040080917, 1270359419862909020], + }, + "roles": {"moderator": 1242051406580416574}, + "cache": {"ttl": 300}, # Cache time-to-live in seconds +} + + +class DatabaseManager: + """Manages database connections and operations for the leveling system.""" + + def __init__(self, db_path: str): + self.db_path = db_path + self.connection_pool = [] + self._setup_database() + + def _setup_database(self): + """Set up the database schema if it doesn't exist.""" + with self.get_connection() as conn: + cursor = conn.cursor() + + # Create users table + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS users ( + user_id INTEGER, + guild_id INTEGER, + xp INTEGER DEFAULT 0, + level INTEGER DEFAULT 0, + last_message_time INTEGER DEFAULT 0, + PRIMARY KEY (user_id, guild_id) + ) + """ + ) + + # Create settings table + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS settings ( + guild_id INTEGER PRIMARY KEY, + xp_multiplier REAL DEFAULT 1.0, + expires_at INTEGER DEFAULT NULL + ) + """ + ) + + # Create indexes + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_users_guild ON users(guild_id)" + ) + cursor.execute("CREATE INDEX IF NOT EXISTS idx_users_xp ON users(xp)") + + conn.commit() + + def get_connection(self): + """Get a database connection from the pool or create a new one.""" + if self.connection_pool: + return self.connection_pool.pop() + return sqlite3.connect(self.db_path) + + def release_connection(self, conn): + """Return a connection to the pool.""" + if len(self.connection_pool) < CONFIG["database"]["pool_size"]: + self.connection_pool.append(conn) + else: + conn.close() + + def execute(self, query: str, params: tuple = ()) -> Any: + """Execute a query and return the result.""" + conn = self.get_connection() + try: + cursor = conn.cursor() + cursor.execute(query, params) + conn.commit() + return cursor.fetchall() + finally: + self.release_connection(conn) + + def execute_many(self, query: str, params_list: List[tuple]) -> None: + """Execute multiple queries with different parameters.""" + conn = self.get_connection() + try: + cursor = conn.cursor() + cursor.executemany(query, params_list) + conn.commit() + finally: + self.release_connection(conn) + + +class Cache: + """Simple time-based cache for reducing database calls.""" + + def __init__(self, ttl: int = 300): + self.cache = {} + self.ttl = ttl + + def get(self, key: str) -> Any: + """Get a value from the cache if it exists and is not expired.""" + if key in self.cache: + value, timestamp = self.cache[key] + if time.time() - timestamp < self.ttl: + return value + # Remove expired entry + del self.cache[key] + return None + + def set(self, key: str, value: Any) -> None: + """Store a value in the cache with the current timestamp.""" + self.cache[key] = (value, time.time()) + + def invalidate(self, key: str) -> None: + """Remove a specific key from the cache.""" + if key in self.cache: + del self.cache[key] + + def clear(self) -> None: + """Clear all cache entries.""" + self.cache.clear() + + +class XPManager: + """Manages XP operations and calculations.""" + + def __init__(self, db: DatabaseManager, cache: Cache): + self.db = db + self.cache = cache + + def get_user_data(self, user_id: int, guild_id: int) -> dict: + """Get a user's XP data, with caching.""" + cache_key = f"user:{user_id}:{guild_id}" + + # Check cache first + cached_data = self.cache.get(cache_key) + if cached_data: + return cached_data + + # Get from database + result = self.db.execute( + "SELECT xp, level FROM users WHERE user_id=? AND guild_id=?", + (user_id, guild_id), + ) + + if result: + data = {"xp": result[0][0], "level": result[0][1]} + else: + # Initialize new user + data = {"xp": 0, "level": 0} + self.db.execute( + "INSERT INTO users (user_id, guild_id, xp, level) VALUES (?, ?, 0, 0)", + (user_id, guild_id), + ) + + # Cache the result + self.cache.set(cache_key, data) + return data + + def add_xp(self, user_id: int, guild_id: int, amount: int) -> dict: + """Add XP to a user and update their level if needed.""" + user_data = self.get_user_data(user_id, guild_id) + new_xp = user_data["xp"] + amount + current_level = user_data["level"] + + # Calculate new level + new_level = self.calculate_level(new_xp) + leveled_up = new_level > current_level + + # Update database + self.db.execute( + "UPDATE users SET xp=?, level=? WHERE user_id=? AND guild_id=?", + (new_xp, new_level, user_id, guild_id), + ) + + # Update cache + updated_data = {"xp": new_xp, "level": new_level} + self.cache.set(f"user:{user_id}:{guild_id}", updated_data) + + return {"data": updated_data, "leveled_up": leveled_up} + + def remove_xp(self, user_id: int, guild_id: int, amount: int) -> dict: + """Remove XP from a user and update their level if needed.""" + user_data = self.get_user_data(user_id, guild_id) + new_xp = max(0, user_data["xp"] - amount) # Ensure XP doesn't go below 0 + current_level = user_data["level"] + + # Calculate new level + new_level = self.calculate_level(new_xp) + leveled_down = new_level < current_level + + # Update database + self.db.execute( + "UPDATE users SET xp=?, level=? WHERE user_id=? AND guild_id=?", + (new_xp, new_level, user_id, guild_id), + ) + + # Update cache + updated_data = {"xp": new_xp, "level": new_level} + self.cache.set(f"user:{user_id}:{guild_id}", updated_data) + + return {"data": updated_data, "leveled_down": leveled_down} + + def get_multiplier(self, guild_id: int) -> float: + """Get the current XP multiplier for a guild, considering expiration.""" + cache_key = f"multiplier:{guild_id}" + + # Check cache first + cached_multiplier = self.cache.get(cache_key) + if cached_multiplier is not None: + return cached_multiplier + + # Get from database + result = self.db.execute( + "SELECT xp_multiplier, expires_at FROM settings WHERE guild_id=?", + (guild_id,), + ) + + if not result: + # Default multiplier + return 1.0 + + multiplier, expires_at = result[0] + + # Check if expired + if expires_at and int(time.time()) > expires_at: + # Reset to default + self.db.execute( + "UPDATE settings SET xp_multiplier=1.0, expires_at=NULL WHERE guild_id=?", + (guild_id,), + ) + multiplier = 1.0 + + # Cache the result + self.cache.set(cache_key, multiplier) + return multiplier + + def set_multiplier(self, guild_id: int, value: float, minutes: int = 0) -> None: + """Set the XP multiplier for a guild.""" + expires_at = None if minutes <= 0 else int(time.time()) + (minutes * 60) + + self.db.execute( + """ + INSERT INTO settings (guild_id, xp_multiplier, expires_at) + VALUES (?, ?, ?) + ON CONFLICT(guild_id) + DO UPDATE SET xp_multiplier=excluded.xp_multiplier, expires_at=excluded.expires_at + """, + (guild_id, value, expires_at), + ) + + # Update cache + self.cache.set(f"multiplier:{guild_id}", value) + + def get_leaderboard(self, guild_id: int, limit: int = 10) -> List[tuple]: + """Get the top users by XP for a guild.""" + return self.db.execute( + "SELECT user_id, xp, level FROM users WHERE guild_id=? ORDER BY xp DESC LIMIT ?", + (guild_id, limit), + ) + + def reset_guild_data(self, guild_id: int) -> int: + """Reset all level data for a guild. Returns number of affected users.""" + result = self.db.execute("DELETE FROM users WHERE guild_id=?", (guild_id,)) + + # Invalidate cache for this guild + # A more sophisticated approach would be to track all keys related to this guild + self.cache.clear() + + return len(result) if result else 0 + + @staticmethod + def calculate_level(xp: int) -> int: + """Calculate level based on XP.""" + # Using the formula from the original code: level^4 = xp + return int(xp**0.25) # 4th root of xp + + @staticmethod + def calculate_xp_for_level(level: int) -> int: + """Calculate XP required for a specific level.""" + return level**4 + + +def requires_moderator(): + """Decorator to check if user has moderator permissions.""" + + async def predicate(interaction: discord.Interaction) -> bool: + # Check for administrator permission + if interaction.user.guild_permissions.administrator: + return True + + # Check for moderator role + moderator_role_id = CONFIG["roles"]["moderator"] + if any(role.id == moderator_role_id for role in interaction.user.roles): + return True + + await interaction.response.send_message( + "You need moderator permissions to use this command.", ephemeral=True + ) + return False + + return app_commands.check(predicate) + + +def requires_guild(): + """Decorator to check if command is used in a guild.""" + + async def predicate(interaction: discord.Interaction) -> bool: + if not interaction.guild: + await interaction.response.send_message( + "This command can only be used in a server.", ephemeral=True + ) + return False + return True + + return app_commands.check(predicate) + + +class LevelingCog(commands.Cog): + """ + A cog that handles user leveling and XP management. + Tracks user activity in text and voice channels and awards XP accordingly. + """ + + def __init__(self, bot: commands.Bot): + self.bot = bot + self.db = DatabaseManager(CONFIG["database"]["path"]) + self.cache = Cache(CONFIG["cache"]["ttl"]) + self.xp_manager = XPManager(self.db, self.cache) + + # Start the voice XP loop + self.voice_xp_loop.start() + + async def cog_unload(self): + # Stop the task when the cog unloads + self.voice_xp_loop.cancel() + + # Close all database connections + for conn in self.db.connection_pool: + conn.close() + self.db.connection_pool.clear() + + @tasks.loop(minutes=1) + async def voice_xp_loop(self): + """Award XP to users in voice channels every minute.""" + # Wait until the bot is ready before starting the task + await self.bot.wait_until_ready() + + # Batch updates for better performance + xp_updates = [] + + for guild in self.bot.guilds: + try: + # Get the XP multiplier for this guild + multiplier = self.xp_manager.get_multiplier(guild.id) + base_xp = CONFIG["xp"]["base_voice_xp"] + xp_gain = int(base_xp * multiplier) + + if xp_gain <= 0: + continue # Skip if no XP would be gained + + for vc in guild.voice_channels: + # Skip blacklisted channels + if vc.id in CONFIG["channels"]["blacklisted_voice"]: + continue + + # Get members who aren't bots and aren't AFK + active_members = [ + m + for m in vc.members + if not m.bot and not (m.voice and m.voice.afk) + ] + + # Only award XP if there are at least the minimum required users in the channel + min_users = CONFIG["xp"]["min_voice_users"] + if len(active_members) >= min_users: + for member in active_members: + # Apply bonus for special channels + actual_xp = xp_gain + if vc.id in CONFIG["channels"]["bonus_channels"]: + actual_xp *= 2 + + # Add to batch update + xp_updates.append((member.id, guild.id, actual_xp)) + + except (discord.HTTPException, sqlite3.Error) as e: + print(f"Error in voice_xp_loop for guild {guild.id}: {e}") + + # Process all XP updates at once + if xp_updates: + try: + for user_id, guild_id, amount in xp_updates: + result = self.xp_manager.add_xp(user_id, guild_id, amount) + + # Check for level up - in a real implementation, this would send notifications + if result["leveled_up"]: + member = self.bot.get_guild(guild_id).get_member(user_id) + if member: + new_level = result["data"]["level"] + print(f"{member.display_name} leveled up to {new_level}!") + # In a real implementation, send level up message to appropriate channel + + except (discord.HTTPException, sqlite3.Error) as e: + print(f"Error processing voice XP batch: {e}") + + @app_commands.command( + name="rank", description="Check your rank or someone else's rank" + ) + @app_commands.describe(user="The user whose rank you want to check") + @requires_guild() + async def rank( + self, interaction: discord.Interaction, user: Optional[discord.Member] = None + ): + """Check the rank and XP of a user.""" + target_user = user or interaction.user + + try: + # Get user data + user_data = self.xp_manager.get_user_data( + target_user.id, interaction.guild.id + ) + xp = user_data["xp"] + level = user_data["level"] + + # Calculate XP needed for next level + next_level = level + 1 + next_level_xp = self.xp_manager.calculate_xp_for_level(next_level) + xp_needed = next_level_xp - xp + + # Create embed + embed = discord.Embed( + title=f"{target_user.display_name}'s Rank", color=0x3498DB + ) + embed.add_field(name="Level", value=f"**{level}**", inline=True) + embed.add_field(name="XP", value=f"**{xp:,}**", inline=True) + embed.add_field( + name="Progress", + value=f"**{xp:,}** / **{next_level_xp:,}** XP\n" + f"**{xp_needed:,}** XP needed for Level {next_level}", + inline=False, + ) + + # Set user avatar if available + if target_user.avatar: + embed.set_thumbnail(url=target_user.avatar.url) + + await interaction.response.send_message(embed=embed) + + except (discord.HTTPException, sqlite3.Error, ValueError) as e: + print(f"Error in rank command: {e}") + await interaction.response.send_message( + "An error occurred while retrieving rank data.", ephemeral=True + ) + + @app_commands.command(name="top", description="Show the top users by XP") + @app_commands.describe(count="Number of users to show (max 25)") + @requires_guild() + async def top(self, interaction: discord.Interaction, count: int = 10): + """Show the top users by XP.""" + # Validate count parameter + if count < 1: + count = 1 + elif count > 25: + count = 25 + + try: + # Get leaderboard data + leaderboard = self.xp_manager.get_leaderboard(interaction.guild.id, count) + + if leaderboard: + embed = discord.Embed( + title=f"Top {count} Users in {interaction.guild.name}", + description="Ranked by XP gained", + color=0xF1C40F, + ) + + for position, (user_id, xp, level) in enumerate(leaderboard, start=1): + # Try to get member information + member = interaction.guild.get_member(user_id) + + if member: + name = member.display_name + else: + try: + # Attempt to fetch member if not in cache + member = await interaction.guild.fetch_member(user_id) + name = member.display_name + except discord.NotFound: + name = f"Unknown User ({user_id})" + except discord.HTTPException: + name = f"User {user_id}" + + # Add to the leaderboard with formatting + medal = "" + if position == 1: + medal = "🥇 " + elif position == 2: + medal = "🥈 " + elif position == 3: + medal = "🥉 " + + embed.add_field( + name=f"{medal}#{position}: {name}", + value=f"Level: **{level}** | XP: **{xp:,}**", + inline=False, + ) + + await interaction.response.send_message(embed=embed) + else: + await interaction.response.send_message( + "No ranking data found for this server.", ephemeral=True + ) + + except (discord.HTTPException, sqlite3.Error, ValueError) as e: + print(f"Error in top command: {e}") + await interaction.response.send_message( + "An error occurred while retrieving leaderboard data.", ephemeral=True + ) + + @app_commands.command(name="addxp", description="Add XP to a user") + @app_commands.describe(user="The user to add XP to", amount="Amount of XP to add") + @requires_guild() + @requires_moderator() + async def addxp( + self, interaction: discord.Interaction, user: discord.Member, amount: int + ): + """Add XP to a user (moderator only).""" + if amount <= 0: + await interaction.response.send_message( + "Please provide a positive amount of XP to add.", ephemeral=True + ) + return + + try: + # Add XP + result = self.xp_manager.add_xp(user.id, interaction.guild.id, amount) + + await interaction.response.send_message( + f"Added **{amount:,}** XP to {user.mention}.", + allowed_mentions=discord.AllowedMentions.none(), + ) + + # Check for level up + if result["leveled_up"]: + new_level = result["data"]["level"] + await interaction.followup.send( + f"🎉 {user.mention} leveled up to **Level {new_level}**!", + allowed_mentions=discord.AllowedMentions.none(), + ) + + except (discord.HTTPException, sqlite3.Error, ValueError) as e: + print(f"Error in addxp command: {e}") + await interaction.response.send_message( + f"An error occurred while adding XP: {e}", ephemeral=True + ) + + @app_commands.command(name="removexp", description="Remove XP from a user") + @app_commands.describe( + user="The user to remove XP from", amount="Amount of XP to remove" + ) + @requires_guild() + @requires_moderator() + async def removexp( + self, interaction: discord.Interaction, user: discord.Member, amount: int + ): + """Remove XP from a user (moderator only).""" + if amount <= 0: + await interaction.response.send_message( + "Please provide a positive amount of XP to remove.", ephemeral=True + ) + return + + try: + # Remove XP + result = self.xp_manager.remove_xp(user.id, interaction.guild.id, amount) + + await interaction.response.send_message( + f"Removed **{amount:,}** XP from {user.mention}.", + allowed_mentions=discord.AllowedMentions.none(), + ) + + # Check for level down + if result.get("leveled_down", False): + new_level = result["data"]["level"] + await interaction.followup.send( + f"{user.mention} dropped to **Level {new_level}**.", + allowed_mentions=discord.AllowedMentions.none(), + ) + + except (discord.HTTPException, sqlite3.Error, ValueError) as e: + print(f"Error in removexp command: {e}") + await interaction.response.send_message( + f"An error occurred while removing XP: {e}", ephemeral=True + ) + + @app_commands.command( + name="setmultiplier", description="Set the XP multiplier for this server" + ) + @app_commands.describe( + value="Multiplier value (0.1 or greater)", + minutes="Duration in minutes (0 for permanent)", + ) + @requires_guild() + @requires_moderator() + async def setmultiplier( + self, interaction: discord.Interaction, value: float, minutes: int = 0 + ): + """Set the XP multiplier for the server (moderator only).""" + if value < 0.1: + await interaction.response.send_message( + "Multiplier must be 0.1 or greater.", ephemeral=True + ) + return + + try: + self.xp_manager.set_multiplier(interaction.guild.id, value, minutes) + + duration_text = "permanently" if minutes <= 0 else f"for {minutes} minutes" + await interaction.response.send_message( + f"Set XP multiplier to **{value}x** {duration_text}." + ) + + except (discord.HTTPException, sqlite3.Error, ValueError) as e: + print(f"Error in setmultiplier command: {e}") + await interaction.response.send_message( + f"An error occurred while setting the multiplier: {e}", ephemeral=True + ) + + @app_commands.command( + name="resetlevels", description="Reset all level data for this server" + ) + @requires_guild() + async def resetlevels(self, interaction: discord.Interaction): + """Reset all level data for the server (admin only).""" + # Check for administrator permissions + if not interaction.user.guild_permissions.administrator: + await interaction.response.send_message( + "You need Administrator permissions to use this command.", + ephemeral=True, + ) + return + + # Create confirmation button ui + class ConfirmView(discord.ui.View): + def __init__(self, parent_cog): + super().__init__(timeout=30.0) + self.parent_cog = parent_cog + self.value = None + self.message = None + + @discord.ui.button(label="Reset All Data", style=discord.ButtonStyle.danger) + async def confirm( + self, interaction: discord.Interaction, _: discord.ui.Button + ): + self.value = True + self.stop() + + # Disable all buttons + for child in self.children: + child.disabled = True + + # Perform the reset + try: + deleted_rows = self.parent_cog.xp_manager.reset_guild_data( + interaction.guild.id + ) + + success_embed = discord.Embed( + title="✅ Level Data Deleted", + description=f"All level data for {deleted_rows} users in this server has been deleted.", + color=discord.Color.green(), + ) + await interaction.response.edit_message( + embed=success_embed, view=self + ) + except (discord.HTTPException, sqlite3.Error, ValueError) as e: + print(f"Error in resetlevels command: {e}") + error_embed = discord.Embed( + title="❌ Error", + description=f"An error occurred during deletion: {e}", + color=discord.Color.red(), + ) + await interaction.response.edit_message( + embed=error_embed, view=self + ) + + @discord.ui.button(label="Cancel", style=discord.ButtonStyle.secondary) + async def cancel( + self, interaction: discord.Interaction, _: discord.ui.Button + ): + self.value = False + self.stop() + + # Disable all buttons + for child in self.children: + child.disabled = True + + cancel_embed = discord.Embed( + title="❌ Operation Cancelled", + description="No data was deleted.", + color=discord.Color.blurple(), + ) + await interaction.response.edit_message(embed=cancel_embed, view=self) + + async def on_timeout(self): + # Update the message when the view times out + timeout_embed = discord.Embed( + title="⏰ Timeout", + description="Reset operation cancelled due to timeout.", + color=discord.Color.dark_gray(), + ) + + # Disable all buttons + for child in self.children: + child.disabled = True + + # Try to edit the original message + try: + await self.message.edit(embed=timeout_embed, view=self) + except discord.HTTPException: + pass # Original message might have been deleted + + # Create warning embed + warning_embed = discord.Embed( + title="⚠️ WARNING: Reset ALL Level Data?", + description=( + f"This will **permanently delete all level and XP data** " + f"for **{interaction.guild.name}**.\n\n" + f"This action cannot be undone!" + ), + color=discord.Color.red(), + ) + + # Create and send the view + view = ConfirmView(self) + await interaction.response.send_message(embed=warning_embed, view=view) + + # Store message for timeout handling + message = await interaction.original_response() + view.message = message + + # Event listeners + @commands.Cog.listener() + async def on_message(self, message: discord.Message): + """Award XP for messages.""" + # Skip if message is from a bot or not in a guild + if message.author.bot or not message.guild: + return + + # Get the multiplier for this guild + multiplier = self.xp_manager.get_multiplier(message.guild.id) + base_xp = CONFIG["xp"]["base_message_xp"] + xp_gain = int(base_xp * multiplier) + + if xp_gain <= 0: + return # Skip if no XP would be gained + + # Check if message is in a special channel + if message.channel.id in CONFIG["channels"]["bonus_channels"]: + xp_gain *= 2 + + # Add XP to the user + result = self.xp_manager.add_xp(message.author.id, message.guild.id, xp_gain) + + # Check for level up + if result["leveled_up"]: + new_level = result["data"]["level"] + await message.channel.send( + f"🎉 {message.author.mention} leveled up to **Level {new_level}**!", + allowed_mentions=discord.AllowedMentions.none(), + ) + + +# Setup function to add the Cog to the bot +async def setup(bot: commands.Bot): + """Add the LevelingCog to the bot.""" + await bot.add_cog(LevelingCog(bot)) + print("LevelingCog loaded.") diff --git a/appwrite/discord-bots/public-bot/cogs/moderation.py b/appwrite/discord-bots/public-bot/cogs/moderation.py new file mode 100644 index 0000000..4dd6948 --- /dev/null +++ b/appwrite/discord-bots/public-bot/cogs/moderation.py @@ -0,0 +1,439 @@ +"""Moderation commands for the bot.""" + +import json +import os +import requests + +import discord +from discord import app_commands +from discord.ext import commands + +MODERATOR_ROLE_ID = os.getenv("MODERATOR_ROLE_ID") +DEV_ROLE_ID = os.getenv("DEV_ROLE_ID") + + +# Moderation cog for commands that require elevated permissions +class ModerationCog(commands.Cog): + """Moderation commands for the bot.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + self.config_path = "./config/replyingconfig.json" + self.testing_url_path = "./config/testingurl.json" + + # Try to load config files + try: + with open(self.config_path, "r", encoding="utf-8") as f: + self.config = json.load(f) + except (FileNotFoundError, json.JSONDecodeError) as e: + print(f"Error loading replyingconfig.json: {e}") + self.config = {"autoreplying": {"enabled": True}} + + try: + with open(self.testing_url_path, "r", encoding="utf-8") as f: + self.testing_url = json.load(f) + except (FileNotFoundError, json.JSONDecodeError) as e: + print(f"Error loading testingurl.json: {e}") + self.testing_url = {"url": "https://example.com"} + + # Helper function to check moderator role + async def is_moderator(self, interaction: discord.Interaction) -> bool: + """Check if a user has the moderator role.""" + # Check for Discord's default admin permission first + if interaction.user.guild_permissions.administrator: + return True + + # Check if user has the moderator role + if any(role.id == MODERATOR_ROLE_ID for role in interaction.user.roles): + return True + + # If we get here, user doesn't have permission + await interaction.response.send_message( + "You don't have permission to use this command.", ephemeral=True + ) + return False + + # Helper function to check developer role + async def is_developer(self, interaction: discord.Interaction) -> bool: + """Check if a user has the developer role.""" + # Check for Discord's default admin permission first + if interaction.user.guild_permissions.administrator: + return True + + # Check if user has the developer role + if any(role.id == DEV_ROLE_ID for role in interaction.user.roles): + return True + + # If we get here, user doesn't have permission + await interaction.response.send_message( + "You don't have permission to use this command.", ephemeral=True + ) + return False + + # --- Auto-reply Commands --- + + @commands.command(name="syncguild") + @commands.has_permissions(administrator=True) + async def sync_guild_commands(self, ctx: commands.Context): + """Sync slash commands to this guild only.""" + + # Print debug info + print(f"Syncing commands to guild {ctx.guild.name} (ID: {ctx.guild.id})") + + # Debug: Print all app commands registered in the tree + bot_commands = self.bot.tree.get_commands() + print(f"App commands found: {len(bot_commands)}") + for cmd in bot_commands: + print(f" - {cmd.name}") + + if not ctx.guild: + await ctx.send("This command must be used in a server.") + return + + try: + synced = await self.bot.tree.sync(guild=ctx.guild) + await ctx.send(f"Synced {len(synced)} command(s) to this server!") + print( + f"Synced {len(synced)} command(s) to guild {ctx.guild.name} (ID: {ctx.guild.id})" + ) + except discord.DiscordServerError as e: + await ctx.send(f"Failed to sync commands: {e}") + print(f"Error syncing commands to guild {ctx.guild.id}: {e}") + except discord.HTTPException as e: + await ctx.send(f"Failed to sync commands: {e}") + print(f"Error syncing commands to guild {ctx.guild.id}: {e}") + + @commands.command(name="syncglobal") + @commands.has_permissions(administrator=True) + async def sync_global_commands(self, ctx: commands.Context): + """Sync slash commands globally.""" + try: + synced = await self.bot.tree.sync() + await ctx.send(f"Synced {len(synced)} command(s) globally!") + print(f"Synced {len(synced)} command(s) globally") + except discord.HTTPException as e: + await ctx.send(f"Failed to sync commands: {e}") + print(f"Error syncing commands globally: {e}") + + @app_commands.command(name="replyon", description="Enable auto-replies") + async def replyon(self, interaction: discord.Interaction): + """Enable auto-replies.""" + # First check if the user has permission + if not await self.is_moderator(interaction): + return + + # Update config + self.config["autoreplying"]["enabled"] = True + + # Save to file + try: + with open(self.config_path, "w", encoding="utf-8") as f: + json.dump(self.config, f) + await interaction.response.send_message( + "Auto-replies have been enabled!", ephemeral=True + ) + except (IOError, PermissionError, FileNotFoundError) as e: + await interaction.response.send_message( + f"Failed to save config: {e}", ephemeral=True + ) + + @app_commands.command(name="replyoff", description="Disable auto-replies") + async def replyoff(self, interaction: discord.Interaction): + """Disable auto-replies.""" + # First check if the user has permission + if not await self.is_moderator(interaction): + return + + # Update config + self.config["autoreplying"]["enabled"] = False + + # Save to file + try: + with open(self.config_path, "w", encoding="utf-8") as f: + json.dump(self.config, f) + await interaction.response.send_message( + "Auto-replies have been disabled!", ephemeral=True + ) + except (IOError, PermissionError, FileNotFoundError) as e: + await interaction.response.send_message( + f"Failed to save config: {e}", ephemeral=True + ) + + # --- Testing Site Commands --- + + @app_commands.command( + name="notifytesters", description="Send notification with test URL to testers" + ) + async def notifytesters(self, interaction: discord.Interaction): + """Notify testers of the testing URL.""" + # Check if used in the right server (dev server) + if not interaction.guild or interaction.guild.id != 1232693376646643836: + await interaction.response.send_message( + "This command can only be used in the development server.", + ephemeral=True, + ) + return + + # Get URL from config + try: + with open(self.testing_url_path, "r", encoding="utf-8") as f: + config = json.load(f) + url = config["url"] + except (FileNotFoundError, json.JSONDecodeError, KeyError) as e: + await interaction.response.send_message( + f"Failed to get testing URL: {e}", ephemeral=True + ) + return + + # Create notification embed + embed = discord.Embed( + title="New testing URL", description=f"{url}", color=0xFF0000 + ) + embed.set_thumbnail(url="https://images.cooltext.com/5724188.gif") + embed.set_author( + name=f"{interaction.user.name}", + icon_url=( + interaction.user.display_avatar.url + if interaction.user.display_avatar + else None + ), + ) + embed.set_footer(text="Open the link and give us ur feedback pls") + + # Send to testing channel + channel = self.bot.get_channel(1342156641843548182) + if channel: + await channel.send(embed=embed) + await interaction.response.send_message( + "Testing notification sent!", ephemeral=True + ) + else: + await interaction.response.send_message( + "Could not find testing channel.", ephemeral=True + ) + + @app_commands.command( + name="istesting", description="Update the testing site status" + ) + @app_commands.describe(status="Whether the testing site is up or down") + @app_commands.choices( + status=[ + app_commands.Choice(name="Up - Ready for testing", value="up"), + app_commands.Choice(name="Down - Not available", value="down"), + ] + ) + async def istesting(self, interaction: discord.Interaction, status: str): + """Update whether the testing site is up or down.""" + # Get the announcement channel + channel = self.bot.get_channel(1341080131841556532) + if not channel: + await interaction.response.send_message( + "Could not find announcement channel.", ephemeral=True + ) + return + + # Create and send appropriate embed + if status.lower() == "up": + embed = discord.Embed( + title="Testing site is **up**", + description="You can test it", + color=0x63EEAA, + ) + embed.set_footer(text="The URL is inside the testing-link channel") + await channel.send(embed=embed) + await interaction.response.send_message( + "Testing site status set to UP!", ephemeral=True + ) + elif status.lower() == "down": + embed = discord.Embed( + title="Testing site is **down**", + description="You can't test it", + color=0xFF5151, + ) + embed.set_footer(text="The URL is inside the testing-link channel") + await channel.send(embed=embed) + await interaction.response.send_message( + "Testing site status set to DOWN!", ephemeral=True + ) + else: + await interaction.response.send_message( + "Invalid status. Use 'up' or 'down'.", ephemeral=True + ) + + @app_commands.command(name="testingurlset", description="Set the testing URL") + @app_commands.describe( + new_url="The new testing URL (must start with http:// or https://)" + ) + async def testingurlset(self, interaction: discord.Interaction, new_url: str): + """Set the testing URL.""" + # Check developer permissions + if not await self.is_developer(interaction): + return + + # Verify this is used in the dev server + if not interaction.guild or interaction.guild.id != 1232693376646643836: + await interaction.response.send_message( + "Please don't set sensitive data in public servers.", ephemeral=True + ) + try: + await interaction.user.send( + "Please don't send sensitive data on the public server." + ) + except discord.Forbidden: + pass # User might have DMs disabled + return + + # Validate URL format + if not (new_url.startswith("http://") or new_url.startswith("https://")): + await interaction.response.send_message( + "Please provide a valid URL starting with http:// or https://", + ephemeral=True, + ) + return + + # Update the testing URL + try: + # Load existing config first + with open(self.testing_url_path, "r", encoding="utf-8") as f: + config = json.load(f) + + # Update URL + config["url"] = new_url + + # Save updated config + with open(self.testing_url_path, "w", encoding="utf-8") as f: + json.dump(config, f, indent=4) + + await interaction.response.send_message( + f"The testing URL has been updated to: {new_url}", ephemeral=True + ) + except (IOError, json.JSONDecodeError, PermissionError) as e: + await interaction.response.send_message( + f"An error occurred while updating the URL: {str(e)}", ephemeral=True + ) + + @app_commands.command( + name="testingurlget", description="Get the current testing URL" + ) + async def testingurlget(self, interaction: discord.Interaction): + """Get the current testing URL (DM only).""" + # Verify this is used in the dev server + if not interaction.guild or interaction.guild.id != 1232693376646643836: + await interaction.response.send_message( + "This command can only be used in the development server.", + ephemeral=True, + ) + return + + # Get and send the URL + try: + with open(self.testing_url_path, "r", encoding="utf-8") as f: + config = json.load(f) + + # Send as DM for security + try: + await interaction.user.send( + f"The current testing URL is: {config['url']}" + ) + await interaction.response.send_message( + "The testing URL has been sent to your DMs.", ephemeral=True + ) + except discord.Forbidden: + await interaction.response.send_message( + "I couldn't send you a DM. Please enable DMs from server members.", + ephemeral=True, + ) + except (FileNotFoundError, json.JSONDecodeError, KeyError) as e: + await interaction.response.send_message( + f"An error occurred: {str(e)}", ephemeral=True + ) + + # --- Status Command --- + + @app_commands.command(name="status", description="Check Blueprint site status") + async def status(self, interaction: discord.Interaction): + """Check the status of Blueprint services.""" + # Let the user know we're checking statuses + await interaction.response.defer(thinking=True) + + # Get statuses (with error handling) + try: + # Production site + try: + production_response = requests.get( + "https://blueprint-create.com", timeout=5 + ) + production = ( + f"Online (Status: {production_response.status_code})" + if production_response.status_code == 200 + else f"Issues detected (Status: {production_response.status_code})" + ) + except (requests.RequestException, ConnectionError, TimeoutError) as e: + production = f"Offline / Not working (Error: {str(e)[:50]}...)" + + # API + try: + api_url = os.getenv( + "PING2", "https://api.blueprint-create.com/health" + ) # Fallback to common endpoint if env var missing + api_response = requests.get(api_url, timeout=5) + api = ( + f"Online (Status: {api_response.status_code})" + if api_response.status_code == 200 + else f"Issues detected (Status: {api_response.status_code})" + ) + except (requests.RequestException, ConnectionError, TimeoutError) as e: + api = f"Offline / Not working (Error: {str(e)[:50]}...)" + + # Meilisearch + try: + meili_url = os.getenv( + "PING1", "https://search.blueprint-create.com/health" + ) # Fallback to common endpoint + meili_response = requests.get(meili_url, timeout=5) + meilisearch = ( + f"Online (Status: {meili_response.status_code})" + if meili_response.status_code == 200 + else f"Issues detected (Status: {meili_response.status_code})" + ) + except (requests.RequestException, ConnectionError, TimeoutError) as e: + meilisearch = f"Offline / Not working (Error: {str(e)[:50]}...)" + + # Legacy GitHub Pages site + try: + legacy_response = requests.get( + "https://blueprint-site.github.io/", timeout=5 + ) + production_gh = ( + f"Online (Status: {legacy_response.status_code})" + if legacy_response.status_code == 200 + else f"Issues detected (Status: {legacy_response.status_code})" + ) + except (requests.RequestException, ConnectionError, TimeoutError) as e: + production_gh = f"Offline / Not working (Error: {str(e)[:50]}...)" + + # Create the status embed + embed = discord.Embed(title="The Blueprint Status", color=0x362D52) + embed.add_field(name="Site (Production)", value=production, inline=False) + embed.add_field(name="API", value=api, inline=False) + embed.add_field(name="Meilisearch API", value=meilisearch, inline=False) + embed.add_field( + name="Site (Legacy, Github Pages)", value=production_gh, inline=False + ) + embed.add_field(name="Bot", value="Online ✅", inline=False) + + # Add timestamp to show when check was performed + embed.timestamp = discord.utils.utcnow() + + await interaction.followup.send(embed=embed) + + except (requests.RequestException, ConnectionError, ValueError, IOError) as e: + await interaction.followup.send( + f"An error occurred while checking statuses: {str(e)}" + ) + + +# Setup function to add the Cog to the bot +async def setup(bot: commands.Bot): + await bot.add_cog(ModerationCog(bot)) + print(" ModerationCog loaded.") diff --git a/appwrite/discord-bots/public-bot/cogs/search.py b/appwrite/discord-bots/public-bot/cogs/search.py new file mode 100644 index 0000000..9a4ca92 --- /dev/null +++ b/appwrite/discord-bots/public-bot/cogs/search.py @@ -0,0 +1,51 @@ +"""Search commands for the bot""" + +import discord +from discord import app_commands +from discord.ext import commands + +from functions.addonsearch import addonsearch +from functions.schematicsearch import schematicsearch + + +# Cog class for general commands +class SearchCog(commands.Cog): + """ + A Discord bot cog that handles general commands, including slash commands and prefix commands. + This cog includes commands for debugging, information retrieval, and social media links. + """ + + # Slash command for addon search + @app_commands.command( + name="addon", description="Manages commands related to addons" + ) + async def addon_command( + self, interaction: discord.Interaction, query: str, limit: int = 1 + ): + """Search for addons based on the query and limit provided""" + limit = max(1, min(limit, 5)) + await interaction.response.defer(ephemeral=True) + await addonsearch(interaction=interaction, query=query, limit=limit) + + # Slash command for schematic search + @app_commands.command( + name="schematic", description="Manages commands related to schematics" + ) + async def schematic_command( + self, interaction: discord.Interaction, query: str, limit: int = 1 + ): + """Search for schematics based on the query and limit provided""" + limit = max(1, min(limit, 5)) + + # Defer the response to avoid InteractionResponded error + await interaction.response.defer(ephemeral=True) + + # Search for schematics + await schematicsearch(interaction=interaction, query=query, limit=limit) + + +# Setup function to add the Cog to the bot +async def setup(bot: commands.Bot): + """Add the SearchCog to the bot.""" + await bot.add_cog(SearchCog(bot)) + print(" SearchCog loaded.") diff --git a/appwrite/discord-bots/public-bot/config/replyingconfig.json b/appwrite/discord-bots/public-bot/config/replyingconfig.json new file mode 100644 index 0000000..e7405b6 --- /dev/null +++ b/appwrite/discord-bots/public-bot/config/replyingconfig.json @@ -0,0 +1 @@ +{"autoreplying": {"enabled": true}} \ No newline at end of file diff --git a/appwrite/discord-bots/public-bot/config/testingurl.json b/appwrite/discord-bots/public-bot/config/testingurl.json new file mode 100644 index 0000000..9771f02 --- /dev/null +++ b/appwrite/discord-bots/public-bot/config/testingurl.json @@ -0,0 +1,3 @@ +{ + "url": "https://testewe.cywa.com" +} \ No newline at end of file diff --git a/appwrite/discord-bots/public-bot/functions/addonsearch.py b/appwrite/discord-bots/public-bot/functions/addonsearch.py new file mode 100644 index 0000000..70f509e --- /dev/null +++ b/appwrite/discord-bots/public-bot/functions/addonsearch.py @@ -0,0 +1,74 @@ +"""Addon Search Functions""" + +import os +import meilisearch +import dotenv + +import discord + +dotenv.load_dotenv() +try: + client = meilisearch.Client(os.getenv("MEILISEARCH_URL"), os.getenv("SEARCH_TOKEN")) + index = client.index("addons") +except (meilisearch.errors.MeiliSearchError, ConnectionError) as e: + print(f"Error initializing Meilisearch: {e}") + exit() + +loaders = ["Forge", "NeoForge", "Fabric", "Quilt"] + + +async def addonsearch(interaction: discord.Interaction, query: str, limit: int = 1): + """Search for addons based on the query and limit provided""" + try: + request = index.search(query, {"limit": limit}) + data = request.get("hits", []) + await send_embeds(interaction, data) + return data + except meilisearch.errors.MeiliSearchError as e: + print(f"Meilisearch error: {e}") + return [] + + +async def send_embeds(interaction: discord.Interaction, data: list[dict]): + """Send embeds to the interaction""" + for mod_data in data: + try: + embed = await gen_embed(mod_data) + await interaction.followup.send(embed=embed) + except discord.HTTPException as e: + print(f"Discord HTTP error: {e}") + + +def format_loaders(loader_to_format: list[str]) -> str: + """Format the loader_to_format into a string""" + if not loader_to_format: + return "No loaders available" + return " ".join([f"{loader}" for loader in loader_to_format]) + + +async def gen_embed(data: dict) -> discord.Embed: + """Generate an embed from the data""" + loaders_field = format_loaders(data.get("loaders", [])) + embed = discord.Embed( + title="🟦 " + data["name"] + " 🟦", + description=data["description"], + color=0x689AEE, + ) + embed.set_author(name="📋 ᴀᴜᴛʜᴏʀ : " + data["author"]) + embed.set_thumbnail(url=data["icon"]) + + embed.add_field(name="🚀 ʟᴏᴀᴅᴇʀs", value=loaders_field, inline=False) + embed.add_field(name="🔵 ᴀᴠᴀɪʟᴀʙʟᴇ ᴏɴ :", value="", inline=False) + if data.get("modrinth_raw"): + embed.add_field(name="", value="ᴍᴏᴅʀɪɴᴛʜ 🟢") + + if data.get("curseforge_raw"): + embed.add_field(name="", value="ᴄᴜʀsᴇғᴏʀɢᴇ 🟠") + + embed.add_field( + name="🔎 ɪɴғᴏs", + value=f"https://nottelling.youthe.domain/addons/{data['slug']}", + inline=False, + ) + embed.set_footer(text=f" ⚡ {data['downloads']} ᴅᴏᴡɴʟᴏᴀᴅs") + return embed diff --git a/appwrite/discord-bots/public-bot/schematicsearch.py b/appwrite/discord-bots/public-bot/functions/schematicsearch.py similarity index 100% rename from appwrite/discord-bots/public-bot/schematicsearch.py rename to appwrite/discord-bots/public-bot/functions/schematicsearch.py diff --git a/appwrite/discord-bots/public-bot/leveling.db b/appwrite/discord-bots/public-bot/leveling.db new file mode 100644 index 0000000..c9cc011 Binary files /dev/null and b/appwrite/discord-bots/public-bot/leveling.db differ