diff --git a/apps/datapuller/src/pullers/classes.ts b/apps/datapuller/src/pullers/classes.ts index f7619a380..cf5c2b82e 100644 --- a/apps/datapuller/src/pullers/classes.ts +++ b/apps/datapuller/src/pullers/classes.ts @@ -1,3 +1,5 @@ +import { connection } from "mongoose"; + import { ClassModel, TermModel } from "@repo/common"; import { warmCatalogCacheForTerms } from "../lib/cache-warming"; @@ -98,53 +100,63 @@ const updateClasses = async (config: Config, termSelector: TermSelector) => { let totalClasses = 0; let totalInserted = 0; + let totalDeleted = 0; + + // Insert classes in batches of 4 terms for (let i = 0; i < terms.length; i += TERMS_PER_API_BATCH) { const termsBatch = terms.slice(i, i + TERMS_PER_API_BATCH); const termsBatchIds = termsBatch.map((term) => term.id); - - log.trace( - `Fetching classes for term ${termsBatch.map((term) => term.name).toLocaleString()}...` - ); - - const classes = await getClasses( - log, - CLASS_APP_ID, - CLASS_APP_KEY, - termsBatchIds - ); - - log.info(`Fetched ${classes.length.toLocaleString()} classes.`); - if (classes.length === 0) { - log.error(`No classes found, skipping update.`); - return; - } - totalClasses += classes.length; - - log.trace("Deleting classes to be replaced..."); - - const { deletedCount } = await ClassModel.deleteMany({ - termId: { $in: termsBatchIds }, - }); - - log.info(`Deleted ${deletedCount.toLocaleString()} classes.`); - - // Insert classes in batches of 5000 - const insertBatchSize = 5000; - for (let i = 0; i < classes.length; i += insertBatchSize) { - const batch = classes.slice(i, i + insertBatchSize); - - log.trace(`Inserting batch ${i / insertBatchSize + 1}...`); - - const { insertedCount } = await ClassModel.insertMany(batch, { - ordered: false, - rawResult: true, + const session = await connection.startSession(); + try { + await session.withTransaction(async () => { + log.trace( + `Fetching classes for terms ${termsBatch.map((term) => term.name).toLocaleString()}...` + ); + const classes = await getClasses( + log, + CLASS_APP_ID, + CLASS_APP_KEY, + termsBatchIds + ); + + if (classes.length === 0) { + throw new Error(`No classes found, skipping update.`); + } + + log.trace(`Deleting classes in batch ${i}...`); + + const { deletedCount } = await ClassModel.deleteMany({ + termId: { $in: termsBatchIds }, + }); + + log.trace(`Inserting batch ${i}`); + + const { insertedCount } = await ClassModel.insertMany(classes, { + ordered: false, + rawResult: true, + }); + + // avoid replacing data if a non-negligible amount is deleted + if (insertedCount / deletedCount <= 0.9) { + throw new Error( + `Deleted ${deletedCount} classes and inserted only ${insertedCount} in batch ${i}; aborting data insertion process` + ); + } + + totalClasses += classes.length; + totalDeleted += deletedCount; + totalInserted += insertedCount; }); - totalInserted += insertedCount; + } catch (error: any) { + log.error(`Error inserting batch: ${error.message}`); + } finally { + await session.endSession(); } } + log.info(`Deleted ${totalDeleted.toLocaleString()} total classes`); log.info( - `Completed updating database with ${totalClasses.toLocaleString()} classes, inserted ${totalInserted.toLocaleString()} documents.` + `Inserted ${totalInserted.toLocaleString()} classes, after fetching ${totalClasses.toLocaleString()} classes.` ); await updateTermsCatalogDataFlags(log); diff --git a/apps/datapuller/src/pullers/courses.ts b/apps/datapuller/src/pullers/courses.ts index 78cf71733..48d186e41 100644 --- a/apps/datapuller/src/pullers/courses.ts +++ b/apps/datapuller/src/pullers/courses.ts @@ -21,6 +21,15 @@ const updateCourses = async (config: Config) => { log.trace("Deleting courses no longer in SIS..."); + const previousCourses = await CourseModel.countDocuments(); + + if (courses.length / previousCourses <= 0.95) { + log.error( + `Fetched only ${courses.length} courses, while there were ${previousCourses} previous courses` + ); + return; + } + const { deletedCount } = await CourseModel.deleteMany({ courseId: { $nin: courses.map((course) => course.courseId) }, }); diff --git a/apps/datapuller/src/pullers/grade-distributions.ts b/apps/datapuller/src/pullers/grade-distributions.ts index 6666b26ef..04b1caf74 100644 --- a/apps/datapuller/src/pullers/grade-distributions.ts +++ b/apps/datapuller/src/pullers/grade-distributions.ts @@ -1,3 +1,5 @@ +import { connection } from "mongoose"; + import { CourseModel, GradeCounts, @@ -14,7 +16,7 @@ import { getRecentPastTerms, } from "../shared/term-selectors"; -const TERMS_PER_API_BATCH = 100; +const TERMS_PER_API_BATCH = 50; interface AggregatedCourseGradeSummary extends GradeCounts { _id: { @@ -141,54 +143,72 @@ const updateGradeDistributions = async ( let totalGradeDistributions = 0; let totalInserted = 0; + let totalDeleted = 0; + + // Insert grade distributions in batches of 50 terms for (let i = 0; i < terms.length; i += TERMS_PER_API_BATCH) { const termsBatch = terms.slice(i, i + TERMS_PER_API_BATCH); const termsBatchIds = termsBatch.map((term) => term.id); - - log.trace( - `Fetching grade distributions for term ${termsBatch.map((term) => term.name).toLocaleString()}...` - ); - - const gradeDistributions = await getGradeDistributionDataByTerms( - DATABASE, - S3_OUTPUT, - REGION_NAME, - WORKGROUP, - termsBatchIds - ); - - log.info( - `Fetched ${gradeDistributions.length.toLocaleString()} grade distributions.` - ); - if (gradeDistributions.length === 0) { - log.error("No grade distributions found, skipping update"); - return; - } - - log.trace("Deleting grade distributions to be replaced..."); - - const { deletedCount } = await GradeDistributionModel.deleteMany({ - termId: { $in: termsBatchIds }, - }); - - log.info(`Deleted ${deletedCount.toLocaleString()} grade distributions.`); - - // Insert grade distributions in batches of 5000 - let totalInserted = 0; - const insertBatchSize = 5000; - for (let i = 0; i < gradeDistributions.length; i += insertBatchSize) { - const batch = gradeDistributions.slice(i, i + insertBatchSize); - - log.trace(`Inserting batch ${i / insertBatchSize + 1}`); - - const { insertedCount } = await GradeDistributionModel.insertMany(batch, { - ordered: false, - rawResult: true, + const session = await connection.startSession(); + try { + await session.withTransaction(async () => { + log.trace( + `Fetching grade distributions for terms ${termsBatch.map((term) => term.name).toLocaleString()}...` + ); + + const gradeDistributions = await getGradeDistributionDataByTerms( + DATABASE, + S3_OUTPUT, + REGION_NAME, + WORKGROUP, + termsBatchIds + ); + + log.info( + `Fetched ${gradeDistributions.length.toLocaleString()} grade distributions.` + ); + + if (gradeDistributions.length === 0) { + throw new Error(`No grade distributions found, skipping update`); + } + + log.trace("Deleting grade distributions to be replaced..."); + + const { deletedCount } = await GradeDistributionModel.deleteMany({ + termId: { $in: termsBatchIds }, + }); + + log.trace(`Inserting batch ${i}`); + + const { insertedCount } = await GradeDistributionModel.insertMany( + gradeDistributions, + { + ordered: false, + rawResult: true, + } + ); + + // avoid replacing data if a non-negligible amount is deleted + if (insertedCount / deletedCount <= 0.9) { + throw new Error( + `Deleted ${deletedCount} grade distributions and inserted only ${insertedCount} in batch ${i}; aborting data insertion process` + ); + } + + totalDeleted += deletedCount; + totalInserted += insertedCount; + totalGradeDistributions += gradeDistributions.length; }); - totalInserted += insertedCount; + } catch (error: any) { + log.error(`Error inserting batch: ${error.message}`); + } finally { + await session.endSession(); } } + log.info( + `Deleted ${totalDeleted.toLocaleString()} total grade distributions` + ); log.info( `Completed updating database with ${totalGradeDistributions.toLocaleString()} grade distributions, inserted ${totalInserted.toLocaleString()} documents.` ); diff --git a/apps/datapuller/src/pullers/sections.ts b/apps/datapuller/src/pullers/sections.ts index 65294d543..15c9525a7 100644 --- a/apps/datapuller/src/pullers/sections.ts +++ b/apps/datapuller/src/pullers/sections.ts @@ -1,3 +1,5 @@ +import { connection } from "mongoose"; + import { SectionModel } from "@repo/common"; import { getSections } from "../lib/sections"; @@ -31,51 +33,63 @@ const updateSections = async (config: Config, termSelector: TermSelector) => { let totalSections = 0; let totalInserted = 0; + let totalDeleted = 0; + for (let i = 0; i < terms.length; i += TERMS_PER_API_BATCH) { const termsBatch = terms.slice(i, i + TERMS_PER_API_BATCH); const termsBatchIds = termsBatch.map((term) => term.id); - - log.trace( - `Fetching sections for term ${termsBatch.map((term) => term.name).toLocaleString()}...` - ); - - const sections = await getSections( - log, - CLASS_APP_ID, - CLASS_APP_KEY, - termsBatchIds - ); - - log.info(`Fetched ${sections.length.toLocaleString()} sections.`); - if (sections.length === 0) { - log.error(`No sections found, skipping update.`); - return; - } - totalSections += sections.length; - - log.trace("Deleting sections to be replaced..."); - - const { deletedCount } = await SectionModel.deleteMany({ - termId: { $in: termsBatchIds }, - }); - - log.info(`Deleted ${deletedCount.toLocaleString()} sections.`); - - // Insert sections in batches of 5000 - const insertBatchSize = 5000; - for (let i = 0; i < sections.length; i += insertBatchSize) { - const batch = sections.slice(i, i + insertBatchSize); - - log.trace(`Inserting batch ${i / insertBatchSize + 1}...`); - - const { insertedCount } = await SectionModel.insertMany(batch, { - ordered: false, - rawResult: true, + const session = await connection.startSession(); + try { + await session.withTransaction(async () => { + log.trace( + `Fetching sections for terms ${termsBatch.map((term) => term.name).toLocaleString()}...` + ); + + const sections = await getSections( + log, + CLASS_APP_ID, + CLASS_APP_KEY, + termsBatchIds + ); + + log.info(`Fetched ${sections.length.toLocaleString()} sections.`); + + if (sections.length === 0) { + throw new Error(`No sections found, skipping update`); + } + + log.trace("Deleting sections to be replaced..."); + + const { deletedCount } = await SectionModel.deleteMany({ + termId: { $in: termsBatchIds }, + }); + + log.trace(`Inserting batch ${i}`); + + const { insertedCount } = await SectionModel.insertMany(sections, { + ordered: false, + rawResult: true, + }); + + // avoid replacing data if a non-negligible amount is deleted + if (insertedCount / deletedCount <= 0.9) { + throw new Error( + `Deleted ${deletedCount} sections and inserted only ${insertedCount} in batch ${i}; aborting data insertion process` + ); + } + + totalDeleted += deletedCount; + totalInserted += insertedCount; + totalSections += sections.length; }); - totalInserted += insertedCount; + } catch (error: any) { + log.error(`Error inserting batch: ${error.message}`); + } finally { + await session.endSession(); } } + log.info(`Deleted ${totalDeleted.toLocaleString()} total sections`); log.info( `Completed updating database with ${totalSections.toLocaleString()} sections, inserted ${totalInserted.toLocaleString()} documents.` );