@@ -12,6 +12,12 @@ var Parser = require('redis-parser');
1212var commands = require ( 'redis-commands' ) ;
1313var debug = require ( './lib/debug' ) ;
1414var unifyOptions = require ( './lib/createClient' ) ;
15+ var SUBSCRIBE_COMMANDS = {
16+ subscribe : true ,
17+ unsubscribe : true ,
18+ psubscribe : true ,
19+ punsubscribe : true
20+ } ;
1521
1622// Newer Node.js versions > 0.10 return the EventEmitter right away and using .EventEmitter was deprecated
1723if ( typeof EventEmitter !== 'function' ) {
@@ -615,59 +621,52 @@ function normal_reply (self, reply) {
615621 }
616622}
617623
618- function set_subscribe ( self , type , command_obj , subscribe , reply ) {
619- var i = 0 ;
624+ function set_subscribe ( self , type , subscribe , channel ) {
625+ // Every channel has to be saved / removed one after the other and the type has to be the same too,
626+ // to make sure partly subscribe / unsubscribe works well together
620627 if ( subscribe ) {
621- // The channels have to be saved one after the other and the type has to be the same too,
622- // to make sure partly subscribe / unsubscribe works well together
623- for ( ; i < command_obj . args . length ; i ++ ) {
624- self . subscription_set [ type + '_' + command_obj . args [ i ] ] = command_obj . args [ i ] ;
625- }
628+ self . subscription_set [ type + '_' + channel ] = channel ;
626629 } else {
627630 type = type === 'unsubscribe' ? 'subscribe' : 'psubscribe' ; // Make types consistent
628- for ( ; i < command_obj . args . length ; i ++ ) {
629- delete self . subscription_set [ type + '_' + command_obj . args [ i ] ] ;
630- }
631- if ( reply [ 2 ] === 0 ) { // No channels left that this client is subscribed to
632- var running_command ;
633- i = 0 ;
634- // This should be a rare case and therefor handling it this way should be good performance wise for the general case
635- while ( running_command = self . command_queue . get ( i ++ ) ) {
636- if (
637- running_command . command === 'subscribe' ||
638- running_command . command === 'psubscribe' ||
639- running_command . command === 'unsubscribe' ||
640- running_command . command === 'punsubscribe'
641- ) {
642- self . pub_sub_mode = i ;
643- return ;
644- }
645- }
646- self . pub_sub_mode = 0 ;
647- }
631+ delete self . subscription_set [ type + '_' + channel ] ;
648632 }
649633}
650634
651635function subscribe_unsubscribe ( self , reply , type , subscribe ) {
652636 // Subscribe commands take an optional callback and also emit an event, but only the _last_ response is included in the callback
637+ // The pub sub commands return each argument in a separate return value and have to be handled that way
653638 var command_obj = self . command_queue . get ( 0 ) ;
654- var buffer = self . options . return_buffers || self . options . detect_buffers && command_obj && command_obj . buffer_args || reply [ 1 ] === null ;
655- var channel = buffer ? reply [ 1 ] : reply [ 1 ] . toString ( ) ;
656- var count = reply [ 2 ] ;
639+ var buffer = self . options . return_buffers || self . options . detect_buffers && command_obj . buffer_args ;
640+ var channel = ( buffer || reply [ 1 ] === null ) ? reply [ 1 ] : reply [ 1 ] . toString ( ) ;
641+ var count = + reply [ 2 ] ; // Return the channel counter as number no matter if `string_numbers` is activated or not
657642 debug ( 'Subscribe / unsubscribe command' ) ;
658643
659644 // Emit first, then return the callback
660- if ( channel !== null ) { // Do not emit something if there was no channel to unsubscribe from
645+ if ( channel !== null ) { // Do not emit or "unsubscribe" something if there was no channel to unsubscribe from
661646 self . emit ( type , channel , count ) ;
647+ set_subscribe ( self , type , subscribe , channel ) ;
662648 }
663- // The pub sub commands return each argument in a separate return value and have to be handled that way
664649 if ( command_obj . sub_commands_left <= 1 ) {
665- if ( count !== 0 && ! subscribe && command_obj . args . length === 0 ) {
666- command_obj . sub_commands_left = count ;
667- return ;
650+ if ( count !== 0 ) {
651+ if ( ! subscribe && command_obj . args . length === 0 ) { // Unsubscribe from all channels
652+ command_obj . sub_commands_left = count ;
653+ return ;
654+ }
655+ } else {
656+ var running_command ;
657+ var i = 1 ;
658+ // This should be a rare case and therefor handling it this way should be good performance wise for the general case
659+ while ( running_command = self . command_queue . get ( i ) ) {
660+ if ( SUBSCRIBE_COMMANDS [ running_command . command ] ) {
661+ self . command_queue . shift ( ) ;
662+ self . pub_sub_mode = i ;
663+ return ;
664+ }
665+ i ++ ;
666+ }
667+ self . pub_sub_mode = 0 ;
668668 }
669669 self . command_queue . shift ( ) ;
670- set_subscribe ( self , type , command_obj , subscribe , reply ) ;
671670 if ( typeof command_obj . callback === 'function' ) {
672671 // TODO: The current return value is pretty useless.
673672 // Evaluate to change this in v.3 to return all subscribed / unsubscribed channels in an array including the number of channels subscribed too
@@ -819,12 +818,10 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
819818 var command_obj = new Command ( command , args_copy , callback ) ;
820819 command_obj . buffer_args = buffer_args ;
821820
822- if ( command === 'subscribe' || command === 'psubscribe' || command === 'unsubscribe' || command === 'punsubscribe' ) {
821+ if ( SUBSCRIBE_COMMANDS [ command ] && this . pub_sub_mode === 0 ) {
823822 // If pub sub is already activated, keep it that way, otherwise set the number of commands to resolve until pub sub mode activates
824823 // Deactivation of the pub sub mode happens in the result handler
825- if ( ! this . pub_sub_mode ) {
826- this . pub_sub_mode = this . command_queue . length + 1 ;
827- }
824+ this . pub_sub_mode = this . command_queue . length + 1 ;
828825 }
829826 this . command_queue . push ( command_obj ) ;
830827
0 commit comments