diff --git a/lua-mosquitto.c b/lua-mosquitto.c index 7f298bd..6849fa7 100644 --- a/lua-mosquitto.c +++ b/lua-mosquitto.c @@ -36,6 +36,7 @@ #include #include +#include #include #include @@ -44,27 +45,23 @@ #include #include - +#include #include "compat.h" -/* re-using mqtt3 message types as callback types */ -#define CONNECT 0x10 -#define PUBLISH 0x30 -#define SUBSCRIBE 0x80 -#define UNSUBSCRIBE 0xA0 -#define DISCONNECT 0xE0 -/* add two extra callback types */ -#define MESSAGE 0x01 -#define LOG 0x02 - -enum connect_return_codes { - CONN_ACCEPT, - CONN_REF_BAD_PROTOCOL, - CONN_REF_BAD_ID, - CONN_REF_SERVER_NOAVAIL, - CONN_REF_BAD_LOGIN, - CONN_REF_NO_AUTH, - CONN_REF_BAD_TLS +enum callback_types { + CALLBACK_ON_CONNECT, + CALLBACK_ON_CONNECT_V5, + CALLBACK_ON_DISCONNECT, + CALLBACK_ON_DISCONNECT_V5, + CALLBACK_ON_PUBLISH, + CALLBACK_ON_PUBLISH_V5, + CALLBACK_ON_MESSAGE, + CALLBACK_ON_MESSAGE_V5, + CALLBACK_ON_SUBSCRIBE, + CALLBACK_ON_SUBSCRIBE_V5, + CALLBACK_ON_UNSUBSCRIBE, + CALLBACK_ON_UNSUBSCRIBE_V5, + CALLBACK_ON_LOG, }; /* unique naming for userdata metatables */ @@ -74,16 +71,27 @@ typedef struct { lua_State *L; struct mosquitto *mosq; int on_connect; + int on_connect_v5; int on_disconnect; + int on_disconnect_v5; int on_publish; + int on_publish_v5; int on_message; + int on_message_v5; int on_subscribe; + int on_subscribe_v5; int on_unsubscribe; + int on_unsubscribe_v5; int on_log; } ctx_t; static int mosq_initialized = 0; +static int lua_table_on_stack(lua_State *L, int index); +static int create_property_list_from_lua_stack(lua_State *L, int index, mosquitto_property** proplist, int command); +static int create_lua_stack_from_property_list(lua_State *L, const mosquitto_property *properties); +static void parse_basic_parameter_for_publish(lua_State *L, const char **topic, const void **payload, size_t *payloadlen, int *qos, bool *retain); + /* handle mosquitto lib return codes */ static int mosq__pstatus(lua_State *L, int mosq_errno) { switch (mosq_errno) { @@ -207,22 +215,34 @@ static int mosq_cleanup(lua_State *L) static void ctx__on_init(ctx_t *ctx) { ctx->on_connect = LUA_REFNIL; + ctx->on_connect_v5 = LUA_REFNIL; ctx->on_disconnect = LUA_REFNIL; + ctx->on_disconnect_v5 = LUA_REFNIL; ctx->on_publish = LUA_REFNIL; + ctx->on_publish_v5 = LUA_REFNIL; ctx->on_message = LUA_REFNIL; + ctx->on_message_v5 = LUA_REFNIL; ctx->on_subscribe = LUA_REFNIL; + ctx->on_subscribe_v5 = LUA_REFNIL; ctx->on_unsubscribe = LUA_REFNIL; + ctx->on_unsubscribe_v5 = LUA_REFNIL; ctx->on_log = LUA_REFNIL; } static void ctx__on_clear(ctx_t *ctx) { luaL_unref(ctx->L, LUA_REGISTRYINDEX, ctx->on_connect); + luaL_unref(ctx->L, LUA_REGISTRYINDEX, ctx->on_connect_v5); luaL_unref(ctx->L, LUA_REGISTRYINDEX, ctx->on_disconnect); + luaL_unref(ctx->L, LUA_REGISTRYINDEX, ctx->on_disconnect_v5); luaL_unref(ctx->L, LUA_REGISTRYINDEX, ctx->on_publish); + luaL_unref(ctx->L, LUA_REGISTRYINDEX, ctx->on_publish_v5); luaL_unref(ctx->L, LUA_REGISTRYINDEX, ctx->on_message); + luaL_unref(ctx->L, LUA_REGISTRYINDEX, ctx->on_message_v5); luaL_unref(ctx->L, LUA_REGISTRYINDEX, ctx->on_subscribe); + luaL_unref(ctx->L, LUA_REGISTRYINDEX, ctx->on_subscribe_v5); luaL_unref(ctx->L, LUA_REGISTRYINDEX, ctx->on_unsubscribe); + luaL_unref(ctx->L, LUA_REGISTRYINDEX, ctx->on_unsubscribe_v5); luaL_unref(ctx->L, LUA_REGISTRYINDEX, ctx->on_log); } @@ -348,21 +368,57 @@ static int ctx_reinitialise(lua_State *L) static int ctx_will_set(lua_State *L) { ctx_t *ctx = ctx_check(L, 1); - const char *topic = luaL_checkstring(L, 2); + const char *topic; size_t payloadlen = 0; const void *payload = NULL; + int qos; + bool retain; - if (!lua_isnil(L, 3)) { - payload = lua_tolstring(L, 3, &payloadlen); - }; - - int qos = luaL_optinteger(L, 4, 0); - bool retain = lua_toboolean(L, 5); + parse_basic_parameter_for_publish(L, &topic, &payload, &payloadlen, &qos, &retain); int rc = mosquitto_will_set(ctx->mosq, topic, payloadlen, payload, qos, retain); return mosq__pstatus(L, rc); } +/*** + * Set a Will with v5 properties + * @function will_set + * @tparam string topic as per mosquitto_will_set + * @tparam string payload as per mosquitto_will_set (but a proper lua string) + * @tparam[opt=0] number qos 0, 1 or 2 + * @tparam[opt=false] boolean retain + * @tparam[opt=nil] table properties + * @see mosquitto_will_set + * @return[1] boolean true + * @return[2] nil + * @treturn[2] number error code + * @treturn[2] string error description. + * @raise For some out of memory or illegal states + */ +static int ctx_will_set_v5(lua_State *L) +{ + ctx_t *ctx = ctx_check(L, 1); + const char *topic; + size_t payloadlen = 0; + const void *payload = NULL; + int qos, rc; + bool retain; + mosquitto_property *proplist = NULL; + + parse_basic_parameter_for_publish(L, &topic, &payload, &payloadlen, &qos, &retain); + + if (lua_table_on_stack(L, 6)) { + rc = create_property_list_from_lua_stack(L, 6, &proplist, CMD_WILL); + if (rc != MOSQ_ERR_SUCCESS) { + return mosq__pstatus(L, rc); + } + } + + rc = mosquitto_will_set_v5(ctx->mosq, topic, payloadlen, payload, qos, retain, proplist); + mosquitto_property_free_all(&proplist); + return mosq__pstatus(L, rc); +} + /*** * Clear a will * @function will_clear @@ -575,7 +631,44 @@ static int ctx_connect(lua_State *L) int port = luaL_optinteger(L, 3, 1883); int keepalive = luaL_optinteger(L, 4, 60); - int rc = mosquitto_connect(ctx->mosq, host, port, keepalive); + int rc = mosquitto_connect(ctx->mosq, host, port, keepalive); + return mosq__pstatus(L, rc); +} + +/*** + * Connect to a broker with interface bind and v5 properties + * @function connect + * @tparam[opt=localhost] string host + * @tparam[opt=1883] number port + * @tparam[opt=60] number keepalive in seconds + * @tparam[opt=nil] string bind_address + * @tparam[opt=nil] table properties + * @see mosquitto_connect_bind_v5 + * @return[1] boolean true + * @return[2] nil + * @treturn[2] number error code + * @treturn[2] string error description. + * @raise For some out of memory or illegal states + */ +static int ctx_connect_bind_v5(lua_State *L) +{ + ctx_t *ctx = ctx_check(L, 1); + int rc; + mosquitto_property *proplist = NULL; + const char *host = luaL_optstring(L, 2, "localhost"); + int port = luaL_optinteger(L, 3, 1883); + int keepalive = luaL_optinteger(L, 4, 60); + const char *bind_address = luaL_optstring(L, 5, 0); + + if (lua_table_on_stack(L, 6)) { + rc = create_property_list_from_lua_stack(L, 6, &proplist, CMD_CONNECT); + if (rc != MOSQ_ERR_SUCCESS) { + return mosq__pstatus(L, rc); + } + } + + rc = mosquitto_connect_bind_v5(ctx->mosq, host, port, keepalive, bind_address, proplist); + mosquitto_property_free_all(&proplist); return mosq__pstatus(L, rc); } @@ -654,6 +747,36 @@ static int ctx_disconnect(lua_State *L) return mosq__pstatus(L, rc); } +/*** + * @function disconnect_v5 with v5 properties + * @tparam int reason_code + * @tparam[opt=nil] table properties + * @see mosquitto_disconnect_v5 + * @return[1] boolean true + * @return[2] nil + * @treturn[2] number error code + * @treturn[2] string error description. + * @raise For some out of memory or illegal states + */ +static int ctx_disconnect_v5(lua_State *L) +{ + ctx_t *ctx = ctx_check(L, 1); + int rc; + mosquitto_property *proplist = NULL; + int reason_code = lua_tonumber(L, 2); + + if (lua_table_on_stack(L, 3)) { + rc = create_property_list_from_lua_stack(L, 3, &proplist, CMD_DISCONNECT); + if (rc != MOSQ_ERR_SUCCESS) { + return mosq__pstatus(L, rc); + } + } + + rc = mosquitto_disconnect_v5(ctx->mosq, reason_code, proplist); + mosquitto_property_free_all(&proplist); + return mosq__pstatus(L, rc); +} + /*** * Publish a message * @function publish @@ -661,6 +784,7 @@ static int ctx_disconnect(lua_State *L) * @tparam string payload (may be nil) * @tparam[opt=0] number qos 0, 1 or 2 * @tparam[opt=nil] boolean retain flag + * @tparam[opt=nil] table properties * @return * @see mosquitto_publish * @treturn[1] number MID can be used for correlation with callbacks @@ -672,19 +796,60 @@ static int ctx_disconnect(lua_State *L) static int ctx_publish(lua_State *L) { ctx_t *ctx = ctx_check(L, 1); - int mid; /* message id is referenced in the publish callback */ - const char *topic = luaL_checkstring(L, 2); + int mid, qos; /* message id is referenced in the publish callback */ + const char *topic; size_t payloadlen = 0; const void *payload = NULL; + bool retain; - if (!lua_isnil(L, 3)) { - payload = lua_tolstring(L, 3, &payloadlen); - }; + parse_basic_parameter_for_publish(L, &topic, &payload, &payloadlen, &qos, &retain); + int rc = mosquitto_publish(ctx->mosq, &mid, topic, payloadlen, payload, qos, retain); - int qos = luaL_optinteger(L, 4, 0); - bool retain = lua_toboolean(L, 5); + if (rc != MOSQ_ERR_SUCCESS) { + return mosq__pstatus(L, rc); + } else { + lua_pushinteger(L, mid); + return 1; + } +} - int rc = mosquitto_publish(ctx->mosq, &mid, topic, payloadlen, payload, qos, retain); +/*** + * Publish a message with v5 properties + * @function publish_v5 + * @tparam string topic + * @tparam string payload (may be nil) + * @tparam[opt=0] number qos 0, 1 or 2 + * @tparam[opt=nil] boolean retain flag + * @tparam[opt=nil] table properties + * @return + * @see mosquitto_publish_v5 + * @treturn[1] number MID can be used for correlation with callbacks + * @return[2] nil + * @treturn[2] number error code + * @treturn[2] string error description. + * @raise For some out of memory or illegal states + */ +static int ctx_publish_v5(lua_State *L) +{ + ctx_t *ctx = ctx_check(L, 1); + int mid, qos, rc; /* message id is referenced in the publish callback */ + const char *topic; + size_t payloadlen = 0; + const void *payload = NULL; + bool retain; + mosquitto_property *proplist = NULL; + + parse_basic_parameter_for_publish(L, &topic, &payload, &payloadlen, &qos, &retain); + + if (lua_table_on_stack(L, 6)) { + rc = create_property_list_from_lua_stack(L, 6, &proplist, CMD_PUBLISH); + if (rc != MOSQ_ERR_SUCCESS) { + return mosq__pstatus(L, rc); + } + } + + rc = mosquitto_publish_v5(ctx->mosq, &mid, topic, payloadlen, payload, qos, retain, proplist); + mosquitto_property_free_all(&proplist); if (rc != MOSQ_ERR_SUCCESS) { return mosq__pstatus(L, rc); @@ -723,6 +888,49 @@ static int ctx_subscribe(lua_State *L) } } +/*** + * Subscribe to a topic with v5 properties + * @function subscribe_v5 + * @tparam string topic eg "blah/+/json/#" + * @tparam[opt=0] number qos 0, 1 or 2 + * @tparam[opt=0] number option + * @tparam[opt=nil] table properties + * @treturn[1] number MID can be used for correlation with callbacks + * @return[2] nil + * @treturn[2] number error code + * @treturn[2] string error description. + * @raise For some out of memory or illegal states + * @see mosquitto_subscribe + * @see option_types + */ +static int ctx_subscribe_v5(lua_State *L) +{ + ctx_t *ctx = ctx_check(L, 1); + int mid; + int rc; + mosquitto_property *proplist = NULL; + const char *sub = luaL_checkstring(L, 2); + int qos = luaL_optinteger(L, 3, 0); + int options = luaL_optinteger(L, 4, 0); + + if (lua_table_on_stack(L, 5)) { + rc = create_property_list_from_lua_stack(L, 5, &proplist, CMD_SUBSCRIBE); + if (rc != MOSQ_ERR_SUCCESS) { + return mosq__pstatus(L, rc); + } + } + + rc = mosquitto_subscribe_v5(ctx->mosq, &mid, sub, qos, options, proplist); + mosquitto_property_free_all(&proplist); + + if (rc != MOSQ_ERR_SUCCESS) { + return mosq__pstatus(L, rc); + } else { + lua_pushinteger(L, mid); + return 1; + } +} + /*** * Unsubscribe from a topic * @function unsubscribe @@ -750,6 +958,41 @@ static int ctx_unsubscribe(lua_State *L) } } +/*** + * Unsubscribe from a topic with v5 properties + * @function unsubscribe_v5 + * @tparam string topic to unsubscribe from + * @tparam[opt=nil] table properties + * @see mosquitto_unsubscribe_v5 + * @return[1] boolean true + * @return[2] nil + * @treturn[2] number error code + * @treturn[2] string error description. + * @raise For some out of memory or illegal states + */ +static int ctx_unsubscribe_v5(lua_State *L){ + ctx_t *ctx = ctx_check(L, 1); + int mid, rc; + mosquitto_property *proplist = NULL; + const char *sub = luaL_checkstring(L, 2); + + if (lua_table_on_stack(L, 3)) { + rc = create_property_list_from_lua_stack(L, 3, &proplist, CMD_UNSUBSCRIBE); + if (rc != MOSQ_ERR_SUCCESS) { + return mosq__pstatus(L, rc); + } + } + + rc = mosquitto_unsubscribe_v5(ctx->mosq, &mid, sub, proplist); + mosquitto_property_free_all(&proplist); + if (rc != MOSQ_ERR_SUCCESS) { + return mosq__pstatus(L, rc); + } else { + lua_pushinteger(L, mid); + return 1; + } +} + static int mosq_loop(lua_State *L, bool forever) { ctx_t *ctx = ctx_check(L, 1); @@ -937,41 +1180,56 @@ static void ctx_on_connect( int rc) { ctx_t *ctx = obj; - bool success = false; - char *str = "reserved for future use"; + bool success = rc == 0; + const char *str = mosquitto_connack_string(rc); - switch(rc) { - case CONN_ACCEPT: - success = true; - str = "connection accepted"; - break; + lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_connect); - case CONN_REF_BAD_PROTOCOL: - str = "connection refused - incorrect protocol version"; - break; + lua_pushboolean(ctx->L, success); + lua_pushinteger(ctx->L, rc); + lua_pushstring(ctx->L, str); - case CONN_REF_BAD_ID: - str = "connection refused - invalid client identifier"; - break; + lua_call(ctx->L, 3, 0); +} - case CONN_REF_SERVER_NOAVAIL: - str = "connection refused - server unavailable"; - break; +static void ctx_on_connect_v5( + struct mosquitto *mosq, + void *obj, + int reason_code, + int flags, + const mosquitto_property *props) +{ + ctx_t *ctx = obj; + bool success = reason_code == MQTT_RC_SUCCESS; + const char *str = mosquitto_reason_string(reason_code); - case CONN_REF_BAD_LOGIN: - str = "connection refused - bad username or password"; - break; + lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_connect_v5); - case CONN_REF_NO_AUTH: - str = "connection refused - not authorised"; - break; + lua_pushboolean(ctx->L, success); + lua_pushinteger(ctx->L, reason_code); + lua_pushstring(ctx->L, str); + lua_pushinteger(ctx->L, flags); + create_lua_stack_from_property_list(ctx->L, props); - case CONN_REF_BAD_TLS: - str = "connection refused - TLS error"; - break; + lua_call(ctx->L, 5, 0); +} + + +static void ctx_on_disconnect( + struct mosquitto *mosq, + void *obj, + int rc) +{ + ctx_t *ctx = obj; + bool success = true; + char *str = "client-initiated disconnect"; + + if (rc) { + success = false; + str = "unexpected disconnect"; } - lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_connect); + lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_disconnect); lua_pushboolean(ctx->L, success); lua_pushinteger(ctx->L, rc); @@ -980,11 +1238,11 @@ static void ctx_on_connect( lua_call(ctx->L, 3, 0); } - -static void ctx_on_disconnect( +static void ctx_on_disconnect_v5( struct mosquitto *mosq, void *obj, - int rc) + int rc, + const mosquitto_property *props) { ctx_t *ctx = obj; bool success = true; @@ -995,13 +1253,14 @@ static void ctx_on_disconnect( str = "unexpected disconnect"; } - lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_disconnect); + lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_disconnect_v5); lua_pushboolean(ctx->L, success); lua_pushinteger(ctx->L, rc); lua_pushstring(ctx->L, str); + create_lua_stack_from_property_list(ctx->L, props); - lua_call(ctx->L, 3, 0); + lua_call(ctx->L, 4, 0); } static void ctx_on_publish( @@ -1016,6 +1275,25 @@ static void ctx_on_publish( lua_call(ctx->L, 1, 0); } +static void ctx_on_publish_v5( + struct mosquitto *mosq, + void *obj, + int mid, + int reason_code, + const mosquitto_property *props) +{ + ctx_t *ctx = obj; + const char *str = mosquitto_reason_string(reason_code); + + lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_publish_v5); + lua_pushinteger(ctx->L, mid); + lua_pushinteger(ctx->L, reason_code); + lua_pushstring(ctx->L, str); + create_lua_stack_from_property_list(ctx->L, props); + + lua_call(ctx->L, 4, 0); +} + static void ctx_on_message( struct mosquitto *mosq, void *obj, @@ -1035,6 +1313,27 @@ static void ctx_on_message( lua_call(ctx->L, 5, 0); /* args: mid, topic, payload, qos, retain */ } +static void ctx_on_message_v5( + struct mosquitto *mosq, + void *obj, + const struct mosquitto_message *msg, + const mosquitto_property *props) +{ + ctx_t *ctx = obj; + + /* push registered Lua callback function onto the stack */ + lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_message_v5); + /* push function args */ + lua_pushinteger(ctx->L, msg->mid); + lua_pushstring(ctx->L, msg->topic); + lua_pushlstring(ctx->L, msg->payload, msg->payloadlen); + lua_pushinteger(ctx->L, msg->qos); + lua_pushboolean(ctx->L, msg->retain); + create_lua_stack_from_property_list(ctx->L, props); + + lua_call(ctx->L, 6, 0); /* args: mid, topic, payload, qos, retain, properties */ +} + static void ctx_on_subscribe( struct mosquitto *mosq, void *obj, @@ -1055,6 +1354,29 @@ static void ctx_on_subscribe( lua_call(ctx->L, qos_count + 1, 0); } +static void ctx_on_subscribe_v5( + struct mosquitto *mosq, + void *obj, + int mid, + int qos_count, + const int *granted_qos, + const mosquitto_property *props) +{ + ctx_t *ctx = obj; + int i; + + lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_subscribe_v5); + lua_pushinteger(ctx->L, mid); + + create_lua_stack_from_property_list(ctx->L, props); + + for (i = 0; i < qos_count; i++) { + lua_pushinteger(ctx->L, granted_qos[i]); + } + + lua_call(ctx->L, qos_count + 2, 0); +} + static void ctx_on_unsubscribe( struct mosquitto *mosq, void *obj, @@ -1067,6 +1389,20 @@ static void ctx_on_unsubscribe( lua_call(ctx->L, 1, 0); } +static void ctx_on_unsubscribe_v5( + struct mosquitto *mosq, + void *obj, + int mid, + const mosquitto_property *props) +{ + ctx_t *ctx = obj; + + lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_unsubscribe_v5); + lua_pushinteger(ctx->L, mid); + create_lua_stack_from_property_list(ctx->L, props); + lua_call(ctx->L, 2, 0); +} + static void ctx_on_log( struct mosquitto *mosq, void *obj, @@ -1104,39 +1440,67 @@ static int ctx_callback_set(lua_State *L) int ref = luaL_ref(L, LUA_REGISTRYINDEX); switch (callback_type) { - case CONNECT: + case CALLBACK_ON_CONNECT: ctx->on_connect = ref; mosquitto_connect_callback_set(ctx->mosq, ctx_on_connect); break; + + case CALLBACK_ON_CONNECT_V5: + ctx->on_connect_v5 = ref; + mosquitto_connect_v5_callback_set(ctx->mosq, ctx_on_connect_v5); + break; - case DISCONNECT: + case CALLBACK_ON_DISCONNECT: ctx->on_disconnect = ref; mosquitto_disconnect_callback_set(ctx->mosq, ctx_on_disconnect); break; - case PUBLISH: + case CALLBACK_ON_DISCONNECT_V5: + ctx->on_disconnect_v5 = ref; + mosquitto_disconnect_v5_callback_set(ctx->mosq, ctx_on_disconnect_v5); + break; + + case CALLBACK_ON_PUBLISH: ctx->on_publish = ref; mosquitto_publish_callback_set(ctx->mosq, ctx_on_publish); break; - case MESSAGE: - /* store the reference into the context, to be retrieved by ctx_on_message */ + case CALLBACK_ON_PUBLISH_V5: + ctx->on_publish_v5 = ref; + mosquitto_publish_v5_callback_set(ctx->mosq, ctx_on_publish_v5); + break; + + case CALLBACK_ON_MESSAGE: ctx->on_message = ref; - /* register C callback in mosquitto */ mosquitto_message_callback_set(ctx->mosq, ctx_on_message); break; - case SUBSCRIBE: + case CALLBACK_ON_MESSAGE_V5: + ctx->on_message_v5 = ref; + mosquitto_message_v5_callback_set(ctx->mosq, ctx_on_message_v5); + break; + + case CALLBACK_ON_SUBSCRIBE: ctx->on_subscribe = ref; mosquitto_subscribe_callback_set(ctx->mosq, ctx_on_subscribe); break; - case UNSUBSCRIBE: + case CALLBACK_ON_SUBSCRIBE_V5: + ctx->on_subscribe_v5 = ref; + mosquitto_subscribe_v5_callback_set(ctx->mosq, ctx_on_subscribe_v5); + break; + + case CALLBACK_ON_UNSUBSCRIBE: ctx->on_unsubscribe = ref; mosquitto_unsubscribe_callback_set(ctx->mosq, ctx_on_unsubscribe); break; - case LOG: + case CALLBACK_ON_UNSUBSCRIBE_V5: + ctx->on_unsubscribe_v5 = ref; + mosquitto_unsubscribe_v5_callback_set(ctx->mosq, ctx_on_unsubscribe_v5); + break; + + case CALLBACK_ON_LOG: ctx->on_log = ref; mosquitto_log_callback_set(ctx->mosq, ctx_on_log); break; @@ -1163,11 +1527,17 @@ struct define { /*** Callback ids * @table callback_ids * @field ON_CONNECT + * @field ON_CONNECT_V5 * @field ON_DISCONNECT + * @field ON_DISCONNECT_V5 * @field ON_PUBLISH + * @field ON_PUBLISH_V5 * @field ON_MESSAGE + * @field ON_MESSAGE_V5 * @field ON_SUBSCRIBE + * @field ON_SUBSCRIBE_V5 * @field ON_UNSUBSCRIBE + * @field ON_UNSUBSCRIBE_V5 * @field ON_LOG */ @@ -1204,14 +1574,31 @@ struct define { * @field MQTT_PROTOCOL_V311 * @field MQTT_PROTOCOL_V5 */ + + /*** Sub Option Values + * @see option + * @table sub_option_values + * @field MQTT_SUB_OPT_NO_LOCAL + * @field MQTT_SUB_OPT_RETAIN_AS_PUBLISHED + * @field MQTT_SUB_OPT_SEND_RETAIN_ALWAYS + * @field MQTT_SUB_OPT_SEND_RETAIN_NEW + * @field MQTT_SUB_OPT_SEND_RETAIN_NEVER + */ + static const struct define D[] = { - {"ON_CONNECT", CONNECT}, - {"ON_DISCONNECT", DISCONNECT}, - {"ON_PUBLISH", PUBLISH}, - {"ON_MESSAGE", MESSAGE}, - {"ON_SUBSCRIBE", SUBSCRIBE}, - {"ON_UNSUBSCRIBE", UNSUBSCRIBE}, - {"ON_LOG", LOG}, + {"ON_CONNECT", CALLBACK_ON_CONNECT}, + {"ON_CONNECT_V5", CALLBACK_ON_CONNECT_V5}, + {"ON_DISCONNECT", CALLBACK_ON_DISCONNECT}, + {"ON_DISCONNECT_V5",CALLBACK_ON_DISCONNECT_V5}, + {"ON_PUBLISH", CALLBACK_ON_PUBLISH}, + {"ON_PUBLISH_V5", CALLBACK_ON_PUBLISH_V5}, + {"ON_MESSAGE", CALLBACK_ON_MESSAGE}, + {"ON_MESSAGE_V5", CALLBACK_ON_MESSAGE_V5}, + {"ON_SUBSCRIBE", CALLBACK_ON_SUBSCRIBE}, + {"ON_SUBSCRIBE_V5", CALLBACK_ON_SUBSCRIBE_V5}, + {"ON_UNSUBSCRIBE", CALLBACK_ON_UNSUBSCRIBE}, + {"ON_UNSUBSCRIBE_V5",CALLBACK_ON_UNSUBSCRIBE_V5}, + {"ON_LOG", CALLBACK_ON_LOG}, {"LOG_NONE", MOSQ_LOG_NONE}, {"LOG_INFO", MOSQ_LOG_INFO}, @@ -1236,9 +1623,244 @@ static const struct define D[] = { {"MQTT_PROTOCOL_V311", MQTT_PROTOCOL_V311}, {"MQTT_PROTOCOL_V5", MQTT_PROTOCOL_V5}, + {"MQTT_SUB_OPT_NO_LOCAL", MQTT_SUB_OPT_NO_LOCAL}, + {"MQTT_SUB_OPT_RETAIN_AS_PUBLISHED", MQTT_SUB_OPT_RETAIN_AS_PUBLISHED}, + {"MQTT_SUB_OPT_SEND_RETAIN_ALWAYS", MQTT_SUB_OPT_SEND_RETAIN_ALWAYS}, + {"MQTT_SUB_OPT_SEND_RETAIN_NEW", MQTT_SUB_OPT_SEND_RETAIN_NEW}, + {"MQTT_SUB_OPT_SEND_RETAIN_NEVER", MQTT_SUB_OPT_SEND_RETAIN_NEVER}, + {NULL, 0} }; +static int create_lua_stack_from_property_list(lua_State *L, const mosquitto_property *properties) +{ + int identifier, pushed; + int user_prop_table_created = 0; + uint8_t i8value = 0; + uint16_t i16value = 0; + uint32_t i32value = 0; + char *strname = NULL, *strvalue = NULL; + char *binvalue = NULL; + const mosquitto_property *prop = NULL; + + lua_newtable(L); + + for(prop=properties; prop != NULL; prop = mosquitto_property_next(prop)){ + pushed = 0; + identifier = mosquitto_property_identifier(prop); + switch(identifier){ + case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR: + case MQTT_PROP_REQUEST_PROBLEM_INFORMATION: + case MQTT_PROP_REQUEST_RESPONSE_INFORMATION: + case MQTT_PROP_MAXIMUM_QOS: + case MQTT_PROP_RETAIN_AVAILABLE: + case MQTT_PROP_WILDCARD_SUB_AVAILABLE: + case MQTT_PROP_SUBSCRIPTION_ID_AVAILABLE: + case MQTT_PROP_SHARED_SUB_AVAILABLE: + mosquitto_property_read_byte(prop, identifier, &i8value, false); + lua_pushinteger(L, i8value); + pushed = 1; + break; + + case MQTT_PROP_SERVER_KEEP_ALIVE: + case MQTT_PROP_RECEIVE_MAXIMUM: + case MQTT_PROP_TOPIC_ALIAS_MAXIMUM: + case MQTT_PROP_TOPIC_ALIAS: + mosquitto_property_read_int16(prop, identifier, &i16value, false); + lua_pushinteger(L, i16value); + pushed = 1; + break; + + case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL: + case MQTT_PROP_SESSION_EXPIRY_INTERVAL: + case MQTT_PROP_WILL_DELAY_INTERVAL: + case MQTT_PROP_MAXIMUM_PACKET_SIZE: + mosquitto_property_read_int32(prop, identifier, &i32value, false); + lua_pushinteger(L, i32value); + pushed = 1; + break; + + case MQTT_PROP_SUBSCRIPTION_IDENTIFIER: + mosquitto_property_read_varint(prop, identifier, &i32value, false); + lua_pushinteger(L, i32value); + pushed = 1; + break; + + case MQTT_PROP_CONTENT_TYPE: + case MQTT_PROP_RESPONSE_TOPIC: + case MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER: + case MQTT_PROP_AUTHENTICATION_METHOD: + case MQTT_PROP_RESPONSE_INFORMATION: + case MQTT_PROP_SERVER_REFERENCE: + case MQTT_PROP_REASON_STRING: + mosquitto_property_read_string(prop, identifier, &strvalue, false); + if(strvalue == NULL) return MOSQ_ERR_NOMEM; + lua_pushstring(L, strvalue); + free(strvalue); + strvalue = NULL; + pushed = 1; + break; + + case MQTT_PROP_AUTHENTICATION_DATA: + case MQTT_PROP_CORRELATION_DATA: + mosquitto_property_read_binary(prop, identifier, (void **)&binvalue, &i16value, false); + if(binvalue == NULL) return MOSQ_ERR_NOMEM; + lua_pushlstring(L, binvalue, i16value); + free(binvalue); + binvalue = NULL; + pushed = 1; + break; + + case MQTT_PROP_USER_PROPERTY: + if (user_prop_table_created == 0){ + lua_newtable(L); + user_prop_table_created = 1; + } + + mosquitto_property_read_string_pair(prop, identifier, &strname, &strvalue, false); + if(strname == NULL || strvalue == NULL) return MOSQ_ERR_NOMEM; + + lua_pushstring(L, strvalue); + lua_setfield(L, -2, strname); + + free(strvalue); + free(strname); + + break; + } + if(pushed){ + // when user property table is created, the root table is at index -3 otherwhise -2 + lua_setfield(L, -2 - user_prop_table_created, mosquitto_property_identifier_to_string(identifier)); + } + } + + // the user property table is added after the loop to the main table since the order of props is undefinied + // and the table can be extended until the last loop + if (user_prop_table_created == 1){ + lua_setfield(L, -2, mosquitto_property_identifier_to_string(MQTT_PROP_USER_PROPERTY)); + } + + return MOSQ_ERR_SUCCESS; +} + +static int lua_table_on_stack(lua_State *L, int index) { + return !lua_isnone(L, index) && lua_istable(L, index); +} + +static int create_property_list_from_lua_stack(lua_State *L, int index, mosquitto_property** proplist, int command) +{ + int rc; + lua_pushvalue(L, index); + lua_pushnil(L); /* first key */ + while (lua_next(L, index) != 0) { + int type, identifier; + size_t szt; + + // parse property key + const char *propname = lua_tolstring(L, -2, NULL); + rc = mosquitto_string_to_property_info(propname, &identifier, &type); + if (rc != MOSQ_ERR_SUCCESS) { + // unknown property name detected + break; + } + + // parse property value + int luatype = lua_type(L, -1); + if (luatype == LUA_TNUMBER) { + int value = lua_tonumber(L, -1); + switch(type){ + case MQTT_PROP_TYPE_BYTE: + if(value < 0 || value > UINT8_MAX){ + rc = MOSQ_ERR_INVAL; + break; + } + rc = mosquitto_property_add_byte(proplist, identifier, (uint8_t )value); + break; + case MQTT_PROP_TYPE_INT16: + if(value < 0 || value > UINT16_MAX){ + rc = MOSQ_ERR_INVAL; + break; + } + rc = mosquitto_property_add_int16(proplist, identifier, (uint16_t )value); + break; + case MQTT_PROP_TYPE_INT32: + if(value < 0 || value > UINT32_MAX){ + rc = MOSQ_ERR_INVAL; + break; + } + rc = mosquitto_property_add_int32(proplist, identifier, (uint32_t )value); + break; + case MQTT_PROP_TYPE_VARINT: + if(value < 0 || value > UINT32_MAX){ + rc = MOSQ_ERR_INVAL; + break; + } + rc = mosquitto_property_add_varint(proplist, identifier, (uint32_t )value); + break; + default: + return MOSQ_ERR_INVAL; + } + } else if (luatype == LUA_TSTRING) { + const char* value = lua_tolstring(L, -1, NULL); + + switch(type){ + case MQTT_PROP_TYPE_BINARY: + szt = strlen(value); + if(szt > UINT16_MAX){ + rc = MOSQ_ERR_INVAL; + break; + } + rc = mosquitto_property_add_binary(proplist, identifier, value, (uint16_t )szt); + break; + case MQTT_PROP_TYPE_STRING: + rc = mosquitto_property_add_string(proplist, identifier, value); + break; + default: + return MOSQ_ERR_INVAL; + } + } else if (luatype == LUA_TTABLE) { + int table_idx = lua_absindex (L, -1); + lua_pushnil(L); /* first key */ + while (lua_next(L, table_idx) != 0) { + luaL_checktype(L,-2,LUA_TSTRING); + const char *userkey = lua_tolstring(L, -2, NULL); + const char *uservalue = lua_tolstring(L, -1, NULL); + lua_pop(L, 1); + + rc = mosquitto_property_add_string_pair(proplist, identifier, userkey, uservalue); + } + } else { + // can only parse string, number and table + rc = MOSQ_ERR_INVAL; + } + + // removes 'value'; keeps 'key' for next iteration + lua_pop(L, 1); + + if (rc != MOSQ_ERR_SUCCESS) { + // single prop was not parsed correctly, break property read loop + break; + } + } + + lua_pop(L, 1); + + // if something failed the proplist memory is freed and the error returned + if (rc != MOSQ_ERR_SUCCESS) { + mosquitto_property_free_all(proplist); + return rc; + } + + // check final proplist for correctness with given command + rc = mosquitto_property_check_all(command, *proplist); + if (rc != MOSQ_ERR_SUCCESS) { + mosquitto_property_free_all(proplist); + return MOSQ_ERR_INVAL; + } + + return MOSQ_ERR_SUCCESS; +} + static int callback_type_from_string(const char *typestr) { const struct define *def = D; @@ -1262,6 +1884,18 @@ static void mosq_register_defs(lua_State *L, const struct define *D) } } +static void parse_basic_parameter_for_publish(lua_State *L, const char **topic, const void **payload, size_t *payloadlen, int *qos, bool *retain) +{ + *topic = luaL_checkstring(L, 2); + + if (!lua_isnil(L, 3)) { + *payload = lua_tolstring(L, 3, payloadlen); + }; + + *qos = luaL_optinteger(L, 4, 0); + *retain = lua_toboolean(L, 5); +} + static const struct luaL_Reg R[] = { {"version", mosq_version}, {"init", mosq_init}, @@ -1277,6 +1911,7 @@ static const struct luaL_Reg ctx_M[] = { {"__gc", ctx_destroy}, {"reinitialise", ctx_reinitialise}, {"will_set", ctx_will_set}, + {"will_set_v5", ctx_will_set_v5}, {"will_clear", ctx_will_clear}, {"login_set", ctx_login_set}, {"tls_insecure_set", ctx_tls_insecure_set}, @@ -1286,13 +1921,18 @@ static const struct luaL_Reg ctx_M[] = { {"threaded_set", ctx_threaded_set}, {"option", ctx_option}, {"connect", ctx_connect}, + {"connect_bind_v5", ctx_connect_bind_v5}, {"connect_async", ctx_connect_async}, {"reconnect", ctx_reconnect}, {"reconnect_async", ctx_reconnect_async}, {"disconnect", ctx_disconnect}, + {"disconnect_v5", ctx_disconnect_v5}, {"publish", ctx_publish}, + {"publish_v5", ctx_publish_v5}, {"subscribe", ctx_subscribe}, + {"subscribe_v5", ctx_subscribe_v5}, {"unsubscribe", ctx_unsubscribe}, + {"unsubscribe_v5", ctx_unsubscribe_v5}, {"loop", ctx_loop}, {"loop_forever", ctx_loop_forever}, {"loop_start", ctx_loop_start}, diff --git a/test/ring/properties.lua b/test/ring/properties.lua new file mode 100644 index 0000000..fa38c46 --- /dev/null +++ b/test/ring/properties.lua @@ -0,0 +1,91 @@ +local mosq = require("mosquitto") + +local MOSQ_ID = "flukso_" +local MOSQ_CLEAN_SESSION = true +local MOSQ_HOST = arg[1] +local MOSQ_PORT = 1883 +local MOSQ_KEEPALIVE = 300 +local MOSQ_MAX_READ = 10 -- packets +local MOSQ_MAX_WRITE = 10 -- packets + +function dump(o) + if type(o) == 'table' then + local s = '{ ' + for k,v in pairs(o) do + if type(k) ~= 'number' then k = '"'..k..'"' end + s = s .. '['..k..'] = ' .. dump(v) .. ',' + end + return s .. '} ' + else + return tostring(o) + end +end + +mosq.init() +local mqtt = mosq.new(MOSQ_ID, MOSQ_CLEAN_SESSION) + +mqtt:option(mosq.OPT_PROTOCOL_VERSION, mosq.MQTT_PROTOCOL_V5); + +while not mqtt:connect_bind_v5(MOSQ_HOST, MOSQ_PORT, MOSQ_KEEPALIVE, nil, conncect_props) do + print("trying to connect to broker ...") +end + +mqtt.ON_CONNECT_V5 = function(success, rc, rc_string, flags, properties) + print("ON_CONNECT_V5", success, rc_string, flags, dump(properties)) + mqtt:subscribe_v5("v5") +end + +mqtt.ON_CONNECT = function(success, rc, rc_string) + print("ON_CONNECT", success, rc_string, flags) +end + +mqtt.ON_SUBSCRIBE_V5 = function(mid, properties, ...) + print("ON_SUBSCRIBE_V5", mid, dump(properties)) + + -- for reference + os.execute([[mosquitto_pub \ + -V 5 \ + -t 'v5' \ + -m 'message' \ + -D publish user-property a testA \ + -D publish payload-format-indicator 0 \ + -D publish content-type text/json \ + -D publish user-property b testB \ + -D publish response-topic this/is/my/response/topic \ + -D publish message-expiry-interval 255 \ + ]]) + + -- send own + properties = {} + properties["content-type"] = "text/json" + properties["response-topic"] = "this/is/my/response/topic" + properties["payload-format-indicator"] = 0 + properties["user-property"] = {a = "testA", b = "testB"} + properties["message-expiry-interval"] = 255 + assert(mqtt:publish_v5("v5", "message", 0, false, properties)) + + -- send without properties + assert(mqtt:publish_v5("v5", "message", 0, false, nil)) + + -- unsubscribe + assert(mqtt:unsubscribe_v5("not-subscribed")) + + properties = {} + properties["user-property"] = {a = "testA", b = "testB"} + --assert(mqtt:disconnect_v5(140, properties)) +end + +mqtt.ON_UNSUBSCRIBE_V5 = function(mid, properties) + print("ON_UNSUBSCRIBE_V5", mid, dump(properties)) +end + +mqtt.ON_DISCONNECT_V5 = function(succes, rc, rc_string, properties) + print("ON_DISCONNECT_V5", succes, rc, rc_string, dump(properties)) +end + +mqtt.ON_MESSAGE_V5 = function(mid, topic, payload, qos, retain, properties) + print("ON_MESSAGE_V5", topic, payload, dump(properties)) +end + +mqtt:loop_forever() +