From a270bcfdafe91724c618c693a1d64478003ddfad Mon Sep 17 00:00:00 2001 From: petruki <31597636+petruki@users.noreply.github.com> Date: Sun, 31 Aug 2025 14:47:05 -0700 Subject: [PATCH] Added cache Worker Manager to handle cache update --- sonar-project.properties | 2 +- src/helpers/cache/index.js | 57 +++++++++- src/helpers/cache/worker-manager.js | 163 ++++++++++++++++++++++++++++ src/helpers/cache/worker.js | 158 +++++++++++++++++++++++++++ tests/unit-test/cache.test.js | 54 ++++++++- 5 files changed, 431 insertions(+), 3 deletions(-) create mode 100644 src/helpers/cache/worker-manager.js create mode 100644 src/helpers/cache/worker.js diff --git a/sonar-project.properties b/sonar-project.properties index dace4fc..2d8ebb5 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -9,7 +9,7 @@ sonar.javascript.lcov.reportPaths=coverage/lcov.info # Path is relative to the sonar-project.properties file. Replace "\" by "/" on Windows. sonar.sources=src -sonar.exclusions=src/api-docs/**, src/app-server.js, src/helpers/timed-match/match-proc.js +sonar.exclusions=src/api-docs/**, src/app-server.js, src/helpers/timed-match/match-proc.js, src/helpers/cache/worker.js sonar.tests=tests sonar.language=js diff --git a/src/helpers/cache/index.js b/src/helpers/cache/index.js index 30a1bfa..03cf099 100644 --- a/src/helpers/cache/index.js +++ b/src/helpers/cache/index.js @@ -2,18 +2,23 @@ import { graphql } from 'graphql'; import schema from '../../aggregator/schema.js'; import { getAllDomains } from '../../services/domain.js'; import { domainQuery, reduceSnapshot } from './query.js'; +import { CacheWorkerManager } from './worker-manager.js'; +import Logger from '../logger.js'; export default class Cache { #instance; + #workerManager; constructor() { this.#instance = new Map(); + this.#workerManager = null; } static getInstance() { if (!Cache.instance) { Cache.instance = new Cache(); } + return Cache.instance; } @@ -25,6 +30,29 @@ export default class Cache { } } + async startScheduledUpdates(options = {}) { + this.#workerManager = new CacheWorkerManager(options); + + this.#workerManager.setOnCacheUpdates((updates) => + this.#handleCacheUpdates(updates)); + + this.#workerManager.setOnCacheVersionRequest((domainId) => + this.#handleCacheVersionRequest(domainId)); + + this.#workerManager.setOnError((error) => { + Logger.error('Cache worker error:', error); + }); + + await this.#workerManager.start(); + } + + async stopScheduledUpdates() { + if (this.#workerManager) { + await this.#workerManager.stop(); + this.#workerManager = null; + } + } + async #updateCache(domain) { const result = await graphql({ schema, @@ -32,12 +60,39 @@ export default class Cache { contextValue: { domain: domain._id } }); - this.#set(domain._id, reduceSnapshot(result.data.domain)); + this.#set(domain._id, { + data: reduceSnapshot(result.data.domain), + lastUpdate: domain.lastUpdate, + version: result.data.domain.version + }); + } + + #handleCacheUpdates(updates) { + for (const update of updates) { + this.#set(update.domainId, { + data: update.data, + lastUpdate: update.lastUpdate, + version: update.version + }); + } + } + + #handleCacheVersionRequest(domainId) { + const cached = this.#instance.get(String(domainId)); + const cachedVersion = cached?.lastUpdate || null; + + if (this.#workerManager) { + this.#workerManager.sendCacheVersionResponse(domainId, cachedVersion); + } } #set(key, value) { this.#instance.set(String(key), value); } + + status() { + return this.#workerManager.getStatus(); + } get(key) { return this.#instance.get(String(key)); diff --git a/src/helpers/cache/worker-manager.js b/src/helpers/cache/worker-manager.js new file mode 100644 index 0000000..7054db7 --- /dev/null +++ b/src/helpers/cache/worker-manager.js @@ -0,0 +1,163 @@ +import { Worker } from 'node:worker_threads'; +import { fileURLToPath } from 'node:url'; +import { dirname, join } from 'node:path'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = dirname(__filename); + +export const EVENT_TYPE = { + START: 'start', + STARTED: 'started', + STOP: 'stop', + STOPPED: 'stopped', + READY: 'ready', + CACHE_UPDATES: 'cache-updates', + REQUEST_CACHE_VERSION: 'request-cache-version', + CACHE_VERSION_RESPONSE: 'cache-version-response', + ERROR: 'error' +}; + +export const STATUS_TYPE = { + RUNNING: 'running', + STOPPED: 'stopped', + STOPPING: 'stopping', + ERROR: 'error' +}; + +export class CacheWorkerManager { + DEFAULT_INTERVAL = 5000; + + constructor(options = {}) { + this.worker = null; + this.status = STATUS_TYPE.STOPPED; + this.options = { + interval: this.DEFAULT_INTERVAL, + ...options + }; + this.onCacheUpdates = null; + this.onError = null; + } + + start() { + return new Promise((resolve, reject) => { + const workerPath = join(__dirname, 'worker.js'); + this.worker = new Worker(workerPath, { + workerData: this.options + }); + + this.worker.on('message', (message) => { + switch (message.type) { + case EVENT_TYPE.READY: + this.worker.postMessage({ type: EVENT_TYPE.START }); + break; + + case EVENT_TYPE.STARTED: + this.status = STATUS_TYPE.RUNNING; + resolve(); + break; + + case EVENT_TYPE.STOPPED: + this.status = STATUS_TYPE.STOPPED; + break; + + case EVENT_TYPE.CACHE_UPDATES: + if (this.onCacheUpdates) { + this.onCacheUpdates(message.updates); + } + break; + + case EVENT_TYPE.REQUEST_CACHE_VERSION: + if (this.onCacheVersionRequest) { + this.onCacheVersionRequest(message.domainId); + } + break; + + case EVENT_TYPE.ERROR: + if (this.onError) { + this.onError(new Error(message.error)); + } + break; + } + }); + + this.worker.on('error', (error) => { + this.status = STATUS_TYPE.ERROR; + if (this.onError) { + this.onError(error); + } + reject(error); + }); + + this.worker.on('exit', (code) => { + const wasTerminating = this.status === STATUS_TYPE.STOPPING; + this.status = STATUS_TYPE.STOPPED; + this.worker = null; + + // Only report error if exit code is not 0 and worker wasn't being intentionally stopped + if (code !== 0 && !wasTerminating) { + const error = new Error(`Worker stopped with exit code ${code}`); + if (this.onError) { + this.onError(error); + } + } + }); + }); + } + + stop() { + this.status = STATUS_TYPE.STOPPING; + + return new Promise((resolve) => { + const cleanup = () => { + if (this.worker) { + this.worker.terminate(); + this.worker = null; + } + this.status = STATUS_TYPE.STOPPED; + resolve(); + }; + + // Set up listeners for graceful shutdown + const onMessage = (message) => { + if (message.type === STATUS_TYPE.STOPPED) { + if (this.worker) { + this.worker.off('message', onMessage); + } + // Give worker a moment to exit gracefully + setTimeout(cleanup, 100); + } + }; + + this.worker.on('message', onMessage); + + // Send stop message + this.worker.postMessage({ type: EVENT_TYPE.STOP }); + }); + } + + getStatus() { + return this.status; + } + + setOnCacheUpdates(callback) { + this.onCacheUpdates = callback; + } + + setOnCacheVersionRequest(callback) { + this.onCacheVersionRequest = callback; + } + + sendCacheVersionResponse(domainId, cachedVersion) { + if (this.worker && this.status === STATUS_TYPE.RUNNING) { + this.worker.postMessage({ + type: EVENT_TYPE.CACHE_VERSION_RESPONSE, + domainId, + cachedVersion + }); + } + } + + setOnError(callback) { + this.onError = callback; + } +} diff --git a/src/helpers/cache/worker.js b/src/helpers/cache/worker.js new file mode 100644 index 0000000..2573b08 --- /dev/null +++ b/src/helpers/cache/worker.js @@ -0,0 +1,158 @@ +import { parentPort, workerData } from 'node:worker_threads'; +import Logger from '../logger.js'; +import { CacheWorkerManager, EVENT_TYPE } from './worker-manager.js'; + +const { interval = CacheWorkerManager.DEFAULT_INTERVAL } = workerData; + +let isRunning = false; +let intervalId = null; +let dbInitialized = false; +let getAllDomains = null; + +// Initialize worker and send ready signal +(async () => { + await initializeWorker(); + if (dbInitialized) { + parentPort.postMessage({ type: EVENT_TYPE.READY }); + } +})(); + +async function initializeWorker() { + try { + await import('../../db/mongoose.js'); + const domainService = await import('../../services/domain.js'); + getAllDomains = domainService.getAllDomains; + dbInitialized = true; + Logger.info('Worker database connection initialized'); + } catch (error) { + Logger.error('Failed to initialize worker database connection:', error); + parentPort.postMessage({ + type: EVENT_TYPE.ERROR, + error: `Database initialization failed: ${error.message}` + }); + } +} + +async function checkForUpdates() { + if (isRunning || !dbInitialized) { + return; + } + + isRunning = true; + + try { + const domains = await getAllDomains(); + const updates = []; + + for (const domain of domains) { + try { + const cacheCheckResult = await fetchCacheVersion(domain); + const dbVersion = domain.lastUpdate; + const cachedVersion = cacheCheckResult; + + if (isCacheOutdated(cachedVersion, dbVersion)) { + await updateDomainCacheSnapshot(domain, updates); + } + } catch (domainError) { + Logger.error(`Error processing domain ${domain._id}:`, domainError.message); + // Continue with next domain instead of failing completely + } + } + + if (updates.length > 0) { + parentPort.postMessage({ + type: EVENT_TYPE.CACHE_UPDATES, + updates + }); + } + } catch (error) { + Logger.error('Worker checkForUpdates error:', error); + parentPort.postMessage({ + type: EVENT_TYPE.ERROR, + error: error.message + }); + } finally { + isRunning = false; + } +} + +function isCacheOutdated(cachedVersion, dbVersion) { + return !cachedVersion || dbVersion !== cachedVersion; +} + +async function updateDomainCacheSnapshot(domain, updates) { + const { graphql } = await import('graphql'); + const { domainQuery, reduceSnapshot } = await import('./query.js'); + const schemaModule = await import('../../aggregator/schema.js'); + + const result = await graphql({ + schema: schemaModule.default, + source: domainQuery(domain._id), + contextValue: { domain: domain._id } + }); + + if (result.data?.domain) { + updates.push({ + domainId: domain._id.toString(), + data: reduceSnapshot(result.data.domain), + lastUpdate: domain.lastUpdate, + version: result.data.domain.version + }); + } +} + +async function fetchCacheVersion(domain) { + return await new Promise((resolve) => { + const timeout = setTimeout(() => { + parentPort.off('message', messageHandler); + resolve(null); + }, 1000); + + const messageHandler = (message) => { + if (message.type === EVENT_TYPE.CACHE_VERSION_RESPONSE && message.domainId === domain._id.toString()) { + clearTimeout(timeout); + parentPort.off('message', messageHandler); + resolve(message.cachedVersion); + } + }; + + parentPort.on('message', messageHandler); + parentPort.postMessage({ + type: EVENT_TYPE.REQUEST_CACHE_VERSION, + domainId: domain._id.toString() + }); + }); +} + +// Handle messages from main thread +parentPort.on('message', async (message) => { + try { + switch (message.type) { + case EVENT_TYPE.START: + if (!intervalId && dbInitialized) { + intervalId = setInterval(() => checkForUpdates(), interval); + parentPort.postMessage({ type: EVENT_TYPE.STARTED }); + } else if (!dbInitialized) { + parentPort.postMessage({ + type: EVENT_TYPE.ERROR, + error: 'Database not initialized' + }); + } + break; + + case EVENT_TYPE.STOP: + if (intervalId) { + clearInterval(intervalId); + intervalId = null; + } + parentPort.postMessage({ type: EVENT_TYPE.STOPPED }); + break; + } + } catch (error) { + Logger.error('Worker message handler error:', error); + parentPort.postMessage({ + type: EVENT_TYPE.ERROR, + error: error.message + }); + } +}); \ No newline at end of file diff --git a/tests/unit-test/cache.test.js b/tests/unit-test/cache.test.js index 786228f..ae2e88e 100644 --- a/tests/unit-test/cache.test.js +++ b/tests/unit-test/cache.test.js @@ -2,6 +2,7 @@ import mongoose from 'mongoose'; import '../../src/db/mongoose'; import { domainId, setupDatabase } from '../fixtures/db_api'; +import Domain from '../../src/models/domain'; import Cache from '../../src/helpers/cache'; let cache; @@ -19,6 +20,10 @@ describe('Test cache', () => { cache = Cache.getInstance(); }); + afterEach(async () => { + await cache.stopScheduledUpdates(); + }); + test('UNIT_SUITE - Should initialize cache', async () => { // test cache = Cache.getInstance(); @@ -32,4 +37,51 @@ describe('Test cache', () => { expect(cacheSingle).toBeDefined(); }); -}); \ No newline at end of file + test('UNIT_SUITE - Should initialize schduled cache update', async () => { + // test + cache = Cache.getInstance(); + await cache.initializeCache(); + await cache.startScheduledUpdates(); + + // assert + expect(cache.status()).toBe('running'); + }); + + test('UNIT_SUITE - Should update cache when new domain version is available', async () => { + // test + cache = Cache.getInstance(); + await cache.initializeCache(); + await cache.startScheduledUpdates(); + + // assert + expect(cache.status()).toBe('running'); + const domain = cache.get(domainId); + + // update DB Domain version + await Domain.findByIdAndUpdate(domainId, { $inc: { lastUpdate: 1 } }); + const { updatedSuccessfully, domainFromCache } = await waitForDomainUpdate(domain.version, 10, 1000); + + expect(domainFromCache).toBeDefined(); + expect(updatedSuccessfully).toBe(true); + }, 20000); + +}); + +// Helpers + +async function waitForDomainUpdate(currentDomainVersion, maxAttempts, delay) { + let domainFromCache; + let attempt = 0; + let updatedSuccessfully = false; + + while (!updatedSuccessfully && attempt < maxAttempts) { + await new Promise(resolve => setTimeout(resolve, delay)); + attempt++; + + domainFromCache = cache.get(domainId); + if (domainFromCache.version != currentDomainVersion) { + updatedSuccessfully = true; + } + } + return { updatedSuccessfully, domainFromCache }; +}