diff --git a/data-app/main.py b/data-app/main.py index ba41497..68c696f 100644 --- a/data-app/main.py +++ b/data-app/main.py @@ -1,6 +1,7 @@ import argparse import pandas as pd import numpy as np +import asyncio from db.Models import Session, Professor, DepartmentDistribution, TermDistribution from src.generation.process import Process @@ -87,7 +88,7 @@ session = Session() dept_dists = session.query(DepartmentDistribution).all() session.close() - CourseInfoEnhance().enhance(dept_dists) + asyncio.run(CourseInfoEnhance().enhance(dept_dists)) print("[MAIN] Finished CourseInfo Updating") if not args.DisableRMP: diff --git a/data-app/src/enhance/abstract.py b/data-app/src/enhance/abstract.py index 7c02b84..b498130 100644 --- a/data-app/src/enhance/abstract.py +++ b/data-app/src/enhance/abstract.py @@ -1,10 +1,9 @@ from abc import ABC, abstractmethod from db.Models import DepartmentDistribution -from multiprocessing import Pool +import asyncio class EnhanceBase(ABC): """Base class for data enhancement operations.""" - def __init__(self): pass @@ -13,7 +12,10 @@ def enhance_helper(self, dept_dist: DepartmentDistribution) -> None: """Abstract method to be implemented by subclasses for enhancing data.""" pass - def enhance(self, dept_dists: list[DepartmentDistribution]) -> None: + async def enhance(self, dept_dists: list[DepartmentDistribution]) -> None: """Enhance the data for a list of department distributions in a multiprocessing pool.""" - with Pool() as pool: - pool.map(self.enhance_helper, dept_dists) \ No newline at end of file + + semaphore = asyncio.Semaphore(9) # Limit concurrent tasks to something under 5 due to rate limiting + + tasks = [self.enhance_helper(dept, semaphore) for dept in dept_dists] + await asyncio.gather(*tasks) diff --git a/data-app/src/enhance/courseInfo.py b/data-app/src/enhance/courseInfo.py index 279a161..450030b 100644 --- a/data-app/src/enhance/courseInfo.py +++ b/data-app/src/enhance/courseInfo.py @@ -3,10 +3,11 @@ from .abstract import EnhanceBase from db.Models import DepartmentDistribution, ClassDistribution, Libed, Session, and_ -import requests +import httpx import datetime +import json from mapping.mappings import libed_mapping - +import asyncio class CourseInfoEnhance(EnhanceBase): @@ -29,53 +30,82 @@ def _calculate_current_term(self) -> str: sterm = f"{year - 1900}{semester_code}" return sterm - def enhance_helper(self, dept_dist: DepartmentDistribution) -> None: - dept = dept_dist.dept_abbr - campus = dept_dist.campus - campus_str = str(campus) + def _process_course_data(self, courses: list[dict], dept: str, campus: str) -> None: + session = Session() + try: + for course in courses: + course_nbr = course["catalog_number"] + class_dist = session.query(ClassDistribution).filter(and_(ClassDistribution.dept_abbr == dept, ClassDistribution.course_num == course_nbr, ClassDistribution.campus == campus)).first() + if class_dist: + class_dist.libeds.clear() + + for attribute in course.get("course_attributes", []): + family = attribute.get("family", "") + attr_id = attribute.get("attribute_id", "") + api_key = f"{family}_{attr_id}" + + if api_key in libed_mapping: + libed_name = libed_mapping[api_key] + elif attr_id in libed_mapping: + libed_name = libed_mapping[attr_id] + else: + continue + # Removed logging for libed not found due to many irrelevant unmapped attributes making it hard to monitor + # [CI Enhanced] Libed mapping not found for attribute: ONL_ONLINE / ONLINE + # [CI Enhanced] Libed mapping not found for attribute: DELM_08 / 08 + # There are more examples like this but they are not useful to log + + libed_dist = session.query(Libed).filter(Libed.name == libed_name).first() + if libed_dist and class_dist not in libed_dist.class_dists: + libed_dist.class_dists.append(class_dist) + + print(f"[CI Enhance] Updated [{class_dist.campus}] {class_dist.dept_abbr} {class_dist.course_num} : Libeds: ({class_dist.libeds})") + session.commit() + except Exception as e: + print(f"[CI ERROR] Error processing course data for {dept} at {campus}: {e}") + session.rollback() + finally: + session.close() + + async def enhance_helper(self, dept_dist: DepartmentDistribution, semaphore: asyncio.Semaphore) -> None: + + async with semaphore: + dept = dept_dist.dept_abbr + campus = dept_dist.campus + campus_str = str(campus) + + # Only process UMNTC and UMNRO campuses + if campus_str not in ["UMNTC", "UMNRO"]: + return + + current_term = self._calculate_current_term() + link = f"https://courses.umn.edu/campuses/{campus_str.lower()}/terms/{current_term}/courses.json?q=subject_id={dept}" - # Only process UMNTC and UMNRO campuses - if campus_str not in ["UMNTC", "UMNRO"]: - return + courses = [] + async with httpx.AsyncClient() as client: + try: + response = await client.get(link) + response.raise_for_status() + req = response.json() + courses = req.get("courses", []) + except httpx.HTTPStatusError as e: + print(f"[CI HTTP ERROR] Failed for {dept}. Status: {e.response.status_code}. URL: {e.request.url}") + return + except json.JSONDecodeError as e: + print(f"[CI PARSE ERROR] Failed to parse JSON for {dept}. URL: {link}. Error: {repr(e)}") + return + except httpx.RequestError as e: + print(f"[CI REQUEST ERROR] Failed to fetch data for {dept}. Error: {repr(e)}") + return - current_term = self._calculate_current_term() - link = f"https://courses.umn.edu/campuses/{campus_str.lower()}/terms/{current_term}/courses.json?q=subject_id={dept}" + if not courses: + print(f"[CI INFO] No courses found for {dept} at {campus_str}") + return - with requests.get(link) as url: try: - req = url.json() - courses = req.get("courses", []) - except ValueError: - print("Json malformed, icky!") + await asyncio.to_thread(self._process_course_data, courses, dept, campus) + except Exception as e: + print(f"[CI ERROR] Error in processing thread for {dept}: {e}") return - - for course in courses: - course_nbr = course["catalog_number"] - session = Session() - class_dist = session.query(ClassDistribution).filter(and_(ClassDistribution.dept_abbr == dept, ClassDistribution.course_num == course_nbr, ClassDistribution.campus == campus)).first() - if class_dist: - class_dist.libeds.clear() - - for attribute in course.get("course_attributes", []): - family = attribute.get("family", "") - attr_id = attribute.get("attribute_id", "") - api_key = f"{family}_{attr_id}" - - if api_key in libed_mapping: - libed_name = libed_mapping[api_key] - elif attr_id in libed_mapping: - libed_name = libed_mapping[attr_id] - else: - continue - # Removed logging for libed not found due to many irrelevant unmapped attributes making it hard to monitor - # [CI Enhanced] Libed mapping not found for attribute: ONL_ONLINE / ONLINE - # [CI Enhanced] Libed mapping not found for attribute: DELM_08 / 08 - # There are more examples like this but they are not useful to log - - libed_dist = session.query(Libed).filter(Libed.name == libed_name).first() - if class_dist not in libed_dist.class_dists: - libed_dist.class_dists.append(class_dist) - - print(f"[CI Enhance] Updated [{class_dist.campus}] {class_dist.dept_abbr} {class_dist.course_num} : Libeds: ({class_dist.libeds})") - session.commit() - session.close() + + \ No newline at end of file