diff --git a/package.json b/package.json index e392853..e730df0 100644 --- a/package.json +++ b/package.json @@ -1,17 +1,17 @@ { "name": "activestack-gateway", - "version": "0.3.0", - "release_date": "2015-09-24", + "version": "0.2.1", + "release_date": "2016-04-28", "description": "Socket server for ActiveStack", "main": "src/main.js", "dependencies": { "amqp": "^0.1.8", "injecterooski": "*", - "properties-parser": "^0.3.0", + "node-properties-parser": "^0.0.2", "redis": "^0.12.1", - "socket.io": "^1.3.7", - "socket.io-client": "^1.3.7", - "socket.io-redis": "^0.1.4", + "socket.io": "git://github.com/curb47/socket.io.git", + "socket.io-client": "^0.9.9", + "webkit-devtools-agent": "^0.3.1", "winston": "^0.6.2" }, "devDependencies": { diff --git a/resources/env.default.properties b/resources/env.default.properties index 8ab3705..920c60c 100644 --- a/resources/env.default.properties +++ b/resources/env.default.properties @@ -7,7 +7,7 @@ cluster.memoryLimit.rss=2048 cluster.memoryLimit.heapUsed=100 cluster.workerRestartDelay=50 cluster.maxRestartBackoff=10 -cluster.workerCount=2 +cluster.workerCount=1 frontend.logLevel=info frontend.shutdownCode=pass123 diff --git a/src/config/properties.js b/src/config/properties.js index f8c6496..676315a 100644 --- a/src/config/properties.js +++ b/src/config/properties.js @@ -1,5 +1,4 @@ -var propParser = require('properties-parser'), - fs = require('fs'); +propParser = require('node-properties-parser'); function Properties(fileName) { this.fileName = fileName; @@ -12,13 +11,13 @@ Properties.prototype.inject = function(prefixedLogger){ var properties; if(this.fileName && this.fileName !== '') try{ - properties = propParser.parse(fs.readFileSync(this.fileName)); + properties = propParser.readSync(this.fileName); }catch(e){ console.log('Could not load properties file from '+this.fileName); } if(!properties) - properties = propParser.parse(fs.readFileSync(__dirname + '/../../resources/env.default.properties')); + properties = propParser.readSync(__dirname + '/../../resources/env.default.properties'); for(var key in properties){ this[key] = properties[key]; diff --git a/src/factory/redis_store_factory.js b/src/factory/redis_store_factory.js index 1cd75e8..082fbab 100644 --- a/src/factory/redis_store_factory.js +++ b/src/factory/redis_store_factory.js @@ -1,5 +1,4 @@ -//var RedisStore = require('socket.io/lib/stores/redis'); -var redis = require('socket.io-redis'); +var RedisStore = require('socket.io/lib/stores/redis'); function RedisStoreFactory(){}; module.exports = RedisStoreFactory; @@ -13,23 +12,22 @@ RedisStoreFactory.prototype.inject = function(properties){ */ RedisStoreFactory.prototype.instance = function(){ if(!this.redisStore){ - var options = { + var redisOptions = { host: this.properties['gateway.redis.host'], port: this.properties['gateway.redis.port'], max_attempts: this.properties['gateway.redis.max_attempts'], enable_offline_queue: this.properties['gateway.redis.offline_queue'] == 'true' }; - // - //this.redisStore = new RedisStore({ - // redisPub: redisOptions, - // redisSub: redisOptions, - // redisClient: redisOptions - //}); - // - //this.redisStore.pub.auth(this.properties['gateway.redis.password']); - //this.redisStore.sub.auth(this.properties['gateway.redis.password']); - //this.redisStore.cmd.auth(this.properties['gateway.redis.password']); - this.redisStore = redis(options); + + this.redisStore = new RedisStore({ + redisPub: redisOptions, + redisSub: redisOptions, + redisClient: redisOptions + }); + + this.redisStore.pub.auth(this.properties['gateway.redis.password']); + this.redisStore.sub.auth(this.properties['gateway.redis.password']); + this.redisStore.cmd.auth(this.properties['gateway.redis.password']); } return this.redisStore; diff --git a/src/factory/socketio_factory.js b/src/factory/socketio_factory.js index c5efc36..8f4a0d4 100644 --- a/src/factory/socketio_factory.js +++ b/src/factory/socketio_factory.js @@ -22,25 +22,28 @@ SocketIOFactory.prototype.inject = function(httpServerFactory, properties, redis SocketIOFactory.prototype.instance = function(){ if(!this.sio){ var httpServer = this.httpServerFactory.instance(); - - var options = { - logger: this.logger.extendPrefix('socket.io') - }; - if (this.ssl.useSsl) { - options.key = this.ssl.key; - options.cert = this.ssl.cert; - options.ca = this.ssl.ca; + this.sio = io.listen(httpServer, { + logger: this.logger.extendPrefix('socket.io'), + key: this.ssl.key, + cert: this.ssl.cert, + ca: this.ssl.ca + }); + } + else { + this.sio = io.listen(httpServer, {logger: this.logger.extendPrefix('socket.io')}); } - this.sio = io(httpServer, options); - this.sio.set('heartbeat timeout', this.properties['gateway.socketio.timeout']); - this.sio.set('heartbeat interval', this.properties['gateway.socketio.interval']); - //this.sio.set('transports', ['websocket', 'flashsocket']); - this.sio.adapter(this.redisStoreFactory.instance()); + var redisStore = this.redisStoreFactory.instance(); + this.sio.configure(function () { + this.sio.set('heartbeat timeout', this.properties['gateway.socketio.timeout']); + this.sio.set('heartbeat interval', this.properties['gateway.socketio.interval']); + this.sio.set('transports', ['websocket', 'flashsocket']); + this.sio.set('store', redisStore); + }.bind(this)); // Start the Flash Policy Server - //this.sio.flashPolicyServer.on('error', this.gatewayWorker.createErrorHandler('Flash Policy Server')); + this.sio.flashPolicyServer.on('error', this.gatewayWorker.createErrorHandler('Flash Policy Server')); } diff --git a/src/service/client.js b/src/service/client.js index 68a428a..a17fcdc 100644 --- a/src/service/client.js +++ b/src/service/client.js @@ -29,11 +29,11 @@ function GatewayClient(socket, exchange, rabbitmq, logger, properties, sessionFa this.logger.verbose('New GatewayClient for socket.'); - this.session = sessionFactory.create(); // Client's session, which also defines client's session ID (reconnectId) + this.session = sessionFactory.create(); // Client's session, which also defines client's session ID (reconnectId) - this.awaitingResponseAcks = {}; // Holds callback functions for client message ACK's. - this.awaitingResponseAcksInterval = {}; // Message ACK Intervals for client message re-sends. - this.clientQueue = null; // Client RabbitMQ Queue + this.awaitingResponseAcks = {}; // Holds callback functions for client message ACK's. + this.awaitingResponseAcksInterval = {}; // Message ACK Intervals for client message re-sends. + this.clientQueue = null; // Client RabbitMQ Queue this.disposed = false; this.init(); @@ -91,7 +91,7 @@ GatewayClient.prototype.dispose = function() { GatewayClient.prototype.init = function() { /////////////////////////////////// - // Socket Handlers + // Socket Handlers /////////////////////////////////// /** @@ -142,7 +142,7 @@ GatewayClient.prototype.onAck = function(message) { var ack = this.awaitingResponseAcks[message.correspondingMessageId]; if (!ack) { // We typically get here when: - // We re-sent the message because the client did not ACK in time, but in the meantime the client DID ACK and thus the re-send turned out to be superfluous. + // We re-sent the message because the client did not ACK in time, but in the meantime the client DID ACK and thus the re-send turned out to be superfluous. this.logger.warn('Unexpected response ack: ', message.correspondingMessageId); } else { @@ -187,9 +187,9 @@ GatewayClient.prototype.onReconnect = function(message) { } // The reconnectId is an encoded string that should contain all required details to re-establish the - // session, including the previous ClientID, which we save and then swap out for the new/current ClientID + // session, including the previous ClientID, which we save and then swap out for the new/current ClientID var newClientId = this.session.clientId; - this.session.load(message.reconnectId); // Decrypts the reconnectId + this.session.load(message.reconnectId); // Decrypts the reconnectId if (!this.session.existingClientIds) { this.session.existingClientIds = []; } @@ -205,14 +205,27 @@ GatewayClient.prototype.onReconnect = function(message) { } this.session.clientId = newClientId; this.onConnect(message); + + if (this.session.isLoggedIn() && this.session.existingClientId && this.session.existingClientId !== this.session.clientId) { + var reconnectMessage = this.session.populateMessage({ + cn: 'com.percero.agents.sync.vo.ReconnectRequest', + existingClientId: this.session.existingClientId, + existingClientIds: this.session.existingClientIds + }); + this.logger.verbose('Reconnect Message for session client ' + this.session.clientId + + ': ' + JSON.stringify(reconnectMessage)); + this.sendToAgent('reconnect', reconnectMessage); + + } + }; /////////////////////////////////// -// Helper Functions +// Helper Functions /////////////////////////////////// /** * routeSpecialMessage - If a specialMessageHandler is defined for the message type: - * 1. Pipe this message to that handler + * 1. Pipe this message to that handler */ GatewayClient.prototype.routeSpecialMessage = function(type, message) { if (this.specialMessageHandlers[type]) { @@ -293,7 +306,6 @@ GatewayClient.prototype.processResponse = function(response){ * @param response */ GatewayClient.prototype.logUserInWithUserToken = function(response) { - if (response.user) { if (response.user.hasOwnProperty("ID")) this.session.userId = response.user.ID; @@ -384,15 +396,15 @@ GatewayClient.prototype.onClientQueueMessage = function(response, headers, info, if (response.EOL) { // This is an End-Of-Life message for this queue. // If the response clientId does NOT match the current session clientId, then this client has - // already moved on and nothing needs to happen here. This typically happens when a client - // reconnects from the same network/IP address/router. + // already moved on and nothing needs to happen here. This typically happens when a client + // reconnects from the same network/IP address/router. if (response.clientId && response.clientId !== this.session.clientId) { this.logger.verbose('Ignoring EOL message for ' + response.clientId + ' -> ' + this.session.clientId); return; } else { // This client is no longer valid. This typically happens when a client reconnects - // from a different network/IP address/router. + // from a different network/IP address/router. this.logger.verbose('Received EOL for queue ' + this.session.clientId); this.isServerTerminated = true; this.dispose(); @@ -409,7 +421,7 @@ GatewayClient.prototype.onClientQueueMessage = function(response, headers, info, } this.processResponse(response); - this.sendSession(); // Send the updated session to the client. + this.sendSession(); // Send the updated session to the client. if (response.correspondingMessageId) { this.setupResendInterval(response, receipt); @@ -495,8 +507,8 @@ GatewayClient.prototype.onClientQueueDeleted = function(){ /** * registerResponseQueue - Sets up a new RabbitMQ Queue for this client, using the session.clientId as the - * name for the queue. Once the queue is setup, subscribes to the queue. This queue is for messages from the - * ActiveStack back-end that are intended for the client. + * name for the queue. Once the queue is setup, subscribes to the queue. This queue is for messages from the + * ActiveStack back-end that are intended for the client. */ GatewayClient.prototype.registerResponseQueue = function() { this.logger.verbose('Setting up rabbit queue ' + this.session.clientId); @@ -508,6 +520,7 @@ GatewayClient.UNAUTH_EVENTS = [ 'authenticateOAuthAccessToken', 'authenticateOAuthCode', 'authenticateUserAccount', + 'register', 'authenticate', 'reauthenticate', 'getAllServiceProviders', @@ -547,7 +560,7 @@ GatewayClient.AUTH_EVENTS = [ /** * registerEvents - Add agent to list of agents, setting up listeners for each event type the agent handles. - * Unregister the agent first in case it is already registered. + * Unregister the agent first in case it is already registered. */ GatewayClient.prototype.registerEvents = function() { GatewayClient.UNAUTH_EVENTS.forEach(function(eventName) { @@ -590,8 +603,8 @@ GatewayClient.prototype.onAuthSocketEvent = function(eventName, request){ /** * sendToAgent - Send the message to the specified agent via RabbitMQ client queue. If the message.clientId - * is different than the session.clientId, update the message.clientId to match the session.clientId. This is - * an enabler for legacy clients that are unable to update their clientId upon reconnect. + * is different than the session.clientId, update the message.clientId to match the session.clientId. This is + * an enabler for legacy clients that are unable to update their clientId upon reconnect. */ GatewayClient.prototype.sendToAgent = function(name, message, callback) { this.logger.verbose( 'Sending to agent (' + name + '): ', JSON.stringify(message) ); @@ -605,7 +618,7 @@ GatewayClient.prototype.sendToAgent = function(name, message, callback) { else { if (message.clientId && message.clientId !== this.session.clientId) { // The message's clientId does not match the session's clientId. This typically happens after a device has reconnected - // and the client library does not update to it's new clientId. + // and the client library does not update to it's new clientId. this.logger.verbose('Message client ' + message.clientId + ' is different than Session client ' + this.session.clientId); message.clientId = this.session.clientId; } diff --git a/src/service/gateway.js b/src/service/gateway.js index c8a57a1..4024a15 100644 --- a/src/service/gateway.js +++ b/src/service/gateway.js @@ -2,7 +2,7 @@ var io = require('socket.io'); var Client = require('./client'); -//var RedisStore = require('socket.io/lib/stores/redis'); +var RedisStore = require('socket.io/lib/stores/redis'); module.exports = Gateway; @@ -52,11 +52,11 @@ Gateway.prototype.initHttpServer = function(){ }; Gateway.prototype.initRedis = function() { - //this.redisStore = this.redisStoreFactory.instance(); - // - //this.attachRedisErrorHandlers('Publisher', this.redisStore.pub); - //this.attachRedisErrorHandlers('Subscriber', this.redisStore.sub); - //this.attachRedisErrorHandlers('Client', this.redisStore.cmd); + this.redisStore = this.redisStoreFactory.instance(); + + this.attachRedisErrorHandlers('Publisher', this.redisStore.pub); + this.attachRedisErrorHandlers('Subscriber', this.redisStore.sub); + this.attachRedisErrorHandlers('Client', this.redisStore.cmd); }; Gateway.prototype.initSocketIO = function() { @@ -80,7 +80,7 @@ Gateway.prototype.onRabbitReady = function () { this.exchange.on('error', this.gatewayWorker.createErrorHandler('RabbitMQ Exchange')); // Handles a new client socket connection. - this.sio.on('connection',this.onSocketConnection.bind(this)); + this.sio.sockets.on('connection',this.onSocketConnection.bind(this)); this.rabbitmq.on('error', this.gatewayWorker.createErrorHandler('RabbitMQ')); } @@ -172,7 +172,7 @@ Gateway.prototype.handleError = function(error, source) { this.httpServer.close(); }; - + Gateway.prototype.onShutdown = function(shutdownType) { this.logger.info('Closing down Socket IO Server'); @@ -181,7 +181,7 @@ Gateway.prototype.onShutdown = function(shutdownType) { }.bind(this)); }; - // RedisClient resists proper error handling :-( + // RedisClient resists proper error handling :-( Gateway.prototype.attachRedisErrorHandlers = function(type, redis) { var handler = this.gatewayWorker.createErrorHandler('Redis Store ' + type); redis.on('error', handler); @@ -204,7 +204,7 @@ Gateway.prototype.trapRedisCleanup = function(type, redis) { Gateway.prototype.start = function(){ this.httpServer.listen( this.properties['frontend.port'], - //this.properties['frontend.host'], + this.properties['frontend.host'], function () { this.logger.info('Gateway ready on http' + (this.useSsl ? 's' : '') + '://' + (this.properties['frontend.host'] || '*') + ':' + (this.properties['frontend.port'])); diff --git a/src/worker.js b/src/worker.js index 2199366..865e0c5 100644 --- a/src/worker.js +++ b/src/worker.js @@ -31,6 +31,10 @@ GatewayWorker.prototype.checkMemoryUsage = function () { this.logger.error('Worker exceeded hard ' + type + ' memory limit (' + memoryUsage[type] + '/' + limit * megabyte + ')!'); } + if (warning && (memoryUsage[type] > warning * megabyte)) { + this.logger.warn('Worker exceeded soft ' + type + ' memory limit (' + + memoryUsage[type] + '/' + warning * megabyte + ')!'); + } } }; @@ -129,7 +133,12 @@ GatewayWorker.prototype.onProcessMessage = function (msg) { GatewayWorker.prototype.createErrorHandler = function createErrorHandler(source) { return function (error) { - this.logger.error('Fatal ' + source + ' error: ', error.stack); + if (error.stack) { + this.logger.error('Fatal ' + source + ' error: ', error.stack); + } + else { + this.logger.error('Fatal ' + source + ' error: ', JSON.stringify(error)); + } clearInterval(this.heartbeat); if (process.send && !this.exiting) { @@ -148,7 +157,7 @@ GatewayWorker.prototype.catchAndWarn = function catchAndWarn(connection, cleanup try { cleanup(); } catch (error) { - if (!this.exiting) { + if (this && !this.exiting) { this.logger.warn('Error disconnecting ' + connection + ' (' +error.toString() + ')'); } }