From d11c3eb92c47bb139302bfba5354eee8df22f4d9 Mon Sep 17 00:00:00 2001 From: Francis Tang Date: Sun, 25 Sep 2022 19:11:14 +0100 Subject: [PATCH 1/2] Add an ugly HTML handler for / --- cmd/ndt7-prometheus-exporter/main.go | 6 + cmd/ndt7-prometheus-exporter/web.go | 153 +++++++++++++++++++++++ cmd/ndt7-prometheus-exporter/web_test.go | 61 +++++++++ 3 files changed, 220 insertions(+) create mode 100644 cmd/ndt7-prometheus-exporter/web.go create mode 100644 cmd/ndt7-prometheus-exporter/web_test.go diff --git a/cmd/ndt7-prometheus-exporter/main.go b/cmd/ndt7-prometheus-exporter/main.go index e843a7e..ba55963 100644 --- a/cmd/ndt7-prometheus-exporter/main.go +++ b/cmd/ndt7-prometheus-exporter/main.go @@ -90,6 +90,7 @@ var ( flagPeriodMax = flag.Duration("period_max", 15 * time.Hour, "maximum period, e.g. 15h, between speed tests, when running in daemon mode") flagPort = flag.Int("port", 0, "if non-zero, start an HTTP server on this port to export prometheus metrics") + flagMaxHistory = flag.Int("max_history", 10, "Number of results to keep in memory") ) func init() { @@ -216,6 +217,11 @@ func main() { e = emitter.NewPrometheus(e, dlThroughput, dlLatency, ulThroughput, ulLatency, lastResultGauge) http.Handle("/metrics", promhttp.Handler()) + + h := newStatusHandler(e, *flagMaxHistory) + e = h + http.Handle("/", h.handler()) + go func() { log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *flagPort), nil)) }() diff --git a/cmd/ndt7-prometheus-exporter/web.go b/cmd/ndt7-prometheus-exporter/web.go new file mode 100644 index 0000000..6ced9c4 --- /dev/null +++ b/cmd/ndt7-prometheus-exporter/web.go @@ -0,0 +1,153 @@ +package main + +import ( + "fmt" + "net/http" + "io" + "strings" + "sync" + "time" + + "github.com/m-lab/ndt7-client-go/internal/emitter" + "github.com/m-lab/ndt7-client-go/spec" +) + +type entry struct { + completion time.Time + summary emitter.Summary +} + +type circularQueue struct { + sync.Mutex + entries []entry + start, count int +} + +func newCircularQueue(maxSize int) *circularQueue { + return &circularQueue{ + entries: make([]entry, maxSize), + start: 0, + count: 0, + } +} + +func (q *circularQueue) internalPop() { + if q.count == 0 { + return + } + + q.start = (q.start + 1) % len(q.entries) + q.count-- +} + +func (q *circularQueue) push(s entry) { + q.Lock() + defer q.Unlock() + + if q.count >= len(q.entries) { + q.internalPop() + } + + i := (q.start + q.count) % len(q.entries) + q.entries[i] = s + q.count++ +} + +func (q *circularQueue) forEachReversed(f func(entry)) { + var copy []entry + func() { + q.Lock() + defer q.Unlock() + + copy = make([]entry, q.count) + for i := 0; i < q.count; i++ { + j := (i + q.start) % len(q.entries) + copy[q.count - i - 1] = q.entries[j] + } + }() + + for _, entry := range copy { + f(entry) + } +} + +type statusHandler struct { + emitter emitter.Emitter + results *circularQueue +} + +func newStatusHandler(e emitter.Emitter, maxSize int) *statusHandler { + return &statusHandler{ + emitter: e, + results: newCircularQueue(maxSize), + } +} + +// OnStarting emits the starting event +func (h *statusHandler) OnStarting(test spec.TestKind) error { + return h.emitter.OnStarting(test) +} + +// OnError emits the error event +func (h *statusHandler) OnError(test spec.TestKind, err error) error { + return h.emitter.OnError(test, err) +} + +// OnConnected emits the connected event +func (h *statusHandler) OnConnected(test spec.TestKind, fqdn string) error { + return h.emitter.OnConnected(test, fqdn) +} + +// OnDownloadEvent handles an event emitted during the download +func (h *statusHandler) OnDownloadEvent(m *spec.Measurement) error { + return h.emitter.OnDownloadEvent(m) +} + +// OnUploadEvent handles an event emitted during the upload +func (h *statusHandler) OnUploadEvent(m *spec.Measurement) error { + return h.emitter.OnUploadEvent(m) +} + +// OnComplete is the event signalling the end of the test +func (h *statusHandler) OnComplete(test spec.TestKind) error { + return h.emitter.OnComplete(test) +} + +// OnSummary handles the summary event, emitted after the test is over. +func (h *statusHandler) OnSummary(s *emitter.Summary) error { + h.results.push(entry{time.Now(), *s}) + + return h.emitter.OnSummary(s) +} + +// Part of http.Handler interface +func (h *statusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + io.WriteString(w, ` + +NDT7 Prometheus Exporter + +

A non-interactive NDT7 client

+

Metrics

+ +`) + + h.results.forEachReversed(func(entry entry) { + b := &strings.Builder{} + io.WriteString(b, "\n") + fmt.Fprintf(b, "\n") + io.WriteString(b, "\n") + io.WriteString(w, b.String()) + }) + + io.WriteString(w, `
%s
\n", entry.completion.Format(time.RFC3339))
+		e := emitter.NewHumanReadableWithWriter(b)
+		e.OnSummary(&entry.summary)
+		io.WriteString(b, "\n
+ + +`) +} + +func (h *statusHandler) handler() http.Handler { + return h +} diff --git a/cmd/ndt7-prometheus-exporter/web_test.go b/cmd/ndt7-prometheus-exporter/web_test.go new file mode 100644 index 0000000..1ec40b6 --- /dev/null +++ b/cmd/ndt7-prometheus-exporter/web_test.go @@ -0,0 +1,61 @@ +package main + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/m-lab/ndt7-client-go/internal/emitter" +) + +func TestCircularQueue(t * testing.T) { + fakeSummary := func(fqdn string) emitter.Summary { + return emitter.Summary{ + ServerFQDN: fqdn, + } + } + + tests := []struct{ + size int + input []emitter.Summary + want []string + }{ + { + 2, + []emitter.Summary{}, + []string{}, + }, + { + 2, + []emitter.Summary{fakeSummary("zero")}, + []string{"0 zero"}, + }, + { + 2, + []emitter.Summary{fakeSummary("zero"), fakeSummary("one")}, + []string{"1 one", "0 zero"}, + }, + { + 2, + []emitter.Summary{fakeSummary("zero"), fakeSummary("one"), fakeSummary("two")}, + []string{"2 two", "1 one"}, + }, + } + + for _, tc := range(tests) { + q := newCircularQueue(tc.size) + for i, s := range(tc.input) { + q.push(entry{time.Unix(int64(i), 0), s}) + } + + got := make([]string, 0) + q.forEachReversed(func(e entry) { + got = append(got, fmt.Sprintf("%d %s", e.completion.Unix(), e.summary.ServerFQDN)) + }) + + if !reflect.DeepEqual(tc.want, got) { + t.Fatalf("want %v; got %v", tc.want, got) + } + } +} From c34ff965fdabcf8bc0f43c323c1857af9e46a938 Mon Sep 17 00:00:00 2001 From: Francis Tang Date: Sun, 25 Sep 2022 19:26:28 +0100 Subject: [PATCH 2/2] Small readability improvements --- cmd/ndt7-prometheus-exporter/web.go | 14 ++++++++++- cmd/ndt7-prometheus-exporter/web_test.go | 32 ++++++++++++------------ 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/cmd/ndt7-prometheus-exporter/web.go b/cmd/ndt7-prometheus-exporter/web.go index 6ced9c4..15c53aa 100644 --- a/cmd/ndt7-prometheus-exporter/web.go +++ b/cmd/ndt7-prometheus-exporter/web.go @@ -19,8 +19,15 @@ type entry struct { type circularQueue struct { sync.Mutex + + // Underlying storage for queue elements entries []entry - start, count int + + // Index into current start of the queue + start int + + // Number of elements in the queue + count int } func newCircularQueue(maxSize int) *circularQueue { @@ -36,6 +43,7 @@ func (q *circularQueue) internalPop() { return } + // Assumes mutex is locked q.start = (q.start + 1) % len(q.entries) q.count-- } @@ -71,8 +79,12 @@ func (q *circularQueue) forEachReversed(f func(entry)) { } } +// statusHandler implements both emitter.Emitter and http.Handler interfaces type statusHandler struct { + // Chained emitter.Emitter emitter emitter.Emitter + + // A cache of recent test results results *circularQueue } diff --git a/cmd/ndt7-prometheus-exporter/web_test.go b/cmd/ndt7-prometheus-exporter/web_test.go index 1ec40b6..c339783 100644 --- a/cmd/ndt7-prometheus-exporter/web_test.go +++ b/cmd/ndt7-prometheus-exporter/web_test.go @@ -16,37 +16,37 @@ func TestCircularQueue(t * testing.T) { } } - tests := []struct{ + cases := []struct{ size int - input []emitter.Summary + input []string want []string }{ { - 2, - []emitter.Summary{}, - []string{}, + size: 2, + input: []string{}, + want: []string{}, }, { - 2, - []emitter.Summary{fakeSummary("zero")}, - []string{"0 zero"}, + size: 2, + input: []string{"zero"}, + want: []string{"0 zero"}, }, { - 2, - []emitter.Summary{fakeSummary("zero"), fakeSummary("one")}, - []string{"1 one", "0 zero"}, + size: 2, + input: []string{"zero", "one"}, + want: []string{"1 one", "0 zero"}, }, { - 2, - []emitter.Summary{fakeSummary("zero"), fakeSummary("one"), fakeSummary("two")}, - []string{"2 two", "1 one"}, + size: 2, + input: []string{"zero", "one", "two"}, + want: []string{"2 two", "1 one"}, }, } - for _, tc := range(tests) { + for _, tc := range(cases) { q := newCircularQueue(tc.size) for i, s := range(tc.input) { - q.push(entry{time.Unix(int64(i), 0), s}) + q.push(entry{time.Unix(int64(i), 0), fakeSummary(s)}) } got := make([]string, 0)