From b7c005bb0a565ae0e0cf33019e806ef8d58b5d07 Mon Sep 17 00:00:00 2001 From: hug0 Date: Thu, 23 Oct 2025 15:40:12 +0200 Subject: [PATCH 1/4] streamd: update status in pipeline struct Implement the required infrastructure to read out the pipeline's state from the HTTP API. --- streamd/gstreamer_bus.go | 11 +++++++++++ streamd/gstreamer_pipeline.go | 11 ++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/streamd/gstreamer_bus.go b/streamd/gstreamer_bus.go index c206c75..073b238 100644 --- a/streamd/gstreamer_bus.go +++ b/streamd/gstreamer_bus.go @@ -24,6 +24,16 @@ func (d *daemon) registerBusWatch() bool { return p.GetBus().AddWatch(func(msg *gst.Message) bool { switch msg.Type() { + case gst.MessageStateChanged: + if msg.Source() == d.pipeline.name { + _, newState := msg.ParseStateChanged() + stateString := newState.String() + + d.mu.Lock() + d.pipeline.state = stateString + d.mu.Unlock() + } + klog.Info(msg) case gst.MessageEOS: // When end-of-stream is received stop the main loop p.BlockSetState(gst.StateNull) d.mainloop.Quit() @@ -34,6 +44,7 @@ func (d *daemon) registerBusWatch() bool { d.mu.Lock() d.metrics.pipelineStats.warnings += 1 d.mu.Unlock() + klog.Info(msg) // When buffers arrive late in the sink, i.e. when their running-time is // smaller than that of the clock, we have a QoS problem // https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/qos.html?gi-language=c diff --git a/streamd/gstreamer_pipeline.go b/streamd/gstreamer_pipeline.go index e8aca9c..1b477ff 100644 --- a/streamd/gstreamer_pipeline.go +++ b/streamd/gstreamer_pipeline.go @@ -16,9 +16,16 @@ var capsStereo48Khz = audioCapsFilter{Mimetype: "audio/x-raw", Channels: 2, Rate // pipeline is the main AV processing pipeline type pipeline struct { + name string constructed bool pipeline *gst.Pipeline + // State is updated when a STATE_CHANGE message for this pipeline is + // received on the GStreamer bus. + // + // See gstreamer_bus.go + state string + camSrc *gst.Bin presentSrc *gst.Bin audioSrc *gst.Bin @@ -73,6 +80,8 @@ func getSRTStatistics(srtBin *gst.Bin) (*srtStats, error) { func newPipeline(d *daemonConfig) (*pipeline, error) { p := &pipeline{} + p.name = "Pipeline" + p.outputCaps = caps1920x1080p30 p.presentSrcCaps = caps1920x1080p30 p.camSrcCaps = caps1920x1080p30 @@ -189,7 +198,7 @@ func newPipeline(d *daemonConfig) (*pipeline, error) { } // Create main pipelines and link bins - p.pipeline, err = gst.NewPipeline("Pipeline") + p.pipeline, err = gst.NewPipeline(p.name) if err != nil { return nil, err } From 8a3c4468c6b25db6e83254dcd5358914b3f8ddcb Mon Sep 17 00:00:00 2001 From: hug0 Date: Thu, 23 Oct 2025 15:41:01 +0200 Subject: [PATCH 2/4] streamd: expose state in daemonController --- streamd/main.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/streamd/main.go b/streamd/main.go index 3ae966f..cee508b 100644 --- a/streamd/main.go +++ b/streamd/main.go @@ -72,6 +72,7 @@ type daemonState struct { type daemonController interface { metricsSnapshot() metrics graph(details gst.DebugGraphDetails) string + state() string srtStatistics() ([]*srtStats, error) } @@ -98,6 +99,16 @@ func (d *daemon) srtStatistics() ([]*srtStats, error) { return []*srtStats{combStats, presentStats, camStats}, nil } +func (d *daemon) state() string { + var state string + + d.mu.Lock() + state = d.pipeline.state + d.mu.Unlock() + + return state +} + // get a snapshot of the current metrics func (d *daemon) metricsSnapshot() metrics { d.mu.Lock() From e9fe7a46b169613fbf1d9a0657b3d01d556ba0d6 Mon Sep 17 00:00:00 2001 From: hug0 Date: Thu, 23 Oct 2025 15:41:20 +0200 Subject: [PATCH 3/4] streamd: expose new /state endpoint --- streamd/http.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/streamd/http.go b/streamd/http.go index 8da13d2..e946c42 100644 --- a/streamd/http.go +++ b/streamd/http.go @@ -224,7 +224,24 @@ func (h *httpServer) graph(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "text/vnd.graphviz") } +// Returns the state of the GStreamer pipeline +// +// Possible states are: +// - PLAYING: Pipeline is up and running. Sinks accept and render data. Live sources produce data. +// - PAUSED: Pipeline is set up but not actively processing data. +// - READY +// - NULL: Initial state after construction +// +// When implementing a simple status indicator, treat everything except PLAYING as 'inactive' or +// as a degraded service. A working (live) pipeline should always be in the PLAYING state. +func (h *httpServer) state(w http.ResponseWriter, _ *http.Request) { + state := h.daemonController.state() + w.Write([]byte(state)) + w.Header().Add("Content-Type", "text/plain") +} + func (h *httpServer) setupHTTPHandlers() { http.HandleFunc("/metrics", h.metrics) http.HandleFunc("/graph", h.graph) + http.HandleFunc("/state", h.state) } From d64bc4ac09c284f6ee84e97f55e01e2c2f050dfe Mon Sep 17 00:00:00 2001 From: hug0 Date: Thu, 23 Oct 2025 15:51:37 +0200 Subject: [PATCH 4/4] streamd: set initial pipeline state to NULL --- streamd/gstreamer_pipeline.go | 1 + 1 file changed, 1 insertion(+) diff --git a/streamd/gstreamer_pipeline.go b/streamd/gstreamer_pipeline.go index 1b477ff..b489774 100644 --- a/streamd/gstreamer_pipeline.go +++ b/streamd/gstreamer_pipeline.go @@ -81,6 +81,7 @@ func newPipeline(d *daemonConfig) (*pipeline, error) { p := &pipeline{} p.name = "Pipeline" + p.state = gst.StateNull.String() p.outputCaps = caps1920x1080p30 p.presentSrcCaps = caps1920x1080p30