Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ npm-debug.log
node_modules
node_modules/*
/node_modules

.vscode/
src/amqp_BAK
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- Reduce usage of `var`
- Use timeout instead of interval for resend messages to clients
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "activestack-gateway",
"version": "0.2.5",
"version": "0.2.7",
"release_date": "2016-05-03",
"description": "Socket server for ActiveStack",
"main": "src/main.js",
Expand Down
4 changes: 4 additions & 0 deletions resources/env.default.properties
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ gateway.rabbitmq.port=5672
gateway.rabbitmq.login=guest
gateway.rabbitmq.password=guest
gateway.rabbitmq.durable=false

frontend.clientMessageResendTimeout=7500
frontend.clientMessageResendBackoff=1.5
frontend.clientMessageResendAttempts=7
10 changes: 4 additions & 6 deletions src/console.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
'use strict';

var redis = require("redis");
const redis = require("redis");

function GatewayConsoleApplication(){}
module.exports = GatewayConsoleApplication;

GatewayConsoleApplication.prototype.run = function(configFile) {
configFile = configFile || __dirname + '/../resources/env.default.properties';
var properties = require('node-properties-parser').readSync(configFile);
const properties = require('node-properties-parser').readSync(configFile);

var gatewayControlQueue = properties['gateway.redis.gatewaycontrolqueue'];
let gatewayControlQueue = properties['gateway.redis.gatewaycontrolqueue'];
if (!gatewayControlQueue)
gatewayControlQueue = 'gateway';

var client = redis.createClient(properties['gateway.redis.port'], properties['gateway.redis.host']);
const client = redis.createClient(properties['gateway.redis.port'], properties['gateway.redis.host']);
if (properties['gateway.redis.password']) {
client.auth(properties['gateway.redis.password'], function (error, result) {
if (error)
Expand All @@ -29,10 +29,8 @@ GatewayConsoleApplication.prototype.run = function(configFile) {

process.stdin.resume();
process.stdin.setEncoding('utf8');
var util = require('util');

process.stdin.on('data', function (text) {
//console.log('received data:', util.inspect(text));
if (text === 'quit\n') {
done();
}
Expand Down
58 changes: 29 additions & 29 deletions src/server.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

var cluster = require('cluster'),
const cluster = require('cluster'),
os = require('os'),
redis = require('redis'),
fs = require('fs'),
Expand All @@ -25,7 +25,7 @@ GatewayServer.prototype.inject = function(prefixedLogger, properties, gatewayWor

GatewayServer.prototype.start = function(){

var cpuCount = os.cpus().length;
const cpuCount = os.cpus().length;
this.workerCount = this.properties['cluster.workerCount'] || cpuCount / 2 + 1;

if(cluster.isMaster){
Expand All @@ -52,7 +52,7 @@ GatewayServer.prototype.isMaster = function(){
GatewayServer.prototype.startMaster = function(){
this.setupIPC();

for (var i = 0; i < this.workerCount; ++i) {
for (let i = 0; i < this.workerCount; ++i) {
this.createWorkerProcess();
}
};
Expand All @@ -62,11 +62,11 @@ GatewayServer.prototype.startMaster = function(){
*/
GatewayServer.prototype.setupIPC = function(){
try {
var gatewayControlQueue = this.properties['gateway.redis.gatewaycontrolqueue'];
let gatewayControlQueue = this.properties['gateway.redis.gatewaycontrolqueue'];
if (!gatewayControlQueue)
gatewayControlQueue = 'gateway';

var client = redis.createClient(this.properties['gateway.redis.port'], this.properties['gateway.redis.host']);
const client = redis.createClient(this.properties['gateway.redis.port'], this.properties['gateway.redis.host']);
if (this.properties['gateway.redis.password']) {
client.auth(this.properties['gateway.redis.password'], function(error, result) {
if (error)
Expand All @@ -93,8 +93,8 @@ GatewayServer.prototype.setupIPC = function(){
*/
GatewayServer.prototype.onIPCMessage = function (channel, message) {
try {
var messageName = message.toLowerCase().trim();
var params = message.toLowerCase().trim().split(' ');
const messageName = message.toLowerCase().trim();
const params = message.toLowerCase().trim().split(' ');

// TODO: Add workerCount message
switch (messageName) {
Expand Down Expand Up @@ -129,7 +129,7 @@ GatewayServer.prototype.onIPCMessage = function (channel, message) {
GatewayServer.prototype.onIPCShutdown = function(params){
this.logger.info('Processing SHUTDOWN message');
this.shuttingDown = true;
var stopCode;
let stopCode;
if (params.length > 1) {
stopCode = params[1];
}
Expand All @@ -138,11 +138,11 @@ GatewayServer.prototype.onIPCShutdown = function(params){
this.logger.info('\n************************ INVALID SHUTDOWN CODE RECEIVED!!! ************************\n');
return;
}
var stopType = 'immediate';
let stopType = 'immediate';
if (params.length > 2) {
stopType = params[2];
}
var stopTimeout;
let stopTimeout;
if (params.length > 3)
stopTimeout = parseInt(params[3]);
this.stopWorkerProcesses(stopType, stopTimeout);
Expand All @@ -154,11 +154,11 @@ GatewayServer.prototype.onIPCShutdown = function(params){
*/
GatewayServer.prototype.onIPCRestart = function(params){
this.logger.info('Processing RESTART message');
var restartType = 'immediate';
let restartType = 'immediate';
if (params.length > 1) {
restartType = params[1];
}
var restartInterval;
let restartInterval;
if (params.length > 2)
restartInterval = parseInt(params[2]);
this.restartWorkerProcesses(restartType, restartInterval);
Expand All @@ -179,7 +179,7 @@ GatewayServer.prototype.onIPCClientCount = function(){
GatewayServer.prototype.onIPCClientMessageResendInterval = function(params){
this.logger.info('Processing CLIENTMESSAGERESENDINTERVAL message');

var clientMessageResendInterval = this.properties['frontend.clientMessageResendInterval'];
let clientMessageResendInterval = this.properties['frontend.clientMessageResendInterval'];
if (params.length > 1) {
clientMessageResendInterval = params[1];
}
Expand All @@ -192,7 +192,7 @@ GatewayServer.prototype.onIPCClientMessageResendInterval = function(params){
*/
GatewayServer.prototype.onIPCLogLevel = function(params){
this.logger.info('Processing LOGLEVEL message');
var logLevel = this.properties['frontend.logLevel'];
let logLevel = this.properties['frontend.logLevel'];
if (params.length > 1) {
logLevel = params[1];
}
Expand All @@ -204,29 +204,29 @@ GatewayServer.prototype.startWorker = function(){
};

GatewayServer.prototype.restartWorkerProcesses = function(restartType, restartInterval) {
var oldWorkers = this.workers;
const oldWorkers = this.workers;
this.workers = [];
this.logger.info('Restarting processes - ' + restartType.toUpperCase());

if (restartType.toLowerCase() === 'immediate') {
if (!restartInterval)
restartInterval = 500;
this.logger.info('Restarting processes at ' + restartInterval + 'ms intervals');
for(var j=0; j<oldWorkers.length; j++) {
for(let j=0; j<oldWorkers.length; j++) {
// Force it to stop. When the onEnd function runs it will restart it
this.hardStopWorkerProcess(oldWorkers[j],j * restartInterval);
}
}
else {
for(var j=0; j<oldWorkers.length; j++) {
for(let j=0; j<oldWorkers.length; j++) {
this.logger.info('sending timeout ' + restartInterval);
oldWorkers[j].send({cmd: 'restart', type: 'on_client_queue_empty', data: restartInterval});
}
}
}

GatewayServer.prototype.hardStopWorkerProcess = function(worker, delay){
var hardStop = function(){
const hardStop = function(){
try {
worker.disconnect();
worker.destroy();
Expand All @@ -242,7 +242,7 @@ GatewayServer.prototype.hardStopWorkerProcess = function(worker, delay){
};

GatewayServer.prototype.stopAndRemoveWorkerProcess = function(worker, delay){
for(var i = 0; i < this.workers.length; i++) {
for(let i = 0; i < this.workers.length; i++) {
if (this.workers[i] === worker) {
this.workers.splice(i, 1);
break;
Expand All @@ -252,7 +252,7 @@ GatewayServer.prototype.stopAndRemoveWorkerProcess = function(worker, delay){
};

GatewayServer.prototype.sendMessageToWorkers = function(message){
for(var j = 0; j < this.workers.length; j++) {
for(let j = 0; j < this.workers.length; j++) {
try {
if (this.workers[j]) {
this.workers[j].send(message);
Expand All @@ -267,7 +267,7 @@ GatewayServer.prototype.stopWorkerProcesses = function(stopType, stopTimeout) {
this.logger.info('Stopping processes - ' + stopType.toUpperCase());

if (stopType.toLowerCase() === 'immediate') {
for(var j = 0; j < this.workers.length; j++) {
for(let j = 0; j < this.workers.length; j++) {
this.hardStopWorkerProcess(this.workers[j]);
}
process.exit();
Expand Down Expand Up @@ -297,16 +297,16 @@ GatewayServer.prototype.resetWorkerRestartDelay = function() {
};

GatewayServer.prototype.getNextWorkerRestartDelay = function() {
var failureCount = Math.min(this.consecutiveFailures++, this.properties['cluster.maxRestartBackoff']);
var factor = Math.random() * (Math.pow(2, failureCount) - 1);
const failureCount = Math.min(this.consecutiveFailures++, this.properties['cluster.maxRestartBackoff']);
const factor = Math.random() * (Math.pow(2, failureCount) - 1);
return factor * this.properties['cluster.workerRestartDelay'];
};

GatewayServer.prototype.onWorkerProcessOnline = function(worker) {
this.logger.info('Worker online: ' + worker.process.pid);
worker.lastHeartbeat = Date.now();
worker.watchdog = setInterval(function() {
var time = Date.now();
const time = Date.now();
if (worker.lastHeartbeat + this.properties['cluster.workerTimeout'] < time) {
this.logger.info('Worker heartbeat stopped, destroying worker: ' + worker.process.pid);

Expand All @@ -322,9 +322,9 @@ GatewayServer.prototype.onWorkerProcessMessageHeartbeat = function(worker, messa
}

worker.lastHeartbeat = Date.now();
for (var type in message.memory) {
var limit = this.properties['cluster.memoryLimit.' + type];
var warning = this.properties['cluster.memoryWarning.' + type];
for (let type in message.memory) {
const limit = this.properties['cluster.memoryLimit.' + type];
const warning = this.properties['cluster.memoryWarning.' + type];
if (limit && (message.memory[type] > limit * MEGABYTE)) {
this.logger.info('Worker exceeded hard ' + type + ' memory limit (' +
message.memory[type] + '/' + limit * MEGABYTE + ')!');
Expand Down Expand Up @@ -393,7 +393,7 @@ GatewayServer.prototype.onWorkerProcessExit = function(worker, code, signal) {
(signal || 'none') + '). Starting replacement...');
clearInterval(worker.watchdog);

for(var i = 0; i < this.workers.length; i++) {
for(let i = 0; i < this.workers.length; i++) {
if (this.workers[i] === worker) {
this.workers.splice(i, 1);
break;
Expand All @@ -420,7 +420,7 @@ GatewayServer.prototype.createWorkerProcess = function(forceCreate) {
}
}

var worker = cluster.fork();
const worker = cluster.fork();
worker.lastHeartbeat = null;
worker.watchdog = null;
worker.gotFirstHeartbeat = false;
Expand Down
6 changes: 3 additions & 3 deletions src/server_application.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
var GatewayServer = require('./server'),
const GatewayServer = require('./server'),
AppContext = require('injecterooski').AppContext,
PrefixedLogger = require('./logging/prefixed_logger'),
HttpServerFactory = require('./factory/http_server_factory'),
Expand All @@ -14,9 +14,9 @@ var GatewayServer = require('./server'),
function GatewayServerApplication(){}

GatewayServerApplication.prototype.run = function(configFile){
var appContext = new AppContext();
const appContext = new AppContext();

var server = new GatewayServer();
const server = new GatewayServer();

appContext.register([
server,
Expand Down
Loading