From 4e2751d57df5f47db572d1ab06c84bc86cf24f81 Mon Sep 17 00:00:00 2001 From: Atif Ali Date: Tue, 24 Aug 2021 18:53:17 -0700 Subject: [PATCH 1/5] send as json --- pkg/plugin/datasource.go | 12 +++++------- test_broker.js | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index fe0f6ce..59b63f9 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "time" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" @@ -179,13 +178,12 @@ func (ds *MQTTDatasource) SendMessage(msg mqtt.StreamMessage, req *backend.RunSt return nil } - message := mqtt.Message{ - Timestamp: time.Now(), - Value: msg.Value, + messageJSON, err := json.Marshal(msg.Value) + if err != nil { + log.DefaultLogger.Debug(fmt.Sprintf("Marshalling message for topic %s failed", msg.Topic)) + return nil } - frame := ToFrame(msg.Topic, []mqtt.Message{message}) - log.DefaultLogger.Debug(fmt.Sprintf("Sending message to client for topic %s", msg.Topic)) - return sender.SendFrame(frame, data.IncludeAll) + return sender.SendJSON(messageJSON) } diff --git a/test_broker.js b/test_broker.js index fbb057e..6cf5666 100644 --- a/test_broker.js +++ b/test_broker.js @@ -31,7 +31,7 @@ const createPublisher = ({ topic, qos }) => { cmd: 'publish', qos, retain: false, - payload: Math.random().toString(), + payload: Math.random(), }); }, interval); } From 4eacf5c787b8fbcd6f307b7780e30d3fd67ddb13 Mon Sep 17 00:00:00 2001 From: Atif Ali Date: Wed, 25 Aug 2021 01:00:19 -0700 Subject: [PATCH 2/5] pass raw json --- pkg/plugin/datasource.go | 3 ++- test_broker.js | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index 59b63f9..3417dd9 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -178,7 +178,8 @@ func (ds *MQTTDatasource) SendMessage(msg mqtt.StreamMessage, req *backend.RunSt return nil } - messageJSON, err := json.Marshal(msg.Value) + messageJSON, err := json.Marshal(json.RawMessage(msg.Value)) + if err != nil { log.DefaultLogger.Debug(fmt.Sprintf("Marshalling message for topic %s failed", msg.Topic)) return nil diff --git a/test_broker.js b/test_broker.js index 6cf5666..fbb057e 100644 --- a/test_broker.js +++ b/test_broker.js @@ -31,7 +31,7 @@ const createPublisher = ({ topic, qos }) => { cmd: 'publish', qos, retain: false, - payload: Math.random(), + payload: Math.random().toString(), }); }, interval); } From 4c7b39696c6bd9872e84fa4efe85f17842ad943b Mon Sep 17 00:00:00 2001 From: Atif Ali Date: Wed, 25 Aug 2021 14:52:11 -0700 Subject: [PATCH 3/5] no json marshal --- pkg/plugin/datasource.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index 3417dd9..f7e95ac 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -178,12 +178,7 @@ func (ds *MQTTDatasource) SendMessage(msg mqtt.StreamMessage, req *backend.RunSt return nil } - messageJSON, err := json.Marshal(json.RawMessage(msg.Value)) - - if err != nil { - log.DefaultLogger.Debug(fmt.Sprintf("Marshalling message for topic %s failed", msg.Topic)) - return nil - } + messageJSON := json.RawMessage(msg.Value) log.DefaultLogger.Debug(fmt.Sprintf("Sending message to client for topic %s", msg.Topic)) return sender.SendJSON(messageJSON) From 7533d56ba2a8d961eafef0a889b2ef493f9569a6 Mon Sep 17 00:00:00 2001 From: Atif Ali Date: Fri, 22 Oct 2021 15:20:57 -0700 Subject: [PATCH 4/5] return empty frame on query --- pkg/plugin/datasource.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index f7e95ac..623dde4 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -156,12 +156,7 @@ func (ds *MQTTDatasource) Query(query backend.DataQuery) backend.DataResponse { // ensure the client is subscribed to the topic. ds.Client.Subscribe(qm.Topic) - messages, ok := ds.Client.Messages(qm.Topic) - if !ok { - return response - } - - frame := ToFrame(qm.Topic, messages) + frame := data.NewFrame(qm.Topic) if qm.Topic != "" { frame.SetMeta(&data.FrameMeta{ From 19f11e903942a464ab4297152c1fb6ae77145e4e Mon Sep 17 00:00:00 2001 From: Atif Ali Date: Tue, 26 Oct 2021 12:10:40 -0700 Subject: [PATCH 5/5] add imu stub in test broker --- test_broker.js | 50 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/test_broker.js b/test_broker.js index fbb057e..de7f8d7 100644 --- a/test_broker.js +++ b/test_broker.js @@ -12,7 +12,55 @@ const toMillis = { }; const createPublisher = ({ topic, qos }) => { - let i = 0; + if (topic == "imu") { + let i = 0; + console.log(`publishing to topic:`, topic); + publishers[topic] = setInterval(() => { + var j = { + "sample": i, + "properties": ["acceleration", "velocity"], + "sensors": ["accelerometer", "gyroscope"], + "axes": ["x", "y", "z"], + "sensor": { + "accelerometer": { + "x": Math.random(), + "y": Math.random(), + "z": Math.random() + }, + "gyroscope": { + "x": Math.random(), + "y": Math.random(), + "z": Math.random() + } + }, + "axis": { + "x": { + "accelerometer": Math.random(), + "gyroscope": Math.random() + }, + "y": { + "accelerometer": Math.random(), + "gyroscope": Math.random() + }, + "z": { + "accelerometer": Math.random(), + "gyroscope": Math.random() + } + } + }; + var out = JSON.stringify(j); + aedes.publish({ + topic, + cmd: 'publish', + qos, + retain: false, + payload: out, + }); + i+=1; + }, 1000); + return + } + const [duration, value] = topic.split('/'); const fn = toMillis[duration];