Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sonar-project.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ sonar.javascript.lcov.reportPaths=coverage/lcov.info

# Path is relative to the sonar-project.properties file. Replace "\" by "/" on Windows.
sonar.sources=src
sonar.exclusions=src/api-docs/**, src/app-server.js, src/helpers/timed-match/match-proc.js
sonar.exclusions=src/api-docs/**, src/app-server.js, src/helpers/timed-match/match-proc.js, src/helpers/cache/worker.js
sonar.tests=tests
sonar.language=js

Expand Down
57 changes: 56 additions & 1 deletion src/helpers/cache/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ import { graphql } from 'graphql';
import schema from '../../aggregator/schema.js';
import { getAllDomains } from '../../services/domain.js';
import { domainQuery, reduceSnapshot } from './query.js';
import { CacheWorkerManager } from './worker-manager.js';
import Logger from '../logger.js';

export default class Cache {
#instance;
#workerManager;

constructor() {
this.#instance = new Map();
this.#workerManager = null;
}

static getInstance() {
if (!Cache.instance) {
Cache.instance = new Cache();
}

return Cache.instance;
}

Expand All @@ -25,19 +30,69 @@ export default class Cache {
}
}

async startScheduledUpdates(options = {}) {
this.#workerManager = new CacheWorkerManager(options);

this.#workerManager.setOnCacheUpdates((updates) =>
this.#handleCacheUpdates(updates));

this.#workerManager.setOnCacheVersionRequest((domainId) =>
this.#handleCacheVersionRequest(domainId));

this.#workerManager.setOnError((error) => {
Logger.error('Cache worker error:', error);
});

await this.#workerManager.start();
}

async stopScheduledUpdates() {
if (this.#workerManager) {
await this.#workerManager.stop();
this.#workerManager = null;
}
}

async #updateCache(domain) {
const result = await graphql({
schema,
source: domainQuery(domain._id),
contextValue: { domain: domain._id }
});

this.#set(domain._id, reduceSnapshot(result.data.domain));
this.#set(domain._id, {
data: reduceSnapshot(result.data.domain),
lastUpdate: domain.lastUpdate,
version: result.data.domain.version
});
}

#handleCacheUpdates(updates) {
for (const update of updates) {
this.#set(update.domainId, {
data: update.data,
lastUpdate: update.lastUpdate,
version: update.version
});
}
}

#handleCacheVersionRequest(domainId) {
const cached = this.#instance.get(String(domainId));
const cachedVersion = cached?.lastUpdate || null;

if (this.#workerManager) {
this.#workerManager.sendCacheVersionResponse(domainId, cachedVersion);
}
}

#set(key, value) {
this.#instance.set(String(key), value);
}

status() {
return this.#workerManager.getStatus();
}

get(key) {
return this.#instance.get(String(key));
Expand Down
163 changes: 163 additions & 0 deletions src/helpers/cache/worker-manager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import { Worker } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
import { dirname, join } from 'node:path';

const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);

export const EVENT_TYPE = {
START: 'start',
STARTED: 'started',
STOP: 'stop',
STOPPED: 'stopped',
READY: 'ready',
CACHE_UPDATES: 'cache-updates',
REQUEST_CACHE_VERSION: 'request-cache-version',
CACHE_VERSION_RESPONSE: 'cache-version-response',
ERROR: 'error'
};

export const STATUS_TYPE = {
RUNNING: 'running',
STOPPED: 'stopped',
STOPPING: 'stopping',
ERROR: 'error'
};

export class CacheWorkerManager {
DEFAULT_INTERVAL = 5000;

constructor(options = {}) {
this.worker = null;
this.status = STATUS_TYPE.STOPPED;
this.options = {
interval: this.DEFAULT_INTERVAL,
...options
};
this.onCacheUpdates = null;
this.onError = null;
}

start() {
return new Promise((resolve, reject) => {
const workerPath = join(__dirname, 'worker.js');
this.worker = new Worker(workerPath, {
workerData: this.options
});

this.worker.on('message', (message) => {
switch (message.type) {
case EVENT_TYPE.READY:
this.worker.postMessage({ type: EVENT_TYPE.START });
break;

case EVENT_TYPE.STARTED:
this.status = STATUS_TYPE.RUNNING;
resolve();
break;

case EVENT_TYPE.STOPPED:
this.status = STATUS_TYPE.STOPPED;
break;

case EVENT_TYPE.CACHE_UPDATES:
if (this.onCacheUpdates) {
this.onCacheUpdates(message.updates);
}
break;

case EVENT_TYPE.REQUEST_CACHE_VERSION:
if (this.onCacheVersionRequest) {
this.onCacheVersionRequest(message.domainId);
}
break;

case EVENT_TYPE.ERROR:
if (this.onError) {
this.onError(new Error(message.error));
}
break;
}
});

this.worker.on('error', (error) => {
this.status = STATUS_TYPE.ERROR;
if (this.onError) {
this.onError(error);
}
reject(error);
});

this.worker.on('exit', (code) => {
const wasTerminating = this.status === STATUS_TYPE.STOPPING;
this.status = STATUS_TYPE.STOPPED;
this.worker = null;

// Only report error if exit code is not 0 and worker wasn't being intentionally stopped
if (code !== 0 && !wasTerminating) {
const error = new Error(`Worker stopped with exit code ${code}`);
if (this.onError) {
this.onError(error);
}
}
});
});
}

stop() {
this.status = STATUS_TYPE.STOPPING;

return new Promise((resolve) => {
const cleanup = () => {
if (this.worker) {
this.worker.terminate();
this.worker = null;
}
this.status = STATUS_TYPE.STOPPED;
resolve();
};

// Set up listeners for graceful shutdown
const onMessage = (message) => {
if (message.type === STATUS_TYPE.STOPPED) {
if (this.worker) {
this.worker.off('message', onMessage);
}
// Give worker a moment to exit gracefully
setTimeout(cleanup, 100);
}
};

this.worker.on('message', onMessage);

// Send stop message
this.worker.postMessage({ type: EVENT_TYPE.STOP });
});
}

getStatus() {
return this.status;
}

setOnCacheUpdates(callback) {
this.onCacheUpdates = callback;
}

setOnCacheVersionRequest(callback) {
this.onCacheVersionRequest = callback;
}

sendCacheVersionResponse(domainId, cachedVersion) {
if (this.worker && this.status === STATUS_TYPE.RUNNING) {
this.worker.postMessage({
type: EVENT_TYPE.CACHE_VERSION_RESPONSE,
domainId,
cachedVersion
});
}
}

setOnError(callback) {
this.onError = callback;
}
}
Loading