diff --git a/lib/channel_model.js b/lib/channel_model.js index 2097fd14..ea652e2f 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -4,264 +4,259 @@ 'use strict'; -var defs = require('./defs'); -var Promise = require('bluebird'); -var inherits = require('util').inherits; -var EventEmitter = require('events').EventEmitter; -var BaseChannel = require('./channel').BaseChannel; -var acceptMessage = require('./channel').acceptMessage; -var Args = require('./api_args'); - -function ChannelModel(connection) { - if (!(this instanceof ChannelModel)) - return new ChannelModel(connection); - EventEmitter.call( this ); - this.connection = connection; - var self = this; - ['error', 'close', 'blocked', 'unblocked'].forEach(function(ev) { - connection.on(ev, self.emit.bind(self, ev)); - }); +const EventEmitter = require('events'); +const Promise = require('bluebird'); +const defs = require('./defs'); +const {BaseChannel} = require('./channel'); +const {acceptMessage} = require('./channel'); +const Args = require('./api_args'); +const {inspect} = require('./format'); + +class ChannelModel extends EventEmitter { + constructor(connection) { + super(); + this.connection = connection; + + ['error', 'close', 'blocked', 'unblocked'].forEach(ev => { + connection.on(ev, this.emit.bind(this, ev)); + }); + } + + close() { + return Promise.fromCallback(this.connection.close.bind(this.connection)); + } + + async createChannel() { + const channel = new Channel(this.connection); + await channel.open(); + return channel; + } + + async createConfirmChannel() { + const channel = new ConfirmChannel(this.connection); + await channel.open(); + await channel.rpc(defs.ConfirmSelect, {nowait: false}, defs.ConfirmSelectOk); + return channel; + } } -inherits(ChannelModel, EventEmitter); - -module.exports.ChannelModel = ChannelModel; - -var CM = ChannelModel.prototype; - -CM.close = function() { - return Promise.fromCallback(this.connection.close.bind(this.connection)); -}; // Channels -function Channel(connection) { - BaseChannel.call(this, connection); - this.on('delivery', this.handleDelivery.bind(this)); - this.on('cancel', this.handleCancel.bind(this)); -} -inherits(Channel, BaseChannel); - -module.exports.Channel = Channel; +class Channel extends BaseChannel { + constructor(connection) { + super(connection); + this.on('delivery', this.handleDelivery.bind(this)); + this.on('cancel', this.handleCancel.bind(this)); + } + + // An RPC that returns a 'proper' promise, which resolves to just the + // response's fields; this is intended to be suitable for implementing + // API procedures. + async rpc(method, fields, expect) { + const f = await Promise.fromCallback(cb => { + return this._rpc(method, fields, expect, cb); + }) -CM.createChannel = function() { - var c = new Channel(this.connection); - return c.open().then(function(openOk) { return c; }); -}; - -var C = Channel.prototype; - -// An RPC that returns a 'proper' promise, which resolves to just the -// response's fields; this is intended to be suitable for implementing -// API procedures. -C.rpc = function(method, fields, expect) { - var self = this; - return Promise.fromCallback(function(cb) { - return self._rpc(method, fields, expect, cb); - }) - .then(function(f) { return f.fields; - }); -}; - -// Do the remarkably simple channel open handshake -C.open = function() { - return Promise.try(this.allocate.bind(this)).then( - function(ch) { - return ch.rpc(defs.ChannelOpen, {outOfBand: ""}, - defs.ChannelOpenOk); + } + + // Do the remarkably simple channel open handshake + open() { + return Promise.try(this.allocate.bind(this)).then( + ch => { + return ch.rpc(defs.ChannelOpen, {outOfBand: ""}, + defs.ChannelOpenOk); + }); + } + + close() { + return Promise.fromCallback(cb => { + return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, + cb); + }); + } + + // === Public API, declaring queues and stuff === + + assertQueue(queue, options) { + return this.rpc(defs.QueueDeclare, + Args.assertQueue(queue, options), + defs.QueueDeclareOk); + } + + checkQueue(queue) { + return this.rpc(defs.QueueDeclare, + Args.checkQueue(queue), + defs.QueueDeclareOk); + } + + deleteQueue(queue, options) { + return this.rpc(defs.QueueDelete, + Args.deleteQueue(queue, options), + defs.QueueDeleteOk); + } + + purgeQueue(queue) { + return this.rpc(defs.QueuePurge, + Args.purgeQueue(queue), + defs.QueuePurgeOk); + } + + bindQueue(queue, source, pattern, argt) { + return this.rpc(defs.QueueBind, + Args.bindQueue(queue, source, pattern, argt), + defs.QueueBindOk); + } + + unbindQueue(queue, source, pattern, argt) { + return this.rpc(defs.QueueUnbind, + Args.unbindQueue(queue, source, pattern, argt), + defs.QueueUnbindOk); + } + + assertExchange(exchange, type, options) { + // The server reply is an empty set of fields, but it's convenient + // to have the exchange name handed to the continuation. + return this.rpc(defs.ExchangeDeclare, + Args.assertExchange(exchange, type, options), + defs.ExchangeDeclareOk) + .then(_ok => { return { exchange }; }); + } + + checkExchange(exchange) { + return this.rpc(defs.ExchangeDeclare, + Args.checkExchange(exchange), + defs.ExchangeDeclareOk); + } + + deleteExchange(name, options) { + return this.rpc(defs.ExchangeDelete, + Args.deleteExchange(name, options), + defs.ExchangeDeleteOk); + } + + bindExchange(dest, source, pattern, argt) { + return this.rpc(defs.ExchangeBind, + Args.bindExchange(dest, source, pattern, argt), + defs.ExchangeBindOk); + } + + unbindExchange(dest, source, pattern, argt) { + return this.rpc(defs.ExchangeUnbind, + Args.unbindExchange(dest, source, pattern, argt), + defs.ExchangeUnbindOk); + } + + // Working with messages + + publish(exchange, routingKey, content, options) { + const fieldsAndProps = Args.publish(exchange, routingKey, options); + return this.sendMessage(fieldsAndProps, fieldsAndProps, content); + } + + sendToQueue(queue, content, options) { + return this.publish('', queue, content, options); + } + + consume(queue, callback, options) { + // NB we want the callback to be run synchronously, so that we've + // registered the consumerTag before any messages can arrive. + const fields = Args.consume(queue, options); + return Promise.fromCallback(cb => { + this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb); + }) + .then(ok => { + this.registerConsumer(ok.fields.consumerTag, callback); + return ok.fields; + }); + } + + async cancel(consumerTag) { + const ok = await Promise.fromCallback(cb => { + this._rpc(defs.BasicCancel, Args.cancel(consumerTag), + defs.BasicCancelOk, + cb); + }) + .then(ok => { + this.unregisterConsumer(consumerTag); + return ok.fields; }); -}; - -C.close = function() { - var self = this; - return Promise.fromCallback(function(cb) { - return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, - cb); - }); -}; - -// === Public API, declaring queues and stuff === - -C.assertQueue = function(queue, options) { - return this.rpc(defs.QueueDeclare, - Args.assertQueue(queue, options), - defs.QueueDeclareOk); -}; - -C.checkQueue = function(queue) { - return this.rpc(defs.QueueDeclare, - Args.checkQueue(queue), - defs.QueueDeclareOk); -}; - -C.deleteQueue = function(queue, options) { - return this.rpc(defs.QueueDelete, - Args.deleteQueue(queue, options), - defs.QueueDeleteOk); -}; - -C.purgeQueue = function(queue) { - return this.rpc(defs.QueuePurge, - Args.purgeQueue(queue), - defs.QueuePurgeOk); -}; - -C.bindQueue = function(queue, source, pattern, argt) { - return this.rpc(defs.QueueBind, - Args.bindQueue(queue, source, pattern, argt), - defs.QueueBindOk); -}; - -C.unbindQueue = function(queue, source, pattern, argt) { - return this.rpc(defs.QueueUnbind, - Args.unbindQueue(queue, source, pattern, argt), - defs.QueueUnbindOk); -}; - -C.assertExchange = function(exchange, type, options) { - // The server reply is an empty set of fields, but it's convenient - // to have the exchange name handed to the continuation. - return this.rpc(defs.ExchangeDeclare, - Args.assertExchange(exchange, type, options), - defs.ExchangeDeclareOk) - .then(function(_ok) { return { exchange: exchange }; }); -}; - -C.checkExchange = function(exchange) { - return this.rpc(defs.ExchangeDeclare, - Args.checkExchange(exchange), - defs.ExchangeDeclareOk); -}; - -C.deleteExchange = function(name, options) { - return this.rpc(defs.ExchangeDelete, - Args.deleteExchange(name, options), - defs.ExchangeDeleteOk); -}; - -C.bindExchange = function(dest, source, pattern, argt) { - return this.rpc(defs.ExchangeBind, - Args.bindExchange(dest, source, pattern, argt), - defs.ExchangeBindOk); -}; - -C.unbindExchange = function(dest, source, pattern, argt) { - return this.rpc(defs.ExchangeUnbind, - Args.unbindExchange(dest, source, pattern, argt), - defs.ExchangeUnbindOk); -}; - -// Working with messages - -C.publish = function(exchange, routingKey, content, options) { - var fieldsAndProps = Args.publish(exchange, routingKey, options); - return this.sendMessage(fieldsAndProps, fieldsAndProps, content); -}; - -C.sendToQueue = function(queue, content, options) { - return this.publish('', queue, content, options); -}; - -C.consume = function(queue, callback, options) { - var self = this; - // NB we want the callback to be run synchronously, so that we've - // registered the consumerTag before any messages can arrive. - var fields = Args.consume(queue, options); - return Promise.fromCallback(function(cb) { - self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb); - }) - .then(function(ok) { - self.registerConsumer(ok.fields.consumerTag, callback); - return ok.fields; - }); -}; - -C.cancel = function(consumerTag) { - var self = this; - return Promise.fromCallback(function(cb) { - self._rpc(defs.BasicCancel, Args.cancel(consumerTag), - defs.BasicCancelOk, - cb); - }) - .then(function(ok) { - self.unregisterConsumer(consumerTag); - return ok.fields; - }); -}; - -C.get = function(queue, options) { - var self = this; - var fields = Args.get(queue, options); - return Promise.fromCallback(function(cb) { - return self.sendOrEnqueue(defs.BasicGet, fields, cb); - }) - .then(function(f) { - if (f.id === defs.BasicGetEmpty) { - return false; - } - else if (f.id === defs.BasicGetOk) { - var fields = f.fields; - return new Promise(function(resolve) { - self.handleMessage = acceptMessage(function(m) { - m.fields = fields; - resolve(m); + } + + get(queue, options) { + const fields = Args.get(queue, options); + return Promise.fromCallback(cb => { + return this.sendOrEnqueue(defs.BasicGet, fields, cb); + }) + .then(f => { + if (f.id === defs.BasicGetEmpty) { + return false; + } + else if (f.id === defs.BasicGetOk) { + const fields = f.fields; + return new Promise(resolve => { + this.handleMessage = acceptMessage(m => { + m.fields = fields; + resolve(m); + }); }); - }); - } - else { - throw new Error("Unexpected response to BasicGet: " + - inspect(f)); - } - }) -}; - -C.ack = function(message, allUpTo) { - this.sendImmediately( - defs.BasicAck, - Args.ack(message.fields.deliveryTag, allUpTo)); -}; - -C.ackAll = function() { - this.sendImmediately(defs.BasicAck, Args.ack(0, true)); -}; - -C.nack = function(message, allUpTo, requeue) { - this.sendImmediately( - defs.BasicNack, - Args.nack(message.fields.deliveryTag, allUpTo, requeue)); -}; - -C.nackAll = function(requeue) { - this.sendImmediately(defs.BasicNack, - Args.nack(0, true, requeue)); -}; - -// `Basic.Nack` is not available in older RabbitMQ versions (or in the -// AMQP specification), so you have to use the one-at-a-time -// `Basic.Reject`. This is otherwise synonymous with -// `#nack(message, false, requeue)`. -C.reject = function(message, requeue) { - this.sendImmediately( - defs.BasicReject, - Args.reject(message.fields.deliveryTag, requeue)); -}; + } + else { + throw new Error(`Unexpected response to BasicGet: ${inspect(f)}`); + } + }); + } + + ack(message, allUpTo) { + this.sendImmediately( + defs.BasicAck, + Args.ack(message.fields.deliveryTag, allUpTo)); + } + + ackAll() { + this.sendImmediately(defs.BasicAck, Args.ack(0, true)); + } + + nack(message, allUpTo, requeue) { + this.sendImmediately( + defs.BasicNack, + Args.nack(message.fields.deliveryTag, allUpTo, requeue)); + } + + nackAll(requeue) { + this.sendImmediately(defs.BasicNack, + Args.nack(0, true, requeue)); + } + + // `Basic.Nack` is not available in older RabbitMQ versions (or in the + // AMQP specification), so you have to use the one-at-a-time + // `Basic.Reject`. This is otherwise synonymous with + // `#nack(message, false, requeue)`. + reject(message, requeue) { + this.sendImmediately( + defs.BasicReject, + Args.reject(message.fields.deliveryTag, requeue)); + } + + recover() { + return this.rpc(defs.BasicRecover, + Args.recover(), + defs.BasicRecoverOk); + } + + qos(count, global) { + return this.rpc(defs.BasicQos, + Args.prefetch(count, global), + defs.BasicQosOk); + } +} // There are more options in AMQP than exposed here; RabbitMQ only // implements prefetch based on message count, and only for individual // channels or consumers. RabbitMQ v3.3.0 and after treat prefetch // (without `global` set) as per-consumer (for consumers following), // and prefetch with `global` set as per-channel. -C.prefetch = C.qos = function(count, global) { - return this.rpc(defs.BasicQos, - Args.prefetch(count, global), - defs.BasicQosOk); -}; - -C.recover = function() { - return this.rpc(defs.BasicRecover, - Args.recover(), - defs.BasicRecoverOk); -}; +Channel.prototype.prefetch = Channel.prototype.qos // Confirm channel. This is a channel with confirms 'switched on', // meaning sent messages will provoke a responding 'ack' or 'nack' @@ -270,49 +265,35 @@ C.recover = function() { // with `null` as its argument to signify 'ack', or an exception as // its argument to signify 'nack'. -function ConfirmChannel(connection) { - Channel.call(this, connection); +class ConfirmChannel extends Channel { + publish(exchange, routingKey, content, options, cb) { + this.pushConfirmCallback(cb); + return Channel.prototype.publish.call(this, exchange, routingKey, content, options); + } + + sendToQueue(queue, content, options, cb) { + return this.publish('', queue, content, options, cb); + } + + waitForConfirms() { + const awaiting = []; + const unconfirmed = this.unconfirmed; + unconfirmed.forEach((val, index) => { + if (val !== null) { + const confirmed = new Promise((resolve, reject) => { + unconfirmed[index] = err => { + if (val) val(err); + if (err === null) resolve(); + else reject(err); + }; + }); + awaiting.push(confirmed); + } + }); + return Promise.all(awaiting); + } } -inherits(ConfirmChannel, Channel); module.exports.ConfirmChannel = ConfirmChannel; - -CM.createConfirmChannel = function() { - var c = new ConfirmChannel(this.connection); - return c.open() - .then(function(openOk) { - return c.rpc(defs.ConfirmSelect, {nowait: false}, - defs.ConfirmSelectOk) - }) - .then(function() { return c; }); -}; - -var CC = ConfirmChannel.prototype; - -CC.publish = function(exchange, routingKey, content, options, cb) { - this.pushConfirmCallback(cb); - return C.publish.call(this, exchange, routingKey, content, options); -}; - -CC.sendToQueue = function(queue, content, options, cb) { - return this.publish('', queue, content, options, cb); -}; - -CC.waitForConfirms = function() { - var awaiting = []; - var unconfirmed = this.unconfirmed; - unconfirmed.forEach(function(val, index) { - if (val === null); // already confirmed - else { - var confirmed = new Promise(function(resolve, reject) { - unconfirmed[index] = function(err) { - if (val) val(err); - if (err === null) resolve(); - else reject(err); - }; - }); - awaiting.push(confirmed); - } - }); - return Promise.all(awaiting); -}; +module.exports.Channel = Channel; +module.exports.ChannelModel = ChannelModel;