Skip to content

Commit e76324c

Browse files
committed
update tests and adjust a couple env var things
1 parent 458135f commit e76324c

File tree

8 files changed

+64
-272
lines changed

8 files changed

+64
-272
lines changed

python/destinations/MQTT/library.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,15 @@
8080
"Required": true
8181
},
8282
{
83-
"Name": "mqtt_tls_enabled",
83+
"Name": "MQTT_RETAIN_MESSAGES",
8484
"Type": "EnvironmentVariable",
8585
"InputType": "FreeText",
86-
"Description": "Enable TLS for MQTT connection (true/false)",
87-
"DefaultValue": "true",
86+
"Description": "Whether to retain/persist the latest message (per key) on the MQTT topic.",
87+
"DefaultValue": "false",
8888
"Required": false
8989
},
9090
{
91-
"Name": "consumer_group_name",
91+
"Name": "CONSUMER_GROUP",
9292
"Type": "EnvironmentVariable",
9393
"InputType": "FreeText",
9494
"Description": "Kafka consumer group name for the MQTT destination",

python/destinations/MQTT/main.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1-
from mqtt import MQTTSink
21
from quixstreams import Application
2+
from quixstreams.sinks.community.mqtt import MQTTSink
33
import os
44

55
# Load environment variables (useful when working locally)
66
# from dotenv import load_dotenv
77
# load_dotenv()
88

9-
app = Application(consumer_group="mqtt_consumer_group", auto_offset_reset="earliest")
10-
input_topic = app.topic(os.environ["input"], value_deserializer="double")
9+
app = Application(
10+
consumer_group=os.getenv("CONSUMER_GROUP", "mqtt_consumer_group"),
11+
auto_offset_reset="earliest"
12+
)
13+
input_topic = app.topic(os.environ["input"])
1114

1215
sink = MQTTSink(
1316
client_id=os.environ["MQTT_CLIENT_ID"],
@@ -17,6 +20,7 @@
1720
username=os.environ["MQTT_USERNAME"],
1821
password=os.environ["MQTT_PASSWORD"],
1922
version=os.environ["MQTT_VERSION"],
23+
retain=os.getenv("MQTT_RETAIN_MESSAGES", "false").lower() == "true",
2024
tls_enabled=os.environ["MQTT_USE_TLS"].lower() == "true"
2125
)
2226

python/destinations/MQTT/mqtt.py

Lines changed: 0 additions & 254 deletions
This file was deleted.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
quixstreams==3.23.1
1+
quixstreams[mqtt]==3.23.1
22
python-dotenv

tests/destinations/MQTT/docker-compose.test.yml

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@
22
services:
33
mqtt-broker:
44
image: eclipse-mosquitto:latest
5+
environment:
6+
- MQTT_USERNAME=testuser
7+
- MQTT_PASSWORD=testpass
8+
entrypoint: ["/bin/sh", "/init-mqtt.sh"]
59
networks:
610
- test-network
711
volumes:
812
- ./mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
13+
- ./init-mqtt.sh:/init-mqtt.sh:ro
914
healthcheck:
10-
test: ["CMD-SHELL", "mosquitto_sub -t '$$SYS/#' -C 1 -i healthcheck -W 3 || exit 1"]
15+
test: ["CMD-SHELL", "mosquitto_sub -t '$$SYS/#' -C 1 -i healthcheck -W 3 -u testuser -P testpass || exit 1"]
1116
interval: 3s
1217
timeout: 5s
1318
retries: 10
@@ -35,13 +40,17 @@ services:
3540
dockerfile: Dockerfile
3641
environment:
3742
- Quix__Broker__Address=kafka:9092
38-
- consumer_group_name=mqtt-test-consumer
3943
- input=test-mqtt-input
40-
- mqtt_server=mqtt-broker
41-
- mqtt_port=1883
42-
- mqtt_topic_root=test/output
43-
- mqtt_version=3.1.1
44-
- mqtt_tls_enabled=false
44+
- CONSUMER_GROUP=mqtt-test-consumer
45+
- MQTT_CLIENT_ID=test_client
46+
- MQTT_SERVER=mqtt-broker
47+
- MQTT_PORT=1883
48+
- MQTT_TOPIC_ROOT=test/output
49+
- MQTT_VERSION=3.1.1
50+
- MQTT_USE_TLS=false
51+
- MQTT_USERNAME=testuser
52+
- MQTT_PASSWORD=testpass
53+
- MQTT_RETAIN_MESSAGES=true
4554
networks:
4655
- test-network
4756
depends_on:
@@ -61,6 +70,8 @@ services:
6170
- MQTT_BROKER=mqtt-broker
6271
- MQTT_PORT=1883
6372
- MQTT_TOPIC=test/output/#
73+
- MQTT_USERNAME=testuser
74+
- MQTT_PASSWORD=testpass
6475
command: >
6576
sh -c "
6677
echo 'Installing MQTT client...' &&
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#!/bin/sh
2+
# Initialize MQTT broker with authentication
3+
4+
MQTT_USERNAME="${MQTT_USERNAME:-testuser}"
5+
MQTT_PASSWORD="${MQTT_PASSWORD:-testpass}"
6+
CONFIG_DIR="/app/state/mosquitto/config"
7+
LOG_DIR="/app/state/mosquitto/log"
8+
DATA_DIR="/app/state/mosquitto/data"
9+
10+
echo "Setting up MQTT broker directories..."
11+
mkdir -p "$CONFIG_DIR" "$LOG_DIR" "$DATA_DIR"
12+
13+
echo "Creating password file for user: $MQTT_USERNAME"
14+
mosquitto_passwd -b -c "$CONFIG_DIR/passwd" "$MQTT_USERNAME" "$MQTT_PASSWORD"
15+
16+
echo "Setting permissions for mosquitto user..."
17+
chown -R mosquitto:mosquitto /app/state/mosquitto
18+
chmod -R 755 /app/state/mosquitto
19+
20+
echo "Starting Mosquitto MQTT broker..."
21+
exec mosquitto -c /mosquitto/config/mosquitto.conf
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,6 @@
11
listener 1883
2-
allow_anonymous true
2+
allow_anonymous false
3+
persistence true
4+
persistence_location /app/state/mosquitto/data/
5+
log_dest file /app/state/mosquitto/log/mosquitto.log
6+
password_file /app/state/mosquitto/config/passwd

0 commit comments

Comments
 (0)