From bdbdb5b026effb3ef08bbde77f413e1d35c9798c Mon Sep 17 00:00:00 2001 From: Tom Fanella Date: Wed, 22 Apr 2026 16:41:17 -0400 Subject: [PATCH 1/4] Handle PLI --- track.go | 49 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/track.go b/track.go index f6494a6..9a535be 100644 --- a/track.go +++ b/track.go @@ -27,16 +27,20 @@ import ( "github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4/pkg/media" + "github.com/livekit/protocol/logger" lksdk "github.com/livekit/server-sdk-go/v2" ) +const minKeyframeRequestInterval = 500 * time.Millisecond + type publisherTrack struct { - track *lksdk.LocalTrack - sink *app.Sink - mimeType string - publication *lksdk.LocalTrackPublication - isEnded atomic.Bool - onEOS func() + track *lksdk.LocalTrack + sink *app.Sink + mimeType string + publication *lksdk.LocalTrackPublication + isEnded atomic.Bool + onEOS func() + lastKeyframeRequestNs atomic.Int64 } func createPublisherTrack(mimeType string) (*publisherTrack, error) { @@ -132,5 +136,36 @@ func (t *publisherTrack) handleSample(sink *app.Sink) gst.FlowReturn { } func (t *publisherTrack) onRTCP(packet rtcp.Packet) { - // TODO: handle PLI by instructing the encoder to send a keyframe + switch packet.(type) { + case *rtcp.PictureLossIndication, *rtcp.FullIntraRequest: + t.forceKeyframe() + } +} + +func (t *publisherTrack) forceKeyframe() { + if t.mimeType == webrtc.MimeTypeOpus { + return + } + + now := time.Now().UnixNano() + last := t.lastKeyframeRequestNs.Load() + if now-last < int64(minKeyframeRequestInterval) { + return + } + if !t.lastKeyframeRequestNs.CompareAndSwap(last, now) { + return + } + + s := gst.NewStructure("GstForceKeyUnit") + if err := s.SetValue("all-headers", true); err != nil { + logger.Debugw("failed to set all-headers on force-keyframe event", "error", err) + } + ev := gst.NewCustomEvent(gst.EventTypeCustomUpstream, s) + pad := t.sink.GetStaticPad("sink") + if pad == nil { + return + } + if ok := pad.SendEvent(ev); !ok { + logger.Debugw("force-keyframe event was not handled by pipeline") + } } From c165a0b9388ed2f3b2d41796dab993e4d7c557f5 Mon Sep 17 00:00:00 2001 From: Tom Fanella Date: Wed, 22 Apr 2026 17:27:19 -0400 Subject: [PATCH 2/4] Logs + potential fix --- track.go | 47 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/track.go b/track.go index 9a535be..afda621 100644 --- a/track.go +++ b/track.go @@ -136,36 +136,67 @@ func (t *publisherTrack) handleSample(sink *app.Sink) gst.FlowReturn { } func (t *publisherTrack) onRTCP(packet rtcp.Packet) { - switch packet.(type) { - case *rtcp.PictureLossIndication, *rtcp.FullIntraRequest: - t.forceKeyframe() + switch p := packet.(type) { + case *rtcp.PictureLossIndication: + logger.Infow("received PLI", "mimeType", t.mimeType, "senderSSRC", p.SenderSSRC, "mediaSSRC", p.MediaSSRC) + t.forceKeyframe("PLI") + case *rtcp.FullIntraRequest: + logger.Infow("received FIR", "mimeType", t.mimeType, "senderSSRC", p.SenderSSRC, "mediaSSRC", p.MediaSSRC) + t.forceKeyframe("FIR") + default: + logger.Debugw("received RTCP packet", "mimeType", t.mimeType, "type", fmt.Sprintf("%T", packet)) } } -func (t *publisherTrack) forceKeyframe() { +func (t *publisherTrack) forceKeyframe(reason string) { if t.mimeType == webrtc.MimeTypeOpus { + logger.Debugw("ignoring keyframe request on audio track", "reason", reason) return } now := time.Now().UnixNano() last := t.lastKeyframeRequestNs.Load() if now-last < int64(minKeyframeRequestInterval) { + logger.Infow("debouncing keyframe request", + "reason", reason, + "sinceLastMs", (now-last)/int64(time.Millisecond), + "minIntervalMs", int64(minKeyframeRequestInterval/time.Millisecond)) return } if !t.lastKeyframeRequestNs.CompareAndSwap(last, now) { + logger.Debugw("another goroutine is handling keyframe request", "reason", reason) return } s := gst.NewStructure("GstForceKeyUnit") if err := s.SetValue("all-headers", true); err != nil { - logger.Debugw("failed to set all-headers on force-keyframe event", "error", err) + logger.Warnw("failed to set all-headers on force-keyframe event", err) } ev := gst.NewCustomEvent(gst.EventTypeCustomUpstream, s) + pad := t.sink.GetStaticPad("sink") if pad == nil { + logger.Warnw("appsink has no sink pad", nil, "reason", reason) return } - if ok := pad.SendEvent(ev); !ok { - logger.Debugw("force-keyframe event was not handled by pipeline") - } + + peer := pad.GetPeer() + peerName := "" + if peer != nil { + if peerParent := peer.GetParentElement(); peerParent != nil { + peerName = peerParent.GetName() + } + } + + // PushEvent forwards the event to the pad's peer. For the appsink's sink pad, + // the peer is the src pad of the upstream element, so the upstream + // force-key-unit event travels upstream toward the encoder. Using + // gst_pad_send_event (our previous approach) is rejected by GStreamer as + // "wrong direction" for a sink pad + upstream event. + ok := pad.PushEvent(ev) + logger.Infow("pushed force-keyframe event upstream", + "reason", reason, + "mimeType", t.mimeType, + "peer", peerName, + "handled", ok) } From 60dcb3ed3bf4fa11b30e308e1ff553e8da4a1325 Mon Sep 17 00:00:00 2001 From: Tom Fanella Date: Wed, 22 Apr 2026 17:47:37 -0400 Subject: [PATCH 3/4] Remove extra logs --- track.go | 51 ++++++++++++--------------------------------------- 1 file changed, 12 insertions(+), 39 deletions(-) diff --git a/track.go b/track.go index afda621..5568932 100644 --- a/track.go +++ b/track.go @@ -136,67 +136,40 @@ func (t *publisherTrack) handleSample(sink *app.Sink) gst.FlowReturn { } func (t *publisherTrack) onRTCP(packet rtcp.Packet) { - switch p := packet.(type) { - case *rtcp.PictureLossIndication: - logger.Infow("received PLI", "mimeType", t.mimeType, "senderSSRC", p.SenderSSRC, "mediaSSRC", p.MediaSSRC) - t.forceKeyframe("PLI") - case *rtcp.FullIntraRequest: - logger.Infow("received FIR", "mimeType", t.mimeType, "senderSSRC", p.SenderSSRC, "mediaSSRC", p.MediaSSRC) - t.forceKeyframe("FIR") - default: - logger.Debugw("received RTCP packet", "mimeType", t.mimeType, "type", fmt.Sprintf("%T", packet)) + switch packet.(type) { + case *rtcp.PictureLossIndication, *rtcp.FullIntraRequest: + t.forceKeyframe() } } -func (t *publisherTrack) forceKeyframe(reason string) { +func (t *publisherTrack) forceKeyframe() { if t.mimeType == webrtc.MimeTypeOpus { - logger.Debugw("ignoring keyframe request on audio track", "reason", reason) return } now := time.Now().UnixNano() last := t.lastKeyframeRequestNs.Load() if now-last < int64(minKeyframeRequestInterval) { - logger.Infow("debouncing keyframe request", - "reason", reason, - "sinceLastMs", (now-last)/int64(time.Millisecond), - "minIntervalMs", int64(minKeyframeRequestInterval/time.Millisecond)) return } if !t.lastKeyframeRequestNs.CompareAndSwap(last, now) { - logger.Debugw("another goroutine is handling keyframe request", "reason", reason) return } s := gst.NewStructure("GstForceKeyUnit") - if err := s.SetValue("all-headers", true); err != nil { - logger.Warnw("failed to set all-headers on force-keyframe event", err) - } + _ = s.SetValue("all-headers", true) ev := gst.NewCustomEvent(gst.EventTypeCustomUpstream, s) pad := t.sink.GetStaticPad("sink") if pad == nil { - logger.Warnw("appsink has no sink pad", nil, "reason", reason) return } - peer := pad.GetPeer() - peerName := "" - if peer != nil { - if peerParent := peer.GetParentElement(); peerParent != nil { - peerName = peerParent.GetName() - } - } - - // PushEvent forwards the event to the pad's peer. For the appsink's sink pad, - // the peer is the src pad of the upstream element, so the upstream - // force-key-unit event travels upstream toward the encoder. Using - // gst_pad_send_event (our previous approach) is rejected by GStreamer as - // "wrong direction" for a sink pad + upstream event. - ok := pad.PushEvent(ev) - logger.Infow("pushed force-keyframe event upstream", - "reason", reason, - "mimeType", t.mimeType, - "peer", peerName, - "handled", ok) + // PushEvent forwards the event to the pad's peer. For the appsink's sink + // pad, the peer is the src pad of the upstream element, so the upstream + // force-key-unit event travels upstream toward the encoder. SendEvent + // would be rejected by GStreamer as "wrong direction" on a sink pad. + if ok := pad.PushEvent(ev); !ok { + logger.Warnw("force-keyframe event was not handled by pipeline", nil) + } } From 5aaded65187a6fc9427b7ce7c0bf612066dc9e03 Mon Sep 17 00:00:00 2001 From: Tom Fanella Date: Wed, 22 Apr 2026 18:14:47 -0400 Subject: [PATCH 4/4] Join room and start pipeline --- go.mod | 2 +- publish.go | 78 ++++++++++++++++++++++++++++++------------------------ track.go | 6 ++--- 3 files changed, 48 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index b2dcdeb..2ac3e59 100644 --- a/go.mod +++ b/go.mod @@ -79,7 +79,7 @@ require ( golang.org/x/crypto v0.43.0 // indirect golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b // indirect golang.org/x/net v0.46.0 // indirect - golang.org/x/sync v0.17.0 // indirect + golang.org/x/sync v0.17.0 golang.org/x/sys v0.37.0 // indirect golang.org/x/text v0.30.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20251014184007-4626949a642f // indirect diff --git a/publish.go b/publish.go index d5ec92f..3a7e78c 100644 --- a/publish.go +++ b/publish.go @@ -23,6 +23,7 @@ import ( "github.com/go-gst/go-glib/glib" "github.com/go-gst/go-gst/gst" + "golang.org/x/sync/errgroup" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -74,47 +75,56 @@ func (p *Publisher) Start() error { return err } - // TODO: connect at the same time in parallel as spinning up pipeline - cb := lksdk.NewRoomCallback() - cb.OnDisconnected = func() { - // TODO: stop publishing and exit - } - p.room = lksdk.NewRoom(cb) - err := p.room.JoinWithToken(p.params.URL, p.params.Token, - lksdk.WithAutoSubscribe(false), - ) - if err != nil { - return err - } + var g errgroup.Group - // publish tracks if sinks are set up - if p.videoTrack != nil { - pub, err := p.room.LocalParticipant.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{ - Source: livekit.TrackSource_CAMERA, - }) - if err != nil { - return err + g.Go(func() error { + cb := lksdk.NewRoomCallback() + cb.OnDisconnected = func() { + // TODO: stop publishing and exit } - p.videoTrack.publication = pub - p.videoTrack.onEOS = func() { - _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) + p.room = lksdk.NewRoom(cb) + if err := p.room.JoinWithToken(p.params.URL, p.params.Token, + lksdk.WithAutoSubscribe(false), + ); err != nil { + return err } - } - if p.audioTrack != nil { - pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{ - Source: livekit.TrackSource_MICROPHONE, - }) - if err != nil { - return err + if p.videoTrack != nil { + pub, err := p.room.LocalParticipant.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{ + Source: livekit.TrackSource_CAMERA, + }) + if err != nil { + return err + } + p.videoTrack.publication = pub + onEOS := func() { + _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) + } + p.videoTrack.onEOS.Store(&onEOS) } - p.audioTrack.publication = pub - p.audioTrack.onEOS = func() { - _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) + + if p.audioTrack != nil { + pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{ + Source: livekit.TrackSource_MICROPHONE, + }) + if err != nil { + return err + } + p.audioTrack.publication = pub + onEOS := func() { + _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) + } + p.audioTrack.onEOS.Store(&onEOS) } - } + return nil + }) - if err := p.pipeline.Start(); err != nil { + g.Go(func() error { + return p.pipeline.Start() + }) + + if err := g.Wait(); err != nil { + p.Stop() return err } diff --git a/track.go b/track.go index 5568932..cab7693 100644 --- a/track.go +++ b/track.go @@ -39,7 +39,7 @@ type publisherTrack struct { mimeType string publication *lksdk.LocalTrackPublication isEnded atomic.Bool - onEOS func() + onEOS atomic.Pointer[func()] lastKeyframeRequestNs atomic.Int64 } @@ -94,8 +94,8 @@ func (t *publisherTrack) IsEnded() bool { // callback function when EOS is received func (t *publisherTrack) handleEOS(_ *app.Sink) { t.isEnded.Store(true) - if t.onEOS != nil { - t.onEOS() + if cb := t.onEOS.Load(); cb != nil { + (*cb)() } }