From 122ccd7ee756c8687ffd35c4e2fd6e5f1921472d Mon Sep 17 00:00:00 2001 From: WolfSkin Torske Date: Tue, 14 Feb 2023 14:21:17 +0000 Subject: [PATCH 1/2] Added email to student id conversion function. Added aula authentication, and refactored auth.py to improve readability. Several packages updated to fix bugs. --- .gitignore | 1 + covscraper/aulaapi.py | 29 ++++ covscraper/auth.py | 342 +++++++++++++++++++++++++++--------------- requirements.txt | 8 +- 4 files changed, 251 insertions(+), 129 deletions(-) create mode 100644 covscraper/aulaapi.py diff --git a/.gitignore b/.gitignore index 19800f8..3c5db5d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ __pycache__ *~ /venv/ +.idea/ diff --git a/covscraper/aulaapi.py b/covscraper/aulaapi.py new file mode 100644 index 0000000..69ad442 --- /dev/null +++ b/covscraper/aulaapi.py @@ -0,0 +1,29 @@ +import sys + +from requests import Session + +from covscraper.auth import Authenticator + + +def email_to_id(session: Session, student_email: str): + # Nuke @uni. in case it got copied in + student_email = student_email.replace("@uni.", "@") + + # Grab internal aula id for student first + aula_id = session.get( + "https://apiv2.coventry.aula.education/search/v2/users/{}?size=1".format( + student_email + ), + ).json()["users"][0]["id"] + + # Grab student object and extract corresponding student ID + data = {"userIds": [aula_id]} + return session.post( + "https://apiv2.coventry.aula.education/users/getByIds", + json=data, + ).json()["users"][0]["custom"]["studentId"] + + +if __name__ == "__main__": + auth = Authenticator(sys.argv[1], sys.argv[2]) + print(email_to_id(auth, sys.argv[3])) diff --git a/covscraper/auth.py b/covscraper/auth.py index 45aff57..6b408c7 100644 --- a/covscraper/auth.py +++ b/covscraper/auth.py @@ -1,166 +1,258 @@ -import requests -from requests_ntlm import HttpNtlmAuth +import re +import sys +from urllib import parse + +import urllib3 from bs4 import BeautifulSoup -import datetime, sys, re, os -import json -import urllib -from requests.packages.urllib3.exceptions import InsecureRequestWarning -requests.packages.urllib3.disable_warnings(InsecureRequestWarning) +from requests import Session +from requests_ntlm import HttpNtlmAuth +from urllib3.exceptions import InsecureRequestWarning + +urllib3.disable_warnings(InsecureRequestWarning) -class AuthenticationFailure(Exception): + +class AuthenticationFailure(Exception): def __init__(self, message): self.message = message -class Authenticator(requests.sessions.Session): - def __auth_sonic(self, url): - loginUrl = "https://webapp.coventry.ac.uk/Sonic" + +class Authenticator(Session): + def __auth_sonic(self, _url): + # login_url = "https://webapp.coventry.ac.uk/Sonic" self.auth = HttpNtlmAuth("COVENTRY\\{}".format(self.username), self.password) - #response = requests.sessions.Session.get(self, url) - #self.auth = None + # response = Session.get(self, url) + # self.auth = None - #return response + # return response def __auth_kuali(self, url): - #print(f"__auth_kuali for: {url}") - #print("Authing Kuali") - kualiUrl = "https://coventry.kuali.co/auth?return_to=https%3A%2F%2Fcoventry.kuali.co%2Fapps%2F" - shibbolethUrl = "https://idp2.coventry.ac.uk/idp/Authn/UserPassword" - + # print(f"__auth_kuali for: {url}") + # print("Authing Kuali") + kuali_url = "https://coventry.kuali.co/auth?return_to=https%3A%2F%2Fcoventry.kuali.co%2Fapps%2F" + shibboleth_url = "https://idp2.coventry.ac.uk/idp/Authn/UserPassword" + # shibboleth wont let us connect unless it looks like we've been redirected from an approved site - response = requests.sessions.Session.get(self, kualiUrl) - if response.status_code != 200: - raise AuthenticationFailure("Failed to load Kuali, HTTP {}".format(response.status_code)) - - + res = Session.get(self, kuali_url) + if res.status_code != 200: + raise AuthenticationFailure( + "Failed to load Kuali, HTTP {}".format(res.status_code) + ) + # post the auth data to shibboleth data = {"j_username": self.username, "j_password": self.password} - response = requests.sessions.Session.post(self, shibbolethUrl, data=data) - if response.status_code != 200: - raise AuthenticationFailure("Failed to load shibboleth, HTTP {}".format(response.status_code)) + res = Session.post(self, shibboleth_url, data=data) + if res.status_code != 200: + raise AuthenticationFailure( + "Failed to load shibboleth, HTTP {}".format(res.status_code) + ) # extract the auth key and post it - soup = BeautifulSoup( response.text, "lxml" ) - samlUrl = soup.find( "form", {"method": "post"} )["action"] - key = soup.find( "input", {"name": "SAMLResponse"} )["value"] - #print(f"samlUrl: {samlUrl}\nkey: {key}") - response = requests.sessions.Session.post(self, samlUrl, data={"SAMLResponse": key}) - #print(f"samlUrl response: {response.text}") - if response.status_code != 200: - raise AuthenticationFailure("Failed to post auth code, HTTP {}".format(response.status_code)) + soup = BeautifulSoup(res.text, "lxml") + saml_url = soup.find("form", {"method": "post"})["action"] + key = soup.find("input", {"name": "SAMLResponse"})["value"] + # print(f"saml_url: {saml_url}\nkey: {key}") + res = Session.post(self, saml_url, data={"SAMLResponse": key}) + # print(f"saml_url res: {res.text}") + if res.status_code != 200: + raise AuthenticationFailure( + "Failed to post auth code, HTTP {}".format(res.status_code) + ) # get the actual page that we were after all this time - #print(f"Now getting {url}") - response = requests.sessions.Session.get(self, url) - #print(f"Actual response: {response.text}") - #print(response.text) - #return response - - def __auth_engage(self, url): - loginUrl = "https://engagementdashboard.coventry.ac.uk/login" - - response = requests.sessions.Session.get(self, loginUrl) - soup = BeautifulSoup( response.text, "lxml" ) + # print(f"Now getting {url}") + _res = Session.get(self, url) + # print(f"Actual res: {res.text}") + # print(res.text) + # return res + + def __auth_engage(self, _url): + login_url = "https://engagementdashboard.coventry.ac.uk/login" + + res = Session.get(self, login_url) + soup = BeautifulSoup(res.text, "lxml") hidden = soup.find("input", {"name": "_csrf"})["value"] - payload = {"username": self.username, - "password": self.password, - "_csrf": hidden} - self.post(loginUrl, data=payload) - - #response = requests.sessions.Session.get(self, url) - - #return response - - def __auth_moodle(self, response): - loginUrl = "https://cumoodle.coventry.ac.uk/login/index.php" - - response = requests.sessions.Session.get(self, loginUrl) - soup = BeautifulSoup( response.text, "lxml" ) - token = soup.find( "input", {"name": "logintoken"} )["value"] - - data = {"username": self.username, "password": self.password, "logintoken": token} - response = requests.sessions.Session.post(self, loginUrl, data=data) - - if response.status_code != 200: - raise AuthenticationFailure("Failed to load Moodle, HTTP {}".format(response.status_code)) - - #response = requests.sessions.Session.get(self, url) - #return response - - - domainRegex = re.compile(r"https{,1}://([\w\.\-]{1,})") - authHandler = {"webapp.coventry.ac.uk": __auth_sonic, \ - "engagementdashboard.coventry.ac.uk": __auth_engage, \ - "coventry.kuali.co": __auth_kuali, \ - "cumoodle.coventry.ac.uk": __auth_moodle } - redirectPages = ["https://engagementdashboard.coventry.ac.uk/login", \ - "https://cumoodle.coventry.ac.uk/login/index.php"] + payload = { + "username": self.username, + "password": self.password, + "_csrf": hidden, + } + self.post(login_url, data=payload) + + # res = Session.get(self, url) + + # return res + + def __auth_moodle(self, _res): + login_url = "https://cumoodle.coventry.ac.uk/login/index.php" + + res = Session.get(self, login_url) + soup = BeautifulSoup(res.text, "lxml") + token = soup.find("input", {"name": "logintoken"})["value"] + + data = { + "username": self.username, + "password": self.password, + "logintoken": token, + } + res = Session.post(self, login_url, data=data) + + if res.status_code != 200: + raise AuthenticationFailure( + "Failed to load Moodle, HTTP {}".format(res.status_code) + ) + + # response = Session.get(self, url) + # return response + + def __auth_aula(self, _): + # It should be possible to have the system just login once and then cache the aula token to disk since it literally never expires. + login_url = "https://api.coventry.aula.education/sso/login?redirect=https://coventry.aula.education/&email={}" + if not self.username.endswith("coventry.ac.uk"): + email = self.username + "@coventry.ac.uk" + elif self.username.endswith("uni.coventry.ac.uk"): + email = self.username.replace("@uni.", "@") + else: + email = self.username + + res = Session.get(self, login_url.format(email)) + if not res.status_code == 200: + raise AuthenticationFailure( + "Unable to begin SAML authentication chain with Aula, HTTP {}".format( + res.status_code + ) + ) + + soup = BeautifulSoup(res.text, "lxml") + saml_url = soup.find("form", {"method": "post", "id": "options"})["action"] + method = soup.find("input", {"name": "AuthMethod"})["value"] + data = {"UserName": email, "Password": self.password, "AuthMethod": method} + res = Session.post(self, saml_url, data=data) + if res.status_code != 200: + raise AuthenticationFailure( + "Failed to post auth code Stage 1, HTTP {}".format(res.status_code) + ) + soup = BeautifulSoup(res.text, "lxml") + saml_url = soup.find("form", {"method": "POST"})["action"] + key = soup.find("input", {"name": "SAMLResponse"})["value"] + state = soup.find("input", {"name": "RelayState"})["value"] + data = {"SAMLResponse": key, "RelayState": state} + res = Session.post( + self, + saml_url, + data=data, + headers={"Referer": "https://federatedauth.coventry.ac.uk/"}, + ) + if res.status_code != 200: + raise AuthenticationFailure( + "Failed to post auth code Stage 2, HTTP {}".format(res.status_code) + ) + soup = BeautifulSoup(res.text, "lxml") + saml_url = soup.find("form", {"method": "post"})["action"] + key = soup.find("input", {"name": "SAMLResponse"})["value"] + state = soup.find("input", {"name": "RelayState"})["value"] + data = {"SAMLResponse": key, "RelayState": state} + res = Session.post( + self, + saml_url, + data=data, + headers={"Referer": "https://federatedauth.coventry.ac.uk/"}, + ) + if res.status_code != 200: + raise AuthenticationFailure( + "Failed to transfer to Aula, HTTP {}".format(res.status_code) + ) + self.headers.update({"x-session-token": res.cookies.get("sso-session-t")}) + + domainRegex = re.compile(r"https?://([\w.\-]+)") + authHandler = { + "webapp.coventry.ac.uk": __auth_sonic, + "engagementdashboard.coventry.ac.uk": __auth_engage, + "coventry.kuali.co": __auth_kuali, + "cumoodle.coventry.ac.uk": __auth_moodle, + "apiv2.coventry.aula.education": __auth_aula, + } + redirectPages = [ + "https://engagementdashboard.coventry.ac.uk/login", + "https://cumoodle.coventry.ac.uk/login/index.php", + ] def __init__(self, username, password): - requests.sessions.Session.__init__(self) + Session.__init__(self) self.username = username self.password = password - def __run_handler(self, response): - domain = self.domainRegex.search(response.url) - #print(f"Got domain: {domain}") - if domain: - domain = domain.group(1) - try: - func = self.authHandler[domain] - func(self,response.url) - except KeyError: pass - - + def __run_handler(self, res): + domain = self.domainRegex.search(res.url) + # print(f"Got domain: {domain}") + if domain: + domain = domain.group(1) + try: + func = self.authHandler[domain] + func(self, res.url) + except KeyError: + pass + def get(self, url, *args, **kwargs): - #print( url ) - #certfile = os.path.join('/etc/ssl/certs/','ca-bundle.crt') + # print( url ) + # certfile = os.path.join('/etc/ssl/certs/','ca-bundle.crt') - response = requests.sessions.Session.get(self, url, verify=False, *args, **kwargs) + res = Session.get(self, url, verify=False, *args, **kwargs) - failCondition = lambda response: response.status_code in (401,403,500) or response.url in self.redirectPages - #print(f"response text: {response.text}\ncode: {response.status_code}") - if failCondition(response): # if the page failed or we got redirected to anything in redirectPages - self.__run_handler( response ) - response = requests.sessions.Session.get(self, url, *args, **kwargs) + fail_condition = ( + lambda check_res: check_res.status_code in (400, 401, 403, 500) + or check_res.url in self.redirectPages + ) + # print(f"res text: {res.text}\ncode: {res.status_code}") + if fail_condition( + res + ): # if the page failed or we got redirected to anything in redirectPages + self.__run_handler(res) + res = Session.get(self, url, *args, **kwargs) - if failCondition(response): # if it still didn't work give up + if fail_condition(res): # if it still didn't work give up raise AuthenticationFailure("Could not authenticate") - #print("auth.get: ",end="") - #print(response.text) - return response - - def post(self,url, *args, **kwargs): - response = requests.sessions.Session.post(self, url, *args, **kwargs) - - failCondition = lambda response: response.status_code in (401,403) or response.url in self.redirectPages - - if failCondition(response): # if the page failed or we got redirected to anything in redirectPages - self.__run_handler( response ) - response = requests.sessions.Session.post(self, url, *args, **kwargs) - - if failCondition(response): # if it still didn't work give up + # print("auth.get: ",end="") + # print(res.text) + return res + + def post(self, url, *args, **kwargs): + res = Session.post(self, url, *args, **kwargs) + + fail_condition = ( + lambda check_res: check_res.status_code in (401, 403) + or check_res.url in self.redirectPages + ) + + if fail_condition( + res + ): # if the page failed or we got redirected to anything in redirectPages + self.__run_handler(res) + res = Session.post(self, url, *args, **kwargs) + + if fail_condition(res): # if it still didn't work give up raise AuthenticationFailure("Could not authenticate") - return response - + return res + +def url_safe(val): + return parse.quote(val, safe="") -def url_safe( val ): - return urllib.parse.quote(val,safe="") if __name__ == "__main__": auth = Authenticator(sys.argv[1], sys.argv[2]) - response = auth.get("https://cumoodle.coventry.ac.uk/grade/report/grader/index.php?id=47437") + response = auth.get( + "https://cumoodle.coventry.ac.uk/grade/report/grader/index.php?id=47437" + ) print(response.text) print() - - #response = auth.get("https://webapp.coventry.ac.uk/Timetable-main") - #print(response) - #response = auth.get("https://engagementdashboard.coventry.ac.uk/attendance/all?id=7203071") - #print(response) - + # response = auth.get("https://webapp.coventry.ac.uk/Timetable-main") + # print(response) + + # response = auth.get("https://engagementdashboard.coventry.ac.uk/attendance/all?id=7203071") + # print(response) diff --git a/requirements.txt b/requirements.txt index 919722c..bea216c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,15 +1,15 @@ -beautifulsoup4==4.9.3 +beautifulsoup4==4.11.2 bs4==0.0.1 certifi==2020.12.5 cffi==1.14.4 chardet==3.0.4 cryptography==3.2.1 idna==2.10 -lxml==4.6.2 +lxml==4.9.2 ntlm-auth==1.5.0 -pkg-resources==0.0.0 +# pkg-resources==0.0.0 pycparser==2.20 -requests==2.25.0 +requests==2.28.2 requests-ntlm==1.1.0 six==1.15.0 soupsieve==2.0.1 From 63beaf45dac36f899e0f3dc1de29c8671b7f4f6d Mon Sep 17 00:00:00 2001 From: dscroft Date: Mon, 15 May 2023 11:27:31 +0100 Subject: [PATCH 2/2] Update aulaapi.py Fix issue where emails will match multiple users. E.g. smith@coventry.ac.uk Will match and return - smith@coventry.ac.uk - ssmith@covernty.ac.uk etc. --- covscraper/aulaapi.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/covscraper/aulaapi.py b/covscraper/aulaapi.py index 69ad442..6c53889 100644 --- a/covscraper/aulaapi.py +++ b/covscraper/aulaapi.py @@ -10,11 +10,17 @@ def email_to_id(session: Session, student_email: str): student_email = student_email.replace("@uni.", "@") # Grab internal aula id for student first - aula_id = session.get( - "https://apiv2.coventry.aula.education/search/v2/users/{}?size=1".format( + response = session.get( + "https://apiv2.coventry.aula.education/search/v2/users/{}".format( student_email ), - ).json()["users"][0]["id"] + ) + + aula_id = None + for i in response.json()["users"]: + if i["email"] == student_email: + aula_id = i["id"] + break # Grab student object and extract corresponding student ID data = {"userIds": [aula_id]}