Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions streamd/gstreamer_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ func (d *daemon) registerBusWatch() bool {

return p.GetBus().AddWatch(func(msg *gst.Message) bool {
switch msg.Type() {
case gst.MessageStateChanged:
if msg.Source() == d.pipeline.name {
_, newState := msg.ParseStateChanged()
stateString := newState.String()

d.mu.Lock()
d.pipeline.state = stateString
d.mu.Unlock()
}
klog.Info(msg)
case gst.MessageEOS: // When end-of-stream is received stop the main loop
p.BlockSetState(gst.StateNull)
d.mainloop.Quit()
Expand All @@ -34,6 +44,7 @@ func (d *daemon) registerBusWatch() bool {
d.mu.Lock()
d.metrics.pipelineStats.warnings += 1
d.mu.Unlock()
klog.Info(msg)
// When buffers arrive late in the sink, i.e. when their running-time is
// smaller than that of the clock, we have a QoS problem
// https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/qos.html?gi-language=c
Expand Down
12 changes: 11 additions & 1 deletion streamd/gstreamer_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@ var capsStereo48Khz = audioCapsFilter{Mimetype: "audio/x-raw", Channels: 2, Rate

// pipeline is the main AV processing pipeline
type pipeline struct {
name string
constructed bool
pipeline *gst.Pipeline

// State is updated when a STATE_CHANGE message for this pipeline is
// received on the GStreamer bus.
//
// See gstreamer_bus.go
state string

camSrc *gst.Bin
presentSrc *gst.Bin
audioSrc *gst.Bin
Expand Down Expand Up @@ -73,6 +80,9 @@ func getSRTStatistics(srtBin *gst.Bin) (*srtStats, error) {
func newPipeline(d *daemonConfig) (*pipeline, error) {
p := &pipeline{}

p.name = "Pipeline"
p.state = gst.StateNull.String()

p.outputCaps = caps1920x1080p30
p.presentSrcCaps = caps1920x1080p30
p.camSrcCaps = caps1920x1080p30
Expand Down Expand Up @@ -189,7 +199,7 @@ func newPipeline(d *daemonConfig) (*pipeline, error) {
}

// Create main pipelines and link bins
p.pipeline, err = gst.NewPipeline("Pipeline")
p.pipeline, err = gst.NewPipeline(p.name)
if err != nil {
return nil, err
}
Expand Down
17 changes: 17 additions & 0 deletions streamd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,24 @@ func (h *httpServer) graph(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "text/vnd.graphviz")
}

// Returns the state of the GStreamer pipeline
//
// Possible states are:
// - PLAYING: Pipeline is up and running. Sinks accept and render data. Live sources produce data.
// - PAUSED: Pipeline is set up but not actively processing data.
// - READY
// - NULL: Initial state after construction
//
// When implementing a simple status indicator, treat everything except PLAYING as 'inactive' or
// as a degraded service. A working (live) pipeline should always be in the PLAYING state.
func (h *httpServer) state(w http.ResponseWriter, _ *http.Request) {
state := h.daemonController.state()
w.Write([]byte(state))
w.Header().Add("Content-Type", "text/plain")
}

func (h *httpServer) setupHTTPHandlers() {
http.HandleFunc("/metrics", h.metrics)
http.HandleFunc("/graph", h.graph)
http.HandleFunc("/state", h.state)
}
11 changes: 11 additions & 0 deletions streamd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type daemonState struct {
type daemonController interface {
metricsSnapshot() metrics
graph(details gst.DebugGraphDetails) string
state() string
srtStatistics() ([]*srtStats, error)
}

Expand All @@ -98,6 +99,16 @@ func (d *daemon) srtStatistics() ([]*srtStats, error) {
return []*srtStats{combStats, presentStats, camStats}, nil
}

func (d *daemon) state() string {
var state string

d.mu.Lock()
state = d.pipeline.state
d.mu.Unlock()

return state
}

// get a snapshot of the current metrics
func (d *daemon) metricsSnapshot() metrics {
d.mu.Lock()
Expand Down