From 62ab6521a208b551c911f03bc0992411e500b23d Mon Sep 17 00:00:00 2001 From: AssassinMagic Date: Thu, 23 Oct 2025 17:37:06 -0500 Subject: [PATCH 1/2] updated enhance to use asyncio for multithreading --- data-app/main.py | 3 +- data-app/src/enhance/abstract.py | 12 +-- data-app/src/enhance/courseInfo.py | 117 +++++++++++++++++------------ 3 files changed, 79 insertions(+), 53 deletions(-) diff --git a/data-app/main.py b/data-app/main.py index ba41497..68c696f 100644 --- a/data-app/main.py +++ b/data-app/main.py @@ -1,6 +1,7 @@ import argparse import pandas as pd import numpy as np +import asyncio from db.Models import Session, Professor, DepartmentDistribution, TermDistribution from src.generation.process import Process @@ -87,7 +88,7 @@ session = Session() dept_dists = session.query(DepartmentDistribution).all() session.close() - CourseInfoEnhance().enhance(dept_dists) + asyncio.run(CourseInfoEnhance().enhance(dept_dists)) print("[MAIN] Finished CourseInfo Updating") if not args.DisableRMP: diff --git a/data-app/src/enhance/abstract.py b/data-app/src/enhance/abstract.py index 7c02b84..5d6a5c0 100644 --- a/data-app/src/enhance/abstract.py +++ b/data-app/src/enhance/abstract.py @@ -1,10 +1,9 @@ from abc import ABC, abstractmethod from db.Models import DepartmentDistribution -from multiprocessing import Pool +import asyncio class EnhanceBase(ABC): """Base class for data enhancement operations.""" - def __init__(self): pass @@ -13,7 +12,10 @@ def enhance_helper(self, dept_dist: DepartmentDistribution) -> None: """Abstract method to be implemented by subclasses for enhancing data.""" pass - def enhance(self, dept_dists: list[DepartmentDistribution]) -> None: + async def enhance(self, dept_dists: list[DepartmentDistribution]) -> None: """Enhance the data for a list of department distributions in a multiprocessing pool.""" - with Pool() as pool: - pool.map(self.enhance_helper, dept_dists) \ No newline at end of file + + semaphore = asyncio.Semaphore(10) # Limit concurrent tasks to 10 + + tasks = [self.enhance_helper(dept) for dept in dept_dists] + await asyncio.gather(*tasks) diff --git a/data-app/src/enhance/courseInfo.py b/data-app/src/enhance/courseInfo.py index 279a161..a16cfa4 100644 --- a/data-app/src/enhance/courseInfo.py +++ b/data-app/src/enhance/courseInfo.py @@ -3,10 +3,10 @@ from .abstract import EnhanceBase from db.Models import DepartmentDistribution, ClassDistribution, Libed, Session, and_ -import requests +import httpx import datetime from mapping.mappings import libed_mapping - +import asyncio class CourseInfoEnhance(EnhanceBase): @@ -29,53 +29,76 @@ def _calculate_current_term(self) -> str: sterm = f"{year - 1900}{semester_code}" return sterm - def enhance_helper(self, dept_dist: DepartmentDistribution) -> None: - dept = dept_dist.dept_abbr - campus = dept_dist.campus - campus_str = str(campus) + def _process_course_data(self, courses: list[dict], dept: str, campus: str) -> None: + session = Session() + try: + for course in courses: + course_nbr = course["catalog_number"] + class_dist = session.query(ClassDistribution).filter(and_(ClassDistribution.dept_abbr == dept, ClassDistribution.course_num == course_nbr, ClassDistribution.campus == campus)).first() + if class_dist: + class_dist.libeds.clear() + + for attribute in course.get("course_attributes", []): + family = attribute.get("family", "") + attr_id = attribute.get("attribute_id", "") + api_key = f"{family}_{attr_id}" + + if api_key in libed_mapping: + libed_name = libed_mapping[api_key] + elif attr_id in libed_mapping: + libed_name = libed_mapping[attr_id] + else: + continue + # Removed logging for libed not found due to many irrelevant unmapped attributes making it hard to monitor + # [CI Enhanced] Libed mapping not found for attribute: ONL_ONLINE / ONLINE + # [CI Enhanced] Libed mapping not found for attribute: DELM_08 / 08 + # There are more examples like this but they are not useful to log + + libed_dist = session.query(Libed).filter(Libed.name == libed_name).first() + if libed_dist and class_dist not in libed_dist.class_dists: + libed_dist.class_dists.append(class_dist) + + print(f"[CI Enhance] Updated [{class_dist.campus}] {class_dist.dept_abbr} {class_dist.course_num} : Libeds: ({class_dist.libeds})") + session.commit() + except Exception as e: + print(f"[CI ERROR] Error processing course data for {dept} at {campus}: {e}") + session.rollback() + finally: + session.close() + + async def enhance_helper(self, dept_dist: DepartmentDistribution) -> None: + + async with asyncio.Semaphore(10): + dept = dept_dist.dept_abbr + campus = dept_dist.campus + campus_str = str(campus) + + # Only process UMNTC and UMNRO campuses + if campus_str not in ["UMNTC", "UMNRO"]: + return + + current_term = self._calculate_current_term() + link = f"https://courses.umn.edu/campuses/{campus_str.lower()}/terms/{current_term}/courses.json?q=subject_id={dept}" - # Only process UMNTC and UMNRO campuses - if campus_str not in ["UMNTC", "UMNRO"]: - return + courses = [] + async with httpx.AsyncClient() as client: + try: + response = await client.get(link) + response.raise_for_status() + req = response.json() + courses = req.get("courses", []) + except (httpx.RequestError, ValueError) as e: + print(f"[CI ERROR] Failed to fetch or parse data for {dept} at {campus_str}: {e}") + return - current_term = self._calculate_current_term() - link = f"https://courses.umn.edu/campuses/{campus_str.lower()}/terms/{current_term}/courses.json?q=subject_id={dept}" + if not courses: + print(f"[CI INFO] No courses found for {dept} at {campus_str}") + return - with requests.get(link) as url: try: - req = url.json() - courses = req.get("courses", []) - except ValueError: - print("Json malformed, icky!") + await asyncio.to_thread(self._process_course_data, courses, dept, campus) + except Exception as e: + print(f"[CI ERROR] Error in processing thread for {dept}: {e}") return - - for course in courses: - course_nbr = course["catalog_number"] - session = Session() - class_dist = session.query(ClassDistribution).filter(and_(ClassDistribution.dept_abbr == dept, ClassDistribution.course_num == course_nbr, ClassDistribution.campus == campus)).first() - if class_dist: - class_dist.libeds.clear() - - for attribute in course.get("course_attributes", []): - family = attribute.get("family", "") - attr_id = attribute.get("attribute_id", "") - api_key = f"{family}_{attr_id}" - - if api_key in libed_mapping: - libed_name = libed_mapping[api_key] - elif attr_id in libed_mapping: - libed_name = libed_mapping[attr_id] - else: - continue - # Removed logging for libed not found due to many irrelevant unmapped attributes making it hard to monitor - # [CI Enhanced] Libed mapping not found for attribute: ONL_ONLINE / ONLINE - # [CI Enhanced] Libed mapping not found for attribute: DELM_08 / 08 - # There are more examples like this but they are not useful to log - - libed_dist = session.query(Libed).filter(Libed.name == libed_name).first() - if class_dist not in libed_dist.class_dists: - libed_dist.class_dists.append(class_dist) - - print(f"[CI Enhance] Updated [{class_dist.campus}] {class_dist.dept_abbr} {class_dist.course_num} : Libeds: ({class_dist.libeds})") - session.commit() - session.close() + + \ No newline at end of file From 54e16a1bb1e27097cc37ed3d4d0da74a7036ac1b Mon Sep 17 00:00:00 2001 From: AssassinMagic Date: Thu, 23 Oct 2025 20:42:29 -0500 Subject: [PATCH 2/2] fixed semaphore issue in enhance --- data-app/src/enhance/abstract.py | 4 ++-- data-app/src/enhance/courseInfo.py | 15 +++++++++++---- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/data-app/src/enhance/abstract.py b/data-app/src/enhance/abstract.py index 5d6a5c0..b498130 100644 --- a/data-app/src/enhance/abstract.py +++ b/data-app/src/enhance/abstract.py @@ -15,7 +15,7 @@ def enhance_helper(self, dept_dist: DepartmentDistribution) -> None: async def enhance(self, dept_dists: list[DepartmentDistribution]) -> None: """Enhance the data for a list of department distributions in a multiprocessing pool.""" - semaphore = asyncio.Semaphore(10) # Limit concurrent tasks to 10 + semaphore = asyncio.Semaphore(9) # Limit concurrent tasks to something under 5 due to rate limiting - tasks = [self.enhance_helper(dept) for dept in dept_dists] + tasks = [self.enhance_helper(dept, semaphore) for dept in dept_dists] await asyncio.gather(*tasks) diff --git a/data-app/src/enhance/courseInfo.py b/data-app/src/enhance/courseInfo.py index a16cfa4..450030b 100644 --- a/data-app/src/enhance/courseInfo.py +++ b/data-app/src/enhance/courseInfo.py @@ -5,6 +5,7 @@ from db.Models import DepartmentDistribution, ClassDistribution, Libed, Session, and_ import httpx import datetime +import json from mapping.mappings import libed_mapping import asyncio @@ -66,9 +67,9 @@ def _process_course_data(self, courses: list[dict], dept: str, campus: str) -> N finally: session.close() - async def enhance_helper(self, dept_dist: DepartmentDistribution) -> None: + async def enhance_helper(self, dept_dist: DepartmentDistribution, semaphore: asyncio.Semaphore) -> None: - async with asyncio.Semaphore(10): + async with semaphore: dept = dept_dist.dept_abbr campus = dept_dist.campus campus_str = str(campus) @@ -87,8 +88,14 @@ async def enhance_helper(self, dept_dist: DepartmentDistribution) -> None: response.raise_for_status() req = response.json() courses = req.get("courses", []) - except (httpx.RequestError, ValueError) as e: - print(f"[CI ERROR] Failed to fetch or parse data for {dept} at {campus_str}: {e}") + except httpx.HTTPStatusError as e: + print(f"[CI HTTP ERROR] Failed for {dept}. Status: {e.response.status_code}. URL: {e.request.url}") + return + except json.JSONDecodeError as e: + print(f"[CI PARSE ERROR] Failed to parse JSON for {dept}. URL: {link}. Error: {repr(e)}") + return + except httpx.RequestError as e: + print(f"[CI REQUEST ERROR] Failed to fetch data for {dept}. Error: {repr(e)}") return if not courses: