diff --git a/aedes.js b/aedes.js index f71e1b38..454fe4a6 100644 --- a/aedes.js +++ b/aedes.js @@ -26,6 +26,7 @@ const defaultOptions = { authorizeSubscribe: defaultAuthorizeSubscribe, authorizeForward: defaultAuthorizeForward, published: defaultPublished, + sharedTopics: false, trustProxy: false, trustedProxies: [], queueLimit: 42 @@ -65,11 +66,13 @@ function Aedes (opts) { this.published = opts.published this.decodeProtocol = opts.decodeProtocol + this.sharedTopics = opts.sharedTopics this.trustProxy = opts.trustProxy this.trustedProxies = opts.trustedProxies this.clients = {} this.brokers = {} + this.groups = {} const heartbeatTopic = '$SYS/' + that.id + '/heartbeat' this._heartbeatInterval = setInterval(heartbeat, opts.heartbeatInterval) diff --git a/lib/group.js b/lib/group.js new file mode 100644 index 00000000..af1ad9a1 --- /dev/null +++ b/lib/group.js @@ -0,0 +1,62 @@ +'use-strict' + +module.exports = Group + +function SharedSubscription (group, topic) { + this.clients = [] + this.group = group + this.topic = topic + this.subscriptions = {} // clientid: qos + this._next = 0 + this.isShared = true +} + +function Group (id, broker) { + this.id = id + this.broker = broker + this.subscriptions = {} // topic: SharedSubscription + + broker.groups[id] = this +} + +Group.prototype.subscribe = function (client, qos, topic, done) { + var shared = this.subscriptions[topic] + + if (!shared) { + shared = new SharedSubscription(this, topic) + this.subscriptions[topic] = shared + } + + var clientSubQoS = shared.subscriptions[client.id] + + if (clientSubQoS === undefined) { + shared.subscriptions[client.id] = qos + client.subscriptions[this.id + '/' + topic] = shared + shared.clients.push(client) + this.broker.subscribe(topic, this.deliverMessage.bind(this), done) + } else if (clientSubQoS !== qos) { + shared.subscriptions[client.id] = qos + } else { + done() + } +} + +Group.prototype.unsubscribe = function (shared, client, done) { + delete shared.subscriptions[client.id] + delete client.subscriptions[shared.group.id + '/' + shared.topic] + var index = shared.clients.indexOf(client) + + if (index < shared._next && shared._next > 0) shared._next-- + + shared.clients.splice(index, 1) + this.broker.unsubscribe(shared.topic, this.deliverMessage, done) +} + +Group.prototype.deliverMessage = function (_packet, cb) { + var shared = this.subscriptions[_packet.topic] + + if (shared._next === shared.clients.length) shared._next = 0 + var client = shared.clients[shared._next++] + + if (client) client.deliverQoS(_packet, cb) +} diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index d952cbda..c444aa79 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -4,7 +4,10 @@ const write = require('../write') const fastfall = require('fastfall') const Packet = require('aedes-packet') const through = require('through2') +const Group = require('../group') const validateTopic = require('./validations').validateTopic +const sharedTopic = require('./validations').sharedTopic + const topicActions = fastfall([ authorize, storeSubscriptions, @@ -112,14 +115,20 @@ function subTopic (sub, done) { return done() } - const client = this.client + var client = this.client const broker = client.broker const topic = sub.topic const qos = sub.qos var func = qos > 0 ? client.deliverQoS : client.deliver0 - // [MQTT-4.7.2-1] - if (isStartsWithWildcard(topic)) { + if (broker.sharedTopics) { + const parsedTopic = sharedTopic(topic) + if (parsedTopic) { + var group = broker.groups[parsedTopic.group] || new Group(parsedTopic.group, broker) + group.subscribe(client, qos, parsedTopic.topic, done) + return + } + } else if (isStartsWithWildcard(topic)) { // [MQTT-4.7.2-1] func = blockDollarSignTopics(func) } diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index 7ee5a32b..092f45a6 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -2,6 +2,7 @@ const write = require('../write') const validateTopic = require('./validations').validateTopic +const sharedTopic = require('./validations').sharedTopic function UnsubscribeState (client, packet, finish) { this.client = client @@ -46,15 +47,26 @@ function actualUnsubscribe (client, packet, done) { function doUnsubscribe (sub, done) { const client = this.client const broker = client.broker + + if (broker.sharedTopics) { + const parsedTopic = sharedTopic(sub) + if (parsedTopic) { + sub = parsedTopic.group + '/' + parsedTopic.topic + } + } const s = client.subscriptions[sub] if (s) { - var func = s.func - delete client.subscriptions[sub] - broker.unsubscribe( - sub, - func, - done) + if (s.isShared) { + s.group.unsubscribe(s, client, done) + } else { + var func = s.func + delete client.subscriptions[sub] + broker.unsubscribe( + sub, + func, + done) + } } else { done() } diff --git a/lib/handlers/validations.js b/lib/handlers/validations.js index f9993aaf..92524e19 100644 --- a/lib/handlers/validations.js +++ b/lib/handlers/validations.js @@ -26,4 +26,18 @@ function validateTopic (topic, message) { } } -module.exports.validateTopic = validateTopic +function sharedTopic (topic) { + if (!topic || !topic.startsWith('$share/')) return null + + var group = topic.substring(7, topic.indexOf('/', 7)) + + return { + group: group, + topic: topic.substring(8 + group.length) + } +} + +module.exports = { + validateTopic: validateTopic, + sharedTopic: sharedTopic +}