diff --git a/cogs/make_member.py b/cogs/make_member.py index 6066948f..ad3ace56 100644 --- a/cogs/make_member.py +++ b/cogs/make_member.py @@ -4,28 +4,29 @@ import re from typing import TYPE_CHECKING -import aiohttp -import bs4 import discord -from bs4 import BeautifulSoup from django.core.exceptions import ValidationError from config import settings from db.core.models import GroupMadeMember from exceptions import ApplicantRoleDoesNotExistError, GuestRoleDoesNotExistError -from utils import GLOBAL_SSL_CONTEXT, CommandChecks, TeXBotBaseCog +from utils import CommandChecks, TeXBotBaseCog +from utils.msl import fetch_community_group_members_count, is_id_a_community_group_member if TYPE_CHECKING: - from collections.abc import Mapping, Sequence + from collections.abc import Sequence from logging import Logger from typing import Final from utils import TeXBotApplicationContext + __all__: "Sequence[str]" = ("MakeMemberCommandCog", "MemberCountCommandCog") + logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + _GROUP_MEMBER_ID_ARGUMENT_DESCRIPTIVE_NAME: "Final[str]" = f"""{ "Student" if ( @@ -49,21 +50,6 @@ _GROUP_MEMBER_ID_ARGUMENT_DESCRIPTIVE_NAME.lower().replace(" ", "") ) -REQUEST_HEADERS: "Final[Mapping[str, str]]" = { - "Cache-Control": "no-cache", - "Pragma": "no-cache", - "Expires": "0", -} - -REQUEST_COOKIES: "Final[Mapping[str, str]]" = { - ".ASPXAUTH": settings["SU_PLATFORM_ACCESS_COOKIE"] -} - -BASE_MEMBERS_URL: "Final[str]" = ( - f"https://guildofstudents.com/organisation/memberlist/{settings['ORGANISATION_ID']}" -) -GROUPED_MEMBERS_URL: "Final[str]" = f"{BASE_MEMBERS_URL}/?sort=groups" - class MakeMemberCommandCog(TeXBotBaseCog): """Cog class that defines the "/make-member" command and its call-back method.""" @@ -101,10 +87,12 @@ class MakeMemberCommandCog(TeXBotBaseCog): required=True, max_length=7, min_length=7, - parameter_name="group_member_id", + parameter_name="raw_group_member_id", ) @CommandChecks.check_interaction_user_in_main_guild - async def make_member(self, ctx: "TeXBotApplicationContext", group_member_id: str) -> None: # type: ignore[misc] + async def make_member( # type: ignore[misc] + self, ctx: "TeXBotApplicationContext", raw_group_member_id: str + ) -> None: """ Definition & callback response of the "make_member" command. @@ -116,6 +104,20 @@ async def make_member(self, ctx: "TeXBotApplicationContext", group_member_id: st member_role: discord.Role = await self.bot.member_role interaction_member: discord.Member = await ctx.bot.get_main_guild_member(ctx.user) + INVALID_GROUP_MEMBER_ID_MESSAGE: Final[str] = ( + f"{raw_group_member_id!r} is not a valid {self.bot.group_member_id_type} ID." + ) + + if not re.fullmatch(r"\A\d{7}\Z", raw_group_member_id): + await self.command_send_error(ctx, message=(INVALID_GROUP_MEMBER_ID_MESSAGE)) + return + + try: + group_member_id: int = int(raw_group_member_id) + except ValueError: + await self.command_send_error(ctx, message=INVALID_GROUP_MEMBER_ID_MESSAGE) + return + await ctx.defer(ephemeral=True) async with ctx.typing(): if member_role in interaction_member.roles: @@ -128,16 +130,6 @@ async def make_member(self, ctx: "TeXBotApplicationContext", group_member_id: st ) return - if not re.fullmatch(r"\A\d{7}\Z", group_member_id): - await self.command_send_error( - ctx, - message=( - f"{group_member_id!r} is not a valid " - f"{self.bot.group_member_id_type} ID." - ), - ) - return - if await GroupMadeMember.objects.filter( hashed_group_member_id=GroupMadeMember.hash_group_member_id( group_member_id, self.bot.group_member_id_type @@ -154,56 +146,7 @@ async def make_member(self, ctx: "TeXBotApplicationContext", group_member_id: st ) return - guild_member_ids: set[str] = set() - - async with ( - aiohttp.ClientSession( - headers=REQUEST_HEADERS, cookies=REQUEST_COOKIES - ) as http_session, - http_session.get( - url=GROUPED_MEMBERS_URL, ssl=GLOBAL_SSL_CONTEXT - ) as http_response, - ): - response_html: str = await http_response.text() - - MEMBER_HTML_TABLE_IDS: Final[frozenset[str]] = frozenset( - { - "ctl00_Main_rptGroups_ctl05_gvMemberships", - "ctl00_Main_rptGroups_ctl03_gvMemberships", - "ctl00_ctl00_Main_AdminPageContent_rptGroups_ctl03_gvMemberships", - "ctl00_ctl00_Main_AdminPageContent_rptGroups_ctl05_gvMemberships", - } - ) - table_id: str - for table_id in MEMBER_HTML_TABLE_IDS: - parsed_html: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( - response_html, "html.parser" - ).find("table", {"id": table_id}) - - if parsed_html is None or isinstance(parsed_html, bs4.NavigableString): - continue - - guild_member_ids.update( - row.contents[2].text - for row in parsed_html.find_all("tr", {"class": ["msl_row", "msl_altrow"]}) - ) - - guild_member_ids.discard("") - guild_member_ids.discard("\n") - guild_member_ids.discard(" ") - - if not guild_member_ids: - await self.command_send_error( - ctx, - error_code="E1041", - logging_message=OSError( - "The guild member IDs could not be retrieved from " - "the MEMBERS_LIST_URL." - ), - ) - return - - if group_member_id not in guild_member_ids: + if not await is_id_a_community_group_member(member_id=group_member_id): await self.command_send_error( ctx, message=( @@ -222,7 +165,7 @@ async def make_member(self, ctx: "TeXBotApplicationContext", group_member_id: st ) try: - await GroupMadeMember.objects.acreate(group_member_id=group_member_id) # type: ignore[misc] + await GroupMadeMember.objects.acreate(group_member_id=raw_group_member_id) # type: ignore[misc] except ValidationError as create_group_made_member_error: error_is_already_exists: bool = ( "hashed_group_member_id" in create_group_made_member_error.message_dict @@ -276,53 +219,9 @@ async def member_count(self, ctx: "TeXBotApplicationContext") -> None: # type: await ctx.defer(ephemeral=False) async with ctx.typing(): - async with ( - aiohttp.ClientSession( - headers=REQUEST_HEADERS, cookies=REQUEST_COOKIES - ) as http_session, - http_session.get( - url=BASE_MEMBERS_URL, ssl=GLOBAL_SSL_CONTEXT - ) as http_response, - ): - response_html: str = await http_response.text() - - member_list_div: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( - response_html, "html.parser" - ).find("div", {"class": "memberlistcol"}) - - if member_list_div is None or isinstance(member_list_div, bs4.NavigableString): - await self.command_send_error( - ctx, - error_code="E1041", - logging_message=OSError( - "The member count could not be retrieved from the MEMBERS_LIST_URL." - ), - ) - return - - if "showing 100 of" in member_list_div.text.lower(): - member_count: str = member_list_div.text.split(" ")[3] - await ctx.followup.send( - content=f"{self.bot.group_full_name} has {member_count} members! :tada:" - ) - return - - member_table: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( - response_html, "html.parser" - ).find("table", {"id": "ctl00_ctl00_Main_AdminPageContent_gvMembers"}) - - if member_table is None or isinstance(member_table, bs4.NavigableString): - await self.command_send_error( - ctx, - error_code="E1041", - logging_message=OSError( - "The member count could not be retrieved from the MEMBERS_LIST_URL." - ), - ) - return - await ctx.followup.send( - content=f"{self.bot.group_full_name} has { - len(member_table.find_all('tr', {'class': ['msl_row', 'msl_altrow']})) - } members! :tada:" + content=( + f"{self.bot.group_full_name} has " + f"{await fetch_community_group_members_count()} members! :tada:" + ) ) diff --git a/exceptions/__init__.py b/exceptions/__init__.py index 4fc98909..dc3c44ce 100644 --- a/exceptions/__init__.py +++ b/exceptions/__init__.py @@ -24,6 +24,7 @@ MessagesJSONFileMissingKeyError, MessagesJSONFileValueError, ) +from .msl import MSLMembershipError from .strike import NoAuditLogsStrikeTrackingError, StrikeTrackingError if TYPE_CHECKING: @@ -44,6 +45,7 @@ "InvalidActionDescriptionError", "InvalidActionTargetError", "InvalidMessagesJSONFileError", + "MSLMembershipError", "MemberRoleDoesNotExistError", "MessagesJSONFileMissingKeyError", "MessagesJSONFileValueError", diff --git a/exceptions/msl.py b/exceptions/msl.py new file mode 100644 index 00000000..6f768075 --- /dev/null +++ b/exceptions/msl.py @@ -0,0 +1,26 @@ +"""Custom exception classes raised when errors occur during use of MSL features.""" + +from typing import TYPE_CHECKING, override + +from typed_classproperties import classproperty + +from .base import BaseTeXBotError + +if TYPE_CHECKING: + from collections.abc import Sequence + + +__all__: "Sequence[str]" = ("MSLMembershipError",) + + +class MSLMembershipError(BaseTeXBotError, RuntimeError): + """ + Exception class to raise when any error occurs while checking MSL membership. + + If this error occurs, it is likely that MSL features will not work correctly. + """ + + @classproperty + @override + def DEFAULT_MESSAGE(cls) -> str: + return "An error occurred while trying to fetch membership data from MSL." diff --git a/utils/msl/__init__.py b/utils/msl/__init__.py new file mode 100644 index 00000000..99e985e1 --- /dev/null +++ b/utils/msl/__init__.py @@ -0,0 +1,18 @@ +"""MSL utility classes & functions provided for use across the whole of the project.""" + +from typing import TYPE_CHECKING + +from .memberships import ( + fetch_community_group_members_count, + fetch_community_group_members_list, + is_id_a_community_group_member, +) + +if TYPE_CHECKING: + from collections.abc import Sequence + +__all__: "Sequence[str]" = ( + "fetch_community_group_members_count", + "fetch_community_group_members_list", + "is_id_a_community_group_member", +) diff --git a/utils/msl/memberships.py b/utils/msl/memberships.py new file mode 100644 index 00000000..99ceb920 --- /dev/null +++ b/utils/msl/memberships.py @@ -0,0 +1,124 @@ +"""Module for checking membership status.""" + +import contextlib +import logging +from typing import TYPE_CHECKING + +import aiohttp +import bs4 +from bs4 import BeautifulSoup + +from config import settings +from exceptions import MSLMembershipError +from utils import GLOBAL_SSL_CONTEXT + +if TYPE_CHECKING: + from collections.abc import Mapping, Sequence + from logging import Logger + from typing import Final + + +__all__: "Sequence[str]" = ( + "fetch_community_group_members_count", + "fetch_community_group_members_list", + "is_id_a_community_group_member", +) + + +logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + + +BASE_SU_PLATFORM_WEB_HEADERS: "Final[Mapping[str, str]]" = { + "Cache-Control": "no-cache", + "Pragma": "no-cache", + "Expires": "0", +} + +BASE_SU_PLATFORM_WEB_COOKIES: "Final[Mapping[str, str]]" = { + ".ASPXAUTH": settings["SU_PLATFORM_ACCESS_COOKIE"], +} + +MEMBERS_LIST_URL: "Final[str]" = f"https://guildofstudents.com/organisation/memberlist/{settings['ORGANISATION_ID']}/?sort=groups" + +_membership_list_cache: set[int] = set() + + +async def fetch_community_group_members_list() -> set[int]: + """ + Make a web request to fetch your community group's full membership list. + + Returns a set of IDs. + """ + async with ( + aiohttp.ClientSession( + headers=BASE_SU_PLATFORM_WEB_HEADERS, cookies=BASE_SU_PLATFORM_WEB_COOKIES + ) as http_session, + http_session.get(url=MEMBERS_LIST_URL, ssl=GLOBAL_SSL_CONTEXT) as http_response, + ): + response_html: str = await http_response.text() + + parsed_html: BeautifulSoup = BeautifulSoup(markup=response_html, features="html.parser") + + member_ids: set[int] = set() + + table_id: str + for table_id in ( + "ctl00_ctl00_Main_AdminPageContent_rptGroups_ctl03_gvMemberships", + "ctl00_ctl00_Main_AdminPageContent_rptGroups_ctl05_gvMemberships", + ): + filtered_table: bs4.Tag | bs4.NavigableString | None = parsed_html.find( + name="table", attrs={"id": table_id} + ) + + if filtered_table is None: + logger.warning("Membership table with ID %s could not be found.", table_id) + logger.debug(response_html) + continue + + if isinstance(filtered_table, bs4.NavigableString): + INVALID_MEMBER_TABLE_FORMAT_MESSAGE: str = ( + f"Membership table with ID {table_id} was found but is in the wrong format." + ) + logger.warning(INVALID_MEMBER_TABLE_FORMAT_MESSAGE) + logger.debug(filtered_table) + raise MSLMembershipError(message=INVALID_MEMBER_TABLE_FORMAT_MESSAGE) + + with contextlib.suppress(IndexError): + member: bs4.Tag + for member in filtered_table.find_all(name="tr")[1:]: + raw_id: str = member.find_all(name="td")[1].text.strip() + try: + member_ids.add(int(raw_id)) + except ValueError: + logger.warning( + "Failed to convert ID '%s' in membership table to an integer", raw_id + ) + + if not member_ids: # NOTE: this should never be possible, because to fetch the page you need to have admin access, which requires being a member. + NO_MEMBERS_MESSAGE: Final[str] = "No members were found in either membership table." + logger.warning(NO_MEMBERS_MESSAGE) + logger.debug(response_html) + raise MSLMembershipError(message=NO_MEMBERS_MESSAGE) + + _membership_list_cache.clear() + _membership_list_cache.update(member_ids) + + return _membership_list_cache + + +async def is_id_a_community_group_member(member_id: int) -> bool: + """Check whether the given ID is a member of your community group.""" + if member_id in _membership_list_cache: + return True + + logger.debug( + "ID %s not found in community group membership list cache; Fetching updated list.", + member_id, + ) + + return member_id in await fetch_community_group_members_list() + + +async def fetch_community_group_members_count() -> int: + """Return the total number of members in your community group.""" + return len(await fetch_community_group_members_list())