diff --git a/docs/index.restdown b/docs/index.restdown index b5d1955..41a476c 100644 --- a/docs/index.restdown +++ b/docs/index.restdown @@ -124,9 +124,9 @@ connecting to ZK via once(). If the client driver has an unexpected error, it is sent here. Specifically, if the client has lost its session to ZK. -### Event: 'connection_interrupted' +### Event: 'not_connected' - function onConnectionInterrupted() { } + function onNotConnected() { } If the underlying native client loses connection to ZK, but not its session, it is sent here. Notably, for most applications, this should be a no-op, since diff --git a/lib/client.js b/lib/client.js index 126034e..6811223 100644 --- a/lib/client.js +++ b/lib/client.js @@ -6,6 +6,7 @@ var util = require('util'); var assert = require('assert-plus'); var vasync = require('vasync'); +var once = require('once'); var ZK = require('zookeeper'); var ZKError = require('./error').ZKError; @@ -19,8 +20,7 @@ var sprintf = util.format; var PROXY_EVENTS = [ 'connect', 'not_connected', - 'close', - 'session_expired' + 'close' ]; @@ -189,13 +189,13 @@ ZKClient.prototype.close = function close() { }); this.removeAllListeners('connect'); - this.removeAllListeners('connection_interrupted'); - this.removeAllListeners('session_expired'); + this.removeAllListeners('not_connected'); this.zk.once('close', function () { self.watchers.forEach(function (w) { w.stop(); }); + self.watchers = []; self.zk.removeAllListeners('error'); self.removeAllListeners('error'); @@ -314,6 +314,8 @@ ZKClient.prototype.mkdirp = function mkdirp(p, callback) { assert.string(p, 'path'); assert.func(callback, 'callback'); + callback = once(callback); + var dirs = path.normalize(p).split('/').slice(1); var log = this.log; var self = this; @@ -328,6 +330,7 @@ ZKClient.prototype.mkdirp = function mkdirp(p, callback) { tasks.push(function checkIfExists(_, cb) { log.trace('mkdirp: checking %s', dir); self.stat(dir, function (err, _stat) { + cb = once(cb); if (err && err.code !== ZK.ZNONODE) { cb(); } @@ -338,6 +341,7 @@ ZKClient.prototype.mkdirp = function mkdirp(p, callback) { }); tasks.push(function mkdirIfNotExists(_, cb) { + cb = once(cb); if (exists) { cb(); } else { @@ -368,12 +372,15 @@ ZKClient.prototype.put = function put(p, object, options, callback) { assert.object(options, 'options'); assert.func(callback, 'callback'); + callback = once(callback); + var exists; var log = this.log; var _p = path.normalize(p); var self = this; var tasks = [ function checkIfExists(_, cb) { + cb = once(cb); log.trace('put: checking %s', _p); self.stat(_p, function (err, _stat) { if (err && err.code !== ZK.ZNONODE) { @@ -386,6 +393,7 @@ ZKClient.prototype.put = function put(p, object, options, callback) { }, function putIfNotExists(_, cb) { + cb = once(cb); if (exists) { cb(); } else { @@ -395,7 +403,7 @@ ZKClient.prototype.put = function put(p, object, options, callback) { }, function set(_, cb) { - self.update(_p, object, cb); + self.update(_p, object, once(cb)); } ]; @@ -424,6 +432,8 @@ ZKClient.prototype.readdir = function readdir(p, callback) { var _p = path.normalize(p); var zk = this.zk; + callback = once(callback); + log.trace({path: p}, 'readdir: entered'); zk.a_get_children(_p, false, function (rc, msg, nodes) { if (rc !== 0) { @@ -442,6 +452,8 @@ ZKClient.prototype.rmr = function rmr(p, callback) { assert.string(p, 'path'); assert.func(callback, 'callback'); + callback = once(callback); + var _done = false; var inflight = 0; var log = this.log; @@ -489,7 +501,7 @@ ZKClient.prototype.rmr = function rmr(p, callback) { nodes.forEach(function (n) { tasks.push(function (_, cb) { - self.unlink(n, cb); + self.unlink(n, once(cb)); }); }); @@ -505,6 +517,8 @@ ZKClient.prototype.stat = function stat(p, callback) { assert.string(p, 'path'); assert.func(callback, 'callback'); + callback = once(callback); + var log = this.log; var zk = this.zk; @@ -542,6 +556,8 @@ ZKClient.prototype.unlink = function unlink(p, options, callback) { assert.object(options, 'options'); assert.func(callback, 'callback'); + callback = once(callback); + var log = this.log; var _p = path.normalize(p); var zk = this.zk; @@ -578,12 +594,15 @@ ZKClient.prototype.update = function update(p, object, options, callback) { assert.object(options, 'options'); assert.func(callback, 'callback'); + callback = once(callback); + var data = JSON.stringify(object); var log = this.log; var _p = path.normalize(p); var self = this; var tasks = [ function getVersion(_, cb) { + cb = once(cb); if (version !== undefined) return (cb()); self.stat(_p, function (err, _stat) { @@ -597,6 +616,7 @@ ZKClient.prototype.update = function update(p, object, options, callback) { return (undefined); }, function write(_, cb) { + cb = once(cb); zk.a_set(_p, data, version, function (rc, msg) { if (rc !== 0) { cb(new ZKError(rc, msg)); @@ -634,6 +654,8 @@ ZKClient.prototype.watch = function watch(p, options, callback) { assert.object(options, 'options'); assert.func(callback, 'callback'); + callback = once(callback); + var log = this.log; var _p = path.normalize(p); var self = this; @@ -738,7 +760,7 @@ ZKClient.prototype.watch = function watch(p, options, callback) { } catch (e) { log.error({err: e}, 'error while parsing data', e); - return (self.emit('error', e)); + return (emitter.emit('error', e)); } log.trace({ @@ -759,7 +781,7 @@ ZKClient.prototype.watch = function watch(p, options, callback) { path: _p }, 'watch: notification fired'); - if (!done) { + if (!done && event != "session" && event != "unknown") { emitter.emit(event); register(); } diff --git a/package.json b/package.json index 57251e4..a41ce6d 100644 --- a/package.json +++ b/package.json @@ -17,10 +17,11 @@ "main": "./lib/index.js", "dependencies": { "assert-plus": "0.1.2", - "bunyan": "0.16.5", + "bunyan": "0.21.1", "node-uuid": "1.4.0", - "vasync": "1.3.2", - "zookeeper": "git://github.com/yunong/node-zookeeper.git#3a0545d" + "once": "1.1.1", + "vasync": "1.3.3", + "zookeeper": "git://github.com/yunong/node-zookeeper.git#ef48f8c24b42d6c1b1c3dbab82278666bed37b70" }, "devDependencies": { "cover": "0.2.8",