From b68e66c99e0f27e8fbc557350565b0ab0b1ddfaa Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Wed, 13 Jun 2018 23:46:38 +0300 Subject: [PATCH 01/21] Add separate worker pools * Add LusterMasterError * Add Master.createPool(key) * Add Master.getPool(key) --- lib/errors.js | 11 ++ lib/master.js | 289 ++++++++-------------------- lib/worker-pool.js | 437 ++++++++++++++++++++++++++++++++++++++++++ lib/worker_wrapper.js | 34 ++-- 4 files changed, 553 insertions(+), 218 deletions(-) create mode 100644 lib/worker-pool.js diff --git a/lib/errors.js b/lib/errors.js index 409bcb9..d4bb163 100644 --- a/lib/errors.js +++ b/lib/errors.js @@ -94,4 +94,15 @@ errors.LusterPortError = LusterError.create('LusterPortError', 'Can not unlink unix socket "%socketPath%"' }); +/** + * @constructor + * @class LusterMasterError + * @augments LusterError + */ +errors.LusterMasterError = LusterError.create('LusterMasterError', + { + POOL_KEY_ALREADY_TAKEN: + 'Pool key "%key%" is already taken' + }); + module.exports = errors; diff --git a/lib/master.js b/lib/master.js index c7fb827..1d766fd 100644 --- a/lib/master.js +++ b/lib/master.js @@ -1,10 +1,12 @@ -const os = require('os'), - cluster = require('cluster'), - ClusterProcess = require('./cluster_process'), - WorkerWrapper = require('./worker_wrapper'), - Port = require('./port'), - RestartQueue = require('./restart_queue'), - RPC = require('./rpc'); +const cluster = require('cluster'); + +const ClusterProcess = require('./cluster_process'); +const LusterMasterError = require('./errors').LusterMasterError; +const RPC = require('./rpc'); +const WorkerPool = require('./worker-pool'); +const WorkerWrapper = require('./worker_wrapper'); + +const DEFAULT_POOL_KEY = '__default'; /** * @constructor @@ -15,21 +17,6 @@ class Master extends ClusterProcess { constructor() { super(); - /** - * @type {Object} - * @property {WorkerWrapper} * - * @public - * @todo make it private or public immutable - */ - this.workers = {}; - - /** - * Workers restart queue. - * @type {RestartQueue} - * @private - */ - this._restartQueue = new RestartQueue(); - /** * Configuration object to pass to cluster.setupMaster() * @type {Object} @@ -37,18 +24,37 @@ class Master extends ClusterProcess { */ this._masterOpts = {}; + this.pools = new Map(); + this.createPool(DEFAULT_POOL_KEY); + this.id = 0; this.wid = 0; this.pid = process.pid; - this.on('worker state', this._cleanupUnixSockets.bind(this)); - this.on('worker exit', this._checkWorkersAlive.bind(this)); - // @todo make it optional? process.on('SIGINT', this._onSignalQuit.bind(this)); process.on('SIGQUIT', this._onSignalQuit.bind(this)); } + createPool(key) { + if (this.pools.has(key)) { + throw LusterMasterError.createError( + LusterMasterError.CODES.POOL_KEY_ALREADY_TAKEN, + {key} + ); + } + + const pool = new WorkerPool(key, this); + this._proxyWorkerEvents(pool); + pool.on('shutdown', this._checkPoolsAlive.bind(this)); + this.pools.set(key, pool); + return pool; + } + + getPool(key) { + return this.pools.get(key); + } + /** * Allows same object structure as cluster.setupMaster(). * This function must be used instead of cluster.setupMaster(), @@ -71,103 +77,54 @@ class Master extends ClusterProcess { .shutdown(); } - /** - * Remove not used unix socket before worker will try to listen it. - * @param {WorkerWrapper} worker - * @param {WorkerWrapperState} state - * @private - */ - _cleanupUnixSockets(worker, state) { - const port = worker.options.port; - - if (this._restartQueue.has(worker) || - state !== WorkerWrapper.STATES.LAUNCHING || - port.family !== Port.UNIX) { - return; - } - - const inUse = this.getWorkersArray().some(w => - worker.wid !== w.wid && - w.isRunning() && - port.isEqualTo(w.options.port) - ); - - if (!inUse) { - port.unlink(err => { - if (err) { - this.emit('error', err); - } - }); - } - } - /** * Check for alive workers, if no one here, then emit "shutdown". * @private */ - _checkWorkersAlive() { - const workers = this.getWorkersArray(), - alive = workers.reduce( - (count, w) => w.dead ? count - 1 : count, - workers.length - ); + _checkPoolsAlive() { + let dead = true; + this.forEachPool(pool => dead = dead && pool.dead); - if (alive === 0) { + if (dead) { this.emit('shutdown'); } } /** - * Repeat WorkerWrapper events on Master and add 'worker ' prefix to event names + * Repeat WorkerWrapper events from WorkerPool on Master * so for example 'online' became 'worker online' * @private - * @param {WorkerWrapper} worker + * @param {WorkerPool} pool */ - _proxyWorkerEvents(worker) { - WorkerWrapper.EVENTS - .forEach(eventName => { - const proxyEventName = 'worker ' + eventName; - worker.on(eventName, this.emit.bind(this, proxyEventName, worker)); - }); + _proxyWorkerEvents(pool) { + for (const eventName of WorkerWrapper.EVENTS) { + const proxyEventName = 'worker ' + eventName; + pool.on(proxyEventName, this.emit.bind(this, proxyEventName)); + } } /** * @returns {number[]} workers ids array */ getWorkersIds() { - if (!this._workersIdsCache) { - this._workersIdsCache = this.getWorkersArray().map(w => w.wid); - } - - return this._workersIdsCache; + return this.getWorkersArray().map(w => w.wid); } /** * @returns {WorkerWrapper[]} workers array */ getWorkersArray() { - if (!this._workersArrayCache) { - this._workersArrayCache = Object.values(this.workers); - } - - return this._workersArrayCache; + let result = []; + this.forEachPool( + pool => result = result.concat(pool.getWorkersArray()) + ); + return result; } - /** - * Add worker to the pool - * @param {WorkerWrapper} worker - * @returns {Master} self - * @public - */ - add(worker) { - // invalidate Master#getWorkersIds and Master#getWorkersArray cache - this._workersIdsCache = null; - this._workersArrayCache = null; - - this.workers[worker.wid] = worker; - this._proxyWorkerEvents(worker); - - return this; + forEachPool(fn) { + for (const pool of this.pools.values()) { + fn(pool); + } } /** @@ -180,8 +137,9 @@ class Master extends ClusterProcess { * master.getWorkersArray().forEach(fn); */ forEach(fn) { - this.getWorkersArray() - .forEach(fn); + this.forEachPool(pool => { + pool.forEach(fn); + }); return this; } @@ -212,47 +170,7 @@ class Master extends ClusterProcess { ); } - const // WorkerWrapper options - forkTimeout = this.config.get('control.forkTimeout'), - stopTimeout = this.config.get('control.stopTimeout'), - exitThreshold = this.config.get('control.exitThreshold'), - allowedSequentialDeaths = this.config.get('control.allowedSequentialDeaths'), - - count = this.config.get('workers', os.cpus().length), - isServerPortSet = this.config.has('server.port'), - groups = this.config.get('server.groups', 1), - workersPerGroup = Math.floor(count / groups); - - let port, - // workers and groups count - i = 0, - group = 0, - workersInGroup = 0; - - if (isServerPortSet) { - port = new Port(this.config.get('server.port')); - } - - // create pool of workers - while (count > i++) { - this.add(new WorkerWrapper(this, { - forkTimeout, - stopTimeout, - exitThreshold, - allowedSequentialDeaths, - port: isServerPortSet ? port.next(group) : 0, - maxListeners: this.getMaxListeners(), - })); - - // groups > 1, current group is full and - // last workers can form at least more one group - if (groups > 1 && - ++workersInGroup >= workersPerGroup && - count - (group + 1) * workersPerGroup >= workersPerGroup) { - workersInGroup = 0; - group++; - } - } + this.pools.get(DEFAULT_POOL_KEY).configure(this.config); } /** @@ -262,23 +180,12 @@ class Master extends ClusterProcess { * @returns {Promise} */ waitForWorkers(wids, event) { - const pendingWids = new Set(wids); - - return new Promise(resolve => { - if (pendingWids.size === 0) { - resolve(); - } - - const onWorkerState = worker => { - const wid = worker.wid; - pendingWids.delete(wid); - if (pendingWids.size === 0) { - this.removeListener(event, onWorkerState); - resolve(); - } - }; - this.on(event, onWorkerState); - }); + const promises = []; + this.forEachPool( + pool => promises.push(pool.waitForWorkers(wids, event)) + ); + + return Promise.all(promises); } /** @@ -303,7 +210,7 @@ class Master extends ClusterProcess { async _restart() { // TODO maybe run this after starting waitForAllWorkers - this.forEach(worker => worker.restart()); + this.forEachPool(pool => pool.restart()); await this.waitForAllWorkers('worker ready'); @@ -322,6 +229,16 @@ class Master extends ClusterProcess { return this; } + async _softRestart() { + const promises = []; + this.forEachPool( + pool => promises.push(pool.softRestart()) + ); + + await Promise.all(promises); + this.emit('restarted'); + } + /** * Workers will be restarted one by one using RestartQueue. * If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed @@ -331,21 +248,7 @@ class Master extends ClusterProcess { * @fires Master#restarted when workers spawned and ready. */ softRestart() { - this.forEach(worker => worker.softRestart()); - this._restartQueue.once('drain', this.emit.bind(this, 'restarted')); - return this; - } - - /** - * Schedules one worker restart using RestartQueue. - * If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed - * into the queue, it will emit 'error' on restart. - * @public - * @param {WorkerWrapper} worker - * @returns {Master} self - */ - scheduleWorkerRestart(worker) { - this._restartQueue.push(worker); + this._softRestart(); return this; } @@ -366,15 +269,7 @@ class Master extends ClusterProcess { * @public */ remoteCallToAll(name, ...args) { - this.forEach(worker => { - if (worker.ready) { - worker.remoteCall(name, ...args); - } else { - worker.on('ready', () => { - worker.remoteCall(name, ...args); - }); - } - }); + this.forEachPool(pool => pool.remoteCallToAll(name, ...args)); } /** @@ -385,11 +280,7 @@ class Master extends ClusterProcess { * @public */ broadcastEventToAll(event, ...args) { - this.forEach(worker => { - if (worker.ready) { - worker.broadcastEvent(event, ...args); - } - }); + this.forEachPool(pool => pool.broadcastEventToAll(event, ...args)); } /** @@ -400,8 +291,7 @@ class Master extends ClusterProcess { * @public */ emitToAll(event, ...args) { - this.emit(event, ...args); - this.broadcastEventToAll(event, ...args); + this.forEachPool(pool => pool.emitToAll(event, ...args)); } /** @@ -409,20 +299,13 @@ class Master extends ClusterProcess { */ async _shutdown() { - const stoppedWorkers = []; - - this.forEach(worker => { - if (worker.isRunning()) { - worker.stop(); - stoppedWorkers.push(worker.wid); - } - }); - - await this.waitForWorkers( - stoppedWorkers, - 'worker exit', + const promises = []; + this.forEachPool( + pool => promises.push(pool._shutdown()) ); + await Promise.all(promises); + this.emit('shutdown'); } @@ -446,11 +329,7 @@ class Master extends ClusterProcess { * @public */ remoteCallToAllWithCallback(opts) { - this.forEach(worker => { - if (worker.isRunning()) { - worker.remoteCallWithCallback(opts); - } - }); + this.forEachPool(pool => pool.remoteCallToAllWithCallback(opts)); } async _run() { @@ -459,7 +338,7 @@ class Master extends ClusterProcess { cluster.setupMaster(this._masterOpts); // TODO maybe run this after starting waitForAllWorkers - this.forEach(worker => worker.run()); + this.forEachPool(pool => pool.run()); await this.waitForAllWorkers('worker ready'); diff --git a/lib/worker-pool.js b/lib/worker-pool.js new file mode 100644 index 0000000..5bb70c3 --- /dev/null +++ b/lib/worker-pool.js @@ -0,0 +1,437 @@ +'use strict'; + +const os = require('os'); + +const Configuration = require('./configuration'); +const EventEmitterEx = require('./event_emitter_ex'); +const Port = require('./port'); +const RestartQueue = require('./restart_queue'); +const WorkerWrapper = require('./worker_wrapper'); + +class WorkerPool extends EventEmitterEx { + constructor(key, master) { + super(); + + this.key = key; + this.master = master; + + this.config = null; + + /** + * @type {Object} + * @property {WorkerWrapper} * + * @public + * @todo make it private or public immutable + */ + this.workers = {}; + + /** + * Workers restart queue. + * @type {RestartQueue} + * @private + */ + this._restartQueue = new RestartQueue(); + + this.dead = false; + + this.on('worker state', this._cleanupUnixSockets.bind(this)); + this._checkWorkersAlive = this._checkWorkersAlive.bind(this); + this.on('worker exit', this._checkWorkersAlive); + } + + /** + * Remove not used unix socket before worker will try to listen it. + * @param {WorkerWrapper} worker + * @param {WorkerWrapperState} state + * @private + */ + _cleanupUnixSockets(worker, state) { + const port = worker.options.port; + + if (this._restartQueue.has(worker) || + state !== WorkerWrapper.STATES.LAUNCHING || + port.family !== Port.UNIX) { + return; + } + + const inUse = this.getWorkersArray().some(w => + worker.wid !== w.wid && + w.isRunning() && + port.isEqualTo(w.options.port) + ); + + if (!inUse) { + port.unlink(err => { + if (err) { + this.emit('error', err); + } + }); + } + } + + /** + * Check for alive workers, if no one here, then emit "shutdown". + * @private + */ + _checkWorkersAlive() { + const dead = this + .getWorkersArray() + .every(w => w.dead); + + if (dead) { + this.dead = true; + this.emit('shutdown'); + } + } + + /** + * @returns {number[]} workers ids array + */ + getWorkersIds() { + if (!this._workersIdsCache) { + this._workersIdsCache = this.getWorkersArray().map(w => w.wid); + } + + return this._workersIdsCache; + } + + /** + * @returns {WorkerWrapper[]} workers array + */ + getWorkersArray() { + if (!this._workersArrayCache) { + this._workersArrayCache = Object.values(this.workers); + } + + return this._workersArrayCache; + } + + /** + * Add worker to the pool + * @param {WorkerWrapper} worker + * @returns {WorkerPool} self + * @public + */ + add(worker) { + // invalidate WorkerPool#getWorkersIds and WorkerPool#getWorkersArray cache + this._workersIdsCache = null; + this._workersArrayCache = null; + + this.workers[worker.wid] = worker; + this._proxyWorkerEvents(worker); + + return this; + } + + /** + * Iterate over workers in the pool. + * @param {Function} fn + * @public + * @returns {WorkerPool} self + * + * @description Shortcut for: + * pool.getWorkersArray().forEach(fn); + */ + forEach(fn) { + this.getWorkersArray() + .forEach(fn); + + return this; + } + + /** + * Repeat WorkerWrapper events on WorkerPool and add 'worker ' prefix to event names + * so for example 'online' became 'worker online' + * @private + * @param {WorkerWrapper} worker + */ + _proxyWorkerEvents(worker) { + WorkerWrapper.EVENTS + .forEach(eventName => { + const proxyEventName = 'worker ' + eventName; + worker.on(eventName, this.emit.bind(this, proxyEventName, worker)); + }); + } + + configure(config) { + this.config = new Configuration(config); + + const // WorkerWrapper options + forkTimeout = this.config.get('control.forkTimeout'), + stopTimeout = this.config.get('control.stopTimeout'), + exitThreshold = this.config.get('control.exitThreshold'), + allowedSequentialDeaths = this.config.get('control.allowedSequentialDeaths'), + + count = this.config.get('workers', os.cpus().length), + isServerPortSet = this.config.has('server.port'), + groups = this.config.get('server.groups', 1), + workersPerGroup = Math.floor(count / groups); + + let port, + // workers and groups count + i = 0, + group = 0, + workersInGroup = 0; + + if (isServerPortSet) { + port = new Port(this.config.get('server.port')); + } + + // create pool of workers + while (count > i++) { + this.add(new WorkerWrapper(this, { + forkTimeout, + stopTimeout, + exitThreshold, + allowedSequentialDeaths, + port: isServerPortSet ? port.next(group) : 0, + maxListeners: this.getMaxListeners(), + })); + + // groups > 1, current group is full and + // last workers can form at least more one group + if (groups > 1 && + ++workersInGroup >= workersPerGroup && + count - (group + 1) * workersPerGroup >= workersPerGroup) { + workersInGroup = 0; + group++; + } + } + + return this; + } + + /** + * @param {Number[]} wids Array of `WorkerWrapper#wid` values + * @param {String} event wait for + * @public + * @returns {Promise} + */ + waitForWorkers(wids, event) { + const pendingWids = new Set(wids); + + return new Promise(resolve => { + if (pendingWids.size === 0) { + resolve(); + } + + const onWorkerState = worker => { + const wid = worker.wid; + if (pendingWids.has(wid)) { + pendingWids.delete(wid); + } + if (pendingWids.size === 0) { + this.removeListener(event, onWorkerState); + resolve(); + } + }; + this.on(event, onWorkerState); + }); + } + + /** + * @param {String} event wait for + * @public + * @returns {Promise} + */ + waitForAllWorkers(event) { + return this.waitForWorkers( + this.getWorkersIds(), + event + ); + } + + /** + * @event WorkerPool#running + */ + + async _run() { + // TODO maybe run this after starting waitForAllWorkers + this.forEach(worker => worker.run()); + + await this.waitForAllWorkers('worker ready'); + + this.emit('running'); + } + + /** + * Fork workers. + * Execution will be delayed until WorkerPool became configured + * (`configured` event fired). + * @method + * @returns {WorkerPool} self + * @public + * @fires WorkerPool#running then workers spawned and ready. + * + * TODO : fix example + * @example + * // file: master.js + * var master = require('luster'); + * + * master + * .configure({ app : 'worker' }) + * .run(); + * + * // there is master is still not running anyway + * // it will run immediate once configured and + * // current thread execution done + */ + run() { + this._run(); + return this; + } + + /** + * @event WorkerPool#restarted + */ + + async _restart() { + // TODO maybe run this after starting waitForAllWorkers + this.forEach(worker => worker.restart()); + + await this.waitForAllWorkers('worker ready'); + + this.emit('restarted'); + } + + /** + * Hard workers restart: all workers will be restarted at same time. + * CAUTION: if dead worker is restarted, it will emit 'error' event. + * @public + * @returns {WorkerPool} self + * @fires WorkerPool#restarted when workers spawned and ready. + */ + restart() { + this._restart(); + return this; + } + + /** + * Workers will be restarted one by one using RestartQueue. + * If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed + * into the queue, it will emit 'error' on restart. + * @public + * @returns {WorkerPool} self + * @fires WorkerPool#restarted when workers spawned and ready. + */ + softRestart() { + return new Promise(resolve => { + this.forEach(worker => worker.softRestart()); + this._restartQueue.once('drain', () => { + this.emit('restarted'); + resolve(); + }); + }); + } + + /** + * Schedules one worker restart using RestartQueue. + * If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed + * into the queue, it will emit 'error' on restart. + * @public + * @param {WorkerWrapper} worker + * @returns {WorkerPool} self + */ + scheduleWorkerRestart(worker) { + this._restartQueue.push(worker); + return this; + } + + /** + * RPC to all workers + * @method + * @param {String} name of called command in the worker + * @param {...*} args + * @public + */ + remoteCallToAll(name, ...args) { + this.forEach(worker => { + if (worker.ready) { + worker.remoteCall(name, ...args); + } else { + worker.on('ready', () => { + worker.remoteCall(name, ...args); + }); + } + }); + } + + /** + * Broadcast event to all workers. + * @method + * @param {String} event of called command in the worker + * @param {...*} args + * @public + */ + broadcastEventToAll(event, ...args) { + this.forEach(worker => { + if (worker.ready) { + worker.broadcastEvent(event, ...args); + } + }); + } + + /** + * Emit event on pool and all workers in "ready" state. + * @method + * @param {String} event of called command in the worker + * @param {...*} args + * @public + */ + emitToAll(event, ...args) { + this.emit(event, ...args); + this.broadcastEventToAll(event, ...args); + } + + /** + * @event WorkerPool#shutdown + */ + + async _shutdown() { + const stoppedWorkers = []; + + this.forEach(worker => { + if (worker.isRunning()) { + worker.stop(); + stoppedWorkers.push(worker.wid); + } + }); + + await this.waitForWorkers( + stoppedWorkers, + 'worker exit', + ); + + this.emit('shutdown'); + } + + /** + * Stop all workers and emit `WorkerPool#shutdown` event after successful shutdown of all workers. + * @fires WorkerPool#shutdown + * @returns {WorkerPool} + */ + shutdown() { + this.removeListener('worker exit', this._checkWorkersAlive); + this._shutdown(); + return this; + } + + /** + * Do a remote call to all workers, callbacks are registered and then executed separately for each worker + * @method + * @param {String} opts.command + * @param {Function} opts.callback + * @param {Number} [opts.timeout] in milliseconds + * @param {*} [opts.data] + * @public + */ + remoteCallToAllWithCallback(opts) { + this.forEach(worker => { + if (worker.isRunning()) { + worker.remoteCallWithCallback(opts); + } + }); + } +} + +module.exports = WorkerPool; diff --git a/lib/worker_wrapper.js b/lib/worker_wrapper.js index 415839c..0c5a035 100644 --- a/lib/worker_wrapper.js +++ b/lib/worker_wrapper.js @@ -66,7 +66,7 @@ class WorkerWrapperOptions { * @constructor * @class WorkerWrapper * @augments EventEmitterEx - * @param {Master} master + * @param {WorkerPool} pool * @param {WorkerWrapperOptions} options * * # Worker wrapper state transitions @@ -75,7 +75,7 @@ class WorkerWrapperOptions { * External events can */ class WorkerWrapper extends EventEmitterEx { - constructor(master, options) { + constructor(pool, options) { super(); if (options && @@ -151,10 +151,10 @@ class WorkerWrapper extends EventEmitterEx { this.ready = false; /** - * @type {Master} + * @type {WorkerPool} * @private */ - this._master = master; + this._pool = pool; /** * Listen for cluster#fork and worker events. @@ -166,7 +166,7 @@ class WorkerWrapper extends EventEmitterEx { this.on('exit', this._onExit.bind(this)); WorkerWrapper._RPC_EVENTS.forEach(event => { - master.on('received worker ' + event, WorkerWrapper.createEventTranslator(event).bind(this)); + pool.master.on('received worker ' + event, this.eventTranslator.bind(this, event)); }); this.on('ready', this._onReady.bind(this)); } @@ -203,6 +203,16 @@ class WorkerWrapper extends EventEmitterEx { return this._state; } + /** + * Pool this WorkerWrapper belongs to + * @memberOf {WorkerWrapper} + * @public + * @readonly + */ + get pool() { + return this._pool; + } + /** * @see WorkerWrapper.STATES for possible `state` argument values * @param {WorkerWrapperState} state @@ -217,12 +227,10 @@ class WorkerWrapper extends EventEmitterEx { this.emit('state', state); } - static createEventTranslator(event) { - return /** @this {WorkerWrapper} */function(worker) { - if (this._worker && worker.id === this._worker.id) { - this.emit(event); - } - }; + eventTranslator(event, worker) { + if (this._worker && worker.id === this._worker.id) { + this.emit(event); + } } _onReady() { @@ -549,11 +557,11 @@ class WorkerWrapper extends EventEmitterEx { } /** - * Adds this worker to master's restart queue + * Adds this worker to pool's restart queue * @public */ softRestart() { - this._master.scheduleWorkerRestart(this); + this._pool.scheduleWorkerRestart(this); } } From c65eaa8120b9dabc91d05cf6a187a8855fe9c6ce Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Wed, 27 Jun 2018 16:38:42 +0300 Subject: [PATCH 02/21] Add workerEnv option --- lib/worker-pool.js | 3 +++ lib/worker_wrapper.js | 5 +++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/worker-pool.js b/lib/worker-pool.js index 5bb70c3..dcb96f7 100644 --- a/lib/worker-pool.js +++ b/lib/worker-pool.js @@ -167,6 +167,8 @@ class WorkerPool extends EventEmitterEx { groups = this.config.get('server.groups', 1), workersPerGroup = Math.floor(count / groups); + const workerEnv = this.config.get('workerEnv'); + let port, // workers and groups count i = 0, @@ -186,6 +188,7 @@ class WorkerPool extends EventEmitterEx { allowedSequentialDeaths, port: isServerPortSet ? port.next(group) : 0, maxListeners: this.getMaxListeners(), + workerEnv })); // groups > 1, current group is full and diff --git a/lib/worker_wrapper.js b/lib/worker_wrapper.js index 0c5a035..eee6981 100644 --- a/lib/worker_wrapper.js +++ b/lib/worker_wrapper.js @@ -35,6 +35,7 @@ class WorkerWrapperOptions { this.exitThreshold = options.exitThreshold; this.allowedSequentialDeaths = options.allowedSequentialDeaths || 0; this.port = options.port; + this.workerEnv = options.workerEnv; } get port() { @@ -389,10 +390,10 @@ class WorkerWrapper extends EventEmitterEx { setImmediate(() => { /** @private */ - this._worker = cluster.fork({ + this._worker = cluster.fork(Object.assign({ port: this.options.port, LUSTER_WID: this.wid, - }); + }, this.options.workerEnv)); /** @private */ this._remoteCall = RPC.createCaller(this._worker); From 17b1d8fb9a311f21cbde2ed572d2c8cd37421700 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Thu, 28 Jun 2018 15:46:18 +0300 Subject: [PATCH 03/21] Add raw getter to Configuration --- lib/configuration/index.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/configuration/index.js b/lib/configuration/index.js index af0b664..bc8bf48 100644 --- a/lib/configuration/index.js +++ b/lib/configuration/index.js @@ -20,6 +20,10 @@ class Configuration { Object.assign(this._rawConfig, config); } + get raw() { + return this._rawConfig; + } + /** * @param {String} path * @param {*} [defaultValue] From 94931900a8ad60553c643b5dd11a345ba3fb100b Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Thu, 28 Jun 2018 15:50:46 +0300 Subject: [PATCH 04/21] Emit 'create pool' event from master to help debugging --- lib/master.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/master.js b/lib/master.js index 1d766fd..6964ff4 100644 --- a/lib/master.js +++ b/lib/master.js @@ -44,6 +44,7 @@ class Master extends ClusterProcess { ); } + this.emit('create pool', key); const pool = new WorkerPool(key, this); this._proxyWorkerEvents(pool); pool.on('shutdown', this._checkPoolsAlive.bind(this)); From 7ac6eb976f0b13382c8afab518986100bc66ab79 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Thu, 28 Jun 2018 15:51:40 +0300 Subject: [PATCH 05/21] Refactor id logging in EventEmitterEx to log eexKey --- lib/event_emitter_ex.js | 4 +++- lib/master.js | 1 + lib/restart_queue.js | 4 +++- lib/worker-pool.js | 3 ++- lib/worker.js | 2 ++ lib/worker_wrapper.js | 1 + 6 files changed, 12 insertions(+), 3 deletions(-) diff --git a/lib/event_emitter_ex.js b/lib/event_emitter_ex.js index 2edd923..29e5853 100644 --- a/lib/event_emitter_ex.js +++ b/lib/event_emitter_ex.js @@ -17,7 +17,9 @@ if (process.env.NODE_DEBUG && /luster:eex/i.test(process.env.NODE_DEBUG)) { EventEmitterEx.prototype.emit = function(...args) { const inspectedArgs = args.map(inspect).join(', '); - console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', this.wid, inspectedArgs); + const key = this.eexKey; + + console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', key, inspectedArgs); return EventEmitter.prototype.emit.apply(this, args); }; diff --git a/lib/master.js b/lib/master.js index 6964ff4..145e49d 100644 --- a/lib/master.js +++ b/lib/master.js @@ -29,6 +29,7 @@ class Master extends ClusterProcess { this.id = 0; this.wid = 0; + this.eexKey = 0; this.pid = process.pid; // @todo make it optional? diff --git a/lib/restart_queue.js b/lib/restart_queue.js index ea8292c..6b56d77 100644 --- a/lib/restart_queue.js +++ b/lib/restart_queue.js @@ -15,7 +15,7 @@ const EventEmitterEx = require('./event_emitter_ex'), * @augments EventEmitterEx */ class RestartQueue extends EventEmitterEx { - constructor() { + constructor(eexKey = undefined) { super(); /** @@ -23,6 +23,8 @@ class RestartQueue extends EventEmitterEx { * @private */ this._queue = []; + + this.eexKey = eexKey; } /** diff --git a/lib/worker-pool.js b/lib/worker-pool.js index dcb96f7..2b88018 100644 --- a/lib/worker-pool.js +++ b/lib/worker-pool.js @@ -13,6 +13,7 @@ class WorkerPool extends EventEmitterEx { super(); this.key = key; + this.eexKey = key; this.master = master; this.config = null; @@ -30,7 +31,7 @@ class WorkerPool extends EventEmitterEx { * @type {RestartQueue} * @private */ - this._restartQueue = new RestartQueue(); + this._restartQueue = new RestartQueue(key); this.dead = false; diff --git a/lib/worker.js b/lib/worker.js index f9fc98a..a4c5b4f 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -15,6 +15,8 @@ class Worker extends ClusterProcess { constructor() { super(); + this.eexKey = wid; + const broadcastEvent = this._broadcastEvent; this._foreignPropertiesReceivedPromise = new Promise(resolve => { diff --git a/lib/worker_wrapper.js b/lib/worker_wrapper.js index eee6981..da198d0 100644 --- a/lib/worker_wrapper.js +++ b/lib/worker_wrapper.js @@ -101,6 +101,7 @@ class WorkerWrapper extends EventEmitterEx { this.options = new WorkerWrapperOptions(options); this._wid = ++nextId; + this.eexKey = this._wid; /** * Indicates worker restarting in progress. From 59167a1e1b90c7bd9b0ddc40fbae8fe8ca377791 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Thu, 28 Jun 2018 15:52:27 +0300 Subject: [PATCH 06/21] Add WorkerPool.onceFirstRunning to ease sync --- lib/worker-pool.js | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/lib/worker-pool.js b/lib/worker-pool.js index 2b88018..737ef72 100644 --- a/lib/worker-pool.js +++ b/lib/worker-pool.js @@ -33,6 +33,10 @@ class WorkerPool extends EventEmitterEx { */ this._restartQueue = new RestartQueue(key); + this._runningPromise = new Promise(resolve => { + this.once('running', resolve); + }); + this.dead = false; this.on('worker state', this._cleanupUnixSockets.bind(this)); @@ -258,6 +262,15 @@ class WorkerPool extends EventEmitterEx { this.emit('running'); } + /** + * Resolves when all workers ready on first run. + * @this {ClusterProcess} + * @returns {Promise} + */ + onceFirstRunning() { + return this._runningPromise; + } + /** * Fork workers. * Execution will be delayed until WorkerPool became configured From 4c36f8cc2d77d03a275da334e776591fa0f31bbc Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Thu, 28 Jun 2018 15:54:13 +0300 Subject: [PATCH 07/21] Wrap require in Worker with try-catch Worker._run in async, so throws in it will result in unhandled Promise rejection instead of unhandled exception, and process wont exit. --- lib/worker.js | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/worker.js b/lib/worker.js index a4c5b4f..248ce41 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -144,7 +144,13 @@ class Worker extends ClusterProcess { const workerBase = this.config.resolve('app'); - require(workerBase); + try { + require(workerBase); + } catch (e) { + console.error(`Worker failed on require ${workerBase}`); + console.error(e); + process.exit(1); + } this.emit('loaded', workerBase); if (!this.config.get('control.triggerReadyStateManually', false)) { From d41a957df9cd5c0cbec56f8e42ed45a6090a1fa3 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Tue, 17 Jul 2018 16:55:43 +0300 Subject: [PATCH 08/21] Enable object spread in eslintrc --- .eslintrc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.eslintrc b/.eslintrc index 72aa182..e794122 100644 --- a/.eslintrc +++ b/.eslintrc @@ -18,6 +18,9 @@ "mocha": true }, "parserOptions": { - "ecmaVersion": 2017 + "ecmaVersion": 2017, + "ecmaFeatures": { + "experimentalObjectRestSpread": true + } } } From 45c439d9697fee68bf4af1a54b5516b72682923e Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Tue, 17 Jul 2018 16:56:15 +0300 Subject: [PATCH 09/21] Add extend method to Configuration instance --- lib/configuration/index.js | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/configuration/index.js b/lib/configuration/index.js index bc8bf48..144f55a 100644 --- a/lib/configuration/index.js +++ b/lib/configuration/index.js @@ -97,6 +97,16 @@ class Configuration { config; } + extend(config) { + return new Configuration( + { + ...this.raw, + ...config, + }, + this._resolveBaseDir, + ); + } + /** * Override config properties using `LUSTER_CONF` environment variable. * From 2d0f8be3a2508d9a77189b60ca4794639f3370fc Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Wed, 18 Jul 2018 15:57:00 +0300 Subject: [PATCH 10/21] Refactor LusterInstance helper using delay and p-event --- package.json | 2 + test/func/helpers/luster_instance.js | 61 ++++++++++++---------------- 2 files changed, 27 insertions(+), 36 deletions(-) diff --git a/package.json b/package.json index 6f768e3..73cbe18 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,8 @@ } ], "dependencies": { + "delay": "^3.0.0", + "p-event": "^2.1.0", "terror": "^1.0.0" }, "devDependencies": { diff --git a/test/func/helpers/luster_instance.js b/test/func/helpers/luster_instance.js index c559564..95dd08e 100644 --- a/test/func/helpers/luster_instance.js +++ b/test/func/helpers/luster_instance.js @@ -63,6 +63,9 @@ const fork = require('child_process').fork, path = require('path'); +const delay = require('delay'); +const pEvent = require('p-event'); + /** * A wrapper for `ChildProcess` * @class LusterInstance @@ -94,7 +97,7 @@ class LusterInstance { * @param {boolean} [pipeStderr] * @returns {Promise} */ - static run(name, env, pipeStderr) { + static async run(name, env, pipeStderr) { if (typeof(env) === 'boolean') { pipeStderr = env; } @@ -103,15 +106,12 @@ class LusterInstance { // Promise is resolved when master process replies to ping // Promise is rejected if master was unable to reply to ping within 1 second - return new Promise((resolve, reject) => { - instance.once('message', message => { - if (message === 'ready') { - resolve(res); - } else { - reject(new Error('First message from master should be "ready", got "' + message + '" instead')); - } - }); - }); + const message = await pEvent(instance, 'message'); + if (message === 'ready') { + return res; + } else { + throw new Error('First message from master should be "ready", got "' + message + '" instead'); + } } /** @@ -120,11 +120,9 @@ class LusterInstance { * @param {Number} timeout * @returns {Promise} */ - sendWaitTimeout(message, timeout) { - return new Promise(resolve => { - this._process.send(message); - setTimeout(resolve, timeout); - }); + async sendWaitTimeout(message, timeout) { + this._process.send(message); + await delay(timeout); } /** @@ -134,17 +132,13 @@ class LusterInstance { * @param {String} expectedAnswer * @returns {Promise} */ - sendWaitAnswer(message, expectedAnswer) { - return new Promise((resolve, reject) => { - this._process.send(message); - this._process.once('message', answer => { - if (answer === expectedAnswer) { - resolve(); - } else { - reject('Expected master to send "' + expectedAnswer + '", got "' + answer + '" instead'); - } - }); - }); + async sendWaitAnswer(message, expectedAnswer) { + const p = pEvent(this._process, 'message'); + this._process.send(message); + const answer = await p; + if (answer !== expectedAnswer) { + throw new Error('Expected master to send "' + expectedAnswer + '", got "' + answer + '" instead'); + } } /** @@ -153,16 +147,11 @@ class LusterInstance { * @param {String} expectedAnswer * @returns {Promise} */ - waitAnswer(expectedAnswer) { - return new Promise((resolve, reject) => { - this._process.once('message', answer => { - if (answer === expectedAnswer) { - resolve(); - } else { - reject('Expected master to send "' + expectedAnswer + '", got "' + answer + '" instead'); - } - }); - }); + async waitAnswer(expectedAnswer) { + const answer = await pEvent(this._process, 'message'); + if (answer !== expectedAnswer) { + throw new Error('Expected master to send "' + expectedAnswer + '", got "' + answer + '" instead'); + } } /** From 100208c2d79f43971723bc99b138505d908ad201 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Wed, 18 Jul 2018 15:58:44 +0300 Subject: [PATCH 11/21] Refactor WorkerPool.softRestart using p-event --- lib/worker-pool.js | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/worker-pool.js b/lib/worker-pool.js index 737ef72..5160b00 100644 --- a/lib/worker-pool.js +++ b/lib/worker-pool.js @@ -2,6 +2,8 @@ const os = require('os'); +const pEvent = require('p-event'); + const Configuration = require('./configuration'); const EventEmitterEx = require('./event_emitter_ex'); const Port = require('./port'); @@ -331,14 +333,11 @@ class WorkerPool extends EventEmitterEx { * @returns {WorkerPool} self * @fires WorkerPool#restarted when workers spawned and ready. */ - softRestart() { - return new Promise(resolve => { - this.forEach(worker => worker.softRestart()); - this._restartQueue.once('drain', () => { - this.emit('restarted'); - resolve(); - }); - }); + async softRestart() { + const p = pEvent(this._restartQueue, 'drain'); + this.forEach(worker => worker.softRestart()); + await p; + this.emit('restarted'); } /** From 2a02f11a9abc73878ea7bca332c4e9c38b46081b Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Wed, 18 Jul 2018 16:00:40 +0300 Subject: [PATCH 12/21] Add double shutdown test --- test/func/fixtures/double_shutdown/master.js | 27 +++++++++++++++++++ .../double_shutdown/node_modules/luster | 1 + test/func/fixtures/double_shutdown/worker.js | 2 ++ test/func/helpers/luster_instance.js | 10 +++++++ test/func/test/double_shutdown.js | 26 ++++++++++++++++++ 5 files changed, 66 insertions(+) create mode 100644 test/func/fixtures/double_shutdown/master.js create mode 120000 test/func/fixtures/double_shutdown/node_modules/luster create mode 100644 test/func/fixtures/double_shutdown/worker.js create mode 100644 test/func/test/double_shutdown.js diff --git a/test/func/fixtures/double_shutdown/master.js b/test/func/fixtures/double_shutdown/master.js new file mode 100644 index 0000000..d0a7f6d --- /dev/null +++ b/test/func/fixtures/double_shutdown/master.js @@ -0,0 +1,27 @@ +const proc = require('luster'); + +proc + .configure({ + app: 'worker.js', + workers: 1, + control: { + stopTimeout: 100 + } + }, true, __dirname) + .run(); + +if (proc.isMaster) { + proc.once('running', () => process.send('ready')); + proc.on('shutdown', () => console.log('shutdown')); + proc.on('shutdown', () => process.connected && process.disconnect()); + + process.on('message', command => { + switch (command) { + case 'shutdown': + proc.shutdown(); + break; + default: + throw new Error('Unknown command ' + command); + } + }); +} diff --git a/test/func/fixtures/double_shutdown/node_modules/luster b/test/func/fixtures/double_shutdown/node_modules/luster new file mode 120000 index 0000000..5930783 --- /dev/null +++ b/test/func/fixtures/double_shutdown/node_modules/luster @@ -0,0 +1 @@ +../../../../.. \ No newline at end of file diff --git a/test/func/fixtures/double_shutdown/worker.js b/test/func/fixtures/double_shutdown/worker.js new file mode 100644 index 0000000..e690920 --- /dev/null +++ b/test/func/fixtures/double_shutdown/worker.js @@ -0,0 +1,2 @@ +// Do not let worker quit +setTimeout(() => {}, 10000000000); diff --git a/test/func/helpers/luster_instance.js b/test/func/helpers/luster_instance.js index 95dd08e..057ff70 100644 --- a/test/func/helpers/luster_instance.js +++ b/test/func/helpers/luster_instance.js @@ -88,6 +88,8 @@ class LusterInstance { if (pipeStderr) { this._process.stderr.pipe(process.stderr, {end: false}); } + + this._exited = pEvent(this._process, 'exit'); } /** @@ -114,6 +116,14 @@ class LusterInstance { } } + get exited() { + return this._exited; + } + + send(message) { + this._process.send(message); + } + /** * Sends message to master instance, resolves after timeout * @param {String} message diff --git a/test/func/test/double_shutdown.js b/test/func/test/double_shutdown.js new file mode 100644 index 0000000..fea32c6 --- /dev/null +++ b/test/func/test/double_shutdown.js @@ -0,0 +1,26 @@ +/* globals describe,it,before,after,assert */ +'use strict'; + +const LusterInstance = require('../helpers/luster_instance'); + +describe('shutdown event', () => { + let instance; + + beforeEach(async () => { + instance = await LusterInstance + .run('../fixtures/double_shutdown/master.js'); + }); + + it('should be emitted only once for shutdown method', async () => { + instance.send('shutdown'); + await instance.exited; + assert.equal(instance.output(), 'shutdown\n'); + }); + + afterEach(() => { + if (instance) { + instance.kill(); + instance = null; + } + }); +}); From a2f24ee62d8aaa172e8882ed47d2442b3d98f8b5 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Wed, 18 Jul 2018 16:11:31 +0300 Subject: [PATCH 13/21] Allow paaing Configuration instance to WorkerPool.configure --- lib/worker-pool.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/worker-pool.js b/lib/worker-pool.js index 5160b00..5b7addb 100644 --- a/lib/worker-pool.js +++ b/lib/worker-pool.js @@ -161,7 +161,10 @@ class WorkerPool extends EventEmitterEx { } configure(config) { - this.config = new Configuration(config); + if (! (config instanceof Configuration)) { + config = new Configuration(config); + } + this.config = config; const // WorkerWrapper options forkTimeout = this.config.get('control.forkTimeout'), From b2622b6bc264cf3efc66f2d43ef5e9ceeac68728 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Wed, 18 Jul 2018 16:48:48 +0300 Subject: [PATCH 14/21] Use p-event in ClusterProcess, WorkerPool and Worker --- lib/cluster_process.js | 6 +++--- lib/worker-pool.js | 4 +--- lib/worker.js | 8 +++----- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/lib/cluster_process.js b/lib/cluster_process.js index d51c23f..8c9e241 100644 --- a/lib/cluster_process.js +++ b/lib/cluster_process.js @@ -7,6 +7,8 @@ const cluster = require('cluster'), LusterClusterProcessError = require('./errors').LusterClusterProcessError, LusterConfigurationError = require('./errors').LusterConfigurationError; +const pEvent = require('p-event'); + /** * @param {Object} context * @param {String} propName @@ -59,9 +61,7 @@ class ClusterProcess extends EventEmitterEx { * @type Promise * @private * */ - this._initPromise = new Promise(resolve => { - this.once('initialized', resolve); - }); + this._initPromise = pEvent(this, 'initialized'); /** * @type {Configuration} diff --git a/lib/worker-pool.js b/lib/worker-pool.js index 5b7addb..d6724fc 100644 --- a/lib/worker-pool.js +++ b/lib/worker-pool.js @@ -35,9 +35,7 @@ class WorkerPool extends EventEmitterEx { */ this._restartQueue = new RestartQueue(key); - this._runningPromise = new Promise(resolve => { - this.once('running', resolve); - }); + this._runningPromise = pEvent(this, 'running'); this.dead = false; diff --git a/lib/worker.js b/lib/worker.js index 248ce41..8d961a6 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -4,6 +4,8 @@ const cluster = require('cluster'), ClusterProcess = require('./cluster_process'), LusterWorkerError = require('./errors').LusterWorkerError; +const pEvent = require('p-event'); + const wid = parseInt(process.env.LUSTER_WID, 10); /** @@ -19,11 +21,7 @@ class Worker extends ClusterProcess { const broadcastEvent = this._broadcastEvent; - this._foreignPropertiesReceivedPromise = new Promise(resolve => { - this.once('foreign properties received', () => { - resolve(); - }); - }); + this._foreignPropertiesReceivedPromise = pEvent(this, 'foreign properties received'); this.on('configured', broadcastEvent.bind(this, 'configured')); this.on('extension loaded', broadcastEvent.bind(this, 'extension loaded')); From a8a1054d26e530df31148b945c0193da2beacdf0 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Wed, 18 Jul 2018 17:51:05 +0300 Subject: [PATCH 15/21] Add chai-as-promised, fix unhandled rejection in ClusterProcess unit test --- package.json | 1 + test/setup.js | 1 + test/unit/test/cluster_process.js | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 73cbe18..3934fef 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,7 @@ }, "devDependencies": { "chai": "^3.5.0", + "chai-as-promised": "^7.1.1", "eslint": "^4.19.1", "eslint-config-nodules": "^0.4.0", "istanbul": "^0.4.1", diff --git a/test/setup.js b/test/setup.js index 9ab99cc..17bba54 100644 --- a/test/setup.js +++ b/test/setup.js @@ -6,5 +6,6 @@ global.sinon = require('sinon'); global.assert = chai.assert; chai.use(require('sinon-chai')); +chai.use(require('chai-as-promised')); sinon.assert.expose(chai.assert, { prefix: '' }); diff --git a/test/unit/test/cluster_process.js b/test/unit/test/cluster_process.js index 793a556..af8f9bd 100644 --- a/test/unit/test/cluster_process.js +++ b/test/unit/test/cluster_process.js @@ -38,13 +38,14 @@ describe('ClusterProcess', () => { assert.calledOnce(spy); }); - it('should emit "error" event for malformed config', () => { + it('should emit "error" event for malformed config', async () => { const spy = sandbox.spy(); clusterProcess.on('error', spy); clusterProcess.configure({}); assert.calledOnce(spy); + await assert.isRejected(clusterProcess.whenInitialized()); }); it('should not apply env config if overriding is explicitly turned off', () => { From 7fcedb5c22595e1737b803ef681ca07a9f265158 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Wed, 25 Jul 2018 16:47:09 +0300 Subject: [PATCH 16/21] Release 3.0.0-alpha.0 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 3934fef..c83ac96 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "luster", - "version": "2.0.1", + "version": "3.0.0-alpha.0", "description": "Node.js cluster wrapper", "main": "./lib/luster.js", "bin": { From d0d6c92323ccdf8787238ca13024be31e5f8d1b0 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Fri, 3 Aug 2018 13:47:55 +0300 Subject: [PATCH 17/21] Fix emitToAll not emitting from Master, make emitToAll test check this --- lib/master.js | 1 + test/func/fixtures/emit_to_all/master.js | 3 +++ test/func/test/emit_to_all.js | 4 ++-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/master.js b/lib/master.js index 145e49d..2888218 100644 --- a/lib/master.js +++ b/lib/master.js @@ -293,6 +293,7 @@ class Master extends ClusterProcess { * @public */ emitToAll(event, ...args) { + this.emit(event, ...args); this.forEachPool(pool => pool.emitToAll(event, ...args)); } diff --git a/test/func/fixtures/emit_to_all/master.js b/test/func/fixtures/emit_to_all/master.js index 75eba94..2e294d9 100644 --- a/test/func/fixtures/emit_to_all/master.js +++ b/test/func/fixtures/emit_to_all/master.js @@ -15,4 +15,7 @@ if (proc.isMaster) { process.send('ready'); proc.emitToAll('log', 'test'); }); + proc.on('log', msg => { + console.log(msg); + }); } diff --git a/test/func/test/emit_to_all.js b/test/func/test/emit_to_all.js index f9e56f5..2396d0b 100644 --- a/test/func/test/emit_to_all.js +++ b/test/func/test/emit_to_all.js @@ -11,9 +11,9 @@ describe('emitToAll', () => { .run('../fixtures/emit_to_all/master.js'); }); - it('should deliver message data to all workers', done => { + it('should deliver message data to all workers and master', done => { setTimeout(() => { - assert.equal(instance.output(), 'test\ntest\n'); + assert.equal(instance.output(), 'test\ntest\ntest\n'); done(); }, 100); }); From 5394d2657e6b98585fb2cafacd9677c0db1fed41 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Fri, 3 Aug 2018 13:48:49 +0300 Subject: [PATCH 18/21] Release 3.0.0-alpha.1 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index c83ac96..e21c4d6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "luster", - "version": "3.0.0-alpha.0", + "version": "3.0.0-alpha.1", "description": "Node.js cluster wrapper", "main": "./lib/luster.js", "bin": { From 5049bea81553c0c56d51dda5b994eae2d818f06c Mon Sep 17 00:00:00 2001 From: Danil Kolesnikov Date: Thu, 17 Jan 2019 18:46:34 +0300 Subject: [PATCH 19/21] feature: ability to cleanup pools & workers --- lib/cluster_process.js | 13 +++++++++---- lib/errors.js | 11 +++++++++++ lib/master.js | 43 +++++++++++++++++++++++++++++++++++++++--- lib/worker-pool.js | 16 ++++++++++++++++ 4 files changed, 76 insertions(+), 7 deletions(-) diff --git a/lib/cluster_process.js b/lib/cluster_process.js index 8c9e241..0511148 100644 --- a/lib/cluster_process.js +++ b/lib/cluster_process.js @@ -110,6 +110,15 @@ class ClusterProcess extends EventEmitterEx { * @public */ configure(config, applyEnv, basedir) { + this._setConfig(config, applyEnv, basedir); + if (this.config) { + this.emit('configured'); + } + + return this; + } + + _setConfig(config, applyEnv, basedir) { if (typeof applyEnv === 'undefined' || applyEnv) { Configuration.applyEnvironment(config); } @@ -128,11 +137,7 @@ class ClusterProcess extends EventEmitterEx { // hack to tweak underlying EventEmitter max listeners // if your luster-based app extensively use luster events this.setMaxListeners(this.config.get('maxEventListeners', 100)); - - this.emit('configured'); } - - return this; } /** diff --git a/lib/errors.js b/lib/errors.js index d4bb163..3417858 100644 --- a/lib/errors.js +++ b/lib/errors.js @@ -105,4 +105,15 @@ errors.LusterMasterError = LusterError.create('LusterMasterError', 'Pool key "%key%" is already taken' }); +/** + * @constructor + * @class LusterMasterError + * @augments LusterError + */ +errors.LusterMasterError = LusterError.create('LusterMasterError', + { + POOL_DOES_NOT_EXIST: + 'Pool with key "%key%" does not exist' + }); + module.exports = errors; diff --git a/lib/master.js b/lib/master.js index 2888218..e628aec 100644 --- a/lib/master.js +++ b/lib/master.js @@ -25,7 +25,6 @@ class Master extends ClusterProcess { this._masterOpts = {}; this.pools = new Map(); - this.createPool(DEFAULT_POOL_KEY); this.id = 0; this.wid = 0; @@ -37,6 +36,10 @@ class Master extends ClusterProcess { process.on('SIGQUIT', this._onSignalQuit.bind(this)); } + /** + * @param {*} key + * @returns {WorkerPool} + */ createPool(key) { if (this.pools.has(key)) { throw LusterMasterError.createError( @@ -53,10 +56,32 @@ class Master extends ClusterProcess { return pool; } + /** + * @param {*} key + * @returns {WorkerPool} + */ + removePool(key) { + const pool = this.pools.get(key); + if (!pool) { + throw LusterMasterError.createError( + LusterMasterError.CODES.POOL_DOES_NOT_EXIST, + {key} + ); + } + + this.emit('remove pool', key); + this.pools.delete(key); + return pool; + } + getPool(key) { return this.pools.get(key); } + getDefaultPool() { + return this.getPool(DEFAULT_POOL_KEY); + } + /** * Allows same object structure as cluster.setupMaster(). * This function must be used instead of cluster.setupMaster(), @@ -156,6 +181,20 @@ class Master extends ClusterProcess { this.emit('received worker ' + event, worker, ...args); } + configure(...args) { + this._setConfig(...args); + + if (this.config) { + if (this.config.get('useDefaultPool', true)) { + this.createPool(DEFAULT_POOL_KEY).configure(this.config); + } + + this.emit('configured'); + } + + return this; + } + /** * Configure cluster * @override ClusterProcess @@ -171,8 +210,6 @@ class Master extends ClusterProcess { this.broadcastWorkerEvent.bind(this) ); } - - this.pools.get(DEFAULT_POOL_KEY).configure(this.config); } /** diff --git a/lib/worker-pool.js b/lib/worker-pool.js index d6724fc..364a135 100644 --- a/lib/worker-pool.js +++ b/lib/worker-pool.js @@ -128,6 +128,22 @@ class WorkerPool extends EventEmitterEx { return this; } + /** + * Remove worker from the pool + * @param {WorkerWrapper} worker + * @returns {WorkerPool} self + * @public + */ + remove(worker) { + // invalidate WorkerPool#getWorkersIds and WorkerPool#getWorkersArray cache + this._workersIdsCache = null; + this._workersArrayCache = null; + + delete this.workers[worker.wid]; + + return this; + } + /** * Iterate over workers in the pool. * @param {Function} fn From 41131ecfb420d90fd5d8896f5fe1a84c4eadf846 Mon Sep 17 00:00:00 2001 From: "Ksenia V. Mamich" Date: Fri, 10 Aug 2018 17:34:27 +0300 Subject: [PATCH 20/21] Config path is next argument after luster filename --- bin/luster.js | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bin/luster.js b/bin/luster.js index 680cd65..8a70983 100755 --- a/bin/luster.js +++ b/bin/luster.js @@ -1,7 +1,10 @@ #!/usr/bin/env node const /** @type {ClusterProcess} */ luster = require('../lib/luster'), - path = require('path'), - configFilePath = path.resolve(process.cwd(), process.argv[2] || 'luster.conf'); + path = require('path'); + +// config path is right after this script in process.argv +const scriptArgvIndex = process.argv.findIndex(arg => arg === __filename || path.resolve(arg) === __filename); +const configFilePath = path.resolve(process.cwd(), process.argv[scriptArgvIndex + 1] || 'luster.conf'); luster.configure(require(configFilePath), true, path.dirname(configFilePath)).run(); From 9fa9b16e38c740088cae18b0e3b4d283e4dd665c Mon Sep 17 00:00:00 2001 From: Danil Kolesnikov Date: Tue, 12 Feb 2019 19:27:50 +0300 Subject: [PATCH 21/21] 3.0.0-alpha.3 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index e21c4d6..4ea3cbf 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "luster", - "version": "3.0.0-alpha.1", + "version": "3.0.0-alpha.3", "description": "Node.js cluster wrapper", "main": "./lib/luster.js", "bin": {