Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 7 additions & 130 deletions cogs/check_su_platform_authorisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@
from enum import Enum
from typing import TYPE_CHECKING, override

import aiohttp
import bs4
import discord
from discord.ext import tasks

from config import settings
from utils import GLOBAL_SSL_CONTEXT, CommandChecks, TeXBotBaseCog
from utils import CommandChecks, TeXBotBaseCog
from utils.error_capture_decorators import (
capture_guild_does_not_exist_error,
)
from utils.msl import get_su_platform_access_cookie_status, get_su_platform_organisations

if TYPE_CHECKING:
from collections.abc import Iterable, Mapping, Sequence
from collections.abc import Sequence
from collections.abc import Set as AbstractSet
from logging import Logger
from typing import Final
Expand All @@ -30,21 +29,6 @@

logger: "Final[Logger]" = logging.getLogger("TeX-Bot")

REQUEST_HEADERS: "Final[Mapping[str, str]]" = {
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Expires": "0",
}

REQUEST_COOKIES: "Final[Mapping[str, str]]" = {
".ASPXAUTH": settings["SU_PLATFORM_ACCESS_COOKIE"]
}

SU_PLATFORM_PROFILE_URL: "Final[str]" = "https://guildofstudents.com/profile"
SU_PLATFORM_ORGANISATION_URL: "Final[str]" = (
"https://www.guildofstudents.com/organisation/admin"
)


class SUPlatformAccessCookieStatus(Enum):
"""Enum class defining the status of the SU Platform Access Cookie."""
Expand Down Expand Up @@ -72,114 +56,7 @@ class SUPlatformAccessCookieStatus(Enum):
)


class CheckSUPlatformAuthorisationBaseCog(TeXBotBaseCog):
"""Cog class that defines the base functionality for cookie authorisation checks."""

async def _fetch_url_content_with_session(self, url: str) -> str:
"""Fetch the HTTP content at the given URL, using a shared aiohttp session."""
async with (
aiohttp.ClientSession(
headers=REQUEST_HEADERS, cookies=REQUEST_COOKIES
) as http_session,
http_session.get(url=url, ssl=GLOBAL_SSL_CONTEXT) as http_response,
):
return await http_response.text()

async def get_su_platform_access_cookie_status(self) -> SUPlatformAccessCookieStatus:
"""Retrieve the current validity status of the SU platform access cookie."""
response_object: bs4.BeautifulSoup = bs4.BeautifulSoup(
await self._fetch_url_content_with_session(SU_PLATFORM_PROFILE_URL), "html.parser"
)
page_title: bs4.Tag | bs4.NavigableString | None = response_object.find("title")
if not page_title or "Login" in str(page_title):
logger.debug("Token is invalid or expired.")
return SUPlatformAccessCookieStatus.INVALID

organisation_admin_url: str = (
f"{SU_PLATFORM_ORGANISATION_URL}/{settings['ORGANISATION_ID']}"
)
response_html: str = await self._fetch_url_content_with_session(organisation_admin_url)

if "admin tools" in response_html.lower():
return SUPlatformAccessCookieStatus.AUTHORISED

if "You do not have any permissions for this organisation" in response_html.lower():
return SUPlatformAccessCookieStatus.VALID

logger.warning(
"Unexpected response when checking SU platform access cookie authorisation."
)
return SUPlatformAccessCookieStatus.INVALID

async def get_su_platform_organisations(self) -> "Iterable[str]":
"""Retrieve the MSL organisations the current SU platform cookie has access to."""
response_object: bs4.BeautifulSoup = bs4.BeautifulSoup(
await self._fetch_url_content_with_session(SU_PLATFORM_PROFILE_URL), "html.parser"
)

page_title: bs4.Tag | bs4.NavigableString | None = response_object.find("title")

if not page_title:
logger.warning(
"Profile page returned no content when checking "
"SU platform access cookie's authorisation."
)
return ()

if "Login" in str(page_title):
logger.warning(
"Authentication redirected to login page. "
"SU platform access cookie is invalid or expired."
)
return ()

profile_section_html: bs4.Tag | bs4.NavigableString | None = response_object.find(
"div", {"id": "profile_main"}
)

if profile_section_html is None:
logger.warning(
"Couldn't find the profile section of the user "
"when scraping the SU platform's website HTML."
)
logger.debug("Retrieved HTML: %s", response_object.text)
return ()

user_name: bs4.Tag | bs4.NavigableString | int | None = profile_section_html.find("h1")

if not isinstance(user_name, bs4.Tag):
logger.warning(
"Found user profile on the SU platform but couldn't find their name."
)
logger.debug("Retrieved HTML: %s", response_object.text)
return ()

parsed_html: bs4.Tag | bs4.NavigableString | None = response_object.find(
"ul", {"id": "ulOrgs"}
)

if parsed_html is None or isinstance(parsed_html, bs4.NavigableString):
NO_ADMIN_TABLE_MESSAGE: Final[str] = (
f"Failed to retrieve the admin table for user: {user_name.string}. "
"Please check you have used the correct SU platform access token!"
)
logger.warning(NO_ADMIN_TABLE_MESSAGE)
return ()

organisations: Iterable[str] = [
list_item.get_text(strip=True) for list_item in parsed_html.find_all("li")
]

logger.debug(
"SU platform access cookie has admin authorisation to: %s as user %s",
organisations,
user_name.text,
)

return organisations


class CheckSUPlatformAuthorisationCommandCog(CheckSUPlatformAuthorisationBaseCog):
class CheckSUPlatformAuthorisationCommandCog(TeXBotBaseCog):
"""Cog class that defines the "/check-su-platform-authorisation" command."""

@discord.slash_command( # type: ignore[no-untyped-call, misc]
Expand All @@ -200,7 +77,7 @@ async def check_su_platform_authorisation(self, ctx: "TeXBotApplicationContext")

async with ctx.typing():
su_platform_access_cookie_organisations: AbstractSet[str] = set(
await self.get_su_platform_organisations()
await get_su_platform_organisations()
)

await ctx.followup.send(
Expand All @@ -223,7 +100,7 @@ async def check_su_platform_authorisation(self, ctx: "TeXBotApplicationContext")
)


class CheckSUPlatformAuthorisationTaskCog(CheckSUPlatformAuthorisationBaseCog):
class CheckSUPlatformAuthorisationTaskCog(TeXBotBaseCog):
"""Cog class defining a repeated task for checking SU platform access cookie."""

@override
Expand Down Expand Up @@ -255,7 +132,7 @@ async def su_platform_access_cookie_check_task(self) -> None:
logger.debug("Running SU platform access cookie check task...")

su_platform_access_cookie_status: tuple[int, str] = (
await self.get_su_platform_access_cookie_status()
await get_su_platform_access_cookie_status()
).value

logger.log(
Expand Down
109 changes: 7 additions & 102 deletions cogs/make_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@
import re
from typing import TYPE_CHECKING

import aiohttp
import bs4
import discord
from bs4 import BeautifulSoup
from django.core.exceptions import ValidationError

from config import settings
from db.core.models import GroupMadeMember
from exceptions import ApplicantRoleDoesNotExistError, GuestRoleDoesNotExistError
from utils import GLOBAL_SSL_CONTEXT, CommandChecks, TeXBotBaseCog
from utils import CommandChecks, TeXBotBaseCog
from utils.msl import get_membership_count, is_student_id_member

if TYPE_CHECKING:
from collections.abc import Mapping, Sequence
Expand Down Expand Up @@ -154,56 +152,7 @@ async def make_member(self, ctx: "TeXBotApplicationContext", group_member_id: st
)
return

guild_member_ids: set[str] = set()

async with (
aiohttp.ClientSession(
headers=REQUEST_HEADERS, cookies=REQUEST_COOKIES
) as http_session,
http_session.get(
url=GROUPED_MEMBERS_URL, ssl=GLOBAL_SSL_CONTEXT
) as http_response,
):
response_html: str = await http_response.text()

MEMBER_HTML_TABLE_IDS: Final[frozenset[str]] = frozenset(
{
"ctl00_Main_rptGroups_ctl05_gvMemberships",
"ctl00_Main_rptGroups_ctl03_gvMemberships",
"ctl00_ctl00_Main_AdminPageContent_rptGroups_ctl03_gvMemberships",
"ctl00_ctl00_Main_AdminPageContent_rptGroups_ctl05_gvMemberships",
}
)
table_id: str
for table_id in MEMBER_HTML_TABLE_IDS:
parsed_html: bs4.Tag | bs4.NavigableString | None = BeautifulSoup(
response_html, "html.parser"
).find("table", {"id": table_id})

if parsed_html is None or isinstance(parsed_html, bs4.NavigableString):
continue

guild_member_ids.update(
row.contents[2].text
for row in parsed_html.find_all("tr", {"class": ["msl_row", "msl_altrow"]})
)

guild_member_ids.discard("")
guild_member_ids.discard("\n")
guild_member_ids.discard(" ")

if not guild_member_ids:
await self.command_send_error(
ctx,
error_code="E1041",
logging_message=OSError(
"The guild member IDs could not be retrieved from "
"the MEMBERS_LIST_URL."
),
)
return

if group_member_id not in guild_member_ids:
if not await is_student_id_member(student_id=group_member_id):
await self.command_send_error(
ctx,
message=(
Expand Down Expand Up @@ -276,53 +225,9 @@ async def member_count(self, ctx: "TeXBotApplicationContext") -> None: # type:
await ctx.defer(ephemeral=False)

async with ctx.typing():
async with (
aiohttp.ClientSession(
headers=REQUEST_HEADERS, cookies=REQUEST_COOKIES
) as http_session,
http_session.get(
url=BASE_MEMBERS_URL, ssl=GLOBAL_SSL_CONTEXT
) as http_response,
):
response_html: str = await http_response.text()

member_list_div: bs4.Tag | bs4.NavigableString | None = BeautifulSoup(
response_html, "html.parser"
).find("div", {"class": "memberlistcol"})

if member_list_div is None or isinstance(member_list_div, bs4.NavigableString):
await self.command_send_error(
ctx=ctx,
error_code="E1041",
logging_message=OSError(
"The member count could not be retrieved from the MEMBERS_LIST_URL."
),
)
return

if "showing 100 of" in member_list_div.text.lower():
member_count: str = member_list_div.text.split(" ")[3]
await ctx.followup.send(
content=f"{self.bot.group_full_name} has {member_count} members! :tada:"
)
return

member_table: bs4.Tag | bs4.NavigableString | None = BeautifulSoup(
response_html, "html.parser"
).find("table", {"id": "ctl00_ctl00_Main_AdminPageContent_gvMembers"})

if member_table is None or isinstance(member_table, bs4.NavigableString):
await self.command_send_error(
ctx=ctx,
error_code="E1041",
logging_message=OSError(
"The member count could not be retrieved from the MEMBERS_LIST_URL."
),
)
return

await ctx.followup.send(
content=f"{self.bot.group_full_name} has {
len(member_table.find_all('tr', {'class': ['msl_row', 'msl_altrow']}))
} members! :tada:"
content=(
f"{self.bot.group_full_name} has "
f"{await get_membership_count()} members! :tada:"
)
)
18 changes: 18 additions & 0 deletions utils/msl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""MSL utility classes & functions provided for use across the whole of the project."""

from typing import TYPE_CHECKING

from .authorisation import get_su_platform_access_cookie_status, get_su_platform_organisations
from .memberships import get_full_membership_list, get_membership_count, is_student_id_member

if TYPE_CHECKING:
from collections.abc import Sequence

__all__: "Sequence[str]" = (
"GLOBAL_SSL_CONTEXT",
"get_full_membership_list",
"get_membership_count",
"get_su_platform_access_cookie_status",
"get_su_platform_organisations",
"is_student_id_member",
)
Loading