A minimal Go MQTT control-plane service that subscribes to events from agent nodes and publishes commands back via MQTT.
- MQTT v5 client with automatic reconnection
- Shared subscriptions for horizontal scaling
- QoS 1 for reliable message delivery
- HTTP API for publishing commands to nodes
- TLS/mTLS support
- Graceful shutdown handling
- Structured JSON logging
- Go 1.22+
- MQTT broker (e.g., Mosquitto)
go build -o nr-mqtt ./cmd/nr-mqttConfiguration is done via environment variables:
| Variable | Description | Default |
|---|---|---|
MQTT_URL |
MQTT broker URL (e.g., ssl://broker:8883 or tcp://broker:1883) |
tcp://localhost:1883 |
MQTT_CLIENT_ID |
MQTT client ID | nr-control-<random> |
MQTT_USERNAME |
MQTT username (optional) | - |
MQTT_PASSWORD |
MQTT password (optional) | - |
MQTT_CA_FILE |
Path to CA certificate for TLS (optional) | - |
MQTT_CERT_FILE |
Path to client certificate for mTLS (optional) | - |
MQTT_KEY_FILE |
Path to client key for mTLS (optional) | - |
HTTP_ADDR |
HTTP server address | :8080 |
ORG_ID |
Default organization ID (optional) | - |
The service uses the following topic structure:
- Events (agent → control):
org/{orgId}/node/{nodeId}/events - Replies (agent → control):
org/{orgId}/node/{nodeId}/reply - Commands (control → agent):
org/{orgId}/node/{nodeId}/cmd - Broadcast (control → agents):
org/{orgId}/broadcast/cmd
The service subscribes using shared subscriptions for horizontal scaling:
$share/nr-listeners/org/+/node/+/events$share/nr-listeners/org/+/node/+/reply
Send a command to a specific node:
POST /org/{orgId}/node/{nodeId}/cmdRequest Body (JSON):
{
"type": "rate_limit",
"version": 1,
"correlation_id": "550e8400-e29b-41d4-a716-446655440000",
"issued_at": 1731416400,
"ttl_sec": 60,
"payload": {
"limit": 1000
}
}Response: 202 Accepted on success
Send a command to all nodes in an organization:
POST /org/{orgId}/broadcast/cmdRequest Body: Same as above
Response: 202 Accepted on success
Using Docker:
docker run -it -p 1883:1883 eclipse-mosquitto:latest mosquitto -c /mosquitto-no-auth.confOr install and run locally:
# macOS
brew install mosquitto
mosquitto -c /usr/local/etc/mosquitto/mosquitto.conf
# Linux
sudo apt-get install mosquitto
mosquitto -c /etc/mosquitto/mosquitto.confexport MQTT_URL="tcp://localhost:1883"
export HTTP_ADDR=":8080"
./nr-mqttcurl -X POST http://localhost:8080/org/my-org/node/node-123/cmd \
-H "Content-Type: application/json" \
-d '{
"type": "reload_cfg",
"version": 1,
"correlation_id": "550e8400-e29b-41d4-a716-446655440000",
"issued_at": '$(date +%s)',
"ttl_sec": 60,
"payload": {}
}'curl -X POST http://localhost:8080/org/my-org/broadcast/cmd \
-H "Content-Type: application/json" \
-d '{
"type": "health_check",
"version": 1,
"correlation_id": "550e8400-e29b-41d4-a716-446655440001",
"issued_at": '$(date +%s)',
"ttl_sec": 60,
"payload": {}
}'docker build -t nr-mqtt:latest .docker run -d \
--name nr-mqtt \
-p 8080:8080 \
-e MQTT_URL="tcp://mqtt-broker:1883" \
-e MQTT_USERNAME="user" \
-e MQTT_PASSWORD="pass" \
nr-mqtt:latestdocker run -d \
--name nr-mqtt \
-p 8080:8080 \
-v /path/to/certs:/certs:ro \
-e MQTT_URL="ssl://mqtt-broker:8883" \
-e MQTT_CA_FILE="/certs/ca.crt" \
-e MQTT_CERT_FILE="/certs/client.crt" \
-e MQTT_KEY_FILE="/certs/client.key" \
nr-mqtt:latestCommands must be valid JSON with the following required fields:
type(string): Command type (e.g., "rate_limit", "isolate", "block", "reload_cfg")version(int): Message schema versioncorrelation_id(string): Unique identifier for request/response correlationissued_at(int64): Unix timestamp when command was issuedttl_sec(int): Time-to-live in seconds (commands are rejected if expired)payload(object): Command-specific payload (opaque to control-plane)
The service outputs structured JSON logs to stdout:
{"time":"2024-11-12T10:30:00Z","level":"INFO","msg":"MQTT connection established","client_id":"nr-control-123","url":"tcp://localhost:1883"}
{"time":"2024-11-12T10:30:01Z","level":"INFO","msg":"Received message","topic":"org/my-org/node/node-123/events","length":256}
{"time":"2024-11-12T10:30:02Z","level":"INFO","msg":"Command published","topic":"org/my-org/node/node-123/cmd","length":128}The service handles SIGTERM and SIGINT signals for graceful shutdown:
- Stops accepting new HTTP requests
- Waits for in-flight requests to complete (10s timeout)
- Disconnects from MQTT broker cleanly
- Exits
.
├── cmd/
│ └── nr-mqtt/
│ └── main.go # Application entry point
├── internal/
│ ├── config/
│ │ └── config.go # Configuration loading
│ ├── http/
│ │ └── server.go # HTTP server
│ ├── model/
│ │ └── command.go # Command model and validation
│ └── mqtt/
│ └── client.go # MQTT client
├── Dockerfile # Container image definition
├── README.md # This file
├── go.mod # Go module definition
└── go.sum # Go module checksums
Proprietary