1- from quixstreams import Application , context
2- import paho .mqtt .client as paho
3- from paho import mqtt
4- import json
1+ from mqtt import MQTTSink
2+ from quixstreams import Application
53import os
64
75# Load environment variables (useful when working locally)
8- from dotenv import load_dotenv
9- load_dotenv ()
6+ # from dotenv import load_dotenv
7+ # load_dotenv()
108
11- def mqtt_protocol_version ():
12- if os .environ ["mqtt_version" ] == "3.1" :
13- print ("Using MQTT version 3.1" )
14- return paho .MQTTv31
15- if os .environ ["mqtt_version" ] == "3.1.1" :
16- print ("Using MQTT version 3.1.1" )
17- return paho .MQTTv311
18- if os .environ ["mqtt_version" ] == "5" :
19- print ("Using MQTT version 5" )
20- return paho .MQTTv5
21- print ("Defaulting to MQTT version 3.1.1" )
22- return paho .MQTTv311
9+ app = Application (consumer_group = "mqtt_consumer_group" , auto_offset_reset = "earliest" )
10+ input_topic = app .topic (os .environ ["input" ], value_deserializer = "double" )
2311
24- def configure_authentication ( mqtt_client ):
25- mqtt_username = os .getenv ( "mqtt_username" , "" )
26- if mqtt_username != "" :
27- mqtt_password = os .getenv ( "mqtt_password" , "" )
28- if mqtt_password == "" :
29- raise ValueError ( 'mqtt_password must set when mqtt_username is set' )
30- print ( "Using username & password authentication" )
31- mqtt_client . username_pw_set ( os .environ ["mqtt_username " ], os . environ [ "mqtt_password" ])
32- return
33- print ( "Using anonymous authentication" )
12+ sink = MQTTSink (
13+ client_id = os .environ [ "MQTT_CLIENT_ID" ],
14+ server = os . environ [ "MQTT_SERVER" ],
15+ port = int ( os .environ [ "MQTT_PORT" ]),
16+ topic_root = os . environ [ "MQTT_TOPIC_ROOT" ],
17+ username = os . environ [ "MQTT_USERNAME" ],
18+ password = os . environ [ "MQTT_PASSWORD" ],
19+ version = os .environ ["MQTT_VERSION " ],
20+ tls_enabled = os . environ [ "MQTT_USE_TLS" ]. lower () == "true"
21+ )
3422
35- mqtt_port = os .environ ["mqtt_port" ]
36- # Validate the config
37- if not mqtt_port .isnumeric ():
38- raise ValueError ('mqtt_port must be a numeric value' )
23+ sdf = app .dataframe (topic = input_topic )
24+ sdf .sink (sink )
3925
40- client_id = os .getenv ("Quix__Deployment__Id" , "default" )
41- mqtt_client = paho .Client (callback_api_version = paho .CallbackAPIVersion .VERSION2 ,
42- client_id = client_id , userdata = None , protocol = mqtt_protocol_version ())
43- mqtt_client .tls_set (tls_version = mqtt .client .ssl .PROTOCOL_TLS ) # we'll be using tls
44- mqtt_client .reconnect_delay_set (5 , 60 )
45- configure_authentication (mqtt_client )
4626
47- # Create a Quix platform-specific application instead
48- app = Application (consumer_group = "mqtt_consumer_group" , auto_offset_reset = 'earliest' )
49- # initialize the topic, this will combine the topic name with the environment details to produce a valid topic identifier
50- input_topic = app .topic (os .environ ["input" ])
51-
52- # setting callbacks for different events to see if it works, print the message etc.
53- def on_connect_cb (client : paho .Client , userdata : any , connect_flags : paho .ConnectFlags ,
54- reason_code : paho .ReasonCode , properties : paho .Properties ):
55- if reason_code == 0 :
56- print ("CONNECTED!" ) # required for Quix to know this has connected
57- else :
58- print (f"ERROR! - ({ reason_code .value } ). { reason_code .getName ()} " )
59-
60- def on_disconnect_cb (client : paho .Client , userdata : any , disconnect_flags : paho .DisconnectFlags ,
61- reason_code : paho .ReasonCode , properties : paho .Properties ):
62- print (f"DISCONNECTED! Reason code ({ reason_code .value } ) { reason_code .getName ()} !" )
63-
64- mqtt_client .on_connect = on_connect_cb
65- mqtt_client .on_disconnect = on_disconnect_cb
66-
67- mqtt_topic_root = os .environ ["mqtt_topic_root" ]
68-
69- # connect to MQTT Cloud on port 8883 (default for MQTT)
70- mqtt_client .connect (os .environ ["mqtt_server" ], int (mqtt_port ))
71-
72- # Hook up to termination signal (for docker image) and CTRL-C
73- print ("Listening to streams. Press CTRL-C to exit." )
74-
75- sdf = app .dataframe (input_topic )
76-
77- def publish_to_mqtt (data , key , timestamp , headers ):
78- json_data = json .dumps (data )
79- message_key_string = key .decode ('utf-8' ) # Convert to string using utf-8 encoding
80- # publish to MQTT
81- mqtt_client .publish (mqtt_topic_root + "/" + message_key_string , payload = json_data , qos = 1 )
82-
83- sdf = sdf .apply (publish_to_mqtt , metadata = True )
84-
85-
86- # start the background process to handle MQTT messages
87- mqtt_client .loop_start ()
88-
89- print ("Starting application" )
90- # run the data processing pipeline
91- app .run (sdf )
92-
93- # stop handling MQTT messages
94- mqtt_client .loop_stop ()
95- print ("Exiting" )
27+ if __name__ == '__main__' :
28+ app .run ()
0 commit comments