diff --git a/.env.example b/.env.example index e7db246..4017b03 100644 --- a/.env.example +++ b/.env.example @@ -33,6 +33,12 @@ DAILY_EVALUATION_TIME=0 1 * * * # Use '*' for development, specify frontend URL for production CORS_ORIGIN=* +# Redis Configuration (for BullMQ Background Jobs) +# Redis server connection details for job queue processing +REDIS_HOST=127.0.0.1 +REDIS_PORT=6379 +REDIS_PASSWORD= +REDIS_DB=0 # Email Configuration (Optional) # SMTP settings for sending emails EMAIL_ENABLED=false diff --git a/.gitignore b/.gitignore index d403f19..0d57483 100644 --- a/.gitignore +++ b/.gitignore @@ -46,6 +46,25 @@ prisma/migrations/ # Temporary files tmp/ temp/ + +# Lock files (use npm ci in CI/CD) +package-lock.json + +# Test files and scripts +test-*.js +setup.ps1 + +# Extra documentation (keep only README.md, CODE_OF_CONDUCT.md, CONTRIBUTING.md) +BACKGROUND_JOBS.md +PR_DOCUMENTATION.md +QUICK_START.md +SETUP_CHECKLIST.md +SETUP_GUIDE.md +*.backup + +# Agent and skills files +.agents/ +skills-lock.json # Agent/Skills files .agents/ -skills-lock.json \ No newline at end of file +skills-lock.json diff --git a/package.json b/package.json index e61537d..15f6513 100644 --- a/package.json +++ b/package.json @@ -29,11 +29,14 @@ "axios": "^1.6.5", "bcryptjs": "^2.4.3", + "bullmq": "^5.70.1", "cors": "^2.8.5", "dotenv": "^16.3.1", "express": "^4.18.2", "express-rate-limit": "^8.2.1", + "express-validator": "^7.3.1", + "ioredis": "^5.9.3", "express-session": "^1.17.3", "express-validator": "^7.3.1", "jsonwebtoken": "^9.0.2", diff --git a/src/config/cron.js b/src/config/cron.js index 2826f4b..bb096ec 100644 --- a/src/config/cron.js +++ b/src/config/cron.js @@ -25,10 +25,10 @@ class CronManager { async () => { logger.info("Starting daily evaluation cron job"); try { - await evaluationService.runDailyEvaluation(); - logger.info("Daily evaluation completed successfully"); + await evaluationService.runDailyEvaluationWithQueue(); // UPDATED: Use queue-based evaluation + logger.info("Daily evaluation jobs queued successfully"); } catch (error) { - logger.error("Daily evaluation failed:", error); + logger.error("Daily evaluation queueing failed:", error); } }, { @@ -150,10 +150,10 @@ class CronManager { async triggerDailyEvaluation() { logger.info("Manually triggering daily evaluation"); try { - await evaluationService.runDailyEvaluation(); - logger.info("Manual daily evaluation completed"); + await evaluationService.runDailyEvaluationWithQueue(); // UPDATED: Use queue-based evaluation + logger.info("Manual daily evaluation jobs queued successfully"); } catch (error) { - logger.error("Manual daily evaluation failed:", error); + logger.error("Manual daily evaluation queueing failed:", error); throw error; } } diff --git a/src/config/env.js b/src/config/env.js index fb1c5d4..b3e994b 100644 --- a/src/config/env.js +++ b/src/config/env.js @@ -49,6 +49,12 @@ const config = { // CORS Configuration corsOrigin: process.env.CORS_ORIGIN || "*", + + // Redis Configuration (for BullMQ) + redisHost: process.env.REDIS_HOST || "127.0.0.1", + redisPort: parseInt(process.env.REDIS_PORT) || 6379, + redisPassword: process.env.REDIS_PASSWORD || undefined, + redisDb: parseInt(process.env.REDIS_DB) || 0, }; // Validate critical environment variables diff --git a/src/config/queue.js b/src/config/queue.js new file mode 100644 index 0000000..7a407ba --- /dev/null +++ b/src/config/queue.js @@ -0,0 +1,77 @@ +const { Queue } = require("bullmq"); +const { config } = require("./env"); +const logger = require("../utils/logger"); + +/** + * Redis connection configuration for BullMQ + */ +const redisConnection = { + host: config.redisHost, + port: config.redisPort, + password: config.redisPassword, + db: config.redisDb, + maxRetriesPerRequest: null, // Required for BullMQ + enableReadyCheck: false, + tls: config.redisHost.includes('upstash.io') ? {} : undefined, // Enable TLS for Upstash +}; + +/** + * Evaluation Queue + * Handles all evaluation-related jobs + */ +const evaluationQueue = new Queue("evaluation", { + connection: redisConnection, + defaultJobOptions: { + attempts: 3, // Retry failed jobs up to 3 times + backoff: { + type: "exponential", + delay: 5000, // Start with 5 seconds, then 10s, 20s, etc. + }, + removeOnComplete: { + age: 86400, // Keep completed jobs for 24 hours + count: 1000, // Keep max 1000 completed jobs + }, + removeOnFail: { + age: 604800, // Keep failed jobs for 7 days + count: 5000, // Keep max 5000 failed jobs + }, + }, +}); + +/** + * Log queue events + */ +evaluationQueue.on("error", (error) => { + logger.error("Evaluation queue error:", error); +}); + +evaluationQueue.on("waiting", (job) => { + logger.debug(`Job ${job.id} is waiting`); +}); + +evaluationQueue.on("active", (job) => { + logger.info(`Job ${job.id} is now active`); +}); + +evaluationQueue.on("completed", (job) => { + logger.info(`Job ${job.id} completed successfully`); +}); + +evaluationQueue.on("failed", (job, error) => { + logger.error(`Job ${job?.id} failed:`, error); +}); + +/** + * Gracefully close the queue + */ +const closeQueue = async () => { + logger.info("Closing evaluation queue..."); + await evaluationQueue.close(); + logger.info("Evaluation queue closed"); +}; + +module.exports = { + evaluationQueue, + closeQueue, + redisConnection, +}; diff --git a/src/server.js b/src/server.js index ff036d7..c75db45 100644 --- a/src/server.js +++ b/src/server.js @@ -2,6 +2,8 @@ const createApp = require("./app"); const { config, validateConfig } = require("./config/env"); const { disconnectPrisma } = require("./config/prisma"); const cronManager = require("./config/cron"); +const { createEvaluationWorker } = require("./workers/evaluation.worker"); +const { closeQueue } = require("./config/queue"); const logger = require("./utils/logger"); /** @@ -24,6 +26,10 @@ const startServer = async () => { // Initialize cron jobs cronManager.initializeCronJobs(); + // Initialize background job worker for evaluations + const evaluationWorker = createEvaluationWorker(); + logger.info("Background job worker initialized"); + // Graceful shutdown handlers const gracefulShutdown = async (signal) => { logger.info(`${signal} signal received: closing HTTP server`); @@ -31,6 +37,13 @@ const startServer = async () => { // Stop cron jobs cronManager.stopAllJobs(); + // Close worker + logger.info("Closing evaluation worker..."); + await evaluationWorker.close(); + + // Close queue + await closeQueue(); + // Close server server.close(async () => { logger.info("HTTP server closed"); diff --git a/src/services/evaluation.service.js b/src/services/evaluation.service.js index 70047ad..46da4ba 100644 --- a/src/services/evaluation.service.js +++ b/src/services/evaluation.service.js @@ -3,10 +3,71 @@ const leetcodeService = require("./leetcode.service"); const penaltyService = require("./penalty.service"); const logger = require("../utils/logger"); const { sendStreakBrokenNotification } = require("./email.service"); +const { evaluationQueue } = require("../config/queue"); /** - * Run daily evaluation for all active challenges - * This is the main function called by the cron job + * Run daily evaluation using background job queue (NEW - Queue-based) + * This pushes evaluation jobs to the queue for async processing + */ +const runDailyEvaluationWithQueue = async () => { + const evaluationDate = new Date(); + evaluationDate.setHours(0, 0, 0, 0); // Start of day + + logger.info( + `Starting queue-based daily evaluation for date: ${evaluationDate.toISOString()}` + ); + + try { + // Get all active challenges + const activeChallenges = await prisma.challenge.findMany({ + where: { + status: "ACTIVE", + startDate: { lte: new Date() }, + endDate: { gte: new Date() }, + }, + select: { + id: true, + name: true, + }, + }); + + logger.info( + `Found ${activeChallenges.length} active challenges to evaluate` + ); + + // Push challenge evaluation jobs to queue + const jobs = activeChallenges.map((challenge) => ({ + name: "challenge-evaluation", + data: { + challengeId: challenge.id, + evaluationDate: evaluationDate.toISOString(), + }, + opts: { + jobId: `challenge-${challenge.id}-${evaluationDate.toISOString()}`, // Prevent duplicate jobs + }, + })); + + await evaluationQueue.addBulk(jobs); + + logger.info( + `Successfully queued ${jobs.length} challenge evaluation jobs. Processing asynchronously...` + ); + + return { + success: true, + challengesQueued: jobs.length, + date: evaluationDate, + }; + } catch (error) { + logger.error("Failed to queue daily evaluation:", error); + throw error; + } +}; + +/** + * Run daily evaluation for all active challenges (OLD - Synchronous) + * This is the legacy synchronous version + * @deprecated Use runDailyEvaluationWithQueue for better performance */ const runDailyEvaluation = async () => { const evaluationDate = new Date(); @@ -429,6 +490,7 @@ const getTodayStatus = async (memberId) => { module.exports = { runDailyEvaluation, + runDailyEvaluationWithQueue, // NEW: Queue-based evaluation evaluateChallenge, evaluateMember, getMemberDailyResults, diff --git a/src/workers/evaluation.worker.js b/src/workers/evaluation.worker.js new file mode 100644 index 0000000..21b88ab --- /dev/null +++ b/src/workers/evaluation.worker.js @@ -0,0 +1,364 @@ +const { Worker } = require("bullmq"); +const { redisConnection } = require("../config/queue"); +const { prisma } = require("../config/prisma"); +const leetcodeService = require("../services/leetcode.service"); +const penaltyService = require("../services/penalty.service"); +const { sendStreakBrokenNotification } = require("../services/email.service"); +const logger = require("../utils/logger"); + +/** + * Process member evaluation job + * This evaluates a single member for a specific date + */ +const processMemberEvaluation = async (job) => { + const { challenge, member, evaluationDate } = job.data; + const user = member.user; + + logger.info( + `Processing evaluation for member: ${user.username} in challenge: ${challenge.name}` + ); + + try { + // Check if user has LeetCode username + if (!user.leetcodeUsername) { + logger.warn(`User ${user.username} doesn't have a LeetCode username set`); + + // Create a failed result + await createDailyResult( + challenge.id, + member.id, + new Date(evaluationDate), + false, + 0, + [], + { + reason: "No LeetCode username configured", + } + ); + + // Apply penalty + await applyPenaltyForFailure( + challenge, + member, + new Date(evaluationDate), + "No LeetCode username configured" + ); + return { success: true, status: "failed", reason: "No LeetCode username" }; + } + + // Fetch submissions for the date + let submissions; + try { + submissions = await leetcodeService.fetchSubmissionsForDate( + user.leetcodeUsername, + new Date(evaluationDate) + ); + } catch (error) { + logger.error( + `Failed to fetch submissions for ${user.leetcodeUsername}:`, + error + ); + + // Create a failed result due to API error + await createDailyResult( + challenge.id, + member.id, + new Date(evaluationDate), + false, + 0, + [], + { + reason: "Failed to fetch submissions from LeetCode", + error: error.message, + } + ); + + // Don't apply penalty for API errors, but throw to retry + throw error; + } + + // Enrich submissions with metadata (difficulty, etc.) + const enrichedSubmissions = + await leetcodeService.enrichSubmissionsWithMetadata(submissions); + + // Filter by difficulty if specified + let filteredSubmissions = enrichedSubmissions; + if (challenge.difficultyFilter && challenge.difficultyFilter.length > 0) { + filteredSubmissions = enrichedSubmissions.filter((sub) => + challenge.difficultyFilter.includes(sub.difficulty) + ); + + logger.debug( + `Filtered ${enrichedSubmissions.length} submissions to ${ + filteredSubmissions.length + } matching difficulties: ${challenge.difficultyFilter.join(", ")}` + ); + } + + // Extract unique problems if constraint is enabled + const problemsSolved = challenge.uniqueProblemConstraint + ? [...new Set(filteredSubmissions.map((s) => s.titleSlug))] + : filteredSubmissions.map((s) => s.titleSlug); + + const submissionsCount = problemsSolved.length; + + // Check if member met the requirement + const completed = submissionsCount >= challenge.minSubmissionsPerDay; + + // Create daily result + await createDailyResult( + challenge.id, + member.id, + new Date(evaluationDate), + completed, + submissionsCount, + problemsSolved, + { + submissions: filteredSubmissions.map((s) => ({ + title: s.title, + titleSlug: s.titleSlug, + difficulty: s.difficulty, + timestamp: s.timestamp, + language: s.language, + })), + } + ); + + // Update streak + await updateStreak(member.id, completed, user, challenge.name); + + // Apply penalty if failed + if (!completed) { + await applyPenaltyForFailure( + challenge, + member, + new Date(evaluationDate), + `Failed to meet daily requirement: ${submissionsCount}/${challenge.minSubmissionsPerDay} submissions` + ); + } + + logger.info( + `Member ${user.username} evaluation: ${ + completed ? "PASSED" : "FAILED" + } (${submissionsCount}/${challenge.minSubmissionsPerDay})` + ); + + return { + success: true, + status: completed ? "passed" : "failed", + submissionsCount, + required: challenge.minSubmissionsPerDay, + }; + } catch (error) { + logger.error( + `Error processing member evaluation for ${user.username}:`, + error + ); + throw error; // Let BullMQ handle the retry + } +}; + +/** + * Process challenge evaluation job + * This creates member evaluation jobs for all members in a challenge + */ +const processChallengeEvaluation = async (job) => { + const { challengeId, evaluationDate } = job.data; + + logger.info( + `Processing challenge evaluation for challenge: ${challengeId} on ${evaluationDate}` + ); + + try { + // Get challenge with members + const challenge = await prisma.challenge.findUnique({ + where: { id: challengeId }, + include: { + members: { + where: { isActive: true }, + include: { + user: { + select: { + id: true, + email: true, + username: true, + leetcodeUsername: true, + }, + }, + }, + }, + }, + }); + + if (!challenge) { + throw new Error(`Challenge ${challengeId} not found`); + } + + logger.info( + `Challenge ${challenge.name} has ${challenge.members.length} active members` + ); + + // Add member evaluation jobs to the queue + const { evaluationQueue } = require("../config/queue"); + + const memberJobs = challenge.members.map((member) => ({ + name: "member-evaluation", + data: { + challenge, + member, + evaluationDate, + }, + opts: { + jobId: `member-${member.id}-${evaluationDate}`, // Prevent duplicate jobs + }, + })); + + await evaluationQueue.addBulk(memberJobs); + + logger.info( + `Added ${memberJobs.length} member evaluation jobs for challenge ${challenge.name}` + ); + + return { + success: true, + challengeName: challenge.name, + membersQueued: memberJobs.length, + }; + } catch (error) { + logger.error(`Error processing challenge evaluation:`, error); + throw error; + } +}; + +/** + * Create a daily result record + */ +const createDailyResult = async ( + challengeId, + memberId, + date, + completed, + submissionsCount, + problemsSolved, + metadata = {} +) => { + return await prisma.dailyResult.create({ + data: { + challengeId, + memberId, + date, + completed, + submissionsCount, + problemsSolved, + evaluatedAt: new Date(), + metadata, + }, + }); +}; + +/** + * Update member's streak based on completion status + */ +const updateStreak = async (memberId, completed, user, challengeName) => { + const member = await prisma.challengeMember.findUnique({ + where: { id: memberId }, + }); + + if (completed) { + // Increment current streak + const newStreak = member.currentStreak + 1; + const newLongest = Math.max(newStreak, member.longestStreak); + + await prisma.challengeMember.update({ + where: { id: memberId }, + data: { + currentStreak: newStreak, + longestStreak: newLongest, + }, + }); + } else { + // Send streak broken notification if they had a streak + if (member.currentStreak > 0 && user && user.email) { + sendStreakBrokenNotification( + user.email, + user.username, + member.currentStreak, + challengeName + ).catch((err) => { + logger.error(`Failed to send streak broken notification: ${err.message}`); + }); + } + + // Reset current streak + await prisma.challengeMember.update({ + where: { id: memberId }, + data: { + currentStreak: 0, + }, + }); + } +}; + +/** + * Apply penalty for failing daily requirement + */ +const applyPenaltyForFailure = async (challenge, member, date, reason) => { + if (challenge.penaltyAmount > 0) { + await penaltyService.applyPenalty( + member.id, + challenge.penaltyAmount, + reason, + date + ); + } +}; + +/** + * Create and start the evaluation worker + */ +const createEvaluationWorker = () => { + const worker = new Worker( + "evaluation", + async (job) => { + logger.info(`Processing job ${job.id} of type: ${job.name}`); + + switch (job.name) { + case "challenge-evaluation": + return await processChallengeEvaluation(job); + case "member-evaluation": + return await processMemberEvaluation(job); + default: + throw new Error(`Unknown job type: ${job.name}`); + } + }, + { + connection: redisConnection, + concurrency: 10, // Process up to 10 jobs simultaneously + limiter: { + max: 20, // Max 20 jobs + duration: 1000, // Per second (rate limiting to prevent API abuse) + }, + } + ); + + // Worker event listeners + worker.on("completed", (job, result) => { + logger.info(`Job ${job.id} completed:`, result); + }); + + worker.on("failed", (job, error) => { + logger.error(`Job ${job?.id} failed:`, error); + }); + + worker.on("error", (error) => { + logger.error("Worker error:", error); + }); + + logger.info("Evaluation worker started with concurrency: 10"); + + return worker; +}; + +module.exports = { + createEvaluationWorker, +}; diff --git a/test-email.js b/test-email.js deleted file mode 100644 index f53f6b9..0000000 --- a/test-email.js +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Email Test Script - * Run: node test-email.js - */ - -require("dotenv").config(); - -const { verifyEmailConfig, sendEmail } = require("./src/config/email"); -const { - sendWelcomeEmail, - sendStreakReminder, - sendStreakBrokenNotification, - sendWeeklySummary -} = require("./src/services/email.service"); - -// Change this to the email you want to send to -const TEST_EMAIL = "25dit007@charusat.edu.in"; // <-- YAHAN DUSRE KA EMAIL DAALO! -const TEST_USERNAME = "HET BHIMANI"; - -async function testEmail() { - console.log("\n๐ง Email Configuration Test\n"); - console.log("=".repeat(50)); - - // Check config - console.log("\n๐ Current Config:"); - console.log(` SMTP Host: ${process.env.SMTP_HOST || "NOT SET"}`); - console.log(` SMTP Port: ${process.env.SMTP_PORT || "NOT SET"}`); - console.log(` SMTP User: ${process.env.SMTP_USER || "NOT SET"}`); - console.log(` SMTP Pass: ${process.env.SMTP_PASS ? "****" : "NOT SET"}`); - console.log(` Email Enabled: ${process.env.EMAIL_ENABLED || "NOT SET"}`); - - // Verify connection - console.log("\n๐ Verifying SMTP Connection..."); - const isVerified = await verifyEmailConfig(); - - if (!isVerified) { - console.log("\nโ SMTP verification failed!"); - console.log("\n๐ก Troubleshooting:"); - console.log(" 1. Check your SMTP credentials in .env"); - console.log(" 2. For Gmail, use App Password (not regular password)"); - console.log(" 3. Make sure 2FA is enabled on your Google account"); - return; - } - - console.log("โ SMTP Connection verified!\n"); - - // Test 1: Simple email - console.log("๐ง Test 1: Sending simple test email..."); - const result1 = await sendEmail({ - to: TEST_EMAIL, - subject: "๐งช Code Duel - Test Email", - html: ` -
If you're reading this, your email configuration is working! ๐
-Sent at: ${new Date().toISOString()}
- `, - }); - - if (result1.success) { - console.log(` โ Simple email sent! Message ID: ${result1.messageId}`); - } else { - console.log(` โ Failed: ${result1.reason}`); - } - - // Test 2: Welcome email template - console.log("\n๐ง Test 2: Sending welcome email template..."); - const result2 = await sendWelcomeEmail(TEST_EMAIL, TEST_USERNAME); - - if (result2.success) { - console.log(` โ Welcome email sent! Message ID: ${result2.messageId}`); - } else { - console.log(` โ Failed: ${result2.reason}`); - } - - // Test 3: Streak Reminder email - console.log("\n๐ง Test 3: Sending streak reminder email..."); - const result3 = await sendStreakReminder(TEST_EMAIL, TEST_USERNAME, 7, "30 Days DSA Challenge"); - - if (result3.success) { - console.log(` โ Streak reminder sent! Message ID: ${result3.messageId}`); - } else { - console.log(` โ Failed: ${result3.reason}`); - } - - // Test 4: Streak Broken notification - console.log("\n๐ง Test 4: Sending streak broken notification..."); - const result4 = await sendStreakBrokenNotification(TEST_EMAIL, TEST_USERNAME, 15, "30 Days DSA Challenge"); - - if (result4.success) { - console.log(` โ Streak broken notification sent! Message ID: ${result4.messageId}`); - } else { - console.log(` โ Failed: ${result4.reason}`); - } - - // Test 5: Weekly Summary - console.log("\n๐ง Test 5: Sending weekly summary..."); - const mockStats = { - weekStart: "Feb 15, 2026", - weekEnd: "Feb 22, 2026", - problemsSolved: 12, - daysCompleted: 5, - currentStreak: 7, - longestStreak: 15, - activeChallenges: [ - { name: "30 Days DSA Challenge", rank: 3, streak: 7, completionRate: 71 }, - { name: "LeetCode Daily", rank: 5, streak: 5, completionRate: 60 }, - ], - }; - const result5 = await sendWeeklySummary(TEST_EMAIL, TEST_USERNAME, mockStats); - - if (result5.success) { - console.log(` โ Weekly summary sent! Message ID: ${result5.messageId}`); - } else { - console.log(` โ Failed: ${result5.reason}`); - } - - console.log("\n" + "=".repeat(50)); - console.log("โจ Test complete! Check your inbox at:", TEST_EMAIL); - console.log(" (Also check spam folder)\n"); -} - -// Run test -testEmail().catch(console.error);