diff --git a/.gitignore b/.gitignore index 9441393..77850e9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ node_modules -lib coverage .c9 \ No newline at end of file diff --git a/lib/queue.js b/lib/queue.js new file mode 100644 index 0000000..1f63a91 --- /dev/null +++ b/lib/queue.js @@ -0,0 +1,398 @@ +// Generated by CoffeeScript 1.10.0 +(function() { + var EventEmitter, URL, mongodb, + extend = function(child, parent) { for (var key in parent) { if (hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; }, + hasProp = {}.hasOwnProperty, + slice = [].slice; + + mongodb = require('mongodb'); + + EventEmitter = require('events').EventEmitter; + + URL = require('url'); + + exports.Connection = (function(superClass) { + extend(Connection, superClass); + + function Connection(options) { + options || (options = {}); + this.expires = options.expires || 60 * 60 * 1000; + this.timeout = options.timeout || 10 * 1000; + this.maxAttempts = options.maxAttempts || 5; + this.queue = []; + setImmediate((function(_this) { + return function() { + return _this.ensureConnection(options); + }; + })(this)); + } + + Connection.prototype.ensureConnection = function(opt) { + var afterConnectionEstablished, db, url; + afterConnectionEstablished = (function(_this) { + return function(err) { + if (err) { + return _this.emit('error', err); + } + return db.collections(function(err) { + if (err) { + return _this.emit('error', err); + } + return db.collection('queue', function(err, collection) { + var fn, i, len, ref; + if (err) { + return _this.emit('error', err); + } + _this.collection = collection; + if (_this.queue) { + ref = _this.queue; + for (i = 0, len = ref.length; i < len; i++) { + fn = ref[i]; + fn(collection); + } + } + delete _this.queue; + return collection.ensureIndex([['expires'], ['owner'], ['queue']], function(err) { + if (err) { + return _this.emit('error', err); + } else { + return _this.emit('connected'); + } + }); + }); + }); + }; + })(this); + if (opt.db instanceof mongodb.Db) { + db = opt.db; + if (!db.serverConfig.isConnected()) { + return db.once('open', afterConnectionEstablished); + } + return afterConnectionEstablished(null); + } + url = URL.format({ + protocol: 'mongodb', + slashes: true, + auth: opt.username && opt.password ? opt.username + ":" + opt.password : void 0, + host: (opt.host || '127.0.0.1') + ":" + (opt.port || 27017), + pathname: "/" + (opt.db || 'queue'), + search: '?w=1' + }); + return mongodb.MongoClient.connect(url, (function(_this) { + return function(err, _db) { + if (err) { + _this.emit('error', err); + } + if (_db) { + db = _db; + } + return afterConnectionEstablished(null); + }; + })(this)); + }; + + Connection.prototype.exec = function(fn) { + return this.queue && this.queue.push(fn) || fn(this.collection); + }; + + Connection.prototype.clear = function(queue, callback) { + return this.exec(function(collection) { + return collection.remove({ + queue: queue + }, callback); + }); + }; + + Connection.prototype.enqueue = function() { + var args, callback, i, queue; + queue = arguments[0], args = 3 <= arguments.length ? slice.call(arguments, 1, i = arguments.length - 1) : (i = 1, []), callback = arguments[i++]; + return this.exec((function(_this) { + return function(collection) { + var attempts, expires, scheduledDate, startDate, task; + if (typeof callback !== 'function') { + return callback(new Error('Last argument must be a callback')); + } + if (queue.startDate) { + scheduledDate = queue.startDate; + } + startDate = scheduledDate || Date.now(); + expires = new Date(+startDate + (queue.expires || _this.expires)); + attempts = 0; + queue = queue.queue || queue; + task = { + queue: queue, + expires: expires, + args: args, + attempts: attempts + }; + if (scheduledDate) { + task.startDate = scheduledDate; + } + return collection.insertOne(task, callback); + }; + })(this)); + }; + + Connection.prototype.next = function(queue, owner, callback) { + var now, options, query, timeout, update; + now = new Date; + timeout = new Date(now.getTime() + this.timeout); + query = { + expires: { + $gt: now + }, + $or: [ + { + startDate: { + $lte: now + } + }, { + startDate: { + $exists: false + } + } + ], + owner: null, + attempts: { + $lt: this.maxAttempts + } + }; + update = { + $set: { + timeout: timeout, + owner: owner + } + }; + options = { + sort: { + expires: 1 + }, + returnOriginal: false + }; + if (queue) { + query.queue = queue; + } + return this.exec(function(collection) { + return collection.findOneAndUpdate(query, update, options, function(err, result) { + return callback(err, result != null ? result.value : void 0); + }); + }); + }; + + Connection.prototype.complete = function(doc, callback) { + return this.exec(function(collection) { + var options, query; + query = { + _id: doc._id + }; + options = { + sort: { + expires: 1 + } + }; + return collection.findOneAndDelete(query, options, function(err, result) { + return callback(err, result != null ? result.value : void 0); + }); + }); + }; + + Connection.prototype.release = function(doc, callback) { + return this.exec(function(collection) { + var options, query, update; + query = { + _id: doc._id + }; + update = { + $unset: { + timeout: 1, + owner: 1 + }, + $inc: { + attempts: 1 + } + }; + options = { + sort: { + expires: 1 + }, + returnOriginal: false + }; + return collection.findOneAndUpdate(query, update, options, function(err, result) { + return callback(err, result != null ? result.value : void 0); + }); + }); + }; + + Connection.prototype.cleanup = function(callback) { + return this.exec(function(collection) { + var options, query, update; + query = { + timeout: { + $lt: new Date + } + }; + update = { + $unset: { + timeout: 1, + owner: 1 + } + }; + options = { + multi: 1 + }; + return collection.update(query, update, options, callback); + }); + }; + + return Connection; + + })(EventEmitter); + + exports.Template = (function() { + function Template(worker, doc1) { + this.worker = worker; + this.doc = doc1; + } + + Template.prototype.invoke = function() { + var err, error; + try { + return this.perform.apply(this, this.doc.args); + } catch (error) { + err = error; + return this.complete(err); + } + }; + + Template.prototype.perform = function() { + var args; + args = 1 <= arguments.length ? slice.call(arguments, 0) : []; + throw new Error('Yo, you need to implement me!'); + }; + + Template.prototype.complete = function(err) { + return this.worker.complete(err, this.doc); + }; + + return Template; + + })(); + + exports.Worker = (function(superClass) { + extend(Worker, superClass); + + function Worker(connection, templates, options) { + this.connection = connection; + this.templates = templates; + options || (options = {}); + this.name = [require('os').hostname(), process.pid].join(':'); + this.timeout = options.timeout || 1000; + this.rotate = options.rotate || false; + this.workers = options.workers || 3; + this.pending = 0; + } + + Worker.prototype.poll = function() { + var Template, templateName; + if (this.stopped) { + return; + } + if (this.pending >= this.workers) { + return this.sleep(); + } + Template = this.getTemplate(); + templateName = Template ? Template.name : void 0; + return this.connection.next(templateName, this.name, (function(_this) { + return function(err, doc) { + if ((err != null) && err.message !== 'No matching object found') { + _this.emit('error', err); + } else if (doc != null) { + ++_this.pending; + if (!Template) { + Template = _this.getTemplate(doc.queue); + } + if (Template) { + new Template(_this, doc).invoke(); + } else { + _this.emit('error', new Error("Unknown template '" + _this.name + "'")); + } + process.nextTick(function() { + return _this.poll(); + }); + } else { + if (_this.pending === 0) { + _this.emit('drained'); + } + } + return _this.sleep(); + }; + })(this)); + }; + + Worker.prototype.getTemplate = function(name) { + var Template; + Template = null; + if (name) { + this.templates.some((function(_this) { + return function(_Template) { + if (_Template.name === name) { + Template = _Template; + return true; + } + }; + })(this)); + } else if (this.rotate) { + Template = this.templates.shift(); + this.templates.push(Template); + } + return Template; + }; + + Worker.prototype.sleep = function() { + if (this.pollTimeout) { + clearTimeout(this.pollTimeout); + } + if (!this.stopped) { + return this.pollTimeout = setTimeout((function(_this) { + return function() { + _this.pollTimeout = null; + return _this.poll(); + }; + })(this), this.timeout); + } + }; + + Worker.prototype.complete = function(err, doc) { + var cb; + cb = (function(_this) { + return function() { + --_this.pending; + if (!_this.stopped) { + _this.poll(); + } + if (_this.pending === 0) { + return _this.emit('stopped'); + } + }; + })(this); + if (err != null) { + this.emit('error', err); + return this.connection.release(doc, cb); + } else { + return this.connection.complete(doc, cb); + } + }; + + Worker.prototype.stop = function() { + this.stopped = true; + clearTimeout(this.pollTimeout); + if (this.pending === 0) { + return this.emit('stopped'); + } + }; + + return Worker; + + })(require('events').EventEmitter); + +}).call(this); diff --git a/package.json b/package.json index 668518b..f75a7c1 100644 --- a/package.json +++ b/package.json @@ -45,6 +45,9 @@ }, { "name": "Hannes Gassert" + }, + { + "name": "Sean Cunningham" } ] } diff --git a/src/queue.coffee b/src/queue.coffee index 391c1be..a5c5c20 100644 --- a/src/queue.coffee +++ b/src/queue.coffee @@ -12,6 +12,7 @@ # mongo-queue is backed by MongoDB mongodb = require 'mongodb' EventEmitter = require('events').EventEmitter +URL = require 'url' # The **Connection** class wraps the connection to MongoDB. It includes # methods to manipulate (add, remove, clear, ...) jobs in the queues. @@ -58,12 +59,13 @@ class exports.Connection extends EventEmitter # TODO: support replica sets # TODO: support connection URIs - url = "mongodb://" - - if opt.username and opt.password - url += encodeURIComponent("#{opt.username}:#{opt.password}") + '@' - - url += "#{opt.host || '127.0.0.1'}:#{opt.port || 27017}/#{opt.db || 'queue'}?w=1" + url = URL.format + protocol: 'mongodb' + slashes: true + auth: "#{opt.username}:#{opt.password}" if (opt.username && opt.password) + host: "#{opt.host || '127.0.0.1'}:#{opt.port || 27017}" + pathname: "/#{opt.db || 'queue'}" + search: '?w=1' mongodb.MongoClient.connect url, (err, _db) => @emit('error', err) if err