Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b64e21d
添加对数据库的操作的类
wuyiadmin Apr 27, 2016
cb75bb5
增加mongo的验证基础类
wuyiadmin Apr 27, 2016
045f59a
调整broker启动的配置权限中加入mongo的配置
wuyiadmin Apr 28, 2016
2838f03
添加broker对ssl的支持,放入测试证书
wuyiadmin May 4, 2016
bf3f53c
修改mqtt的验证,http相关下发消息接口的调整,增加数据操作模块
wuyiadmin May 30, 2016
5d1d54c
修复授权验证时userName不是ObjectId导致无返回码的bug
wuyiadmin Jun 20, 2016
48de41c
增加了对agent发布消息和订阅的topic限制,防止agent之间相互通讯
wuyiadmin Jul 15, 2016
c5291c5
调整限制订阅时必须包括自己的agentId。
wuyiadmin Jul 15, 2016
f57dea4
调整了对mqtt连接和订阅,发布消息的验证
wuyiadmin Jul 21, 2016
db3fc41
update push
xiaopeng1995 Jul 25, 2016
656ce00
Merge remote-tracking branch 'origin/master'
xiaopeng1995 Jul 25, 2016
8877743
modify config file in last merge
tianyaxiangdong Jul 26, 2016
61559fe
修改publish时topic为jsonUp的转为标准定义的topic
tianyaxiangdong Sep 9, 2016
ef197f1
修改配置文件,及权限验证类的调整。测试使用修改。
wuyiadmin Sep 12, 2016
3674d8a
优化设备连接服务器验证逻辑
12315jack May 28, 2017
8f318a6
优化设备连接服务器验证逻辑,添加主动断开设备逻辑
12315jack Jun 26, 2017
324ed10
修改部分验证逻辑和基本的数据处理等
wuyiadmin Jun 26, 2017
b0fda3b
优化设备连接服务器验证逻辑,添加主动断开设备逻辑
12315jack Oct 18, 2017
8760ddd
Merge branch 'master' of https://github.com/zenin-tech/mithqtt into #…
12315jack Oct 18, 2017
4ebb49f
Merge branch 'master' of https://github.com/zenin-tech/mithqtt into #…
12315jack Oct 18, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
6 changes: 0 additions & 6 deletions gradle/wrapper/gradle-wrapper.properties

This file was deleted.

10 changes: 3 additions & 7 deletions gradlew

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,19 @@ public static InternalMessage<Publish> fromMqttMessage(MqttVersion version, Stri
return msg;
}

public static InternalMessage<Publish> fromMqttMessage(String topic,MqttVersion version, String clientId, String userName,
String brokerId,
MqttPublishMessage mqtt) {
InternalMessage<Publish> msg = fromMqttMessage(version, clientId, userName, brokerId, mqtt.fixedHeader());
// forge bytes payload
ByteBuf buf = mqtt.payload().duplicate();
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
msg.payload = new Publish(topic, mqtt.variableHeader().packetId(), bytes);
return msg;
}


public static InternalMessage<Disconnect> fromMqttMessage(MqttVersion version, String clientId, String userName,
String brokerId,
boolean cleanSession, boolean cleanExit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public static MqttMessage newMessage(MqttFixedHeader mqttFixedHeader, Object var
(MqttConnectVariableHeader) variableHeader,
(MqttConnectPayload) payload);


case CONNACK:
return new MqttConnAckMessage(mqttFixedHeader, (MqttConnAckVariableHeader) variableHeader);

Expand Down
2 changes: 2 additions & 0 deletions mithqtt-authenticator-dummy/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
dependencies {
// project api
compile project(':mithqtt-api')

compile project(':mithqtt-storage-mongo')
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.github.longkerdandy.mithqtt.api.auth.AuthorizeResult;
import com.github.longkerdandy.mithqtt.api.auth.Authenticator;
import io.j1st.power.storage.mongo.MongoStorage;
import io.j1st.power.storage.mongo.entity.ProductStatus;
import io.netty.handler.codec.mqtt.MqttGrantedQoS;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.commons.configuration.AbstractConfiguration;
Expand All @@ -19,10 +21,16 @@ public class DummyAuthenticator implements Authenticator {
private boolean allowDollar; // allow $ in topic
private String deniedTopic; // topic will be rejected


// protected MongoStorage mongoStorage;

@Override
public void init(AbstractConfiguration config) {
this.allowDollar = config.getBoolean("allowDollar", true);
this.deniedTopic = config.getString("deniedTopic", null);
// mongoStorage = new MongoStorage();
// mongoStorage.init(config);

}

@Override
Expand All @@ -31,6 +39,19 @@ public void destroy() {

@Override
public AuthorizeResult authConnect(String clientId, String userName, String password) {
//验证clentId是否有效
// if(!mongoStorage.isAgentExists(clientId)) {
// return AuthorizeResult.FORBIDDEN;
// }
// //验证用户名密码是否合法
// if(!mongoStorage.isAgentAuth(userName, password)) {
// return AuthorizeResult.FORBIDDEN;
// }
// //验证product状态是否正常
// Integer status = this.mongoStorage.getProductStatusByAgentId(clientId);
// if(status == null || !status.equals(ProductStatus.SERVICE.value())){
// return AuthorizeResult.FORBIDDEN;
// }
return AuthorizeResult.OK;
}

Expand All @@ -54,6 +75,6 @@ public List<MqttGrantedQoS> authSubscribe(String clientId, String userName, List

@Override
public String oauth(String credentials) {
return "dummy";
return credentials;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.j1st.mithqtt.authenticator.power;

import com.github.longkerdandy.mithqtt.api.auth.Authenticator;
import com.github.longkerdandy.mithqtt.api.auth.AuthorizeResult;
import io.j1st.power.storage.mongo.MongoStorage;
import io.j1st.power.storage.mongo.entity.AgentStatus;
import io.j1st.power.storage.mongo.entity.ProductStatus;
import io.j1st.power.storage.mongo.entity.ServiceType;
import io.netty.handler.codec.mqtt.MqttGrantedQoS;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.commons.configuration.AbstractConfiguration;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
* Dummy Authenticator
* This authenticator basically authorize everything, it should only been used for test purpose
*/
@SuppressWarnings("unused")
public class PowerAuthenticator implements Authenticator {

Logger logger = LoggerFactory.getLogger(PowerAuthenticator.class);

private boolean allowDollar; // allow $ in topic
private String deniedTopic; // topic will be rejected
protected MongoStorage mongoStorage;

@Override
public void init(AbstractConfiguration config) {
this.allowDollar = config.getBoolean("allowDollar", true);
this.deniedTopic = config.getString("deniedTopic", null);
mongoStorage = new MongoStorage();
mongoStorage.init(config);

}

@Override
public void destroy() {
}

@Override
public AuthorizeResult authConnect(String clientId, String userName, String password) {
//验证clentId是否有效
if (!mongoStorage.isAgentExists(clientId)) {
return AuthorizeResult.FORBIDDEN;
}
//验证用户名密码是否合法
if (!mongoStorage.isAgentAuth(userName, password)) {
return AuthorizeResult.FORBIDDEN;
}
//验证product状态是否正常
Integer status = this.mongoStorage.getProductStatusByAgentId(clientId);
if (status == null || !status.equals(ProductStatus.SERVICE.value())) {
return AuthorizeResult.FORBIDDEN;
}
/* //验证所在的service是否链接已满
String userId = this.mongoStorage.getOperatorIdByAgent(clientId);
if (userId != null) {
//link count
long count = this.mongoStorage.getServiceCountByOperatorId(ServiceType.HARDWARE_MANAGER.value(), new ObjectId(userId));
if (count <= 0) {
return AuthorizeResult.FORBIDDEN;
}
}*/
// Validate Agent Connect Privilege
if (this.mongoStorage.isDisableAgent(new ObjectId(clientId), AgentStatus.DISABLED.value())) {
return AuthorizeResult.FORBIDDEN;
}

return AuthorizeResult.OK;
}

@Override
public AuthorizeResult authPublish(String clientId, String userName, String topicName, int qos, boolean retain) {
if (!this.allowDollar && topicName.startsWith("$")) return AuthorizeResult.FORBIDDEN;
if (topicName.equals(this.deniedTopic)) return AuthorizeResult.FORBIDDEN;
//判断topic是否包括自己的clientId
// if(topicName.indexOf(clientId) == -1){
// return AuthorizeResult.FORBIDDEN;
// }
// if(!topicName.endsWith("upstream")){
// return AuthorizeResult.FORBIDDEN;
// }
// //验证product状态是否正常
// Integer status = this.mongoStorage.getProductStatusByAgentId(clientId);
// if (status == null || !status.equals(ProductStatus.SERVICE.value())) {
// return AuthorizeResult.FORBIDDEN;
// }
// Validate Agent Connect Privilege
if (this.mongoStorage.isDisableAgent(new ObjectId(clientId), AgentStatus.DISABLED.value())) {
return AuthorizeResult.FORBIDDEN;
}

return AuthorizeResult.OK;
}

@Override
public List<MqttGrantedQoS> authSubscribe(String clientId, String userName, List<MqttTopicSubscription> requestSubscriptions) {
List<MqttGrantedQoS> r = new ArrayList<>();
requestSubscriptions.forEach(subscription -> {
if (!this.allowDollar && subscription.topic().startsWith("$")) r.add(MqttGrantedQoS.FAILURE);
if (subscription.topic().equals(this.deniedTopic)) r.add(MqttGrantedQoS.FAILURE);
if (!subscription.topic().endsWith("downstream")) r.add(MqttGrantedQoS.FAILURE);
if (!subscription.topic().contains(clientId)) r.add(MqttGrantedQoS.FAILURE);
r.add(MqttGrantedQoS.valueOf(subscription.requestedQos().value()));
});
return r;
}

@Override
public String oauth(String credentials) {
return mongoStorage.getUserByToken(credentials);
}
}
2 changes: 2 additions & 0 deletions mithqtt-broker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ dependencies {
compile project(':mithqtt-api')
compile project(':mithqtt-storage-redis')

compile project(':mithqtt-storage-mongo')

// authenticator
runtime project(':mithqtt-authenticator-dummy')

Expand Down
5 changes: 4 additions & 1 deletion mithqtt-broker/src/dist/config/authenticator.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
# Authenticator

# Authenticator implementation (full qualified class name)
authenticator.class = com.github.longkerdandy.mithqtt.authenticator.dummy.DummyAuthenticator
#com.github.longkerdandy.mithqtt.authenticator.dummy.DummyAuthenticator
#io.j1st.mithqtt.authenticator.power.PowerAuthenticator
#authenticator.class =com.github.longkerdandy.mithqtt.authenticator.dummy.DummyAuthenticator
authenticator.class=io.j1st.mithqtt.authenticator.power.PowerAuthenticator


# Dummy
Expand Down
13 changes: 5 additions & 8 deletions mithqtt-broker/src/dist/config/broker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
# Broker

# This is the broker id, please make sure each broker instance used a different id
broker.id = 1
broker.id=test-1

# This is the ip address the broker will bind to
# Use 0.0.0.0 to bind to all possible ip addresses
mqtt.host = 0.0.0.0

# This is the network port the broker will bind to
# The MQTT Protocol Specification recommended using port 1883
mqtt.port = 1883
mqtt.port=1884

# To use ssl in the connection, set this to true
# Must provide an X.509 certificate chain file in PEM format
Expand All @@ -23,13 +23,10 @@ mqtt.ssl.enabled = false
mqtt.ssl.port = 8883

# X.509 certificate chain file path
# mqtt.ssl.certPath =

# A PKCS#8 private key file path
# mqtt.ssl.keyPath =
mqtt.ssl.certPath =E://SSL/ze/server.jks

# The password of the key File
# mqtt.ssl.keyPassword =
mqtt.ssl.password = zenintec

# These are the default and maximum time interval that client is permitted to be idled
# Time interval measured in seconds
Expand Down Expand Up @@ -58,7 +55,7 @@ netty.useEpoll = false
netty.soBacklog = 511

# this parameter configures the "TCP keepalive" behavior for the listening socket.
# If this parameter is omitted then the operating system��s settings will be in effect for the socket.
# If this parameter is omitted then the operating system��s settings will be in effect for the socket.
# If it is set to the value "true", the SO_KEEPALIVE option is turned on for the socket.
# If it is set to the value "off", the SO_KEEPALIVE option is turned off for the socket.
netty.soKeepAlive = true
27 changes: 24 additions & 3 deletions mithqtt-broker/src/dist/config/communicator.properties
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
# Hazelcast communicator configuration
# RabbitMQ communicator configuration

# Communicator

# Communicator implementation (full qualified class name)
communicator.class = com.github.longkerdandy.mithqtt.communicator.hazelcast.broker.HazelcastBrokerCommunicator
communicator.class = com.github.longkerdandy.mithqtt.communicator.rabbitmq.broker.RabbitMQBrokerCommunicator

# This is the topic prefix that broker instance consume. (full topic is like mithqtt.broker.{brokerId})
communicator.broker.topic = mithqtt.broker

# This is the topic that processor will pass message to 3rd party application
communicator.application.topic = mithqtt.application

# Hazelcast
# RabbitMQ

# User name
rabbitmq.userName=guest

# Password
rabbitmq.password=guest

# Virtual host
rabbitmq.virtualHost = /

# Server addresses
# In the format like host1[:port1],host2[:port2]
rabbitmq.addresses=localhost:5672

# Queue name for application communicator
# ONLY APPLIES TO RabbitMQApplicationCommunicator
rabbitmq.app.queueName = appQueue

# Routing key for application communicator
# ONLY APPLIES TO RabbitMQApplicationCommunicator
rabbitmq.app.routingKey = #


Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ communicator.application.topic = mithqtt.application
# RabbitMQ

# User name
rabbitmq.userName = guest
rabbitmq.userName = zenin

# Password
rabbitmq.password = guest
rabbitmq.password = zenin

# Virtual host
rabbitmq.virtualHost = /

# Server addresses
# In the format like host1[:port1],host2[:port2]
rabbitmq.addresses = localhost
rabbitmq.addresses = 127.0.0.1

# Queue name for application communicator
# ONLY APPLIES TO RabbitMQApplicationCommunicator
Expand Down
21 changes: 21 additions & 0 deletions mithqtt-broker/src/dist/config/mongo.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Redis storage configuration

# Storage

# Storage implementation (full qualified class name)
storage.class = io.j1st.power.storage.mongo.MongoStorage


# MongoDB storage configuration
mongo.address=localhost:27017
mongo.database=power
mongo.userName=jack
mongo.password=123456



# Allow '$' in topic
allowDollar = false

# Topic will be rejected in PUBLISH and SUBSCRIBE
deniedTopic = nosubscribe
Loading