Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const defaultOptions = {
authorizeSubscribe: defaultAuthorizeSubscribe,
authorizeForward: defaultAuthorizeForward,
published: defaultPublished,
sharedTopics: false,
trustProxy: false,
trustedProxies: [],
queueLimit: 42
Expand Down Expand Up @@ -65,11 +66,13 @@ function Aedes (opts) {
this.published = opts.published

this.decodeProtocol = opts.decodeProtocol
this.sharedTopics = opts.sharedTopics
this.trustProxy = opts.trustProxy
this.trustedProxies = opts.trustedProxies

this.clients = {}
this.brokers = {}
this.groups = {}

const heartbeatTopic = '$SYS/' + that.id + '/heartbeat'
this._heartbeatInterval = setInterval(heartbeat, opts.heartbeatInterval)
Expand Down
62 changes: 62 additions & 0 deletions lib/group.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
'use-strict'

module.exports = Group

function SharedSubscription (group, topic) {
this.clients = []
this.group = group
this.topic = topic
this.subscriptions = {} // clientid: qos
this._next = 0
this.isShared = true
}

function Group (id, broker) {
this.id = id
this.broker = broker
this.subscriptions = {} // topic: SharedSubscription

broker.groups[id] = this
}

Group.prototype.subscribe = function (client, qos, topic, done) {
var shared = this.subscriptions[topic]

if (!shared) {
shared = new SharedSubscription(this, topic)
this.subscriptions[topic] = shared
}

var clientSubQoS = shared.subscriptions[client.id]

if (clientSubQoS === undefined) {
shared.subscriptions[client.id] = qos
client.subscriptions[this.id + '/' + topic] = shared
shared.clients.push(client)
this.broker.subscribe(topic, this.deliverMessage.bind(this), done)
} else if (clientSubQoS !== qos) {
shared.subscriptions[client.id] = qos
} else {
done()
}
}

Group.prototype.unsubscribe = function (shared, client, done) {
delete shared.subscriptions[client.id]
delete client.subscriptions[shared.group.id + '/' + shared.topic]
var index = shared.clients.indexOf(client)

if (index < shared._next && shared._next > 0) shared._next--

shared.clients.splice(index, 1)
this.broker.unsubscribe(shared.topic, this.deliverMessage, done)
}

Group.prototype.deliverMessage = function (_packet, cb) {
var shared = this.subscriptions[_packet.topic]

if (shared._next === shared.clients.length) shared._next = 0
var client = shared.clients[shared._next++]

if (client) client.deliverQoS(_packet, cb)
}
15 changes: 12 additions & 3 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ const write = require('../write')
const fastfall = require('fastfall')
const Packet = require('aedes-packet')
const through = require('through2')
const Group = require('../group')
const validateTopic = require('./validations').validateTopic
const sharedTopic = require('./validations').sharedTopic

const topicActions = fastfall([
authorize,
storeSubscriptions,
Expand Down Expand Up @@ -112,14 +115,20 @@ function subTopic (sub, done) {
return done()
}

const client = this.client
var client = this.client
const broker = client.broker
const topic = sub.topic
const qos = sub.qos
var func = qos > 0 ? client.deliverQoS : client.deliver0

// [MQTT-4.7.2-1]
if (isStartsWithWildcard(topic)) {
if (broker.sharedTopics) {
const parsedTopic = sharedTopic(topic)
if (parsedTopic) {
var group = broker.groups[parsedTopic.group] || new Group(parsedTopic.group, broker)
group.subscribe(client, qos, parsedTopic.topic, done)
return
}
} else if (isStartsWithWildcard(topic)) { // [MQTT-4.7.2-1]
func = blockDollarSignTopics(func)
}

Expand Down
24 changes: 18 additions & 6 deletions lib/handlers/unsubscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const write = require('../write')
const validateTopic = require('./validations').validateTopic
const sharedTopic = require('./validations').sharedTopic

function UnsubscribeState (client, packet, finish) {
this.client = client
Expand Down Expand Up @@ -46,15 +47,26 @@ function actualUnsubscribe (client, packet, done) {
function doUnsubscribe (sub, done) {
const client = this.client
const broker = client.broker

if (broker.sharedTopics) {
const parsedTopic = sharedTopic(sub)
if (parsedTopic) {
sub = parsedTopic.group + '/' + parsedTopic.topic
}
}
const s = client.subscriptions[sub]

if (s) {
var func = s.func
delete client.subscriptions[sub]
broker.unsubscribe(
sub,
func,
done)
if (s.isShared) {
s.group.unsubscribe(s, client, done)
} else {
var func = s.func
delete client.subscriptions[sub]
broker.unsubscribe(
sub,
func,
done)
}
} else {
done()
}
Expand Down
16 changes: 15 additions & 1 deletion lib/handlers/validations.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,18 @@ function validateTopic (topic, message) {
}
}

module.exports.validateTopic = validateTopic
function sharedTopic (topic) {
if (!topic || !topic.startsWith('$share/')) return null

var group = topic.substring(7, topic.indexOf('/', 7))

return {
group: group,
topic: topic.substring(8 + group.length)
}
}

module.exports = {
validateTopic: validateTopic,
sharedTopic: sharedTopic
}