From 63462bbcd5840984ceea2a80344bd3759a305a2b Mon Sep 17 00:00:00 2001 From: EwwPhysics <74149971+EwwPhysics@users.noreply.github.com> Date: Wed, 22 Oct 2025 00:35:20 -0700 Subject: [PATCH 1/9] Preliminary batching (untested) --- apps/datapuller/src/pullers/classes.ts | 107 ++++++++++++++++++------- 1 file changed, 76 insertions(+), 31 deletions(-) diff --git a/apps/datapuller/src/pullers/classes.ts b/apps/datapuller/src/pullers/classes.ts index e53b4fd11..1fe21a610 100644 --- a/apps/datapuller/src/pullers/classes.ts +++ b/apps/datapuller/src/pullers/classes.ts @@ -9,6 +9,7 @@ import { } from "../shared/term-selectors"; const TERMS_PER_API_BATCH = 4; +const CLASSES_PER_BATCH = 5000; const updateClasses = async ( { log, sis: { CLASS_APP_ID, CLASS_APP_KEY } }: Config, @@ -29,54 +30,98 @@ const updateClasses = async ( let totalClasses = 0; let totalInserted = 0; - for (let i = 0; i < terms.length; i += TERMS_PER_API_BATCH) { - const termsBatch = terms.slice(i, i + TERMS_PER_API_BATCH); - const termsBatchIds = termsBatch.map((term) => term.id); - - log.trace( - `Fetching classes for term ${termsBatch.map((term) => term.name).toLocaleString()}...` - ); - - const classes = await getClasses( + + const termsBatchIds = terms.map((term) => term.id); + log.trace( + `Fetching classes for ${terms.length.toLocaleString()} terms...` + ); + const classes = await getClasses( log, CLASS_APP_ID, CLASS_APP_KEY, termsBatchIds ); - log.info(`Fetched ${classes.length.toLocaleString()} classes.`); - if (!classes) { - log.warn(`No classes found, skipping update.`); - return; - } - totalClasses += classes.length; + log.info(`Fetched ${classes.length.toLocaleString()} classes.`); + if (!classes) { + log.warn(`No classes found, skipping update.`); + return; + } + totalClasses += classes.length; - log.trace("Deleting classes to be replaced..."); + log.trace("Deleting classes to be replaced..."); - const { deletedCount } = await ClassModel.deleteMany({ - termId: { $in: termsBatchIds }, - }); + const { deletedCount } = await ClassModel.deleteMany({ + termId: { $in: termsBatchIds }, + }); - log.info(`Deleted ${deletedCount.toLocaleString()} classes.`); + log.info(`Deleted ${deletedCount.toLocaleString()} classes.`); - // Insert classes in batches of 5000 - const insertBatchSize = 5000; - for (let i = 0; i < classes.length; i += insertBatchSize) { - const batch = classes.slice(i, i + insertBatchSize); + // Insert classes in batches of 5000 + for (let i = 0; i < classes.length; i += CLASSES_PER_BATCH) { + const batch = classes.slice(i, i + CLASSES_PER_BATCH); - log.trace(`Inserting batch ${i / insertBatchSize + 1}...`); + log.trace(`Inserting batch ${i / CLASSES_PER_BATCH + 1}...`); - const { insertedCount } = await ClassModel.insertMany(batch, { - ordered: false, - rawResult: true, - }); - totalInserted += insertedCount; - } + const { insertedCount } = await ClassModel.insertMany(batch, { + ordered: false, + rawResult: true, + }); + totalInserted += insertedCount; } log.info( `Completed updating database with ${totalClasses.toLocaleString()} classes, inserted ${totalInserted.toLocaleString()} documents.` ); + + // for (let i = 0; i < terms.length; i += TERMS_PER_API_BATCH) { + // const termsBatch = terms.slice(i, i + TERMS_PER_API_BATCH); + // const termsBatchIds = termsBatch.map((term) => term.id); + + // log.trace( + // `Fetching classes for term ${termsBatch.map((term) => term.name).toLocaleString()}...` + // ); + + // const classes = await getClasses( + // log, + // CLASS_APP_ID, + // CLASS_APP_KEY, + // termsBatchIds + // ); + + // log.info(`Fetched ${classes.length.toLocaleString()} classes.`); + // if (!classes) { + // log.warn(`No classes found, skipping update.`); + // return; + // } + // totalClasses += classes.length; + + // log.trace("Deleting classes to be replaced..."); + + // const { deletedCount } = await ClassModel.deleteMany({ + // termId: { $in: termsBatchIds }, + // }); + + // log.info(`Deleted ${deletedCount.toLocaleString()} classes.`); + + // // Insert classes in batches of 5000 + // const insertBatchSize = 5000; + // for (let i = 0; i < classes.length; i += insertBatchSize) { + // const batch = classes.slice(i, i + insertBatchSize); + + // log.trace(`Inserting batch ${i / insertBatchSize + 1}...`); + + // const { insertedCount } = await ClassModel.insertMany(batch, { + // ordered: false, + // rawResult: true, + // }); + // totalInserted += insertedCount; + // } + // } + + // log.info( + // `Completed updating database with ${totalClasses.toLocaleString()} classes, inserted ${totalInserted.toLocaleString()} documents.` + // ); }; const activeTerms = async (config: Config) => { From fddacbbc49d09f6c41cd31500936e59154099667 Mon Sep 17 00:00:00 2001 From: EwwPhysics <74149971+EwwPhysics@users.noreply.github.com> Date: Mon, 27 Oct 2025 23:41:59 -0700 Subject: [PATCH 2/9] Prevent class deletion when 0 classes are pulled --- apps/datapuller/src/pullers/classes.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/datapuller/src/pullers/classes.ts b/apps/datapuller/src/pullers/classes.ts index 1fe21a610..f3f0923c8 100644 --- a/apps/datapuller/src/pullers/classes.ts +++ b/apps/datapuller/src/pullers/classes.ts @@ -8,7 +8,7 @@ import { getLastFiveYearsTerms, } from "../shared/term-selectors"; -const TERMS_PER_API_BATCH = 4; +// const TERMS_PER_API_BATCH = 4; const CLASSES_PER_BATCH = 5000; const updateClasses = async ( @@ -43,7 +43,7 @@ const updateClasses = async ( ); log.info(`Fetched ${classes.length.toLocaleString()} classes.`); - if (!classes) { + if (!classes || classes.length == 0) { log.warn(`No classes found, skipping update.`); return; } From 53d4d42b2123081a4cea7b291aeb0bda813e645c Mon Sep 17 00:00:00 2001 From: EwwPhysics <74149971+EwwPhysics@users.noreply.github.com> Date: Mon, 10 Nov 2025 13:52:19 -0800 Subject: [PATCH 3/9] Insert classes with mongodb transactions --- apps/datapuller/src/pullers/classes.ts | 81 +++++++------------------- 1 file changed, 22 insertions(+), 59 deletions(-) diff --git a/apps/datapuller/src/pullers/classes.ts b/apps/datapuller/src/pullers/classes.ts index f3f0923c8..7ea327f60 100644 --- a/apps/datapuller/src/pullers/classes.ts +++ b/apps/datapuller/src/pullers/classes.ts @@ -7,6 +7,7 @@ import { getActiveTerms, getLastFiveYearsTerms, } from "../shared/term-selectors"; +import { connection } from "mongoose"; // const TERMS_PER_API_BATCH = 4; const CLASSES_PER_BATCH = 5000; @@ -59,69 +60,31 @@ const updateClasses = async ( // Insert classes in batches of 5000 for (let i = 0; i < classes.length; i += CLASSES_PER_BATCH) { - const batch = classes.slice(i, i + CLASSES_PER_BATCH); - - log.trace(`Inserting batch ${i / CLASSES_PER_BATCH + 1}...`); - - const { insertedCount } = await ClassModel.insertMany(batch, { - ordered: false, - rawResult: true, - }); - totalInserted += insertedCount; + const session = await connection.startSession(); + try { + await session.withTransaction(async () => { + const batch = classes.slice(i, i + CLASSES_PER_BATCH); + //const terms = classes.filter((class) => class.termId === "2258"); + + log.trace(`Inserting batch ${i / CLASSES_PER_BATCH + 1}...`); + + const { insertedCount } = await ClassModel.insertMany(batch, { + ordered: false, + rawResult: true, + }); + totalInserted += insertedCount; + }) + } catch (error: any) { + log.warn(`Error inserting batch: ${error.message}`); + } finally { + await session.endSession(); + } + } log.info( - `Completed updating database with ${totalClasses.toLocaleString()} classes, inserted ${totalInserted.toLocaleString()} documents.` + `Inserted ${totalInserted.toLocaleString()} classes, after fetching ${totalClasses.toLocaleString()} classes.` ); - - // for (let i = 0; i < terms.length; i += TERMS_PER_API_BATCH) { - // const termsBatch = terms.slice(i, i + TERMS_PER_API_BATCH); - // const termsBatchIds = termsBatch.map((term) => term.id); - - // log.trace( - // `Fetching classes for term ${termsBatch.map((term) => term.name).toLocaleString()}...` - // ); - - // const classes = await getClasses( - // log, - // CLASS_APP_ID, - // CLASS_APP_KEY, - // termsBatchIds - // ); - - // log.info(`Fetched ${classes.length.toLocaleString()} classes.`); - // if (!classes) { - // log.warn(`No classes found, skipping update.`); - // return; - // } - // totalClasses += classes.length; - - // log.trace("Deleting classes to be replaced..."); - - // const { deletedCount } = await ClassModel.deleteMany({ - // termId: { $in: termsBatchIds }, - // }); - - // log.info(`Deleted ${deletedCount.toLocaleString()} classes.`); - - // // Insert classes in batches of 5000 - // const insertBatchSize = 5000; - // for (let i = 0; i < classes.length; i += insertBatchSize) { - // const batch = classes.slice(i, i + insertBatchSize); - - // log.trace(`Inserting batch ${i / insertBatchSize + 1}...`); - - // const { insertedCount } = await ClassModel.insertMany(batch, { - // ordered: false, - // rawResult: true, - // }); - // totalInserted += insertedCount; - // } - // } - - // log.info( - // `Completed updating database with ${totalClasses.toLocaleString()} classes, inserted ${totalInserted.toLocaleString()} documents.` - // ); }; const activeTerms = async (config: Config) => { From fe938771d91ee967074c85dc62f0093901e18e42 Mon Sep 17 00:00:00 2001 From: EwwPhysics <74149971+EwwPhysics@users.noreply.github.com> Date: Mon, 10 Nov 2025 14:27:00 -0800 Subject: [PATCH 4/9] Batch by termId + move deletion into mongodb transaction --- apps/datapuller/src/pullers/classes.ts | 52 +++++++++++++------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/apps/datapuller/src/pullers/classes.ts b/apps/datapuller/src/pullers/classes.ts index 7ea327f60..33068dc6b 100644 --- a/apps/datapuller/src/pullers/classes.ts +++ b/apps/datapuller/src/pullers/classes.ts @@ -1,3 +1,5 @@ +import { connection } from "mongoose"; + import { ClassModel } from "@repo/common"; import { getClasses } from "../lib/classes"; @@ -7,10 +9,6 @@ import { getActiveTerms, getLastFiveYearsTerms, } from "../shared/term-selectors"; -import { connection } from "mongoose"; - -// const TERMS_PER_API_BATCH = 4; -const CLASSES_PER_BATCH = 5000; const updateClasses = async ( { log, sis: { CLASS_APP_ID, CLASS_APP_KEY } }: Config, @@ -31,17 +29,16 @@ const updateClasses = async ( let totalClasses = 0; let totalInserted = 0; - + let totalDeleted = 0; + const termsBatchIds = terms.map((term) => term.id); - log.trace( - `Fetching classes for ${terms.length.toLocaleString()} terms...` - ); + log.trace(`Fetching classes for ${terms.length.toLocaleString()} terms...`); const classes = await getClasses( - log, - CLASS_APP_ID, - CLASS_APP_KEY, - termsBatchIds - ); + log, + CLASS_APP_ID, + CLASS_APP_KEY, + termsBatchIds + ); log.info(`Fetched ${classes.length.toLocaleString()} classes.`); if (!classes || classes.length == 0) { @@ -50,38 +47,39 @@ const updateClasses = async ( } totalClasses += classes.length; - log.trace("Deleting classes to be replaced..."); - - const { deletedCount } = await ClassModel.deleteMany({ - termId: { $in: termsBatchIds }, - }); - - log.info(`Deleted ${deletedCount.toLocaleString()} classes.`); - // Insert classes in batches of 5000 - for (let i = 0; i < classes.length; i += CLASSES_PER_BATCH) { + for (let i = 0; i < termsBatchIds.length; i += 1) { const session = await connection.startSession(); + let currentTerm = termsBatchIds[i]; try { await session.withTransaction(async () => { - const batch = classes.slice(i, i + CLASSES_PER_BATCH); - //const terms = classes.filter((class) => class.termId === "2258"); + // const batch = classes.slice(i, i + CLASSES_PER_BATCH); + const batch = classes.filter((x) => x.termId == currentTerm); - log.trace(`Inserting batch ${i / CLASSES_PER_BATCH + 1}...`); + log.trace(`Deleting classes to be replaced in term ${currentTerm}...`); + + const { deletedCount } = await ClassModel.deleteMany({ + termId: currentTerm, + }); + + log.trace(`Inserting term ${currentTerm}...`); const { insertedCount } = await ClassModel.insertMany(batch, { ordered: false, rawResult: true, }); + + totalDeleted += deletedCount; totalInserted += insertedCount; - }) + }); } catch (error: any) { log.warn(`Error inserting batch: ${error.message}`); } finally { await session.endSession(); } - } + log.info(`Deleted ${totalDeleted.toLocaleString()} classes`); log.info( `Inserted ${totalInserted.toLocaleString()} classes, after fetching ${totalClasses.toLocaleString()} classes.` ); From 02c1156679fee692e4c9fe920e4b53325a0b88d9 Mon Sep 17 00:00:00 2001 From: EwwPhysics <74149971+EwwPhysics@users.noreply.github.com> Date: Wed, 22 Oct 2025 00:35:20 -0700 Subject: [PATCH 5/9] Preliminary batching (untested) --- apps/datapuller/src/pullers/classes.ts | 62 ++++++++++++-------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/apps/datapuller/src/pullers/classes.ts b/apps/datapuller/src/pullers/classes.ts index f7619a380..bde6e3761 100644 --- a/apps/datapuller/src/pullers/classes.ts +++ b/apps/datapuller/src/pullers/classes.ts @@ -9,7 +9,8 @@ import { getLastFiveYearsTerms, } from "../shared/term-selectors"; -const TERMS_PER_API_BATCH = 4; +//const TERMS_PER_API_BATCH = 4; +const CLASSES_PER_BATCH = 5000; // Note: We scan ALL terms in the database rather than just the affected ones. // This is intentional to ensure data consistency, as class data may be modified @@ -98,49 +99,44 @@ const updateClasses = async (config: Config, termSelector: TermSelector) => { let totalClasses = 0; let totalInserted = 0; - for (let i = 0; i < terms.length; i += TERMS_PER_API_BATCH) { - const termsBatch = terms.slice(i, i + TERMS_PER_API_BATCH); - const termsBatchIds = termsBatch.map((term) => term.id); - - log.trace( - `Fetching classes for term ${termsBatch.map((term) => term.name).toLocaleString()}...` - ); - - const classes = await getClasses( + + const termsBatchIds = terms.map((term) => term.id); + log.trace( + `Fetching classes for ${terms.length.toLocaleString()} terms...` + ); + const classes = await getClasses( log, CLASS_APP_ID, CLASS_APP_KEY, termsBatchIds - ); + ); - log.info(`Fetched ${classes.length.toLocaleString()} classes.`); - if (classes.length === 0) { - log.error(`No classes found, skipping update.`); - return; - } - totalClasses += classes.length; + log.info(`Fetched ${classes.length.toLocaleString()} classes.`); + if (!classes || classes.length === 0) { + log.error(`No classes found, skipping update.`); + return; + } + totalClasses += classes.length; - log.trace("Deleting classes to be replaced..."); + log.trace("Deleting classes to be replaced..."); - const { deletedCount } = await ClassModel.deleteMany({ - termId: { $in: termsBatchIds }, - }); + const { deletedCount } = await ClassModel.deleteMany({ + termId: { $in: termsBatchIds }, + }); - log.info(`Deleted ${deletedCount.toLocaleString()} classes.`); + log.info(`Deleted ${deletedCount.toLocaleString()} classes.`); - // Insert classes in batches of 5000 - const insertBatchSize = 5000; - for (let i = 0; i < classes.length; i += insertBatchSize) { - const batch = classes.slice(i, i + insertBatchSize); + // Insert classes in batches of 5000 + for (let i = 0; i < classes.length; i += CLASSES_PER_BATCH) { + const batch = classes.slice(i, i + CLASSES_PER_BATCH); - log.trace(`Inserting batch ${i / insertBatchSize + 1}...`); + log.trace(`Inserting batch ${i / CLASSES_PER_BATCH + 1}...`); - const { insertedCount } = await ClassModel.insertMany(batch, { - ordered: false, - rawResult: true, - }); - totalInserted += insertedCount; - } + const { insertedCount } = await ClassModel.insertMany(batch, { + ordered: false, + rawResult: true, + }); + totalInserted += insertedCount; } log.info( From 5e037a2684490f0dbbc78a71e6bbe76f088742c3 Mon Sep 17 00:00:00 2001 From: EwwPhysics <74149971+EwwPhysics@users.noreply.github.com> Date: Mon, 10 Nov 2025 13:52:19 -0800 Subject: [PATCH 6/9] Insert classes with mongodb transactions --- apps/datapuller/src/pullers/classes.ts | 32 ++++++++++++++++++-------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/apps/datapuller/src/pullers/classes.ts b/apps/datapuller/src/pullers/classes.ts index bde6e3761..b0412e210 100644 --- a/apps/datapuller/src/pullers/classes.ts +++ b/apps/datapuller/src/pullers/classes.ts @@ -8,6 +8,7 @@ import { getActiveTerms, getLastFiveYearsTerms, } from "../shared/term-selectors"; +import { connection } from "mongoose"; //const TERMS_PER_API_BATCH = 4; const CLASSES_PER_BATCH = 5000; @@ -128,19 +129,30 @@ const updateClasses = async (config: Config, termSelector: TermSelector) => { // Insert classes in batches of 5000 for (let i = 0; i < classes.length; i += CLASSES_PER_BATCH) { - const batch = classes.slice(i, i + CLASSES_PER_BATCH); - - log.trace(`Inserting batch ${i / CLASSES_PER_BATCH + 1}...`); - - const { insertedCount } = await ClassModel.insertMany(batch, { - ordered: false, - rawResult: true, - }); - totalInserted += insertedCount; + const session = await connection.startSession(); + try { + await session.withTransaction(async () => { + const batch = classes.slice(i, i + CLASSES_PER_BATCH); + //const terms = classes.filter((class) => class.termId === "2258"); + + log.trace(`Inserting batch ${i / CLASSES_PER_BATCH + 1}...`); + + const { insertedCount } = await ClassModel.insertMany(batch, { + ordered: false, + rawResult: true, + }); + totalInserted += insertedCount; + }) + } catch (error: any) { + log.warn(`Error inserting batch: ${error.message}`); + } finally { + await session.endSession(); + } + } log.info( - `Completed updating database with ${totalClasses.toLocaleString()} classes, inserted ${totalInserted.toLocaleString()} documents.` + `Inserted ${totalInserted.toLocaleString()} classes, after fetching ${totalClasses.toLocaleString()} classes.` ); await updateTermsCatalogDataFlags(log); From e4d47fef6130a3190a9f584efe5dd6c8c9211ee8 Mon Sep 17 00:00:00 2001 From: EwwPhysics <74149971+EwwPhysics@users.noreply.github.com> Date: Mon, 10 Nov 2025 14:27:00 -0800 Subject: [PATCH 7/9] Batch by termId + move deletion into mongodb transaction --- apps/datapuller/src/pullers/classes.ts | 46 +++++++++++++------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/apps/datapuller/src/pullers/classes.ts b/apps/datapuller/src/pullers/classes.ts index b0412e210..334f5de6f 100644 --- a/apps/datapuller/src/pullers/classes.ts +++ b/apps/datapuller/src/pullers/classes.ts @@ -11,7 +11,7 @@ import { import { connection } from "mongoose"; //const TERMS_PER_API_BATCH = 4; -const CLASSES_PER_BATCH = 5000; +//const CLASSES_PER_BATCH = 5000; // Note: We scan ALL terms in the database rather than just the affected ones. // This is intentional to ensure data consistency, as class data may be modified @@ -100,16 +100,15 @@ const updateClasses = async (config: Config, termSelector: TermSelector) => { let totalClasses = 0; let totalInserted = 0; - + let totalDeleted = 0; + const termsBatchIds = terms.map((term) => term.id); - log.trace( - `Fetching classes for ${terms.length.toLocaleString()} terms...` - ); + log.trace(`Fetching classes for ${terms.length.toLocaleString()} terms...`); const classes = await getClasses( - log, - CLASS_APP_ID, - CLASS_APP_KEY, - termsBatchIds + log, + CLASS_APP_ID, + CLASS_APP_KEY, + termsBatchIds ); log.info(`Fetched ${classes.length.toLocaleString()} classes.`); @@ -119,38 +118,39 @@ const updateClasses = async (config: Config, termSelector: TermSelector) => { } totalClasses += classes.length; - log.trace("Deleting classes to be replaced..."); - - const { deletedCount } = await ClassModel.deleteMany({ - termId: { $in: termsBatchIds }, - }); - - log.info(`Deleted ${deletedCount.toLocaleString()} classes.`); - // Insert classes in batches of 5000 - for (let i = 0; i < classes.length; i += CLASSES_PER_BATCH) { + for (let i = 0; i < termsBatchIds.length; i += 1) { const session = await connection.startSession(); + let currentTerm = termsBatchIds[i]; try { await session.withTransaction(async () => { - const batch = classes.slice(i, i + CLASSES_PER_BATCH); - //const terms = classes.filter((class) => class.termId === "2258"); + // const batch = classes.slice(i, i + CLASSES_PER_BATCH); + const batch = classes.filter((x) => x.termId == currentTerm); + + log.trace(`Deleting classes to be replaced in term ${currentTerm}...`); - log.trace(`Inserting batch ${i / CLASSES_PER_BATCH + 1}...`); + const { deletedCount } = await ClassModel.deleteMany({ + termId: currentTerm, + }); + + log.trace(`Inserting term ${currentTerm}...`); const { insertedCount } = await ClassModel.insertMany(batch, { ordered: false, rawResult: true, }); + + totalDeleted += deletedCount; totalInserted += insertedCount; - }) + }); } catch (error: any) { log.warn(`Error inserting batch: ${error.message}`); } finally { await session.endSession(); } - } + log.info(`Deleted ${totalDeleted.toLocaleString()} classes`); log.info( `Inserted ${totalInserted.toLocaleString()} classes, after fetching ${totalClasses.toLocaleString()} classes.` ); From 14d2935b97558f722563fc8cfcc2da00447f1d9e Mon Sep 17 00:00:00 2001 From: EwwPhysics <74149971+EwwPhysics@users.noreply.github.com> Date: Thu, 20 Nov 2025 00:17:48 -0800 Subject: [PATCH 8/9] Abort class datapuller if <90% of data is fetched --- apps/datapuller/src/pullers/classes.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/datapuller/src/pullers/classes.ts b/apps/datapuller/src/pullers/classes.ts index 334f5de6f..8681eedd7 100644 --- a/apps/datapuller/src/pullers/classes.ts +++ b/apps/datapuller/src/pullers/classes.ts @@ -10,9 +10,6 @@ import { } from "../shared/term-selectors"; import { connection } from "mongoose"; -//const TERMS_PER_API_BATCH = 4; -//const CLASSES_PER_BATCH = 5000; - // Note: We scan ALL terms in the database rather than just the affected ones. // This is intentional to ensure data consistency, as class data may be modified // by sources other than the datapuller. Since the query is fast, the overhead @@ -127,7 +124,7 @@ const updateClasses = async (config: Config, termSelector: TermSelector) => { // const batch = classes.slice(i, i + CLASSES_PER_BATCH); const batch = classes.filter((x) => x.termId == currentTerm); - log.trace(`Deleting classes to be replaced in term ${currentTerm}...`); + log.trace(`Deleting classes in term ${currentTerm}...`); const { deletedCount } = await ClassModel.deleteMany({ termId: currentTerm, @@ -140,6 +137,11 @@ const updateClasses = async (config: Config, termSelector: TermSelector) => { rawResult: true, }); + // avoid replacing data if a non-negligible amount is deleted + if (insertedCount / deletedCount <= 0.90) { + throw new Error(`Deleted ${deletedCount} classes and inserted only ${insertedCount} in term ${currentTerm}; aborting data insertion process`); + } + totalDeleted += deletedCount; totalInserted += insertedCount; }); From e9ba17116fecfe22e28a1ca16d2b4a92e7dfbbec Mon Sep 17 00:00:00 2001 From: EwwPhysics <74149971+EwwPhysics@users.noreply.github.com> Date: Wed, 3 Dec 2025 14:05:32 -0800 Subject: [PATCH 9/9] Update fault tolerance of datapullers Use mongodb transactions, aborting if we insert less than 95% of the previous data. Add more log messages to show how much data was deleted and inserted. --- apps/datapuller/src/pullers/classes.ts | 55 +++++---- apps/datapuller/src/pullers/courses.ts | 9 ++ .../src/pullers/grade-distributions.ts | 104 +++++++++++------- apps/datapuller/src/pullers/sections.ts | 90 ++++++++------- 4 files changed, 150 insertions(+), 108 deletions(-) diff --git a/apps/datapuller/src/pullers/classes.ts b/apps/datapuller/src/pullers/classes.ts index a98bc3227..cf5c2b82e 100644 --- a/apps/datapuller/src/pullers/classes.ts +++ b/apps/datapuller/src/pullers/classes.ts @@ -11,6 +11,8 @@ import { getLastFiveYearsTerms, } from "../shared/term-selectors"; +const TERMS_PER_API_BATCH = 4; + // Note: We scan ALL terms in the database rather than just the affected ones. // This is intentional to ensure data consistency, as class data may be modified // by sources other than the datapuller. Since the query is fast, the overhead @@ -100,40 +102,36 @@ const updateClasses = async (config: Config, termSelector: TermSelector) => { let totalInserted = 0; let totalDeleted = 0; - const termsBatchIds = terms.map((term) => term.id); - log.trace(`Fetching classes for ${terms.length.toLocaleString()} terms...`); - const classes = await getClasses( - log, - CLASS_APP_ID, - CLASS_APP_KEY, - termsBatchIds - ); - - log.info(`Fetched ${classes.length.toLocaleString()} classes.`); - if (!classes || classes.length === 0) { - log.error(`No classes found, skipping update.`); - return; - } - totalClasses += classes.length; - - // Insert classes in batches of 5000 - for (let i = 0; i < termsBatchIds.length; i += 1) { + // Insert classes in batches of 4 terms + for (let i = 0; i < terms.length; i += TERMS_PER_API_BATCH) { + const termsBatch = terms.slice(i, i + TERMS_PER_API_BATCH); + const termsBatchIds = termsBatch.map((term) => term.id); const session = await connection.startSession(); - let currentTerm = termsBatchIds[i]; try { await session.withTransaction(async () => { - // const batch = classes.slice(i, i + CLASSES_PER_BATCH); - const batch = classes.filter((x) => x.termId == currentTerm); + log.trace( + `Fetching classes for terms ${termsBatch.map((term) => term.name).toLocaleString()}...` + ); + const classes = await getClasses( + log, + CLASS_APP_ID, + CLASS_APP_KEY, + termsBatchIds + ); + + if (classes.length === 0) { + throw new Error(`No classes found, skipping update.`); + } - log.trace(`Deleting classes in term ${currentTerm}...`); + log.trace(`Deleting classes in batch ${i}...`); const { deletedCount } = await ClassModel.deleteMany({ - termId: currentTerm, + termId: { $in: termsBatchIds }, }); - log.trace(`Inserting term ${currentTerm}...`); + log.trace(`Inserting batch ${i}`); - const { insertedCount } = await ClassModel.insertMany(batch, { + const { insertedCount } = await ClassModel.insertMany(classes, { ordered: false, rawResult: true, }); @@ -141,21 +139,22 @@ const updateClasses = async (config: Config, termSelector: TermSelector) => { // avoid replacing data if a non-negligible amount is deleted if (insertedCount / deletedCount <= 0.9) { throw new Error( - `Deleted ${deletedCount} classes and inserted only ${insertedCount} in term ${currentTerm}; aborting data insertion process` + `Deleted ${deletedCount} classes and inserted only ${insertedCount} in batch ${i}; aborting data insertion process` ); } + totalClasses += classes.length; totalDeleted += deletedCount; totalInserted += insertedCount; }); } catch (error: any) { - log.warn(`Error inserting batch: ${error.message}`); + log.error(`Error inserting batch: ${error.message}`); } finally { await session.endSession(); } } - log.info(`Deleted ${totalDeleted.toLocaleString()} classes`); + log.info(`Deleted ${totalDeleted.toLocaleString()} total classes`); log.info( `Inserted ${totalInserted.toLocaleString()} classes, after fetching ${totalClasses.toLocaleString()} classes.` ); diff --git a/apps/datapuller/src/pullers/courses.ts b/apps/datapuller/src/pullers/courses.ts index 78cf71733..48d186e41 100644 --- a/apps/datapuller/src/pullers/courses.ts +++ b/apps/datapuller/src/pullers/courses.ts @@ -21,6 +21,15 @@ const updateCourses = async (config: Config) => { log.trace("Deleting courses no longer in SIS..."); + const previousCourses = await CourseModel.countDocuments(); + + if (courses.length / previousCourses <= 0.95) { + log.error( + `Fetched only ${courses.length} courses, while there were ${previousCourses} previous courses` + ); + return; + } + const { deletedCount } = await CourseModel.deleteMany({ courseId: { $nin: courses.map((course) => course.courseId) }, }); diff --git a/apps/datapuller/src/pullers/grade-distributions.ts b/apps/datapuller/src/pullers/grade-distributions.ts index 6666b26ef..04b1caf74 100644 --- a/apps/datapuller/src/pullers/grade-distributions.ts +++ b/apps/datapuller/src/pullers/grade-distributions.ts @@ -1,3 +1,5 @@ +import { connection } from "mongoose"; + import { CourseModel, GradeCounts, @@ -14,7 +16,7 @@ import { getRecentPastTerms, } from "../shared/term-selectors"; -const TERMS_PER_API_BATCH = 100; +const TERMS_PER_API_BATCH = 50; interface AggregatedCourseGradeSummary extends GradeCounts { _id: { @@ -141,54 +143,72 @@ const updateGradeDistributions = async ( let totalGradeDistributions = 0; let totalInserted = 0; + let totalDeleted = 0; + + // Insert grade distributions in batches of 50 terms for (let i = 0; i < terms.length; i += TERMS_PER_API_BATCH) { const termsBatch = terms.slice(i, i + TERMS_PER_API_BATCH); const termsBatchIds = termsBatch.map((term) => term.id); - - log.trace( - `Fetching grade distributions for term ${termsBatch.map((term) => term.name).toLocaleString()}...` - ); - - const gradeDistributions = await getGradeDistributionDataByTerms( - DATABASE, - S3_OUTPUT, - REGION_NAME, - WORKGROUP, - termsBatchIds - ); - - log.info( - `Fetched ${gradeDistributions.length.toLocaleString()} grade distributions.` - ); - if (gradeDistributions.length === 0) { - log.error("No grade distributions found, skipping update"); - return; - } - - log.trace("Deleting grade distributions to be replaced..."); - - const { deletedCount } = await GradeDistributionModel.deleteMany({ - termId: { $in: termsBatchIds }, - }); - - log.info(`Deleted ${deletedCount.toLocaleString()} grade distributions.`); - - // Insert grade distributions in batches of 5000 - let totalInserted = 0; - const insertBatchSize = 5000; - for (let i = 0; i < gradeDistributions.length; i += insertBatchSize) { - const batch = gradeDistributions.slice(i, i + insertBatchSize); - - log.trace(`Inserting batch ${i / insertBatchSize + 1}`); - - const { insertedCount } = await GradeDistributionModel.insertMany(batch, { - ordered: false, - rawResult: true, + const session = await connection.startSession(); + try { + await session.withTransaction(async () => { + log.trace( + `Fetching grade distributions for terms ${termsBatch.map((term) => term.name).toLocaleString()}...` + ); + + const gradeDistributions = await getGradeDistributionDataByTerms( + DATABASE, + S3_OUTPUT, + REGION_NAME, + WORKGROUP, + termsBatchIds + ); + + log.info( + `Fetched ${gradeDistributions.length.toLocaleString()} grade distributions.` + ); + + if (gradeDistributions.length === 0) { + throw new Error(`No grade distributions found, skipping update`); + } + + log.trace("Deleting grade distributions to be replaced..."); + + const { deletedCount } = await GradeDistributionModel.deleteMany({ + termId: { $in: termsBatchIds }, + }); + + log.trace(`Inserting batch ${i}`); + + const { insertedCount } = await GradeDistributionModel.insertMany( + gradeDistributions, + { + ordered: false, + rawResult: true, + } + ); + + // avoid replacing data if a non-negligible amount is deleted + if (insertedCount / deletedCount <= 0.9) { + throw new Error( + `Deleted ${deletedCount} grade distributions and inserted only ${insertedCount} in batch ${i}; aborting data insertion process` + ); + } + + totalDeleted += deletedCount; + totalInserted += insertedCount; + totalGradeDistributions += gradeDistributions.length; }); - totalInserted += insertedCount; + } catch (error: any) { + log.error(`Error inserting batch: ${error.message}`); + } finally { + await session.endSession(); } } + log.info( + `Deleted ${totalDeleted.toLocaleString()} total grade distributions` + ); log.info( `Completed updating database with ${totalGradeDistributions.toLocaleString()} grade distributions, inserted ${totalInserted.toLocaleString()} documents.` ); diff --git a/apps/datapuller/src/pullers/sections.ts b/apps/datapuller/src/pullers/sections.ts index 65294d543..15c9525a7 100644 --- a/apps/datapuller/src/pullers/sections.ts +++ b/apps/datapuller/src/pullers/sections.ts @@ -1,3 +1,5 @@ +import { connection } from "mongoose"; + import { SectionModel } from "@repo/common"; import { getSections } from "../lib/sections"; @@ -31,51 +33,63 @@ const updateSections = async (config: Config, termSelector: TermSelector) => { let totalSections = 0; let totalInserted = 0; + let totalDeleted = 0; + for (let i = 0; i < terms.length; i += TERMS_PER_API_BATCH) { const termsBatch = terms.slice(i, i + TERMS_PER_API_BATCH); const termsBatchIds = termsBatch.map((term) => term.id); - - log.trace( - `Fetching sections for term ${termsBatch.map((term) => term.name).toLocaleString()}...` - ); - - const sections = await getSections( - log, - CLASS_APP_ID, - CLASS_APP_KEY, - termsBatchIds - ); - - log.info(`Fetched ${sections.length.toLocaleString()} sections.`); - if (sections.length === 0) { - log.error(`No sections found, skipping update.`); - return; - } - totalSections += sections.length; - - log.trace("Deleting sections to be replaced..."); - - const { deletedCount } = await SectionModel.deleteMany({ - termId: { $in: termsBatchIds }, - }); - - log.info(`Deleted ${deletedCount.toLocaleString()} sections.`); - - // Insert sections in batches of 5000 - const insertBatchSize = 5000; - for (let i = 0; i < sections.length; i += insertBatchSize) { - const batch = sections.slice(i, i + insertBatchSize); - - log.trace(`Inserting batch ${i / insertBatchSize + 1}...`); - - const { insertedCount } = await SectionModel.insertMany(batch, { - ordered: false, - rawResult: true, + const session = await connection.startSession(); + try { + await session.withTransaction(async () => { + log.trace( + `Fetching sections for terms ${termsBatch.map((term) => term.name).toLocaleString()}...` + ); + + const sections = await getSections( + log, + CLASS_APP_ID, + CLASS_APP_KEY, + termsBatchIds + ); + + log.info(`Fetched ${sections.length.toLocaleString()} sections.`); + + if (sections.length === 0) { + throw new Error(`No sections found, skipping update`); + } + + log.trace("Deleting sections to be replaced..."); + + const { deletedCount } = await SectionModel.deleteMany({ + termId: { $in: termsBatchIds }, + }); + + log.trace(`Inserting batch ${i}`); + + const { insertedCount } = await SectionModel.insertMany(sections, { + ordered: false, + rawResult: true, + }); + + // avoid replacing data if a non-negligible amount is deleted + if (insertedCount / deletedCount <= 0.9) { + throw new Error( + `Deleted ${deletedCount} sections and inserted only ${insertedCount} in batch ${i}; aborting data insertion process` + ); + } + + totalDeleted += deletedCount; + totalInserted += insertedCount; + totalSections += sections.length; }); - totalInserted += insertedCount; + } catch (error: any) { + log.error(`Error inserting batch: ${error.message}`); + } finally { + await session.endSession(); } } + log.info(`Deleted ${totalDeleted.toLocaleString()} total sections`); log.info( `Completed updating database with ${totalSections.toLocaleString()} sections, inserted ${totalInserted.toLocaleString()} documents.` );