Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions python/destinations/MQTT/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,18 @@ You'll need to have an MQTT broker either locally or in the cloud
The connector uses the following environment variables:

- **input**: Name of the input topic to listen to.
- **mqtt_topic_root**: The root for messages in MQTT, this can be anything.
- **mqtt_server**: The address of your MQTT server.
- **mqtt_port**: The port of your MQTT server.
- **mqtt_username**: Username of your MQTT user.
- **mqtt_password**: Password for the MQTT user.
- **MQTT_CLIENT_ID**: A client ID for the sink.
**Default**: `mqtt-sink`
- **MQTT_TOPIC_ROOT**: The root for messages in MQTT, this can be anything.
- **MQTT_SERVER**: The address of your MQTT server.
- **MQTT_PORT**: The port of your MQTT server.
**Default**: `8883`
- **MQTT_USERNAME**: Username of your MQTT user.
- **MQTT_PASSWORD**: Password for the MQTT user.
- **MQTT_VERSION**: MQTT protocol version; choose 3.1, 3.1.1, or 5.
**Default**: `3.1.1`
- **MQTT_USE_TLS**: Set to true if the server uses TLS.
**Default**: `True`

## Contribute

Expand Down
42 changes: 28 additions & 14 deletions python/destinations/MQTT/library.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,63 +18,77 @@
"Type": "EnvironmentVariable",
"InputType": "InputTopic",
"Description": "Name of the input topic to listen to.",
"DefaultValue": "",
"Required": true
},
{
"Name": "mqtt_topic_root",
"Name": "MQTT_CLIENT_ID",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "A client ID for the sink",
"DefaultValue": "mqtt-sink",
"Required": true
},
{
"Name": "MQTT_TOPIC_ROOT",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The root for messages in MQTT, this can be anything",
"Required": true
},
{
"Name": "mqtt_server",
"Name": "MQTT_SERVER",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The address of your MQTT server",
"Required": true
},
{
"Name": "mqtt_port",
"Name": "MQTT_PORT",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The port of your MQTT server",
"DefaultValue": "8883",
"Required": true
},
{
"Name": "mqtt_username",
"Name": "MQTT_USERNAME",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Username of your MQTT user",
"Required": false
"Required": true
},
{
"Name": "mqtt_password",
"Name": "MQTT_PASSWORD",
"Type": "EnvironmentVariable",
"InputType": "Secret",
"Description": "Password for the MQTT user",
"Required": false
"Required": true
},
{
"Name": "mqtt_version",
"Name": "MQTT_VERSION",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "MQTT protocol version: 3.1, 3.1.1, 5",
"DefaultValue": "3.1.1",
"Description": "MQTT protocol version; choose 3.1, 3.1.1, or 5",
"Required": true
},
{
"Name": "mqtt_tls_enabled",
"Name": "MQTT_USE_TLS",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Enable TLS for MQTT connection (true/false)",
"Description": "Set to true if the server uses TLS",
"DefaultValue": "true",
"Required": true
},
{
"Name": "MQTT_RETAIN_MESSAGES",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Whether to retain/persist the latest message (per key) on the MQTT topic.",
"DefaultValue": "false",
"Required": false
},
{
"Name": "consumer_group_name",
"Name": "CONSUMER_GROUP",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Kafka consumer group name for the MQTT destination",
Expand Down
133 changes: 29 additions & 104 deletions python/destinations/MQTT/main.py
Original file line number Diff line number Diff line change
@@ -1,107 +1,32 @@
from quixstreams import Application, context
import paho.mqtt.client as paho
from paho import mqtt
import json
from quixstreams import Application
from quixstreams.sinks.community.mqtt import MQTTSink
import os

# Load environment variables (useful when working locally)
from dotenv import load_dotenv
load_dotenv()

def mqtt_protocol_version():
if os.environ["mqtt_version"] == "3.1":
print("Using MQTT version 3.1")
return paho.MQTTv31
if os.environ["mqtt_version"] == "3.1.1":
print("Using MQTT version 3.1.1")
return paho.MQTTv311
if os.environ["mqtt_version"] == "5":
print("Using MQTT version 5")
return paho.MQTTv5
print("Defaulting to MQTT version 3.1.1")
return paho.MQTTv311

def configure_authentication(mqtt_client):
mqtt_username = os.getenv("mqtt_username", "")
if mqtt_username != "":
mqtt_password = os.getenv("mqtt_password", "")
if mqtt_password == "":
raise ValueError('mqtt_password must set when mqtt_username is set')
print("Using username & password authentication")
mqtt_client.username_pw_set(os.environ["mqtt_username"], os.environ["mqtt_password"])
return
print("Using anonymous authentication")

mqtt_port = os.environ["mqtt_port"]
mqtt_tls_enabled = os.getenv("mqtt_tls_enabled", "true").lower() == "true"

# Validate the config
if not mqtt_port.isnumeric():
raise ValueError('mqtt_port must be a numeric value')

client_id = os.getenv("Quix__Deployment__Id", "default")
mqtt_client = paho.Client(callback_api_version=paho.CallbackAPIVersion.VERSION2,
client_id = client_id, userdata = None, protocol = mqtt_protocol_version())

if mqtt_tls_enabled:
print("TLS enabled")
mqtt_client.tls_set(tls_version = mqtt.client.ssl.PROTOCOL_TLS)
else:
print("TLS disabled")

mqtt_client.reconnect_delay_set(5, 60)
configure_authentication(mqtt_client)

# Create a Quix platform-specific application instead
consumer_group = os.getenv("consumer_group_name", "mqtt_consumer_group")
app = Application(consumer_group=consumer_group, auto_offset_reset='earliest')
# initialize the topic, this will combine the topic name with the environment details to produce a valid topic identifier
input_topic_name = os.environ["input"]
input_topic = app.topic(input_topic_name)

# setting callbacks for different events to see if it works, print the message etc.
def on_connect_cb(client: paho.Client, userdata: any, connect_flags: paho.ConnectFlags,
reason_code: paho.ReasonCode, properties: paho.Properties):
if reason_code == 0:
print("CONNECTED!") # required for Quix to know this has connected
else:
print(f"ERROR! - ({reason_code.value}). {reason_code.getName()}")

def on_disconnect_cb(client: paho.Client, userdata: any, disconnect_flags: paho.DisconnectFlags,
reason_code: paho.ReasonCode, properties: paho.Properties):
print(f"DISCONNECTED! Reason code ({reason_code.value}) {reason_code.getName()}!")

mqtt_client.on_connect = on_connect_cb
mqtt_client.on_disconnect = on_disconnect_cb

mqtt_topic_root = os.environ["mqtt_topic_root"]

# connect to MQTT Cloud on port 8883 (default for MQTT)
mqtt_client.connect(os.environ["mqtt_server"], int(mqtt_port))

# Hook up to termination signal (for docker image) and CTRL-C
print("Listening to streams. Press CTRL-C to exit.")

sdf = app.dataframe(input_topic)

def publish_to_mqtt(data, key, timestamp, headers):
json_data = json.dumps(data)
message_key_string = key.decode('utf-8') # Convert to string using utf-8 encoding
mqtt_topic = mqtt_topic_root + "/" + message_key_string
# publish to MQTT with retain=True so messages are available for late subscribers
mqtt_client.publish(mqtt_topic, payload = json_data, qos = 1, retain=True)
return data

sdf = sdf.apply(publish_to_mqtt, metadata=True)


# start the background process to handle MQTT messages
mqtt_client.loop_start()

print("Starting application")
# run the data processing pipeline
app.run()

# stop handling MQTT messages
mqtt_client.loop_stop()
print("Exiting")
# from dotenv import load_dotenv
# load_dotenv()

app = Application(
consumer_group=os.getenv("CONSUMER_GROUP", "mqtt_consumer_group"),
auto_offset_reset="earliest"
)
input_topic = app.topic(os.environ["input"])

sink = MQTTSink(
client_id=os.environ["MQTT_CLIENT_ID"],
server=os.environ["MQTT_SERVER"],
port=int(os.environ["MQTT_PORT"]),
topic_root=os.environ["MQTT_TOPIC_ROOT"],
username=os.environ["MQTT_USERNAME"],
password=os.environ["MQTT_PASSWORD"],
version=os.environ["MQTT_VERSION"],
retain=os.getenv("MQTT_RETAIN_MESSAGES", "false").lower() == "true",
tls_enabled=os.environ["MQTT_USE_TLS"].lower() == "true"
)

sdf = app.dataframe(topic=input_topic)
sdf.sink(sink)


if __name__ == '__main__':
app.run()
3 changes: 1 addition & 2 deletions python/destinations/MQTT/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
quixstreams==3.23.1
paho-mqtt==2.1.0
quixstreams[mqtt]==3.23.1
python-dotenv
25 changes: 18 additions & 7 deletions tests/destinations/MQTT/docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
services:
mqtt-broker:
image: eclipse-mosquitto:latest
environment:
- MQTT_USERNAME=testuser
- MQTT_PASSWORD=testpass
entrypoint: ["/bin/sh", "/init-mqtt.sh"]
networks:
- test-network
volumes:
- ./mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
- ./init-mqtt.sh:/init-mqtt.sh:ro
healthcheck:
test: ["CMD-SHELL", "mosquitto_sub -t '$$SYS/#' -C 1 -i healthcheck -W 3 || exit 1"]
test: ["CMD-SHELL", "mosquitto_sub -t '$$SYS/#' -C 1 -i healthcheck -W 3 -u testuser -P testpass || exit 1"]
interval: 3s
timeout: 5s
retries: 10
Expand Down Expand Up @@ -35,13 +40,17 @@ services:
dockerfile: Dockerfile
environment:
- Quix__Broker__Address=kafka:9092
- consumer_group_name=mqtt-test-consumer
- input=test-mqtt-input
- mqtt_server=mqtt-broker
- mqtt_port=1883
- mqtt_topic_root=test/output
- mqtt_version=3.1.1
- mqtt_tls_enabled=false
- CONSUMER_GROUP=mqtt-test-consumer
- MQTT_CLIENT_ID=test_client
- MQTT_SERVER=mqtt-broker
- MQTT_PORT=1883
- MQTT_TOPIC_ROOT=test/output
- MQTT_VERSION=3.1.1
- MQTT_USE_TLS=false
- MQTT_USERNAME=testuser
- MQTT_PASSWORD=testpass
- MQTT_RETAIN_MESSAGES=true
networks:
- test-network
depends_on:
Expand All @@ -61,6 +70,8 @@ services:
- MQTT_BROKER=mqtt-broker
- MQTT_PORT=1883
- MQTT_TOPIC=test/output/#
- MQTT_USERNAME=testuser
- MQTT_PASSWORD=testpass
command: >
sh -c "
echo 'Installing MQTT client...' &&
Expand Down
21 changes: 21 additions & 0 deletions tests/destinations/MQTT/init-mqtt.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/sh
# Initialize MQTT broker with authentication

MQTT_USERNAME="${MQTT_USERNAME:-testuser}"
MQTT_PASSWORD="${MQTT_PASSWORD:-testpass}"
CONFIG_DIR="/app/state/mosquitto/config"
LOG_DIR="/app/state/mosquitto/log"
DATA_DIR="/app/state/mosquitto/data"

echo "Setting up MQTT broker directories..."
mkdir -p "$CONFIG_DIR" "$LOG_DIR" "$DATA_DIR"

echo "Creating password file for user: $MQTT_USERNAME"
mosquitto_passwd -b -c "$CONFIG_DIR/passwd" "$MQTT_USERNAME" "$MQTT_PASSWORD"

echo "Setting permissions for mosquitto user..."
chown -R mosquitto:mosquitto /app/state/mosquitto
chmod -R 755 /app/state/mosquitto

echo "Starting Mosquitto MQTT broker..."
exec mosquitto -c /mosquitto/config/mosquitto.conf
6 changes: 5 additions & 1 deletion tests/destinations/MQTT/mosquitto.conf
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
listener 1883
allow_anonymous true
allow_anonymous false
persistence true
persistence_location /app/state/mosquitto/data/
log_dest file /app/state/mosquitto/log/mosquitto.log
password_file /app/state/mosquitto/config/passwd
10 changes: 8 additions & 2 deletions tests/destinations/MQTT/verify_output.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import paho.mqtt.client as mqtt
import paho.mqtt.client as paho
import os
import time
import sys
Expand All @@ -8,20 +8,26 @@
mqtt_broker = os.getenv("MQTT_BROKER", "mqtt-broker")
mqtt_port = int(os.getenv("MQTT_PORT", "1883"))
mqtt_topic = os.getenv("MQTT_TOPIC", "test/output/#")
mqtt_username = os.environ["MQTT_USERNAME"]
mqtt_password = os.environ["MQTT_PASSWORD"]


def on_connect(client, userdata, flags, reason_code, properties):
print(f"Connected to MQTT broker with result code {reason_code}")
client.subscribe(mqtt_topic)
print(f"Subscribed to topic: {mqtt_topic}")


def on_message(client, userdata, msg):
print(f"Received message on topic {msg.topic}: {msg.payload.decode()}")
messages_received.append(msg.payload)


def main():
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
client = paho.Client(callback_api_version=paho.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(mqtt_username, mqtt_password)

print(f"Connecting to MQTT broker at {mqtt_broker}:{mqtt_port}...")
client.connect(mqtt_broker, mqtt_port, 60)
Expand Down