Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion data-app/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse
import pandas as pd
import numpy as np
import asyncio
from db.Models import Session, Professor, DepartmentDistribution, TermDistribution

from src.generation.process import Process
Expand Down Expand Up @@ -87,7 +88,7 @@
session = Session()
dept_dists = session.query(DepartmentDistribution).all()
session.close()
CourseInfoEnhance().enhance(dept_dists)
asyncio.run(CourseInfoEnhance().enhance(dept_dists))
print("[MAIN] Finished CourseInfo Updating")

if not args.DisableRMP:
Expand Down
12 changes: 7 additions & 5 deletions data-app/src/enhance/abstract.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from abc import ABC, abstractmethod
from db.Models import DepartmentDistribution
from multiprocessing import Pool
import asyncio

class EnhanceBase(ABC):
"""Base class for data enhancement operations."""

def __init__(self):
pass

Expand All @@ -13,7 +12,10 @@ def enhance_helper(self, dept_dist: DepartmentDistribution) -> None:
"""Abstract method to be implemented by subclasses for enhancing data."""
pass

def enhance(self, dept_dists: list[DepartmentDistribution]) -> None:
async def enhance(self, dept_dists: list[DepartmentDistribution]) -> None:
"""Enhance the data for a list of department distributions in a multiprocessing pool."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update comment.

with Pool() as pool:
pool.map(self.enhance_helper, dept_dists)

semaphore = asyncio.Semaphore(9) # Limit concurrent tasks to something under 5 due to rate limiting
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment conflicts with implementation


tasks = [self.enhance_helper(dept, semaphore) for dept in dept_dists]
await asyncio.gather(*tasks)
124 changes: 77 additions & 47 deletions data-app/src/enhance/courseInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

from .abstract import EnhanceBase
from db.Models import DepartmentDistribution, ClassDistribution, Libed, Session, and_
import requests
import httpx
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not imported in pipenv

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we use aiohttp for RMP already. I would encourage using that so we keep our surface area low with libraries. Check the RMP folder for how that looks like.

import datetime
import json
from mapping.mappings import libed_mapping

import asyncio

class CourseInfoEnhance(EnhanceBase):

Expand All @@ -29,53 +30,82 @@ def _calculate_current_term(self) -> str:
sterm = f"{year - 1900}{semester_code}"
return sterm

def enhance_helper(self, dept_dist: DepartmentDistribution) -> None:
dept = dept_dist.dept_abbr
campus = dept_dist.campus
campus_str = str(campus)
def _process_course_data(self, courses: list[dict], dept: str, campus: str) -> None:
session = Session()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sessions should be made for every granular change. You're batching a massive amount of data in one commit.

try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If even one course fails the entire batch will be considered faulty.

for course in courses:
course_nbr = course["catalog_number"]
class_dist = session.query(ClassDistribution).filter(and_(ClassDistribution.dept_abbr == dept, ClassDistribution.course_num == course_nbr, ClassDistribution.campus == campus)).first()
if class_dist:
class_dist.libeds.clear()

for attribute in course.get("course_attributes", []):
family = attribute.get("family", "")
attr_id = attribute.get("attribute_id", "")
api_key = f"{family}_{attr_id}"

if api_key in libed_mapping:
libed_name = libed_mapping[api_key]
elif attr_id in libed_mapping:
libed_name = libed_mapping[attr_id]
else:
continue
# Removed logging for libed not found due to many irrelevant unmapped attributes making it hard to monitor
# [CI Enhanced] Libed mapping not found for attribute: ONL_ONLINE / ONLINE
# [CI Enhanced] Libed mapping not found for attribute: DELM_08 / 08
# There are more examples like this but they are not useful to log

libed_dist = session.query(Libed).filter(Libed.name == libed_name).first()
if libed_dist and class_dist not in libed_dist.class_dists:
libed_dist.class_dists.append(class_dist)

print(f"[CI Enhance] Updated [{class_dist.campus}] {class_dist.dept_abbr} {class_dist.course_num} : Libeds: ({class_dist.libeds})")
session.commit()
except Exception as e:
print(f"[CI ERROR] Error processing course data for {dept} at {campus}: {e}")
session.rollback()
finally:
session.close()

async def enhance_helper(self, dept_dist: DepartmentDistribution, semaphore: asyncio.Semaphore) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does not fit abstract class structure.


async with semaphore:
dept = dept_dist.dept_abbr
campus = dept_dist.campus
campus_str = str(campus)

# Only process UMNTC and UMNRO campuses
if campus_str not in ["UMNTC", "UMNRO"]:
return

current_term = self._calculate_current_term()
link = f"https://courses.umn.edu/campuses/{campus_str.lower()}/terms/{current_term}/courses.json?q=subject_id={dept}"

# Only process UMNTC and UMNRO campuses
if campus_str not in ["UMNTC", "UMNRO"]:
return
courses = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this is necessary? Courses will either be assigned a value if the try succeeds. If it fails it should return with no-ops./

async with httpx.AsyncClient() as client:
try:
response = await client.get(link)
response.raise_for_status()
req = response.json()
courses = req.get("courses", [])
except httpx.HTTPStatusError as e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to know if this is a prohibited error 4xx or a server error 5xx in our logs.

print(f"[CI HTTP ERROR] Failed for {dept}. Status: {e.response.status_code}. URL: {e.request.url}")
return
except json.JSONDecodeError as e:
print(f"[CI PARSE ERROR] Failed to parse JSON for {dept}. URL: {link}. Error: {repr(e)}")
return
except httpx.RequestError as e:
print(f"[CI REQUEST ERROR] Failed to fetch data for {dept}. Error: {repr(e)}")
return

current_term = self._calculate_current_term()
link = f"https://courses.umn.edu/campuses/{campus_str.lower()}/terms/{current_term}/courses.json?q=subject_id={dept}"
if not courses:
print(f"[CI INFO] No courses found for {dept} at {campus_str}")
return

with requests.get(link) as url:
try:
req = url.json()
courses = req.get("courses", [])
except ValueError:
print("Json malformed, icky!")
await asyncio.to_thread(self._process_course_data, courses, dept, campus)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This spawns a seperate thread to run a synchronous function. Can you make _process_course_data async and just await it instead? Would reduce thread overhead.

except Exception as e:
print(f"[CI ERROR] Error in processing thread for {dept}: {e}")
return

for course in courses:
course_nbr = course["catalog_number"]
session = Session()
class_dist = session.query(ClassDistribution).filter(and_(ClassDistribution.dept_abbr == dept, ClassDistribution.course_num == course_nbr, ClassDistribution.campus == campus)).first()
if class_dist:
class_dist.libeds.clear()

for attribute in course.get("course_attributes", []):
family = attribute.get("family", "")
attr_id = attribute.get("attribute_id", "")
api_key = f"{family}_{attr_id}"

if api_key in libed_mapping:
libed_name = libed_mapping[api_key]
elif attr_id in libed_mapping:
libed_name = libed_mapping[attr_id]
else:
continue
# Removed logging for libed not found due to many irrelevant unmapped attributes making it hard to monitor
# [CI Enhanced] Libed mapping not found for attribute: ONL_ONLINE / ONLINE
# [CI Enhanced] Libed mapping not found for attribute: DELM_08 / 08
# There are more examples like this but they are not useful to log

libed_dist = session.query(Libed).filter(Libed.name == libed_name).first()
if class_dist not in libed_dist.class_dists:
libed_dist.class_dists.append(class_dist)

print(f"[CI Enhance] Updated [{class_dist.campus}] {class_dist.dept_abbr} {class_dist.course_num} : Libeds: ({class_dist.libeds})")
session.commit()
session.close()