From 6180b51d57599c877f5aae4d42fb7b163edec630 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 20 Feb 2020 14:35:48 +0100 Subject: [PATCH 1/9] WIP: shared subscriptions --- aedes.js | 3 +++ lib/group.js | 34 ++++++++++++++++++++++++++++++++++ lib/handlers/subscribe.js | 14 +++++++++++--- lib/handlers/validations.js | 16 +++++++++++++++- 4 files changed, 63 insertions(+), 4 deletions(-) create mode 100644 lib/group.js diff --git a/aedes.js b/aedes.js index f71e1b38..454fe4a6 100644 --- a/aedes.js +++ b/aedes.js @@ -26,6 +26,7 @@ const defaultOptions = { authorizeSubscribe: defaultAuthorizeSubscribe, authorizeForward: defaultAuthorizeForward, published: defaultPublished, + sharedTopics: false, trustProxy: false, trustedProxies: [], queueLimit: 42 @@ -65,11 +66,13 @@ function Aedes (opts) { this.published = opts.published this.decodeProtocol = opts.decodeProtocol + this.sharedTopics = opts.sharedTopics this.trustProxy = opts.trustProxy this.trustedProxies = opts.trustedProxies this.clients = {} this.brokers = {} + this.groups = {} const heartbeatTopic = '$SYS/' + that.id + '/heartbeat' this._heartbeatInterval = setInterval(heartbeat, opts.heartbeatInterval) diff --git a/lib/group.js b/lib/group.js new file mode 100644 index 00000000..6dab2a60 --- /dev/null +++ b/lib/group.js @@ -0,0 +1,34 @@ +'use-strict' + +module.exports = Group + +function Group (id, broker) { + this.id = id + this.clients = [] + this.broker = broker + this.subscriptions = {} + + broker.groups[id] = this + + this._next = 0 +} + +Group.prototype.add = function (client) { + this.clients.push(client) +} + +Group.prototype.remove = function (client) { + var index = this.clients.indexOf(client) + if (index >= 0) { + this.clients.splice(index, 1) + } else { + this.broker.emit('error', new Error('Unable to remove client ' + client.id + 'from group ' + this.id)) + } +} + +Group.prototype.deliverMessage = function (_packet, cb) { + if (this._next === this.clients.length) this._next = 0 + var client = this.clients[this._next++] + + client.deliverQoS(_packet, cb) +} diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index d952cbda..beff70b4 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -4,7 +4,10 @@ const write = require('../write') const fastfall = require('fastfall') const Packet = require('aedes-packet') const through = require('through2') +const Group = require('../group') const validateTopic = require('./validations').validateTopic +const sharedTopic = require('./validations').sharedTopic + const topicActions = fastfall([ authorize, storeSubscriptions, @@ -112,14 +115,19 @@ function subTopic (sub, done) { return done() } - const client = this.client + var client = this.client const broker = client.broker const topic = sub.topic const qos = sub.qos var func = qos > 0 ? client.deliverQoS : client.deliver0 - // [MQTT-4.7.2-1] - if (isStartsWithWildcard(topic)) { + if (broker.sharedTopics) { + const parsedTopic = sharedTopic(sub) + if (parsedTopic) { + client = broker.groups[parsedTopic.group] || new Group(parsedTopic.group, broker) + func = client.deliverMessage + } + } else if (isStartsWithWildcard(topic)) { // [MQTT-4.7.2-1] func = blockDollarSignTopics(func) } diff --git a/lib/handlers/validations.js b/lib/handlers/validations.js index f9993aaf..92524e19 100644 --- a/lib/handlers/validations.js +++ b/lib/handlers/validations.js @@ -26,4 +26,18 @@ function validateTopic (topic, message) { } } -module.exports.validateTopic = validateTopic +function sharedTopic (topic) { + if (!topic || !topic.startsWith('$share/')) return null + + var group = topic.substring(7, topic.indexOf('/', 7)) + + return { + group: group, + topic: topic.substring(8 + group.length) + } +} + +module.exports = { + validateTopic: validateTopic, + sharedTopic: sharedTopic +} From 8fc8899e94743a47941343f55b5cf269012ccf7c Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 20 Feb 2020 15:37:14 +0100 Subject: [PATCH 2/9] fix: Group subscrube --- lib/group.js | 31 +++++++++++++++++++++++-------- lib/handlers/subscribe.js | 4 ++-- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/lib/group.js b/lib/group.js index 6dab2a60..4866c4c7 100644 --- a/lib/group.js +++ b/lib/group.js @@ -2,22 +2,35 @@ module.exports = Group +function SharedSubscription () { + this.clients = [] + this.subscriptions = {} // clientid: qos + this._next = 0 +} + function Group (id, broker) { this.id = id - this.clients = [] this.broker = broker this.subscriptions = {} broker.groups[id] = this - - this._next = 0 } -Group.prototype.add = function (client) { - this.clients.push(client) +Group.prototype.subscribe = function (client, qos, topic, done) { + var shared = this.subscriptions[topic] || new SharedSubscription() + var clientSubQoS = shared.subscriptions[client.id] + + if (clientSubQoS === undefined) { + shared.subscriptions[client.id] = qos + this.broker.subscribe(topic, this.deliverMessage, done) + } else if (clientSubQoS !== qos) { + shared.subscriptions[client.id] = qos + } else { + done() + } } -Group.prototype.remove = function (client) { +Group.prototype.unsubscribe = function (client, sub, topic, done) { var index = this.clients.indexOf(client) if (index >= 0) { this.clients.splice(index, 1) @@ -27,8 +40,10 @@ Group.prototype.remove = function (client) { } Group.prototype.deliverMessage = function (_packet, cb) { - if (this._next === this.clients.length) this._next = 0 - var client = this.clients[this._next++] + var shared = this.subscriptions[_packet.topic] + + if (shared._next === shared.clients.length) shared._next = 0 + var client = shared.clients[shared._next++] client.deliverQoS(_packet, cb) } diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index beff70b4..24f0c7b2 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -124,8 +124,8 @@ function subTopic (sub, done) { if (broker.sharedTopics) { const parsedTopic = sharedTopic(sub) if (parsedTopic) { - client = broker.groups[parsedTopic.group] || new Group(parsedTopic.group, broker) - func = client.deliverMessage + var group = broker.groups[parsedTopic.group] || new Group(parsedTopic.group, broker) + group.subscribe(client, qos, topic, done) } } else if (isStartsWithWildcard(topic)) { // [MQTT-4.7.2-1] func = blockDollarSignTopics(func) From 67457d0130703f99d7c38680e1605d5decd7312a Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 20 Feb 2020 15:55:50 +0100 Subject: [PATCH 3/9] fix: Group unsubscribe --- lib/group.js | 25 +++++++++++++++---------- lib/handlers/unsubscribe.js | 17 +++++++++++------ 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/lib/group.js b/lib/group.js index 4866c4c7..33a6b838 100644 --- a/lib/group.js +++ b/lib/group.js @@ -2,10 +2,13 @@ module.exports = Group -function SharedSubscription () { +function SharedSubscription (group, topic) { this.clients = [] + this.group = group + this.topic = topic this.subscriptions = {} // clientid: qos this._next = 0 + this.isShared = true } function Group (id, broker) { @@ -17,11 +20,12 @@ function Group (id, broker) { } Group.prototype.subscribe = function (client, qos, topic, done) { - var shared = this.subscriptions[topic] || new SharedSubscription() + var shared = this.subscriptions[topic] || new SharedSubscription(this, topic) var clientSubQoS = shared.subscriptions[client.id] if (clientSubQoS === undefined) { shared.subscriptions[client.id] = qos + client.subscriptions[topic] = shared this.broker.subscribe(topic, this.deliverMessage, done) } else if (clientSubQoS !== qos) { shared.subscriptions[client.id] = qos @@ -30,13 +34,14 @@ Group.prototype.subscribe = function (client, qos, topic, done) { } } -Group.prototype.unsubscribe = function (client, sub, topic, done) { - var index = this.clients.indexOf(client) - if (index >= 0) { - this.clients.splice(index, 1) - } else { - this.broker.emit('error', new Error('Unable to remove client ' + client.id + 'from group ' + this.id)) - } +Group.prototype.unsubscribe = function (shared, client, done) { + delete shared.subscriptions[client.id] + var index = shared.clients.indexOf(client) + + if (index < shared._next && shared._next > 0) shared._next-- + + shared.clients.splice(index, 1) + this.broker.unsubscribe(shared.topic, this.deliverMessage, done) } Group.prototype.deliverMessage = function (_packet, cb) { @@ -45,5 +50,5 @@ Group.prototype.deliverMessage = function (_packet, cb) { if (shared._next === shared.clients.length) shared._next = 0 var client = shared.clients[shared._next++] - client.deliverQoS(_packet, cb) + if (client) client.deliverQoS(_packet, cb) } diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index 7ee5a32b..b34226ec 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -2,6 +2,7 @@ const write = require('../write') const validateTopic = require('./validations').validateTopic +const sharedTopic = require('./validations').sharedTopic function UnsubscribeState (client, packet, finish) { this.client = client @@ -49,12 +50,16 @@ function doUnsubscribe (sub, done) { const s = client.subscriptions[sub] if (s) { - var func = s.func - delete client.subscriptions[sub] - broker.unsubscribe( - sub, - func, - done) + if (s.isShared) { + s.group.unsubscribe(s, client, done) + } else { + var func = s.func + delete client.subscriptions[sub] + broker.unsubscribe( + sub, + func, + done) + } } else { done() } From cc31684be2ec2040cd7130a3a21df66c06bd66bf Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 20 Feb 2020 15:58:07 +0100 Subject: [PATCH 4/9] fix: Removed unused import --- lib/handlers/unsubscribe.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index b34226ec..380e43b7 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -2,7 +2,6 @@ const write = require('../write') const validateTopic = require('./validations').validateTopic -const sharedTopic = require('./validations').sharedTopic function UnsubscribeState (client, packet, finish) { this.client = client From b77f5f3a974a8d7c501dc017cc3eb653f9dd3ddb Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 20 Feb 2020 16:02:30 +0100 Subject: [PATCH 5/9] fix: stop after group subscribe --- lib/group.js | 2 +- lib/handlers/subscribe.js | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/group.js b/lib/group.js index 33a6b838..5156ed4c 100644 --- a/lib/group.js +++ b/lib/group.js @@ -14,7 +14,7 @@ function SharedSubscription (group, topic) { function Group (id, broker) { this.id = id this.broker = broker - this.subscriptions = {} + this.subscriptions = {} // topic: SharedSubscription broker.groups[id] = this } diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 24f0c7b2..21e508e6 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -126,6 +126,7 @@ function subTopic (sub, done) { if (parsedTopic) { var group = broker.groups[parsedTopic.group] || new Group(parsedTopic.group, broker) group.subscribe(client, qos, topic, done) + return } } else if (isStartsWithWildcard(topic)) { // [MQTT-4.7.2-1] func = blockDollarSignTopics(func) From 18b0bc71c4e30028de8ea20cb01cc92b8cf2d3d6 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 20 Feb 2020 16:15:14 +0100 Subject: [PATCH 6/9] fix: Typo on subscription --- lib/handlers/subscribe.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 21e508e6..ab014d3a 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -122,7 +122,7 @@ function subTopic (sub, done) { var func = qos > 0 ? client.deliverQoS : client.deliver0 if (broker.sharedTopics) { - const parsedTopic = sharedTopic(sub) + const parsedTopic = sharedTopic(topic) if (parsedTopic) { var group = broker.groups[parsedTopic.group] || new Group(parsedTopic.group, broker) group.subscribe(client, qos, topic, done) From 0a828155fa5cfabcb644a3b1231de7b357447e3f Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 20 Feb 2020 16:32:22 +0100 Subject: [PATCH 7/9] fix: Pass parsed topic on subscribe and add client to shared subscription --- lib/group.js | 11 +++++++++-- lib/handlers/subscribe.js | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/lib/group.js b/lib/group.js index 5156ed4c..9a74250e 100644 --- a/lib/group.js +++ b/lib/group.js @@ -20,13 +20,20 @@ function Group (id, broker) { } Group.prototype.subscribe = function (client, qos, topic, done) { - var shared = this.subscriptions[topic] || new SharedSubscription(this, topic) + var shared = this.subscriptions[topic] + + if (!shared) { + shared = new SharedSubscription(this, topic) + this.subscriptions[topic] = shared + } + var clientSubQoS = shared.subscriptions[client.id] if (clientSubQoS === undefined) { shared.subscriptions[client.id] = qos client.subscriptions[topic] = shared - this.broker.subscribe(topic, this.deliverMessage, done) + shared.clients.push(client) + this.broker.subscribe(topic, this.deliverMessage.bind(this), done) } else if (clientSubQoS !== qos) { shared.subscriptions[client.id] = qos } else { diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index ab014d3a..c444aa79 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -125,7 +125,7 @@ function subTopic (sub, done) { const parsedTopic = sharedTopic(topic) if (parsedTopic) { var group = broker.groups[parsedTopic.group] || new Group(parsedTopic.group, broker) - group.subscribe(client, qos, topic, done) + group.subscribe(client, qos, parsedTopic.topic, done) return } } else if (isStartsWithWildcard(topic)) { // [MQTT-4.7.2-1] From d81053fb174c4ed40f711089140df5e30ec06889 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 20 Feb 2020 17:52:24 +0100 Subject: [PATCH 8/9] fix: Delete also client subscription when unsubscribing from group --- lib/group.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/group.js b/lib/group.js index 9a74250e..c4e0968d 100644 --- a/lib/group.js +++ b/lib/group.js @@ -43,6 +43,7 @@ Group.prototype.subscribe = function (client, qos, topic, done) { Group.prototype.unsubscribe = function (shared, client, done) { delete shared.subscriptions[client.id] + delete client.subscriptions[shared.topic] var index = shared.clients.indexOf(client) if (index < shared._next && shared._next > 0) shared._next-- From 82f5d93b6ca461291619886bcfa7cd0ea238d561 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 20 Feb 2020 18:17:36 +0100 Subject: [PATCH 9/9] fix: Add group to client subscription --- lib/group.js | 4 ++-- lib/handlers/unsubscribe.js | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/lib/group.js b/lib/group.js index c4e0968d..af1ad9a1 100644 --- a/lib/group.js +++ b/lib/group.js @@ -31,7 +31,7 @@ Group.prototype.subscribe = function (client, qos, topic, done) { if (clientSubQoS === undefined) { shared.subscriptions[client.id] = qos - client.subscriptions[topic] = shared + client.subscriptions[this.id + '/' + topic] = shared shared.clients.push(client) this.broker.subscribe(topic, this.deliverMessage.bind(this), done) } else if (clientSubQoS !== qos) { @@ -43,7 +43,7 @@ Group.prototype.subscribe = function (client, qos, topic, done) { Group.prototype.unsubscribe = function (shared, client, done) { delete shared.subscriptions[client.id] - delete client.subscriptions[shared.topic] + delete client.subscriptions[shared.group.id + '/' + shared.topic] var index = shared.clients.indexOf(client) if (index < shared._next && shared._next > 0) shared._next-- diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index 380e43b7..092f45a6 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -2,6 +2,7 @@ const write = require('../write') const validateTopic = require('./validations').validateTopic +const sharedTopic = require('./validations').sharedTopic function UnsubscribeState (client, packet, finish) { this.client = client @@ -46,6 +47,13 @@ function actualUnsubscribe (client, packet, done) { function doUnsubscribe (sub, done) { const client = this.client const broker = client.broker + + if (broker.sharedTopics) { + const parsedTopic = sharedTopic(sub) + if (parsedTopic) { + sub = parsedTopic.group + '/' + parsedTopic.topic + } + } const s = client.subscriptions[sub] if (s) {