diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar deleted file mode 100644 index fd7e590..0000000 Binary files a/gradle/wrapper/gradle-wrapper.jar and /dev/null differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties deleted file mode 100644 index edc91be..0000000 --- a/gradle/wrapper/gradle-wrapper.properties +++ /dev/null @@ -1,6 +0,0 @@ -#Thu Mar 03 16:48:37 CST 2016 -distributionBase=GRADLE_USER_HOME -distributionPath=wrapper/dists -zipStoreBase=GRADLE_USER_HOME -zipStorePath=wrapper/dists -distributionUrl=http\://services.gradle.org/distributions/gradle-2.11-all.zip diff --git a/gradlew b/gradlew index 91a7e26..9d82f78 100644 --- a/gradlew +++ b/gradlew @@ -42,11 +42,6 @@ case "`uname`" in ;; esac -# For Cygwin, ensure paths are in UNIX format before anything is touched. -if $cygwin ; then - [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"` -fi - # Attempt to set APP_HOME # Resolve links: $0 may be a link PRG="$0" @@ -61,9 +56,9 @@ while [ -h "$PRG" ] ; do fi done SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >&- +cd "`dirname \"$PRG\"`/" >/dev/null APP_HOME="`pwd -P`" -cd "$SAVED" >&- +cd "$SAVED" >/dev/null CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar @@ -114,6 +109,7 @@ fi if $cygwin ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` # We build the pattern for arguments to be converted via cygpath ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` diff --git a/mithqtt-api/src/main/java/com/github/longkerdandy/mithqtt/api/internal/InternalMessage.java b/mithqtt-api/src/main/java/com/github/longkerdandy/mithqtt/api/internal/InternalMessage.java index c2397ac..9c41ff9 100644 --- a/mithqtt-api/src/main/java/com/github/longkerdandy/mithqtt/api/internal/InternalMessage.java +++ b/mithqtt-api/src/main/java/com/github/longkerdandy/mithqtt/api/internal/InternalMessage.java @@ -125,6 +125,19 @@ public static InternalMessage fromMqttMessage(MqttVersion version, Stri return msg; } + public static InternalMessage fromMqttMessage(String topic,MqttVersion version, String clientId, String userName, + String brokerId, + MqttPublishMessage mqtt) { + InternalMessage msg = fromMqttMessage(version, clientId, userName, brokerId, mqtt.fixedHeader()); + // forge bytes payload + ByteBuf buf = mqtt.payload().duplicate(); + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + msg.payload = new Publish(topic, mqtt.variableHeader().packetId(), bytes); + return msg; + } + + public static InternalMessage fromMqttMessage(MqttVersion version, String clientId, String userName, String brokerId, boolean cleanSession, boolean cleanExit) { diff --git a/mithqtt-api/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java b/mithqtt-api/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java index df7a3d3..a32f4cc 100644 --- a/mithqtt-api/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java +++ b/mithqtt-api/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java @@ -35,6 +35,7 @@ public static MqttMessage newMessage(MqttFixedHeader mqttFixedHeader, Object var (MqttConnectVariableHeader) variableHeader, (MqttConnectPayload) payload); + case CONNACK: return new MqttConnAckMessage(mqttFixedHeader, (MqttConnAckVariableHeader) variableHeader); diff --git a/mithqtt-authenticator-dummy/build.gradle b/mithqtt-authenticator-dummy/build.gradle index 93d4815..1de3c69 100644 --- a/mithqtt-authenticator-dummy/build.gradle +++ b/mithqtt-authenticator-dummy/build.gradle @@ -1,4 +1,6 @@ dependencies { // project api compile project(':mithqtt-api') + + compile project(':mithqtt-storage-mongo') } \ No newline at end of file diff --git a/mithqtt-authenticator-dummy/src/main/java/com/github/longkerdandy/mithqtt/authenticator/dummy/DummyAuthenticator.java b/mithqtt-authenticator-dummy/src/main/java/com/github/longkerdandy/mithqtt/authenticator/dummy/DummyAuthenticator.java index 108e1f1..719dd08 100644 --- a/mithqtt-authenticator-dummy/src/main/java/com/github/longkerdandy/mithqtt/authenticator/dummy/DummyAuthenticator.java +++ b/mithqtt-authenticator-dummy/src/main/java/com/github/longkerdandy/mithqtt/authenticator/dummy/DummyAuthenticator.java @@ -2,6 +2,8 @@ import com.github.longkerdandy.mithqtt.api.auth.AuthorizeResult; import com.github.longkerdandy.mithqtt.api.auth.Authenticator; +import io.j1st.power.storage.mongo.MongoStorage; +import io.j1st.power.storage.mongo.entity.ProductStatus; import io.netty.handler.codec.mqtt.MqttGrantedQoS; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.commons.configuration.AbstractConfiguration; @@ -19,10 +21,16 @@ public class DummyAuthenticator implements Authenticator { private boolean allowDollar; // allow $ in topic private String deniedTopic; // topic will be rejected + +// protected MongoStorage mongoStorage; + @Override public void init(AbstractConfiguration config) { this.allowDollar = config.getBoolean("allowDollar", true); this.deniedTopic = config.getString("deniedTopic", null); +// mongoStorage = new MongoStorage(); +// mongoStorage.init(config); + } @Override @@ -31,6 +39,19 @@ public void destroy() { @Override public AuthorizeResult authConnect(String clientId, String userName, String password) { + //验证clentId是否有效 +// if(!mongoStorage.isAgentExists(clientId)) { +// return AuthorizeResult.FORBIDDEN; +// } +// //验证用户名密码是否合法 +// if(!mongoStorage.isAgentAuth(userName, password)) { +// return AuthorizeResult.FORBIDDEN; +// } +// //验证product状态是否正常 +// Integer status = this.mongoStorage.getProductStatusByAgentId(clientId); +// if(status == null || !status.equals(ProductStatus.SERVICE.value())){ +// return AuthorizeResult.FORBIDDEN; +// } return AuthorizeResult.OK; } @@ -54,6 +75,6 @@ public List authSubscribe(String clientId, String userName, List @Override public String oauth(String credentials) { - return "dummy"; + return credentials; } } diff --git a/mithqtt-authenticator-dummy/src/main/java/io/j1st/mithqtt/authenticator/power/PowerAuthenticator.java b/mithqtt-authenticator-dummy/src/main/java/io/j1st/mithqtt/authenticator/power/PowerAuthenticator.java new file mode 100644 index 0000000..c0876ba --- /dev/null +++ b/mithqtt-authenticator-dummy/src/main/java/io/j1st/mithqtt/authenticator/power/PowerAuthenticator.java @@ -0,0 +1,118 @@ +package io.j1st.mithqtt.authenticator.power; + +import com.github.longkerdandy.mithqtt.api.auth.Authenticator; +import com.github.longkerdandy.mithqtt.api.auth.AuthorizeResult; +import io.j1st.power.storage.mongo.MongoStorage; +import io.j1st.power.storage.mongo.entity.AgentStatus; +import io.j1st.power.storage.mongo.entity.ProductStatus; +import io.j1st.power.storage.mongo.entity.ServiceType; +import io.netty.handler.codec.mqtt.MqttGrantedQoS; +import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.commons.configuration.AbstractConfiguration; +import org.bson.types.ObjectId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Dummy Authenticator + * This authenticator basically authorize everything, it should only been used for test purpose + */ +@SuppressWarnings("unused") +public class PowerAuthenticator implements Authenticator { + + Logger logger = LoggerFactory.getLogger(PowerAuthenticator.class); + + private boolean allowDollar; // allow $ in topic + private String deniedTopic; // topic will be rejected + protected MongoStorage mongoStorage; + + @Override + public void init(AbstractConfiguration config) { + this.allowDollar = config.getBoolean("allowDollar", true); + this.deniedTopic = config.getString("deniedTopic", null); + mongoStorage = new MongoStorage(); + mongoStorage.init(config); + + } + + @Override + public void destroy() { + } + + @Override + public AuthorizeResult authConnect(String clientId, String userName, String password) { + //验证clentId是否有效 + if (!mongoStorage.isAgentExists(clientId)) { + return AuthorizeResult.FORBIDDEN; + } + //验证用户名密码是否合法 + if (!mongoStorage.isAgentAuth(userName, password)) { + return AuthorizeResult.FORBIDDEN; + } + //验证product状态是否正常 + Integer status = this.mongoStorage.getProductStatusByAgentId(clientId); + if (status == null || !status.equals(ProductStatus.SERVICE.value())) { + return AuthorizeResult.FORBIDDEN; + } + /* //验证所在的service是否链接已满 + String userId = this.mongoStorage.getOperatorIdByAgent(clientId); + if (userId != null) { + //link count + long count = this.mongoStorage.getServiceCountByOperatorId(ServiceType.HARDWARE_MANAGER.value(), new ObjectId(userId)); + if (count <= 0) { + return AuthorizeResult.FORBIDDEN; + } + }*/ + // Validate Agent Connect Privilege + if (this.mongoStorage.isDisableAgent(new ObjectId(clientId), AgentStatus.DISABLED.value())) { + return AuthorizeResult.FORBIDDEN; + } + + return AuthorizeResult.OK; + } + + @Override + public AuthorizeResult authPublish(String clientId, String userName, String topicName, int qos, boolean retain) { + if (!this.allowDollar && topicName.startsWith("$")) return AuthorizeResult.FORBIDDEN; + if (topicName.equals(this.deniedTopic)) return AuthorizeResult.FORBIDDEN; + //判断topic是否包括自己的clientId +// if(topicName.indexOf(clientId) == -1){ +// return AuthorizeResult.FORBIDDEN; +// } +// if(!topicName.endsWith("upstream")){ +// return AuthorizeResult.FORBIDDEN; +// } +// //验证product状态是否正常 +// Integer status = this.mongoStorage.getProductStatusByAgentId(clientId); +// if (status == null || !status.equals(ProductStatus.SERVICE.value())) { +// return AuthorizeResult.FORBIDDEN; +// } + // Validate Agent Connect Privilege + if (this.mongoStorage.isDisableAgent(new ObjectId(clientId), AgentStatus.DISABLED.value())) { + return AuthorizeResult.FORBIDDEN; + } + + return AuthorizeResult.OK; + } + + @Override + public List authSubscribe(String clientId, String userName, List requestSubscriptions) { + List r = new ArrayList<>(); + requestSubscriptions.forEach(subscription -> { + if (!this.allowDollar && subscription.topic().startsWith("$")) r.add(MqttGrantedQoS.FAILURE); + if (subscription.topic().equals(this.deniedTopic)) r.add(MqttGrantedQoS.FAILURE); + if (!subscription.topic().endsWith("downstream")) r.add(MqttGrantedQoS.FAILURE); + if (!subscription.topic().contains(clientId)) r.add(MqttGrantedQoS.FAILURE); + r.add(MqttGrantedQoS.valueOf(subscription.requestedQos().value())); + }); + return r; + } + + @Override + public String oauth(String credentials) { + return mongoStorage.getUserByToken(credentials); + } +} diff --git a/mithqtt-broker/build.gradle b/mithqtt-broker/build.gradle index a41f539..180fca8 100644 --- a/mithqtt-broker/build.gradle +++ b/mithqtt-broker/build.gradle @@ -11,6 +11,8 @@ dependencies { compile project(':mithqtt-api') compile project(':mithqtt-storage-redis') + compile project(':mithqtt-storage-mongo') + // authenticator runtime project(':mithqtt-authenticator-dummy') diff --git a/mithqtt-broker/src/dist/config/authenticator.properties b/mithqtt-broker/src/dist/config/authenticator.properties index 198cb2f..6b22136 100644 --- a/mithqtt-broker/src/dist/config/authenticator.properties +++ b/mithqtt-broker/src/dist/config/authenticator.properties @@ -3,7 +3,10 @@ # Authenticator # Authenticator implementation (full qualified class name) -authenticator.class = com.github.longkerdandy.mithqtt.authenticator.dummy.DummyAuthenticator +#com.github.longkerdandy.mithqtt.authenticator.dummy.DummyAuthenticator + #io.j1st.mithqtt.authenticator.power.PowerAuthenticator +#authenticator.class =com.github.longkerdandy.mithqtt.authenticator.dummy.DummyAuthenticator +authenticator.class=io.j1st.mithqtt.authenticator.power.PowerAuthenticator # Dummy diff --git a/mithqtt-broker/src/dist/config/broker.properties b/mithqtt-broker/src/dist/config/broker.properties index 1c74bc4..ac0294d 100644 --- a/mithqtt-broker/src/dist/config/broker.properties +++ b/mithqtt-broker/src/dist/config/broker.properties @@ -3,7 +3,7 @@ # Broker # This is the broker id, please make sure each broker instance used a different id -broker.id = 1 +broker.id=test-1 # This is the ip address the broker will bind to # Use 0.0.0.0 to bind to all possible ip addresses @@ -11,7 +11,7 @@ mqtt.host = 0.0.0.0 # This is the network port the broker will bind to # The MQTT Protocol Specification recommended using port 1883 -mqtt.port = 1883 +mqtt.port=1884 # To use ssl in the connection, set this to true # Must provide an X.509 certificate chain file in PEM format @@ -23,13 +23,10 @@ mqtt.ssl.enabled = false mqtt.ssl.port = 8883 # X.509 certificate chain file path -# mqtt.ssl.certPath = - -# A PKCS#8 private key file path -# mqtt.ssl.keyPath = +mqtt.ssl.certPath =E://SSL/ze/server.jks # The password of the key File -# mqtt.ssl.keyPassword = +mqtt.ssl.password = zenintec # These are the default and maximum time interval that client is permitted to be idled # Time interval measured in seconds @@ -58,7 +55,7 @@ netty.useEpoll = false netty.soBacklog = 511 # this parameter configures the "TCP keepalive" behavior for the listening socket. -# If this parameter is omitted then the operating systems settings will be in effect for the socket. +# If this parameter is omitted then the operating system��s settings will be in effect for the socket. # If it is set to the value "true", the SO_KEEPALIVE option is turned on for the socket. # If it is set to the value "off", the SO_KEEPALIVE option is turned off for the socket. netty.soKeepAlive = true diff --git a/mithqtt-broker/src/dist/config/communicator.properties b/mithqtt-broker/src/dist/config/communicator.properties index 1251612..a65a207 100644 --- a/mithqtt-broker/src/dist/config/communicator.properties +++ b/mithqtt-broker/src/dist/config/communicator.properties @@ -1,9 +1,9 @@ -# Hazelcast communicator configuration +# RabbitMQ communicator configuration # Communicator # Communicator implementation (full qualified class name) -communicator.class = com.github.longkerdandy.mithqtt.communicator.hazelcast.broker.HazelcastBrokerCommunicator +communicator.class = com.github.longkerdandy.mithqtt.communicator.rabbitmq.broker.RabbitMQBrokerCommunicator # This is the topic prefix that broker instance consume. (full topic is like mithqtt.broker.{brokerId}) communicator.broker.topic = mithqtt.broker @@ -11,6 +11,27 @@ communicator.broker.topic = mithqtt.broker # This is the topic that processor will pass message to 3rd party application communicator.application.topic = mithqtt.application -# Hazelcast +# RabbitMQ + +# User name +rabbitmq.userName=guest + +# Password +rabbitmq.password=guest + +# Virtual host +rabbitmq.virtualHost = / + +# Server addresses +# In the format like host1[:port1],host2[:port2] +rabbitmq.addresses=localhost:5672 + +# Queue name for application communicator +# ONLY APPLIES TO RabbitMQApplicationCommunicator +rabbitmq.app.queueName = appQueue + +# Routing key for application communicator +# ONLY APPLIES TO RabbitMQApplicationCommunicator +rabbitmq.app.routingKey = # diff --git a/mithqtt-broker/src/dist/config/communicator.rabbitmq.properties b/mithqtt-broker/src/dist/config/communicator.rabbitmq.properties index 17c8957..144a558 100644 --- a/mithqtt-broker/src/dist/config/communicator.rabbitmq.properties +++ b/mithqtt-broker/src/dist/config/communicator.rabbitmq.properties @@ -14,17 +14,17 @@ communicator.application.topic = mithqtt.application # RabbitMQ # User name -rabbitmq.userName = guest +rabbitmq.userName = zenin # Password -rabbitmq.password = guest +rabbitmq.password = zenin # Virtual host rabbitmq.virtualHost = / # Server addresses # In the format like host1[:port1],host2[:port2] -rabbitmq.addresses = localhost +rabbitmq.addresses = 127.0.0.1 # Queue name for application communicator # ONLY APPLIES TO RabbitMQApplicationCommunicator diff --git a/mithqtt-broker/src/dist/config/mongo.properties b/mithqtt-broker/src/dist/config/mongo.properties new file mode 100644 index 0000000..a0f68a3 --- /dev/null +++ b/mithqtt-broker/src/dist/config/mongo.properties @@ -0,0 +1,21 @@ +# Redis storage configuration + +# Storage + +# Storage implementation (full qualified class name) +storage.class = io.j1st.power.storage.mongo.MongoStorage + + +# MongoDB storage configuration +mongo.address=localhost:27017 +mongo.database=power +mongo.userName=jack +mongo.password=123456 + + + +# Allow '$' in topic +allowDollar = false + +# Topic will be rejected in PUBLISH and SUBSCRIBE +deniedTopic = nosubscribe \ No newline at end of file diff --git a/mithqtt-broker/src/dist/config/redis.properties b/mithqtt-broker/src/dist/config/redis.properties index 85fc16f..13a12d1 100644 --- a/mithqtt-broker/src/dist/config/redis.properties +++ b/mithqtt-broker/src/dist/config/redis.properties @@ -16,7 +16,7 @@ storage.sync.class = com.github.longkerdandy.mithqtt.storage.redis.sync.RedisSyn mqtt.inflight.queue.size = 0 # The unacknowledged QoS 2 messages' id were stored in order for each client -# Including: +# Including:S # QoS 2 PUBLISH messages received but not acknowledged by PUBREL # If the queue size limit is reached, the oldest QoS 2 message id will be dropped. # Default and 0 means no limit. @@ -27,8 +27,7 @@ mqtt.qos2.queue.size = 0 # If the queue size limit is reached, the oldest retain message will be dropped. # Default and 0 means no limit. mqtt.retain.queue.size = 0 - -# Redis +# RedisS # Redis server type, could be: # 1. 'single' : http://redis.io/topics/config @@ -42,8 +41,9 @@ redis.type = single # 2. 'master_slave' : host[:port][,host2[:port2]] # the 1st should be the master node # 3. 'sentinel' : host[:port][,host2[:port2]] # the 1st should be the master node, this is the sentinel address # 4. 'cluster' : host[:port][,host2[:port2]] -redis.address = 127.0.0.1:6379 +#redis.address = 139.198.0.174:6379 +redis.address=localhost:6379 # Redis database number redis.database = 0 diff --git a/mithqtt-broker/src/dist/ssl/server.jks b/mithqtt-broker/src/dist/ssl/server.jks new file mode 100644 index 0000000..0cce162 Binary files /dev/null and b/mithqtt-broker/src/dist/ssl/server.jks differ diff --git a/mithqtt-broker/src/main/java/com/github/longkerdandy/mithqtt/broker/MqttBroker.java b/mithqtt-broker/src/main/java/com/github/longkerdandy/mithqtt/broker/MqttBroker.java index b8c62bf..c4a6794 100644 --- a/mithqtt-broker/src/main/java/com/github/longkerdandy/mithqtt/broker/MqttBroker.java +++ b/mithqtt-broker/src/main/java/com/github/longkerdandy/mithqtt/broker/MqttBroker.java @@ -12,6 +12,7 @@ import com.github.longkerdandy.mithqtt.broker.session.SessionRegistry; import com.github.longkerdandy.mithqtt.broker.util.Validator; import com.lambdaworks.redis.ValueScanCursor; +import io.j1st.power.storage.mongo.MongoStorage; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.epoll.EpollEventLoopGroup; @@ -32,7 +33,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; import java.io.File; +import java.io.FileInputStream; +import java.security.KeyStore; /** * MQTT Bridge @@ -52,18 +57,21 @@ public static void main(String[] args) throws Exception { PropertiesConfiguration communicatorConfig; PropertiesConfiguration authenticatorConfig; PropertiesConfiguration metricsConfig; + PropertiesConfiguration mongodbConfig; if (args.length >= 5) { brokerConfig = new PropertiesConfiguration(args[0]); redisConfig = new PropertiesConfiguration(args[1]); communicatorConfig = new PropertiesConfiguration(args[2]); authenticatorConfig = new PropertiesConfiguration(args[3]); metricsConfig = new PropertiesConfiguration(args[4]); + mongodbConfig = new PropertiesConfiguration(args[5]); } else { brokerConfig = new PropertiesConfiguration("config/broker.properties"); redisConfig = new PropertiesConfiguration("config/redis.properties"); communicatorConfig = new PropertiesConfiguration("config/communicator.properties"); authenticatorConfig = new PropertiesConfiguration("config/authenticator.properties"); metricsConfig = new PropertiesConfiguration("config/metrics.properties"); + mongodbConfig = new PropertiesConfiguration("config/mongo.properties"); } final String brokerId = brokerConfig.getString("broker.id"); @@ -93,7 +101,7 @@ public static void main(String[] args) throws Exception { // authenticator logger.debug("Initializing authenticator..."); Authenticator authenticator = (Authenticator) Class.forName(authenticatorConfig.getString("authenticator.class")).newInstance(); - authenticator.init(authenticatorConfig); + authenticator.init(mongodbConfig); // metrics logger.debug("Initializing metrics ..."); @@ -105,9 +113,10 @@ public static void main(String[] args) throws Exception { final int keepAlive = brokerConfig.getInt("mqtt.keepalive.default"); final int keepAliveMax = brokerConfig.getInt("mqtt.keepalive.max"); final boolean ssl = brokerConfig.getBoolean("mqtt.ssl.enabled"); - final SslContext sslContext = ssl ? SslContextBuilder.forServer(new File(brokerConfig.getString("mqtt.ssl.certPath")), new File(brokerConfig.getString("mqtt.ssl.keyPath")), brokerConfig.getString("mqtt.ssl.keyPassword")).build() : null; +// final SslContext sslContext = ssl ? SslContextBuilder.forServer(new File(brokerConfig.getString("mqtt.ssl.certPath")), new File(brokerConfig.getString("mqtt.ssl.keyPath")), brokerConfig.getString("mqtt.ssl.keyPassword")).build() : null; final String host = brokerConfig.getString("mqtt.host"); - final int port = ssl ? brokerConfig.getInt("mqtt.ssl.port") : brokerConfig.getInt("mqtt.port"); + final int port = brokerConfig.getInt("mqtt.port"); + // tcp server logger.debug("Initializing tcp server ..."); @@ -144,10 +153,6 @@ public void run() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); - // ssl - if (ssl) { - p.addLast("ssl", sslContext.newHandler(ch.alloc())); - } // idle p.addFirst("idleHandler", new IdleStateHandler(0, 0, keepAlive)); // metrics @@ -171,11 +176,65 @@ public void initChannel(SocketChannel ch) throws Exception { // Bind and start to accept incoming connections. ChannelFuture f = b.bind(host, port).sync(); + + /** + * 打开SSL的端口监听 + */ + if(ssl) { + + String password = brokerConfig.getString("mqtt.ssl.password"); + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(new FileInputStream(new File(brokerConfig.getString("mqtt.ssl.certPath"))), password.toCharArray()); + + TrustManagerFactory tmFactory = TrustManagerFactory.getInstance("SunX509"); + tmFactory.init(ks); + + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + kmf.init(ks, password.toCharArray()); + + final SslContext sslContext = ssl ? SslContextBuilder.forServer(kmf).build() : null; + final int sslport = ssl ? brokerConfig.getInt("mqtt.ssl.port") : 8883; + ServerBootstrap sslb = new ServerBootstrap(); + sslb.group(bossGroup, workerGroup) + .channel(brokerConfig.getBoolean("netty.useEpoll") ? EpollServerSocketChannel.class : NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + // ssl + p.addLast("ssl", sslContext.newHandler(ch.alloc())); + + // idle + p.addFirst("idleHandler", new IdleStateHandler(0, 0, keepAlive)); + // metrics + if (metricsEnabled) { + p.addLast("bytesMetrics", new BytesMetricsHandler(metrics, brokerId)); + } + // mqtt encoder & decoder + p.addLast("encoder", new MqttEncoder()); + p.addLast("decoder", new MqttDecoder()); + // metrics + if (metricsEnabled) { + p.addLast("msgMetrics", new MessageMetricsHandler(metrics, brokerId)); + } + // logic handler + p.addLast(handlerGroup, "logicHandler", new SyncRedisHandler(authenticator, communicator, redis, registry, validator, brokerId, keepAlive, keepAliveMax)); + } + }) + .option(ChannelOption.SO_BACKLOG, brokerConfig.getInt("netty.soBacklog")) + .childOption(ChannelOption.SO_KEEPALIVE, brokerConfig.getBoolean("netty.soKeepAlive")); + + ChannelFuture sf = sslb.bind(host, sslport).sync(); + + } + logger.info("MQTT broker is up and running."); // Wait until the server socket is closed. // Do this to gracefully shut down the server. f.channel().closeFuture().sync(); + } /** diff --git a/mithqtt-broker/src/main/java/com/github/longkerdandy/mithqtt/broker/handler/SyncRedisHandler.java b/mithqtt-broker/src/main/java/com/github/longkerdandy/mithqtt/broker/handler/SyncRedisHandler.java index bfd0342..c719c0c 100644 --- a/mithqtt-broker/src/main/java/com/github/longkerdandy/mithqtt/broker/handler/SyncRedisHandler.java +++ b/mithqtt-broker/src/main/java/com/github/longkerdandy/mithqtt/broker/handler/SyncRedisHandler.java @@ -405,7 +405,6 @@ protected void onPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) { ctx.close(); return; } - // boolean dup = msg.fixedHeader().dup(); MqttQoS qos = msg.fixedHeader().qos(); boolean retain = msg.fixedHeader().retain(); @@ -526,12 +525,16 @@ else if (qos == MqttQoS.EXACTLY_ONCE) { onwardRecipients(msg); } } - + //转换topic为平台标准topic + if(topicName.equals("jsonUp")) { + topicName = "agents/"+userName+"/upstream"; + } // Pass message to 3rd party application - this.communicator.sendToApplication(InternalMessage.fromMqttMessage(this.version, this.clientId, this.userName, this.brokerId, msg)); + this.communicator.sendToApplication(InternalMessage.fromMqttMessage(topicName,this.version, this.clientId, this.userName, this.brokerId, msg)); } else { logger.trace("Authorization failed: Publish to topic {} unauthorized for client {}", topicName, this.clientId); + ctx.close(); } } @@ -963,7 +966,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } } } - /** * Handle connection lost condition * Both when received DISCONNECT message or not @@ -1033,9 +1035,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { if (this.connected) { if (cause instanceof IOException) { - logger.debug("Exception caught: Exception caught from client {} user {}: ", this.clientId, this.userName, ExceptionUtils.getMessage(cause)); + logger.debug("Exception caught: Exception caught from client {} user {}: {}", this.clientId, this.userName, ExceptionUtils.getMessage(cause)); } else { - logger.debug("Exception caught: Exception caught from client {} user {}: ", this.clientId, this.userName, cause); + logger.debug("Exception caught: Exception caught from client {} user {}: {}", this.clientId, this.userName, cause); } } ctx.close(); diff --git a/mithqtt-http/src/dist/config/authenticator.properties b/mithqtt-http/src/dist/config/authenticator.properties index 198cb2f..cc4c02f 100644 --- a/mithqtt-http/src/dist/config/authenticator.properties +++ b/mithqtt-http/src/dist/config/authenticator.properties @@ -3,7 +3,7 @@ # Authenticator # Authenticator implementation (full qualified class name) -authenticator.class = com.github.longkerdandy.mithqtt.authenticator.dummy.DummyAuthenticator +authenticator.class=io.j1st.mithqtt.authenticator.power.PowerAuthenticator # Dummy @@ -13,3 +13,8 @@ allowDollar = false # Topic will be rejected in PUBLISH and SUBSCRIBE deniedTopic = nosubscribe +#Mongodb +mongo.address=localhost:27017 +mongo.database=power +mongo.userName=zenin +mongo.password=zenin \ No newline at end of file diff --git a/mithqtt-http/src/dist/config/communicator.properties b/mithqtt-http/src/dist/config/communicator.properties index cba8d3f..d6c79b7 100644 --- a/mithqtt-http/src/dist/config/communicator.properties +++ b/mithqtt-http/src/dist/config/communicator.properties @@ -1,9 +1,9 @@ -# Hazelcast communicator configuration +# RabbitMQ communicator configuration # Communicator # Communicator implementation (full qualified class name) -communicator.class = com.github.longkerdandy.mithqtt.communicator.hazelcast.http.HazelcastHttpCommunicator +communicator.class = com.github.longkerdandy.mithqtt.communicator.rabbitmq.http.RabbitMQHttpCommunicator # This is the topic prefix that broker instance consume. (full topic is like mithqtt.broker.{brokerId}) communicator.broker.topic = mithqtt.broker @@ -11,6 +11,27 @@ communicator.broker.topic = mithqtt.broker # This is the topic that processor will pass message to 3rd party application communicator.application.topic = mithqtt.application -# Hazelcast +# RabbitMQ + +# User name +rabbitmq.userName = guest + +# Password +rabbitmq.password = guest + +# Virtual host +rabbitmq.virtualHost = / + +# Server addresses +# In the format like host1[:port1],host2[:port2] +rabbitmq.addresses=localhost:5672 + +# Queue name for application communicator +# ONLY APPLIES TO RabbitMQApplicationCommunicator +rabbitmq.app.queueName = appQueue + +# Routing key for application communicator +# ONLY APPLIES TO RabbitMQApplicationCommunicator +rabbitmq.app.routingKey = # diff --git a/mithqtt-http/src/dist/config/http.yml b/mithqtt-http/src/dist/config/http.yml index e92fc18..fa4a042 100644 --- a/mithqtt-http/src/dist/config/http.yml +++ b/mithqtt-http/src/dist/config/http.yml @@ -8,12 +8,32 @@ serverId: 1 # These are regexp validator for MQTT packet field # Fields will validate against the regexp whenever a related request is received # Leave empty to skip the validation -clientIdValidator = ^[ -~]+$ -topicNameValidator = ^[ -~]+$ -topicFilterValidator = ^[ -~]+$ +clientIdValidator : ^[ -~]+$ +topicNameValidator : ^[ -~]+$ +topicFilterValidator : ^[ -~]+$ # DropWizard +logging: + level: INFO + loggers: + "mithqtt-http": DEBUG + appenders: + - type: console + threshold: ALL + target: stdout + timeZone: Asia/Shanghai + logFormat: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{20} - %msg%n %ex{full}" + - type: file + currentLogFilename: log/mqhttp.log + threshold: ALL + archive: true + archivedLogFilenamePattern: /log/http-%d.log + archivedFileCount: 5 + timeZone: Asia/Shanghai + logFormat: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{20} - %msg%n %ex{full}" + + server: type: simple @@ -21,4 +41,4 @@ server: adminContextPath: /admin connector: type: http - port: 8080 \ No newline at end of file + port: 8081 \ No newline at end of file diff --git a/mithqtt-http/src/dist/config/logback.xml b/mithqtt-http/src/dist/config/logback.xml index 1354651..7d34b80 100644 --- a/mithqtt-http/src/dist/config/logback.xml +++ b/mithqtt-http/src/dist/config/logback.xml @@ -7,7 +7,7 @@ - log/http.log + log/mqhttp.log log/archived/http.%d{yyyy-MM-dd}.log @@ -53,6 +53,12 @@ + + + + + + diff --git a/mithqtt-http/src/dist/config/redis.properties b/mithqtt-http/src/dist/config/redis.properties index 85fc16f..1cfd1fc 100644 --- a/mithqtt-http/src/dist/config/redis.properties +++ b/mithqtt-http/src/dist/config/redis.properties @@ -42,7 +42,7 @@ redis.type = single # 2. 'master_slave' : host[:port][,host2[:port2]] # the 1st should be the master node # 3. 'sentinel' : host[:port][,host2[:port2]] # the 1st should be the master node, this is the sentinel address # 4. 'cluster' : host[:port][,host2[:port2]] -redis.address = 127.0.0.1:6379 +redis.address=localhost:6379 # Redis database number redis.database = 0 diff --git a/mithqtt-http/src/main/java/com/github/longkerdandy/mithqtt/http/MqttHttp.java b/mithqtt-http/src/main/java/com/github/longkerdandy/mithqtt/http/MqttHttp.java index 4e43b75..a27fdb2 100644 --- a/mithqtt-http/src/main/java/com/github/longkerdandy/mithqtt/http/MqttHttp.java +++ b/mithqtt-http/src/main/java/com/github/longkerdandy/mithqtt/http/MqttHttp.java @@ -147,6 +147,7 @@ public void stop() throws Exception { environment.jersey().register(new MqttSubscribeResource(configuration.getServerId(), validator, redis, communicator, authenticator, metrics)); environment.jersey().register(new MqttUnsubscribeResource(configuration.getServerId(), validator, redis, communicator, authenticator, metrics)); + // config jackson environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); environment.getObjectMapper().configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); diff --git a/mithqtt-http/src/main/java/com/github/longkerdandy/mithqtt/http/resources/MqttPublishResource.java b/mithqtt-http/src/main/java/com/github/longkerdandy/mithqtt/http/resources/MqttPublishResource.java index 293b82a..e892212 100644 --- a/mithqtt-http/src/main/java/com/github/longkerdandy/mithqtt/http/resources/MqttPublishResource.java +++ b/mithqtt-http/src/main/java/com/github/longkerdandy/mithqtt/http/resources/MqttPublishResource.java @@ -45,13 +45,16 @@ public MqttPublishResource(String serverId, Validator validator, RedisSyncStorag super(serverId, validator, redis, communicator, authenticator, metrics); } - @PermitAll + //@PermitAll @POST public ResultEntity publish(@PathParam("clientId") String clientId, @Auth UserPrincipal user, @QueryParam("protocol") @DefaultValue("4") byte protocol, @QueryParam("dup") @DefaultValue("false") boolean dup, @QueryParam("qos") @DefaultValue("0") int qos, @QueryParam("topicName") String topicName, @QueryParam("packetId") @DefaultValue("0") int packetId, - String body) throws UnsupportedEncodingException { - String userName = user.getName(); + String body) throws UnsupportedEncodingException { + + logger.info("clientId {} publish message to rabbitmq ,topic = {}",clientId,topicName); + + String userName = clientId; MqttVersion version = MqttVersion.fromProtocolLevel(protocol); byte[] payload = body == null ? null : body.getBytes("ISO-8859-1"); @@ -125,6 +128,7 @@ public ResultEntity publish(@PathParam("clientId") String clientId, @Au logger.trace("Communicator sending: Send PUBLISH message to broker {} for client {} subscription", bid, cid); d = true; this.communicator.sendToBroker(bid, m); + logger.info("clientId {} publish message to broker success . message topic = {}",clientId,topicName); } // In the QoS 1 delivery protocol, the Sender @@ -142,9 +146,12 @@ public ResultEntity publish(@PathParam("clientId") String clientId, @Au // Pass message to 3rd party application this.communicator.sendToApplication(msg); + logger.info("clientId {} publish message to rabbitmq success . message topic = {}",clientId,topicName); return new ResultEntity<>(true); } else { + logger.info("clientId {} publish message to rabbitmq Authorize fail publish out . message topic = {}",clientId,topicName); throw new AuthorizeException(new ErrorEntity(ErrorCode.UNAUTHORIZED)); } } + } diff --git a/mithqtt-storage-mongo/build.gradle b/mithqtt-storage-mongo/build.gradle new file mode 100644 index 0000000..1b72f3d --- /dev/null +++ b/mithqtt-storage-mongo/build.gradle @@ -0,0 +1,5 @@ +dependencies { + // mongodb + compile 'org.mongodb:mongodb-driver:3.2.2' + +} diff --git a/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/MongoStorage.java b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/MongoStorage.java new file mode 100644 index 0000000..0cd59d8 --- /dev/null +++ b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/MongoStorage.java @@ -0,0 +1,244 @@ +package io.j1st.power.storage.mongo; + +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoDatabase; +import io.j1st.power.storage.mongo.entity.Permission; +import org.apache.commons.configuration.AbstractConfiguration; +import org.bson.Document; +import org.bson.types.ObjectId; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import static com.mongodb.client.model.Filters.*; +import static com.mongodb.client.model.Projections.exclude; +import static com.mongodb.client.model.Projections.include; + + +/** + * Created by Administrator on 2016/4/27. + */ +public class MongoStorage { + + protected MongoClient client; + protected MongoDatabase database; + + public void init(AbstractConfiguration config) { + // MongoClient + List addresses = parseAddresses(config.getString("mongo.address")); + List credentials = parseCredentials( + config.getString("mongo.userName"), + "admin", + config.getString("mongo.password")); + if (addresses.size() == 1) { + this.client = new MongoClient(addresses.get(0), credentials); + } else { + this.client = new MongoClient(addresses, credentials); + } + this.database = this.client.getDatabase(config.getString("mongo.database")); + } + + + public void destroy() { + if (this.client != null) this.client.close(); + } + + private ServerAddress parseAddress(String address) { + int idx = address.indexOf(':'); + return (idx == -1) ? + new ServerAddress(address) : + new ServerAddress(address.substring(0, idx), Integer.parseInt(address.substring(idx + 1))); + } + + private List parseAddresses(String addresses) { + List result = new ArrayList<>(); + String[] addrs = addresses.split(" *, *"); + for (String addr : addrs) { + result.add(parseAddress(addr)); + } + return result; + } + + private List parseCredentials(String userName, String database, String password) { + List result = new ArrayList<>(); + result.add(MongoCredential.createCredential(userName, database, password.toCharArray())); + return result; + } + + + /* =========================================== Agent Operations ===============================================*/ + + + /** + * 判断 采集器 权限 + * + * @param id 采集器Id + * @param permission 最低权限 + * @return True 权限满足 + */ + public boolean isAgentdByUser(String id, Permission permission) { + if (!ObjectId.isValid(id)) { + return false; + } + return this.database.getCollection("agents") + .find(and(eq("_id", new ObjectId(id)), eq("permissions", new Document("$elemMatch", new Document() + .append("user_id", permission.getUserId()))))) + .first() != null; + } + + + /** + * 判断Agent是否存在 + * + * @param id 采集器Id + * @return 采集器 or Null + */ + public boolean isAgentExists(String id) { + if (!ObjectId.isValid(id)) { + return false; + } + return this.database.getCollection("agents") + .find(eq("_id", new ObjectId(id))).first() != null; + } + + + /** + * 判断Agent是否存在 + * + * @param userName 采集器Id + * @return 采集器 or Null + */ + public boolean isAgentAuth(String userName, String password) { + if (!ObjectId.isValid(userName)) { + return false; + } + return this.database.getCollection("agents") + .find(and(eq("_id", new ObjectId(userName)), eq("token", password))).first() != null; + } + + + /** + * 获取 用户信息,根据 Token + * + * @param token Token + * @return 用户信息 or Null + */ + public String getUserByToken(String token) { + Document d = this.database.getCollection("users") + .find(eq("token", token)) + .projection(exclude("password")) + .first(); + if (d == null) return null; + return d.getObjectId("_id").toString(); + } + + /** + * 获取 产品 是否被激活 + * 激活的定义为:旗下采集器至少有一个被激活 + * + * @param productId 产品Id + * @return True 被激活 + */ + public boolean isProductActivated(String productId) { + if (!ObjectId.isValid(productId)) { + return false; + } + return this.database.getCollection("agents") + .find(and(eq("product_id", new ObjectId(productId)), exists("activated_at", true))) + .projection(include("_id")) + .first() != null; + + } + + /** + * 判断agent是否可连接 + * + * @param agentId agent id + * @param status status + * @return is exist + */ + public boolean isDisableAgent(ObjectId agentId, int status) { + return this.database.getCollection("agents") + .find(and(eq("_id", agentId), eq("status", status))) + .first() != null; + + } + + /** + * 获取 产品 是否被激活 + * 激活的定义为:旗下采集器至少有一个被激活 + * + * @param agentId + * @return True 被激活 + */ + public Integer getProductStatusByAgentId(String agentId) { + Integer status = null; + if (!ObjectId.isValid(agentId)) { + return null; + } + Document agentDocument = this.database.getCollection("agents") + .find(eq("_id", new ObjectId(agentId))) + .first(); + if (agentDocument != null) { + ObjectId productId = agentDocument.getObjectId("product_id"); + if (productId != null) { + Document productDocument = this.database.getCollection("products") + .find(eq("_id", productId)) + .first(); + if (productDocument != null) { + status = productDocument.getInteger("status"); + } + } + } + + return status; + } + + /** + * 根据Operator订购的服务类型来查询可用数量(未过期的) + * + * @param serviceType service type + * @param operatorId operator ID + * @return number of service + */ + public long getServiceCountByOperatorId(Integer serviceType, ObjectId operatorId) { + long count = 0; + Document query = new Document(); + query.append("user_id", operatorId); + query.append("serviceType", serviceType); + query.append("expired_at", new Document("$gte", new Date())); + FindIterable ds = this.database.getCollection("user_services").find(query); + if (ds != null) { + for (Document d : ds) { + if (d.getLong("used") != null) { + count += d.getLong("count") - d.getLong("used"); + } else { + count += d.getLong("count"); + } + } + } + return count; + } + + /** + * Get Operator Id + * + * @param agentId Agent id + * @return Operator id + */ + public String getOperatorIdByAgent(String agentId) { + if (!ObjectId.isValid(agentId)) { + return null; + } + Document doc = this.database.getCollection("user_assets_info").find(eq("agent_id", new ObjectId(agentId))) + .projection(include("user_id")) + .first(); + if (doc == null) return null; + return doc.getObjectId("user_id").toString(); + } + +} diff --git a/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/Agent.java b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/Agent.java new file mode 100644 index 0000000..7a13d79 --- /dev/null +++ b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/Agent.java @@ -0,0 +1,113 @@ +package io.j1st.power.storage.mongo.entity; + +import org.bson.types.ObjectId; + +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * Agent + */ +public class Agent { + + private ObjectId id; + private ObjectId productId; + private String name; + private String token; + private boolean connected; + private long msgCount; + private long msgSizeSum; + private Map attributes; + private List permissions; + private Date activatedAt; + private Date updatedAt; + + public ObjectId getId() { + return id; + } + + public void setId(ObjectId id) { + this.id = id; + } + + public ObjectId getProductId() { + return productId; + } + + public void setProductId(ObjectId productId) { + this.productId = productId; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getToken() { + return token; + } + + public void setToken(String token) { + this.token = token; + } + + public boolean isConnected() { + return connected; + } + + public void setConnected(boolean connected) { + this.connected = connected; + } + + public long getMsgCount() { + return msgCount; + } + + public void setMsgCount(long msgCount) { + this.msgCount = msgCount; + } + + public long getMsgSizeSum() { + return msgSizeSum; + } + + public void setMsgSizeSum(long msgSizeSum) { + this.msgSizeSum = msgSizeSum; + } + + public Map getAttributes() { + return attributes; + } + + public void setAttributes(Map attributes) { + this.attributes = attributes; + } + + public List getPermissions() { + return permissions; + } + + public void setPermissions(List permissions) { + this.permissions = permissions; + } + + public Date getActivatedAt() { + return activatedAt; + } + + public void setActivatedAt(Date activatedAt) { + this.activatedAt = activatedAt; + } + + public Date getUpdatedAt() { + return updatedAt; + } + + public void setUpdatedAt(Date updatedAt) { + this.updatedAt = updatedAt; + } +} diff --git a/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/AgentStatus.java b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/AgentStatus.java new file mode 100644 index 0000000..77ee03c --- /dev/null +++ b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/AgentStatus.java @@ -0,0 +1,36 @@ +package io.j1st.power.storage.mongo.entity; + +/** + * Agent status + */ +public enum AgentStatus { + + INIT(1), //初始化 + IN_DEVELOPER_ORDER(2), //初始化 + TO_OPERATOR(3), //分配到operator + TO_INSTALLER(4), //分配到installerd + INSTALL_ING(5), //安装中 + INSTALL_SUCCESS(6), //安装完成 + TEST(7), //测试中() + COMPLETE(8), //验收完成 + DISABLED(10); //停用 + + private final int value; + + AgentStatus(int value) { + this.value = value; + } + + public static AgentStatus valueOf(int value) { + for (AgentStatus t : values()) { + if (t.value == value) { + return t; + } + } + return null; + } + + public int value() { + return value; + } +} diff --git a/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/Permission.java b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/Permission.java new file mode 100644 index 0000000..7ae20be --- /dev/null +++ b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/Permission.java @@ -0,0 +1,28 @@ +package io.j1st.power.storage.mongo.entity; + +import org.bson.types.ObjectId; + +/** + * Permission + */ +public class Permission { + + private ObjectId userId; + private PermissionLevel level; + + public ObjectId getUserId() { + return userId; + } + + public void setUserId(ObjectId userId) { + this.userId = userId; + } + + public PermissionLevel getLevel() { + return level; + } + + public void setLevel(PermissionLevel level) { + this.level = level; + } +} diff --git a/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/PermissionLevel.java b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/PermissionLevel.java new file mode 100644 index 0000000..eb082d7 --- /dev/null +++ b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/PermissionLevel.java @@ -0,0 +1,29 @@ +package io.j1st.power.storage.mongo.entity; + +/** + * Permission Level + */ +public enum PermissionLevel { + OWNER(9), + READ_WRITE(6), + READ(3); + + private final int value; + + PermissionLevel(int value) { + this.value = value; + } + + public static PermissionLevel valueOf(int value) { + for (PermissionLevel l : values()) { + if (l.value == value) { + return l; + } + } + throw new IllegalArgumentException("invalid permission level " + value); + } + + public int value() { + return value; + } +} diff --git a/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/ProductStatus.java b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/ProductStatus.java new file mode 100644 index 0000000..44196ec --- /dev/null +++ b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/ProductStatus.java @@ -0,0 +1,29 @@ +package io.j1st.power.storage.mongo.entity; + +/** + * Product status description + */ +public enum ProductStatus { + SERVICE(1), //服务 + SUSPEND(2), //暂停 + ARREARS(3); //欠费 + + private final int value; + + ProductStatus(int value) { + this.value = value; + } + + public static ProductStatus valueOf(int value) { + for (ProductStatus r : values()) { + if (r.value == value) { + return r; + } + } + throw new IllegalArgumentException("invalid product status: " + value); + } + + public int value() { + return value; + } +} diff --git a/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/ServiceType.java b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/ServiceType.java new file mode 100644 index 0000000..a9cdd58 --- /dev/null +++ b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/ServiceType.java @@ -0,0 +1,32 @@ +package io.j1st.power.storage.mongo.entity; + +/** + * service type 服务类型 + */ +public enum ServiceType { + + HARDWARE_MANAGER(0), + ANALYSIS(1), + AUTO_ENGINE(2), + WEBHOOKER(3); + + private final int value; + + ServiceType(int value) { + this.value = value; + } + + public static ServiceType valueOf(int value) { + for (ServiceType r : values()) { + if (r.value == value) { + return r; + } + } + throw new IllegalArgumentException("invalid service type: " + value); + } + + public int value() { + return value; + } + +} diff --git a/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/User.java b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/User.java new file mode 100644 index 0000000..1104eea --- /dev/null +++ b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/User.java @@ -0,0 +1,84 @@ +package io.j1st.power.storage.mongo.entity; + +import org.bson.types.ObjectId; + +import java.util.Date; + +/** + * User + */ +public class User { + + private ObjectId id; + private String name; + private String password; + private String token; + private String email; + private String mobile; + private UserRole role; + private Date updatedAt; + + public ObjectId getId() { + return id; + } + + public void setId(ObjectId id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getToken() { + return token; + } + + public void setToken(String token) { + this.token = token; + } + + public String getEmail() { + return email; + } + + public void setEmail(String email) { + this.email = email; + } + + public String getMobile() { + return mobile; + } + + public void setMobile(String mobile) { + this.mobile = mobile; + } + + public UserRole getRole() { + return role; + } + + public void setRole(UserRole role) { + this.role = role; + } + + public Date getUpdatedAt() { + return updatedAt; + } + + public void setUpdatedAt(Date updatedAt) { + this.updatedAt = updatedAt; + } +} diff --git a/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/UserRole.java b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/UserRole.java new file mode 100644 index 0000000..7fb9fb5 --- /dev/null +++ b/mithqtt-storage-mongo/src/main/java/io/j1st/power/storage/mongo/entity/UserRole.java @@ -0,0 +1,28 @@ +package io.j1st.power.storage.mongo.entity; + +/** + * User Role + */ +public enum UserRole { + DEVELOPER(1), + PLAYER(2); + + private final int value; + + UserRole(int value) { + this.value = value; + } + + public static UserRole valueOf(int value) { + for (UserRole r : values()) { + if (r.value == value) { + return r; + } + } + throw new IllegalArgumentException("invalid user role: " + value); + } + + public int value() { + return value; + } +} diff --git a/settings.gradle b/settings.gradle index 148d2f5..8c65720 100644 --- a/settings.gradle +++ b/settings.gradle @@ -9,4 +9,5 @@ include 'mithqtt-communicator-kafka' include 'mithqtt-communicator-rabbitmq' include 'mithqtt-authenticator-dummy' include 'mithqtt-metrics-influxdb' +include 'mithqtt-storage-mongo'