Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
{
"name": "activestack-gateway",
"version": "0.3.0",
"release_date": "2015-09-24",
"version": "0.2.1",
"release_date": "2016-04-28",
"description": "Socket server for ActiveStack",
"main": "src/main.js",
"dependencies": {
"amqp": "^0.1.8",
"injecterooski": "*",
"properties-parser": "^0.3.0",
"node-properties-parser": "^0.0.2",
"redis": "^0.12.1",
"socket.io": "^1.3.7",
"socket.io-client": "^1.3.7",
"socket.io-redis": "^0.1.4",
"socket.io": "git://github.com/curb47/socket.io.git",
"socket.io-client": "^0.9.9",
"webkit-devtools-agent": "^0.3.1",
"winston": "^0.6.2"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion resources/env.default.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ cluster.memoryLimit.rss=2048
cluster.memoryLimit.heapUsed=100
cluster.workerRestartDelay=50
cluster.maxRestartBackoff=10
cluster.workerCount=2
cluster.workerCount=1

frontend.logLevel=info
frontend.shutdownCode=pass123
Expand Down
7 changes: 3 additions & 4 deletions src/config/properties.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
var propParser = require('properties-parser'),
fs = require('fs');
propParser = require('node-properties-parser');

function Properties(fileName) {
this.fileName = fileName;
Expand All @@ -12,13 +11,13 @@ Properties.prototype.inject = function(prefixedLogger){
var properties;
if(this.fileName && this.fileName !== '')
try{
properties = propParser.parse(fs.readFileSync(this.fileName));
properties = propParser.readSync(this.fileName);
}catch(e){
console.log('Could not load properties file from '+this.fileName);
}

if(!properties)
properties = propParser.parse(fs.readFileSync(__dirname + '/../../resources/env.default.properties'));
properties = propParser.readSync(__dirname + '/../../resources/env.default.properties');

for(var key in properties){
this[key] = properties[key];
Expand Down
26 changes: 12 additions & 14 deletions src/factory/redis_store_factory.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//var RedisStore = require('socket.io/lib/stores/redis');
var redis = require('socket.io-redis');
var RedisStore = require('socket.io/lib/stores/redis');

function RedisStoreFactory(){}; module.exports = RedisStoreFactory;

Expand All @@ -13,23 +12,22 @@ RedisStoreFactory.prototype.inject = function(properties){
*/
RedisStoreFactory.prototype.instance = function(){
if(!this.redisStore){
var options = {
var redisOptions = {
host: this.properties['gateway.redis.host'],
port: this.properties['gateway.redis.port'],
max_attempts: this.properties['gateway.redis.max_attempts'],
enable_offline_queue: this.properties['gateway.redis.offline_queue'] == 'true'
};
//
//this.redisStore = new RedisStore({
// redisPub: redisOptions,
// redisSub: redisOptions,
// redisClient: redisOptions
//});
//
//this.redisStore.pub.auth(this.properties['gateway.redis.password']);
//this.redisStore.sub.auth(this.properties['gateway.redis.password']);
//this.redisStore.cmd.auth(this.properties['gateway.redis.password']);
this.redisStore = redis(options);

this.redisStore = new RedisStore({
redisPub: redisOptions,
redisSub: redisOptions,
redisClient: redisOptions
});

this.redisStore.pub.auth(this.properties['gateway.redis.password']);
this.redisStore.sub.auth(this.properties['gateway.redis.password']);
this.redisStore.cmd.auth(this.properties['gateway.redis.password']);
}

return this.redisStore;
Expand Down
31 changes: 17 additions & 14 deletions src/factory/socketio_factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,28 @@ SocketIOFactory.prototype.inject = function(httpServerFactory, properties, redis
SocketIOFactory.prototype.instance = function(){
if(!this.sio){
var httpServer = this.httpServerFactory.instance();

var options = {
logger: this.logger.extendPrefix('socket.io')
};

if (this.ssl.useSsl) {
options.key = this.ssl.key;
options.cert = this.ssl.cert;
options.ca = this.ssl.ca;
this.sio = io.listen(httpServer, {
logger: this.logger.extendPrefix('socket.io'),
key: this.ssl.key,
cert: this.ssl.cert,
ca: this.ssl.ca
});
}
else {
this.sio = io.listen(httpServer, {logger: this.logger.extendPrefix('socket.io')});
}

this.sio = io(httpServer, options);
this.sio.set('heartbeat timeout', this.properties['gateway.socketio.timeout']);
this.sio.set('heartbeat interval', this.properties['gateway.socketio.interval']);
//this.sio.set('transports', ['websocket', 'flashsocket']);
this.sio.adapter(this.redisStoreFactory.instance());
var redisStore = this.redisStoreFactory.instance();
this.sio.configure(function () {
this.sio.set('heartbeat timeout', this.properties['gateway.socketio.timeout']);
this.sio.set('heartbeat interval', this.properties['gateway.socketio.interval']);
this.sio.set('transports', ['websocket', 'flashsocket']);
this.sio.set('store', redisStore);
}.bind(this));

// Start the Flash Policy Server
//this.sio.flashPolicyServer.on('error', this.gatewayWorker.createErrorHandler('Flash Policy Server'));
this.sio.flashPolicyServer.on('error', this.gatewayWorker.createErrorHandler('Flash Policy Server'));

}

Expand Down
55 changes: 34 additions & 21 deletions src/service/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ function GatewayClient(socket, exchange, rabbitmq, logger, properties, sessionFa

this.logger.verbose('New GatewayClient for socket.');

this.session = sessionFactory.create(); // Client's session, which also defines client's session ID (reconnectId)
this.session = sessionFactory.create(); // Client's session, which also defines client's session ID (reconnectId)

this.awaitingResponseAcks = {}; // Holds callback functions for client message ACK's.
this.awaitingResponseAcksInterval = {}; // Message ACK Intervals for client message re-sends.
this.clientQueue = null; // Client RabbitMQ Queue
this.awaitingResponseAcks = {}; // Holds callback functions for client message ACK's.
this.awaitingResponseAcksInterval = {}; // Message ACK Intervals for client message re-sends.
this.clientQueue = null; // Client RabbitMQ Queue
this.disposed = false;

this.init();
Expand Down Expand Up @@ -91,7 +91,7 @@ GatewayClient.prototype.dispose = function() {

GatewayClient.prototype.init = function() {
///////////////////////////////////
// Socket Handlers
// Socket Handlers
///////////////////////////////////

/**
Expand Down Expand Up @@ -142,7 +142,7 @@ GatewayClient.prototype.onAck = function(message) {
var ack = this.awaitingResponseAcks[message.correspondingMessageId];
if (!ack) {
// We typically get here when:
// We re-sent the message because the client did not ACK in time, but in the meantime the client DID ACK and thus the re-send turned out to be superfluous.
// We re-sent the message because the client did not ACK in time, but in the meantime the client DID ACK and thus the re-send turned out to be superfluous.
this.logger.warn('Unexpected response ack: ', message.correspondingMessageId);
}
else {
Expand Down Expand Up @@ -187,9 +187,9 @@ GatewayClient.prototype.onReconnect = function(message) {
}

// The reconnectId is an encoded string that should contain all required details to re-establish the
// session, including the previous ClientID, which we save and then swap out for the new/current ClientID
// session, including the previous ClientID, which we save and then swap out for the new/current ClientID
var newClientId = this.session.clientId;
this.session.load(message.reconnectId); // Decrypts the reconnectId
this.session.load(message.reconnectId); // Decrypts the reconnectId
if (!this.session.existingClientIds) {
this.session.existingClientIds = [];
}
Expand All @@ -205,14 +205,27 @@ GatewayClient.prototype.onReconnect = function(message) {
}
this.session.clientId = newClientId;
this.onConnect(message);

if (this.session.isLoggedIn() && this.session.existingClientId && this.session.existingClientId !== this.session.clientId) {
var reconnectMessage = this.session.populateMessage({
cn: 'com.percero.agents.sync.vo.ReconnectRequest',
existingClientId: this.session.existingClientId,
existingClientIds: this.session.existingClientIds
});
this.logger.verbose('Reconnect Message for session client ' + this.session.clientId
+ ': ' + JSON.stringify(reconnectMessage));
this.sendToAgent('reconnect', reconnectMessage);

}

};

///////////////////////////////////
// Helper Functions
// Helper Functions
///////////////////////////////////
/**
* routeSpecialMessage - If a specialMessageHandler is defined for the message type:
* 1. Pipe this message to that handler
* 1. Pipe this message to that handler
*/
GatewayClient.prototype.routeSpecialMessage = function(type, message) {
if (this.specialMessageHandlers[type]) {
Expand Down Expand Up @@ -293,7 +306,6 @@ GatewayClient.prototype.processResponse = function(response){
* @param response
*/
GatewayClient.prototype.logUserInWithUserToken = function(response) {

if (response.user) {
if (response.user.hasOwnProperty("ID"))
this.session.userId = response.user.ID;
Expand Down Expand Up @@ -384,15 +396,15 @@ GatewayClient.prototype.onClientQueueMessage = function(response, headers, info,
if (response.EOL) {
// This is an End-Of-Life message for this queue.
// If the response clientId does NOT match the current session clientId, then this client has
// already moved on and nothing needs to happen here. This typically happens when a client
// reconnects from the same network/IP address/router.
// already moved on and nothing needs to happen here. This typically happens when a client
// reconnects from the same network/IP address/router.
if (response.clientId && response.clientId !== this.session.clientId) {
this.logger.verbose('Ignoring EOL message for ' + response.clientId + ' -> ' + this.session.clientId);
return;
}
else {
// This client is no longer valid. This typically happens when a client reconnects
// from a different network/IP address/router.
// from a different network/IP address/router.
this.logger.verbose('Received EOL for queue ' + this.session.clientId);
this.isServerTerminated = true;
this.dispose();
Expand All @@ -409,7 +421,7 @@ GatewayClient.prototype.onClientQueueMessage = function(response, headers, info,
}

this.processResponse(response);
this.sendSession(); // Send the updated session to the client.
this.sendSession(); // Send the updated session to the client.

if (response.correspondingMessageId) {
this.setupResendInterval(response, receipt);
Expand Down Expand Up @@ -495,8 +507,8 @@ GatewayClient.prototype.onClientQueueDeleted = function(){

/**
* registerResponseQueue - Sets up a new RabbitMQ Queue for this client, using the session.clientId as the
* name for the queue. Once the queue is setup, subscribes to the queue. This queue is for messages from the
* ActiveStack back-end that are intended for the client.
* name for the queue. Once the queue is setup, subscribes to the queue. This queue is for messages from the
* ActiveStack back-end that are intended for the client.
*/
GatewayClient.prototype.registerResponseQueue = function() {
this.logger.verbose('Setting up rabbit queue ' + this.session.clientId);
Expand All @@ -508,6 +520,7 @@ GatewayClient.UNAUTH_EVENTS = [
'authenticateOAuthAccessToken',
'authenticateOAuthCode',
'authenticateUserAccount',
'register',
'authenticate',
'reauthenticate',
'getAllServiceProviders',
Expand Down Expand Up @@ -547,7 +560,7 @@ GatewayClient.AUTH_EVENTS = [

/**
* registerEvents - Add agent to list of agents, setting up listeners for each event type the agent handles.
* Unregister the agent first in case it is already registered.
* Unregister the agent first in case it is already registered.
*/
GatewayClient.prototype.registerEvents = function() {
GatewayClient.UNAUTH_EVENTS.forEach(function(eventName) {
Expand Down Expand Up @@ -590,8 +603,8 @@ GatewayClient.prototype.onAuthSocketEvent = function(eventName, request){

/**
* sendToAgent - Send the message to the specified agent via RabbitMQ client queue. If the message.clientId
* is different than the session.clientId, update the message.clientId to match the session.clientId. This is
* an enabler for legacy clients that are unable to update their clientId upon reconnect.
* is different than the session.clientId, update the message.clientId to match the session.clientId. This is
* an enabler for legacy clients that are unable to update their clientId upon reconnect.
*/
GatewayClient.prototype.sendToAgent = function(name, message, callback) {
this.logger.verbose( 'Sending to agent (' + name + '): ', JSON.stringify(message) );
Expand All @@ -605,7 +618,7 @@ GatewayClient.prototype.sendToAgent = function(name, message, callback) {
else {
if (message.clientId && message.clientId !== this.session.clientId) {
// The message's clientId does not match the session's clientId. This typically happens after a device has reconnected
// and the client library does not update to it's new clientId.
// and the client library does not update to it's new clientId.
this.logger.verbose('Message client ' + message.clientId + ' is different than Session client ' + this.session.clientId);
message.clientId = this.session.clientId;
}
Expand Down
20 changes: 10 additions & 10 deletions src/service/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

var io = require('socket.io');
var Client = require('./client');
//var RedisStore = require('socket.io/lib/stores/redis');
var RedisStore = require('socket.io/lib/stores/redis');

module.exports = Gateway;

Expand Down Expand Up @@ -52,11 +52,11 @@ Gateway.prototype.initHttpServer = function(){
};

Gateway.prototype.initRedis = function() {
//this.redisStore = this.redisStoreFactory.instance();
//
//this.attachRedisErrorHandlers('Publisher', this.redisStore.pub);
//this.attachRedisErrorHandlers('Subscriber', this.redisStore.sub);
//this.attachRedisErrorHandlers('Client', this.redisStore.cmd);
this.redisStore = this.redisStoreFactory.instance();

this.attachRedisErrorHandlers('Publisher', this.redisStore.pub);
this.attachRedisErrorHandlers('Subscriber', this.redisStore.sub);
this.attachRedisErrorHandlers('Client', this.redisStore.cmd);
};

Gateway.prototype.initSocketIO = function() {
Expand All @@ -80,7 +80,7 @@ Gateway.prototype.onRabbitReady = function () {
this.exchange.on('error', this.gatewayWorker.createErrorHandler('RabbitMQ Exchange'));

// Handles a new client socket connection.
this.sio.on('connection',this.onSocketConnection.bind(this));
this.sio.sockets.on('connection',this.onSocketConnection.bind(this));

this.rabbitmq.on('error', this.gatewayWorker.createErrorHandler('RabbitMQ'));
}
Expand Down Expand Up @@ -172,7 +172,7 @@ Gateway.prototype.handleError = function(error, source) {

this.httpServer.close();
};
Gateway.prototype.onShutdown = function(shutdownType) {
this.logger.info('Closing down Socket IO Server');

Expand All @@ -181,7 +181,7 @@ Gateway.prototype.onShutdown = function(shutdownType) {
}.bind(this));
};

// RedisClient resists proper error handling :-(
// RedisClient resists proper error handling :-(
Gateway.prototype.attachRedisErrorHandlers = function(type, redis) {
var handler = this.gatewayWorker.createErrorHandler('Redis Store ' + type);
redis.on('error', handler);
Expand All @@ -204,7 +204,7 @@ Gateway.prototype.trapRedisCleanup = function(type, redis) {
Gateway.prototype.start = function(){
this.httpServer.listen(
this.properties['frontend.port'],
//this.properties['frontend.host'],
this.properties['frontend.host'],
function () {
this.logger.info('Gateway ready on http' + (this.useSsl ? 's' : '') + '://' +
(this.properties['frontend.host'] || '*') + ':' + (this.properties['frontend.port']));
Expand Down
13 changes: 11 additions & 2 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ GatewayWorker.prototype.checkMemoryUsage = function () {
this.logger.error('Worker exceeded hard ' + type + ' memory limit (' +
memoryUsage[type] + '/' + limit * megabyte + ')!');
}
if (warning && (memoryUsage[type] > warning * megabyte)) {
this.logger.warn('Worker exceeded soft ' + type + ' memory limit (' +
memoryUsage[type] + '/' + warning * megabyte + ')!');
}
}
};

Expand Down Expand Up @@ -129,7 +133,12 @@ GatewayWorker.prototype.onProcessMessage = function (msg) {

GatewayWorker.prototype.createErrorHandler = function createErrorHandler(source) {
return function (error) {
this.logger.error('Fatal ' + source + ' error: ', error.stack);
if (error.stack) {
this.logger.error('Fatal ' + source + ' error: ', error.stack);
}
else {
this.logger.error('Fatal ' + source + ' error: ', JSON.stringify(error));
}

clearInterval(this.heartbeat);
if (process.send && !this.exiting) {
Expand All @@ -148,7 +157,7 @@ GatewayWorker.prototype.catchAndWarn = function catchAndWarn(connection, cleanup
try {
cleanup();
} catch (error) {
if (!this.exiting) {
if (this && !this.exiting) {
this.logger.warn('Error disconnecting ' + connection + ' (' +error.toString() + ')');
}
}
Expand Down