Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ DAILY_EVALUATION_TIME=0 1 * * *
# Use '*' for development, specify frontend URL for production
CORS_ORIGIN=*

# Redis Configuration (for BullMQ Background Jobs)
# Redis server connection details for job queue processing
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
# Email Configuration (Optional)
# SMTP settings for sending emails
EMAIL_ENABLED=false
Expand Down
21 changes: 20 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,25 @@ prisma/migrations/
# Temporary files
tmp/
temp/

# Lock files (use npm ci in CI/CD)
package-lock.json

# Test files and scripts
test-*.js
setup.ps1

# Extra documentation (keep only README.md, CODE_OF_CONDUCT.md, CONTRIBUTING.md)
BACKGROUND_JOBS.md
PR_DOCUMENTATION.md
QUICK_START.md
SETUP_CHECKLIST.md
SETUP_GUIDE.md
*.backup

# Agent and skills files
.agents/
skills-lock.json
# Agent/Skills files
.agents/
skills-lock.json
skills-lock.json
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@

"axios": "^1.6.5",
"bcryptjs": "^2.4.3",
"bullmq": "^5.70.1",
"cors": "^2.8.5",
"dotenv": "^16.3.1",
"express": "^4.18.2",

"express-rate-limit": "^8.2.1",
"express-validator": "^7.3.1",
"ioredis": "^5.9.3",
"express-session": "^1.17.3",
"express-validator": "^7.3.1",
"jsonwebtoken": "^9.0.2",
Expand Down
12 changes: 6 additions & 6 deletions src/config/cron.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class CronManager {
async () => {
logger.info("Starting daily evaluation cron job");
try {
await evaluationService.runDailyEvaluation();
logger.info("Daily evaluation completed successfully");
await evaluationService.runDailyEvaluationWithQueue(); // UPDATED: Use queue-based evaluation
logger.info("Daily evaluation jobs queued successfully");
} catch (error) {
logger.error("Daily evaluation failed:", error);
logger.error("Daily evaluation queueing failed:", error);
}
},
{
Expand Down Expand Up @@ -150,10 +150,10 @@ class CronManager {
async triggerDailyEvaluation() {
logger.info("Manually triggering daily evaluation");
try {
await evaluationService.runDailyEvaluation();
logger.info("Manual daily evaluation completed");
await evaluationService.runDailyEvaluationWithQueue(); // UPDATED: Use queue-based evaluation
logger.info("Manual daily evaluation jobs queued successfully");
} catch (error) {
logger.error("Manual daily evaluation failed:", error);
logger.error("Manual daily evaluation queueing failed:", error);
throw error;
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/config/env.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ const config = {

// CORS Configuration
corsOrigin: process.env.CORS_ORIGIN || "*",

// Redis Configuration (for BullMQ)
redisHost: process.env.REDIS_HOST || "127.0.0.1",
redisPort: parseInt(process.env.REDIS_PORT) || 6379,
redisPassword: process.env.REDIS_PASSWORD || undefined,
redisDb: parseInt(process.env.REDIS_DB) || 0,
};

// Validate critical environment variables
Expand Down
77 changes: 77 additions & 0 deletions src/config/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
const { Queue } = require("bullmq");
const { config } = require("./env");
const logger = require("../utils/logger");

/**
* Redis connection configuration for BullMQ
*/
const redisConnection = {
host: config.redisHost,
port: config.redisPort,
password: config.redisPassword,
db: config.redisDb,
maxRetriesPerRequest: null, // Required for BullMQ
enableReadyCheck: false,
tls: config.redisHost.includes('upstash.io') ? {} : undefined, // Enable TLS for Upstash
};

/**
* Evaluation Queue
* Handles all evaluation-related jobs
*/
const evaluationQueue = new Queue("evaluation", {
connection: redisConnection,
defaultJobOptions: {
attempts: 3, // Retry failed jobs up to 3 times
backoff: {
type: "exponential",
delay: 5000, // Start with 5 seconds, then 10s, 20s, etc.
},
removeOnComplete: {
age: 86400, // Keep completed jobs for 24 hours
count: 1000, // Keep max 1000 completed jobs
},
removeOnFail: {
age: 604800, // Keep failed jobs for 7 days
count: 5000, // Keep max 5000 failed jobs
},
},
});

/**
* Log queue events
*/
evaluationQueue.on("error", (error) => {
logger.error("Evaluation queue error:", error);
});

evaluationQueue.on("waiting", (job) => {
logger.debug(`Job ${job.id} is waiting`);
});

evaluationQueue.on("active", (job) => {
logger.info(`Job ${job.id} is now active`);
});

evaluationQueue.on("completed", (job) => {
logger.info(`Job ${job.id} completed successfully`);
});

evaluationQueue.on("failed", (job, error) => {
logger.error(`Job ${job?.id} failed:`, error);
});

/**
* Gracefully close the queue
*/
const closeQueue = async () => {
logger.info("Closing evaluation queue...");
await evaluationQueue.close();
logger.info("Evaluation queue closed");
};

module.exports = {
evaluationQueue,
closeQueue,
redisConnection,
};
13 changes: 13 additions & 0 deletions src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ const createApp = require("./app");
const { config, validateConfig } = require("./config/env");
const { disconnectPrisma } = require("./config/prisma");
const cronManager = require("./config/cron");
const { createEvaluationWorker } = require("./workers/evaluation.worker");
const { closeQueue } = require("./config/queue");
const logger = require("./utils/logger");

/**
Expand All @@ -24,13 +26,24 @@ const startServer = async () => {
// Initialize cron jobs
cronManager.initializeCronJobs();

// Initialize background job worker for evaluations
const evaluationWorker = createEvaluationWorker();
logger.info("Background job worker initialized");

// Graceful shutdown handlers
const gracefulShutdown = async (signal) => {
logger.info(`${signal} signal received: closing HTTP server`);

// Stop cron jobs
cronManager.stopAllJobs();

// Close worker
logger.info("Closing evaluation worker...");
await evaluationWorker.close();

// Close queue
await closeQueue();

// Close server
server.close(async () => {
logger.info("HTTP server closed");
Expand Down
66 changes: 64 additions & 2 deletions src/services/evaluation.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,71 @@ const leetcodeService = require("./leetcode.service");
const penaltyService = require("./penalty.service");
const logger = require("../utils/logger");
const { sendStreakBrokenNotification } = require("./email.service");
const { evaluationQueue } = require("../config/queue");

/**
* Run daily evaluation for all active challenges
* This is the main function called by the cron job
* Run daily evaluation using background job queue (NEW - Queue-based)
* This pushes evaluation jobs to the queue for async processing
*/
const runDailyEvaluationWithQueue = async () => {
const evaluationDate = new Date();
evaluationDate.setHours(0, 0, 0, 0); // Start of day

logger.info(
`Starting queue-based daily evaluation for date: ${evaluationDate.toISOString()}`
);

try {
// Get all active challenges
const activeChallenges = await prisma.challenge.findMany({
where: {
status: "ACTIVE",
startDate: { lte: new Date() },
endDate: { gte: new Date() },
},
select: {
id: true,
name: true,
},
});

logger.info(
`Found ${activeChallenges.length} active challenges to evaluate`
);

// Push challenge evaluation jobs to queue
const jobs = activeChallenges.map((challenge) => ({
name: "challenge-evaluation",
data: {
challengeId: challenge.id,
evaluationDate: evaluationDate.toISOString(),
},
opts: {
jobId: `challenge-${challenge.id}-${evaluationDate.toISOString()}`, // Prevent duplicate jobs
},
}));

await evaluationQueue.addBulk(jobs);

logger.info(
`Successfully queued ${jobs.length} challenge evaluation jobs. Processing asynchronously...`
);

return {
success: true,
challengesQueued: jobs.length,
date: evaluationDate,
};
} catch (error) {
logger.error("Failed to queue daily evaluation:", error);
throw error;
}
};

/**
* Run daily evaluation for all active challenges (OLD - Synchronous)
* This is the legacy synchronous version
* @deprecated Use runDailyEvaluationWithQueue for better performance
*/
const runDailyEvaluation = async () => {
const evaluationDate = new Date();
Expand Down Expand Up @@ -429,6 +490,7 @@ const getTodayStatus = async (memberId) => {

module.exports = {
runDailyEvaluation,
runDailyEvaluationWithQueue, // NEW: Queue-based evaluation
evaluateChallenge,
evaluateMember,
getMemberDailyResults,
Expand Down
Loading