Skip to content
90 changes: 51 additions & 39 deletions apps/datapuller/src/pullers/classes.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { connection } from "mongoose";

import { ClassModel, TermModel } from "@repo/common";

import { warmCatalogCacheForTerms } from "../lib/cache-warming";
Expand Down Expand Up @@ -98,53 +100,63 @@ const updateClasses = async (config: Config, termSelector: TermSelector) => {

let totalClasses = 0;
let totalInserted = 0;
let totalDeleted = 0;

// Insert classes in batches of 4 terms
for (let i = 0; i < terms.length; i += TERMS_PER_API_BATCH) {
const termsBatch = terms.slice(i, i + TERMS_PER_API_BATCH);
const termsBatchIds = termsBatch.map((term) => term.id);

log.trace(
`Fetching classes for term ${termsBatch.map((term) => term.name).toLocaleString()}...`
);

const classes = await getClasses(
log,
CLASS_APP_ID,
CLASS_APP_KEY,
termsBatchIds
);

log.info(`Fetched ${classes.length.toLocaleString()} classes.`);
if (classes.length === 0) {
log.error(`No classes found, skipping update.`);
return;
}
totalClasses += classes.length;

log.trace("Deleting classes to be replaced...");

const { deletedCount } = await ClassModel.deleteMany({
termId: { $in: termsBatchIds },
});

log.info(`Deleted ${deletedCount.toLocaleString()} classes.`);

// Insert classes in batches of 5000
const insertBatchSize = 5000;
for (let i = 0; i < classes.length; i += insertBatchSize) {
const batch = classes.slice(i, i + insertBatchSize);

log.trace(`Inserting batch ${i / insertBatchSize + 1}...`);

const { insertedCount } = await ClassModel.insertMany(batch, {
ordered: false,
rawResult: true,
const session = await connection.startSession();
try {
await session.withTransaction(async () => {
log.trace(
`Fetching classes for terms ${termsBatch.map((term) => term.name).toLocaleString()}...`
);
const classes = await getClasses(
log,
CLASS_APP_ID,
CLASS_APP_KEY,
termsBatchIds
);

if (classes.length === 0) {
throw new Error(`No classes found, skipping update.`);
}

log.trace(`Deleting classes in batch ${i}...`);

const { deletedCount } = await ClassModel.deleteMany({
termId: { $in: termsBatchIds },
});

log.trace(`Inserting batch ${i}`);

const { insertedCount } = await ClassModel.insertMany(classes, {
ordered: false,
rawResult: true,
});

// avoid replacing data if a non-negligible amount is deleted
if (insertedCount / deletedCount <= 0.9) {
throw new Error(
`Deleted ${deletedCount} classes and inserted only ${insertedCount} in batch ${i}; aborting data insertion process`
);
}

totalClasses += classes.length;
totalDeleted += deletedCount;
totalInserted += insertedCount;
});
totalInserted += insertedCount;
} catch (error: any) {
log.error(`Error inserting batch: ${error.message}`);
} finally {
await session.endSession();
}
}

log.info(`Deleted ${totalDeleted.toLocaleString()} total classes`);
log.info(
`Completed updating database with ${totalClasses.toLocaleString()} classes, inserted ${totalInserted.toLocaleString()} documents.`
`Inserted ${totalInserted.toLocaleString()} classes, after fetching ${totalClasses.toLocaleString()} classes.`
);

await updateTermsCatalogDataFlags(log);
Expand Down
9 changes: 9 additions & 0 deletions apps/datapuller/src/pullers/courses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ const updateCourses = async (config: Config) => {

log.trace("Deleting courses no longer in SIS...");

const previousCourses = await CourseModel.countDocuments();

if (courses.length / previousCourses <= 0.95) {
log.error(
`Fetched only ${courses.length} courses, while there were ${previousCourses} previous courses`
);
return;
}

const { deletedCount } = await CourseModel.deleteMany({
courseId: { $nin: courses.map((course) => course.courseId) },
});
Expand Down
104 changes: 62 additions & 42 deletions apps/datapuller/src/pullers/grade-distributions.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { connection } from "mongoose";

import {
CourseModel,
GradeCounts,
Expand All @@ -14,7 +16,7 @@ import {
getRecentPastTerms,
} from "../shared/term-selectors";

const TERMS_PER_API_BATCH = 100;
const TERMS_PER_API_BATCH = 50;

interface AggregatedCourseGradeSummary extends GradeCounts {
_id: {
Expand Down Expand Up @@ -141,54 +143,72 @@ const updateGradeDistributions = async (

let totalGradeDistributions = 0;
let totalInserted = 0;
let totalDeleted = 0;

// Insert grade distributions in batches of 50 terms
for (let i = 0; i < terms.length; i += TERMS_PER_API_BATCH) {
const termsBatch = terms.slice(i, i + TERMS_PER_API_BATCH);
const termsBatchIds = termsBatch.map((term) => term.id);

log.trace(
`Fetching grade distributions for term ${termsBatch.map((term) => term.name).toLocaleString()}...`
);

const gradeDistributions = await getGradeDistributionDataByTerms(
DATABASE,
S3_OUTPUT,
REGION_NAME,
WORKGROUP,
termsBatchIds
);

log.info(
`Fetched ${gradeDistributions.length.toLocaleString()} grade distributions.`
);
if (gradeDistributions.length === 0) {
log.error("No grade distributions found, skipping update");
return;
}

log.trace("Deleting grade distributions to be replaced...");

const { deletedCount } = await GradeDistributionModel.deleteMany({
termId: { $in: termsBatchIds },
});

log.info(`Deleted ${deletedCount.toLocaleString()} grade distributions.`);

// Insert grade distributions in batches of 5000
let totalInserted = 0;
const insertBatchSize = 5000;
for (let i = 0; i < gradeDistributions.length; i += insertBatchSize) {
const batch = gradeDistributions.slice(i, i + insertBatchSize);

log.trace(`Inserting batch ${i / insertBatchSize + 1}`);

const { insertedCount } = await GradeDistributionModel.insertMany(batch, {
ordered: false,
rawResult: true,
const session = await connection.startSession();
try {
await session.withTransaction(async () => {
log.trace(
`Fetching grade distributions for terms ${termsBatch.map((term) => term.name).toLocaleString()}...`
);

const gradeDistributions = await getGradeDistributionDataByTerms(
DATABASE,
S3_OUTPUT,
REGION_NAME,
WORKGROUP,
termsBatchIds
);

log.info(
`Fetched ${gradeDistributions.length.toLocaleString()} grade distributions.`
);

if (gradeDistributions.length === 0) {
throw new Error(`No grade distributions found, skipping update`);
}

log.trace("Deleting grade distributions to be replaced...");

const { deletedCount } = await GradeDistributionModel.deleteMany({
termId: { $in: termsBatchIds },
});

log.trace(`Inserting batch ${i}`);

const { insertedCount } = await GradeDistributionModel.insertMany(
gradeDistributions,
{
ordered: false,
rawResult: true,
}
);

// avoid replacing data if a non-negligible amount is deleted
if (insertedCount / deletedCount <= 0.9) {
throw new Error(
`Deleted ${deletedCount} grade distributions and inserted only ${insertedCount} in batch ${i}; aborting data insertion process`
);
}

totalDeleted += deletedCount;
totalInserted += insertedCount;
totalGradeDistributions += gradeDistributions.length;
});
totalInserted += insertedCount;
} catch (error: any) {
log.error(`Error inserting batch: ${error.message}`);
} finally {
await session.endSession();
}
}

log.info(
`Deleted ${totalDeleted.toLocaleString()} total grade distributions`
);
log.info(
`Completed updating database with ${totalGradeDistributions.toLocaleString()} grade distributions, inserted ${totalInserted.toLocaleString()} documents.`
);
Expand Down
90 changes: 52 additions & 38 deletions apps/datapuller/src/pullers/sections.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { connection } from "mongoose";

import { SectionModel } from "@repo/common";

import { getSections } from "../lib/sections";
Expand Down Expand Up @@ -31,51 +33,63 @@ const updateSections = async (config: Config, termSelector: TermSelector) => {

let totalSections = 0;
let totalInserted = 0;
let totalDeleted = 0;

for (let i = 0; i < terms.length; i += TERMS_PER_API_BATCH) {
const termsBatch = terms.slice(i, i + TERMS_PER_API_BATCH);
const termsBatchIds = termsBatch.map((term) => term.id);

log.trace(
`Fetching sections for term ${termsBatch.map((term) => term.name).toLocaleString()}...`
);

const sections = await getSections(
log,
CLASS_APP_ID,
CLASS_APP_KEY,
termsBatchIds
);

log.info(`Fetched ${sections.length.toLocaleString()} sections.`);
if (sections.length === 0) {
log.error(`No sections found, skipping update.`);
return;
}
totalSections += sections.length;

log.trace("Deleting sections to be replaced...");

const { deletedCount } = await SectionModel.deleteMany({
termId: { $in: termsBatchIds },
});

log.info(`Deleted ${deletedCount.toLocaleString()} sections.`);

// Insert sections in batches of 5000
const insertBatchSize = 5000;
for (let i = 0; i < sections.length; i += insertBatchSize) {
const batch = sections.slice(i, i + insertBatchSize);

log.trace(`Inserting batch ${i / insertBatchSize + 1}...`);

const { insertedCount } = await SectionModel.insertMany(batch, {
ordered: false,
rawResult: true,
const session = await connection.startSession();
try {
await session.withTransaction(async () => {
log.trace(
`Fetching sections for terms ${termsBatch.map((term) => term.name).toLocaleString()}...`
);

const sections = await getSections(
log,
CLASS_APP_ID,
CLASS_APP_KEY,
termsBatchIds
);

log.info(`Fetched ${sections.length.toLocaleString()} sections.`);

if (sections.length === 0) {
throw new Error(`No sections found, skipping update`);
}

log.trace("Deleting sections to be replaced...");

const { deletedCount } = await SectionModel.deleteMany({
termId: { $in: termsBatchIds },
});

log.trace(`Inserting batch ${i}`);

const { insertedCount } = await SectionModel.insertMany(sections, {
ordered: false,
rawResult: true,
});

// avoid replacing data if a non-negligible amount is deleted
if (insertedCount / deletedCount <= 0.9) {
throw new Error(
`Deleted ${deletedCount} sections and inserted only ${insertedCount} in batch ${i}; aborting data insertion process`
);
}

totalDeleted += deletedCount;
totalInserted += insertedCount;
totalSections += sections.length;
});
totalInserted += insertedCount;
} catch (error: any) {
log.error(`Error inserting batch: ${error.message}`);
} finally {
await session.endSession();
}
}

log.info(`Deleted ${totalDeleted.toLocaleString()} total sections`);
log.info(
`Completed updating database with ${totalSections.toLocaleString()} sections, inserted ${totalInserted.toLocaleString()} documents.`
);
Expand Down