Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ require (
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b // indirect
golang.org/x/net v0.46.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sync v0.17.0
golang.org/x/sys v0.37.0 // indirect
golang.org/x/text v0.30.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251014184007-4626949a642f // indirect
Expand Down
78 changes: 44 additions & 34 deletions publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"golang.org/x/sync/errgroup"

"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
Expand Down Expand Up @@ -74,47 +75,56 @@ func (p *Publisher) Start() error {
return err
}

// TODO: connect at the same time in parallel as spinning up pipeline
cb := lksdk.NewRoomCallback()
cb.OnDisconnected = func() {
// TODO: stop publishing and exit
}
p.room = lksdk.NewRoom(cb)
err := p.room.JoinWithToken(p.params.URL, p.params.Token,
lksdk.WithAutoSubscribe(false),
)
if err != nil {
return err
}
var g errgroup.Group

// publish tracks if sinks are set up
if p.videoTrack != nil {
pub, err := p.room.LocalParticipant.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{
Source: livekit.TrackSource_CAMERA,
})
if err != nil {
return err
g.Go(func() error {
cb := lksdk.NewRoomCallback()
cb.OnDisconnected = func() {
// TODO: stop publishing and exit
}
p.videoTrack.publication = pub
p.videoTrack.onEOS = func() {
_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())
p.room = lksdk.NewRoom(cb)
if err := p.room.JoinWithToken(p.params.URL, p.params.Token,
lksdk.WithAutoSubscribe(false),
); err != nil {
return err
}
}

if p.audioTrack != nil {
pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{
Source: livekit.TrackSource_MICROPHONE,
})
if err != nil {
return err
if p.videoTrack != nil {
pub, err := p.room.LocalParticipant.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{
Source: livekit.TrackSource_CAMERA,
})
if err != nil {
return err
}
p.videoTrack.publication = pub
onEOS := func() {
_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())
}
p.videoTrack.onEOS.Store(&onEOS)
Comment on lines +100 to +103
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Possible nil-pointer race: onEOS closure dereferences p.room which Stop() nils.

The onEOS closure calls p.room.LocalParticipant.UnpublishTrack(pub.SID()), but Stop() (lines 148–151) sets p.room = nil after disconnecting. EOS can be triggered from the GStreamer thread (handleEOS in track.go) concurrently with Stop() being invoked from messageWatch (on MessageEOS/MessageError) or the signal goroutine, leaving a window where the closure dereferences a nil p.room.

Capture p.room (or just p.room.LocalParticipant) in the closure to make the callback self-contained, or guard the access:

🛡️ Proposed fix
-		if p.videoTrack != nil {
-			pub, err := p.room.LocalParticipant.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{
+		if p.videoTrack != nil {
+			lp := p.room.LocalParticipant
+			pub, err := lp.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{
 				Source: livekit.TrackSource_CAMERA,
 			})
 			if err != nil {
 				return err
 			}
 			p.videoTrack.publication = pub
 			onEOS := func() {
-				_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())
+				_ = lp.UnpublishTrack(pub.SID())
 			}
 			p.videoTrack.onEOS.Store(&onEOS)
 		}

(apply the same pattern to the audio block at lines 106–118.)

Also applies to: 114-117

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@publish.go` around lines 100 - 103, The onEOS closures assigned via
p.videoTrack.onEOS.Store and p.audioTrack.onEOS.Store capture p.room which
Stop() can nil, risking a nil-pointer race; fix by capturing the
LocalParticipant (e.g., lp := p.room.LocalParticipant) into a local variable
before creating the closure and use lp.UnpublishTrack(pub.SID()) inside the
closure (or check for nil lp inside the closure) so the callback is
self-contained and safe against concurrent Stop() niling p.room; apply the same
change to both the video and audio onEOS closures and ensure handleEOS in
track.go/messageWatch interactions remain safe.

}
p.audioTrack.publication = pub
p.audioTrack.onEOS = func() {
_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())

if p.audioTrack != nil {
pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{
Source: livekit.TrackSource_MICROPHONE,
})
if err != nil {
return err
}
p.audioTrack.publication = pub
onEOS := func() {
_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())
}
p.audioTrack.onEOS.Store(&onEOS)
}
}
return nil
})
Comment on lines +80 to +120
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Race: pipeline EOS can fire before onEOS callback is registered, causing per-track unpublish to be skipped

The parallelization of room connection and pipeline startup introduces a race where handleEOS (track.go:95-99) can fire from a GStreamer streaming thread before the room goroutine stores the onEOS callback via atomic.Pointer.Store (publish.go:103 / publish.go:117). In the old sequential code, onEOS was always set before pipeline.Start(), so this race didn't exist.

When handleEOS fires with onEOS still nil, isEnded is set to true but the per-track UnpublishTrack callback never executes. If the room goroutine later publishes the track and stores onEOS, that callback will never fire because GStreamer won't deliver EOS again. In a multi-track pipeline where one stream ends before the other (e.g., video ends but audio continues), the ended track remains published (producing no data) instead of being explicitly unpublished. Room-level cleanup in Stop() eventually handles it when the entire pipeline ends, but this is a behavioral regression from the sequential code.

Prompt for agents
The race exists because the pipeline starts producing data (and can reach EOS) concurrently with the room goroutine that registers the onEOS callback. One approach to fix this is to use errgroup.WithContext to get a context, but that alone won't solve the logical ordering issue.

A cleaner fix would be to keep the parallel startup but defer publishing/onEOS registration until after both goroutines succeed. For example:
1. Run room.JoinWithToken and pipeline.Start in parallel via errgroup
2. After g.Wait succeeds, publish the tracks and register onEOS callbacks sequentially

This preserves the parallelism benefit (room connect and pipeline startup overlap) while ensuring tracks are only published after the pipeline is confirmed running, and onEOS is set before any EOS can arrive from GStreamer (since the pipeline is in PLAYING state but any queued EOS messages would be processed on the main loop which hasn't started yet at that point).

Alternatively, if EOS can fire from streaming threads even after Start returns, you could have handleEOS check isEnded and then have the room goroutine check isEnded after storing onEOS, firing it manually if EOS already happened. This would close the race window.

Relevant code paths:
- publish.go: Start() method, the two g.Go closures
- track.go: handleEOS method, onEOS atomic pointer
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


if err := p.pipeline.Start(); err != nil {
g.Go(func() error {
return p.pipeline.Start()
})
Comment on lines +80 to +124
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

golang.org/x/sync errgroup error cancellation behavior without context

💡 Result:

In golang.org/x/sync/errgroup, a plain errgroup.Group (created as a zero value, without calling WithContext) has no context cancellation capability. When using g.Go(f), all submitted functions run to completion in their goroutines regardless of errors returned by other functions. The first non-nil error is captured and returned by g.Wait after all goroutines finish, but no cancellation occurs and remaining goroutines continue executing without interruption. This differs from errgroup.WithContext(ctx), where the group has an associated context that gets canceled (via the internal cancel func) when the first error is encountered. Goroutines can then check ctx.Err or use context-aware operations to exit early. Key evidence from source code: - Group struct has cancel func(error), which is nil in plain Group. - In Go/TryGo: if err != nil, g.errOnce.Do sets g.err and calls g.cancel(g.err) only if g.cancel != nil. - Wait calls g.wg.Wait (blocks until all done), then if g.cancel != nil calls g.cancel(g.err), then returns g.err. Plain Group behaves like sync.WaitGroup + error collection: always waits for all, no cancellation on error. Use WithContext explicitly for cancellation behavior.

Citations:


🏁 Script executed:

#!/bin/bash
# First, locate the publish.go file
find . -name "publish.go" -type f | head -5

Repository: MainSquare/gstreamer-publisher

Length of output: 87


🏁 Script executed:

#!/bin/bash
# Read the relevant section of publish.go
sed -n '80,124p' ./publish.go

Repository: MainSquare/gstreamer-publisher

Length of output: 1187


🏁 Script executed:

#!/bin/bash
# Get more context around the lines - see the full function/context
sed -n '60,140p' ./publish.go

Repository: MainSquare/gstreamer-publisher

Length of output: 1750


🏁 Script executed:

#!/bin/bash
# Search for the Stop() method
grep -n "func.*Stop" ./publish.go | head -10

Repository: MainSquare/gstreamer-publisher

Length of output: 107


🏁 Script executed:

#!/bin/bash
# Read the Stop() method
sed -n '142,200p' ./publish.go

Repository: MainSquare/gstreamer-publisher

Length of output: 1408


🏁 Script executed:

#!/bin/bash
# Search for where onEOS is used/called
rg "onEOS" ./publish.go -A 2 -B 2

Repository: MainSquare/gstreamer-publisher

Length of output: 417


🏁 Script executed:

#!/bin/bash
# Search for where onEOS callbacks are loaded and called
rg "onEOS.*Load" ./publish.go -A 3 -B 1

Repository: MainSquare/gstreamer-publisher

Length of output: 56


🏁 Script executed:

#!/bin/bash
# Search more broadly for Load calls on track fields
rg "\.Load\(" ./publish.go -B 3 -A 2

Repository: MainSquare/gstreamer-publisher

Length of output: 56


🏁 Script executed:

#!/bin/bash
# Search for Track type definition
rg "type.*Track" ./publish.go -A 5 | head -40

Repository: MainSquare/gstreamer-publisher

Length of output: 56


🏁 Script executed:

#!/bin/bash
# Find all Go files in the repository
find . -name "*.go" -type f

Repository: MainSquare/gstreamer-publisher

Length of output: 108


🏁 Script executed:

#!/bin/bash
# Read track.go to understand track structure and onEOS
cat ./track.go

Repository: MainSquare/gstreamer-publisher

Length of output: 4466


Reconsider concurrent ordering of pipeline start vs. room join/publish.

p.pipeline.Start() is launched in parallel with JoinWithToken + PublishTrack. Three issues:

  1. pipeline.Start() returns quickly after issuing an async state change to PLAYING, so the concurrency buys very little — the room-join goroutine is the only one that actually takes time. Meanwhile the pipeline begins producing samples forwarded to t.track.WriteSample() before the track is published, so initial encoded frames (including any keyframe at the head of the stream) are emitted into a track with no subscribers yet. For video this can extend time-to-first-frame for early subscribers until the next keyframe interval.

  2. Race condition in onEOS callback: The closure stored via p.videoTrack.onEOS.Store() and p.audioTrack.onEOS.Store() accesses p.room.LocalParticipant when invoked later (in handleEOS). Concurrently, p.Stop() sets p.room = nil from the signal handler, message watch, or error path. If handleEOS executes after Stop() has cleared p.room, a nil pointer dereference occurs.

  3. errgroup.Group without context does not propagate cancellation — if JoinWithToken fails, p.pipeline.Start() continues transitioning to PLAYING until g.Wait() returns and p.Stop() runs, leaving a window where the pipeline produces into discarded state.

Consider starting the pipeline only after the room is joined and tracks are published, and use errgroup.WithContext if you keep the parallel structure for future extension.

♻️ Sketch of a sequential ordering
-	var g errgroup.Group
-
-	g.Go(func() error {
-		cb := lksdk.NewRoomCallback()
-		...
-	})
-
-	g.Go(func() error {
-		return p.pipeline.Start()
-	})
-
-	if err := g.Wait(); err != nil {
-		p.Stop()
-		return err
-	}
+	if err := p.joinAndPublish(); err != nil {
+		p.Stop()
+		return err
+	}
+	if err := p.pipeline.Start(); err != nil {
+		p.Stop()
+		return err
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@publish.go` around lines 80 - 124, The pipeline is started concurrently with
JoinWithToken/PublishTrack causing early samples to be emitted before tracks are
published, a race on the onEOS closures accessing p.room after p.Stop clears it,
and lack of cancellation propagation; fix by sequencing Start after successful
room join and track publications (move the call to p.pipeline.Start() to run
only after JoinWithToken and both PublishTrack calls complete), change how you
store onEOS closures so they capture a safe reference (e.g., capture pub.SID()
and call Unpublish via a safe helper that checks p.room != nil) instead of
directly dereferencing p.room.LocalParticipant, and if you retain parallelism
replace g := errgroup.Group with errgroup.WithContext to propagate cancellation
from JoinWithToken failures to the pipeline start/operation.


if err := g.Wait(); err != nil {
p.Stop()
return err
}

Expand Down
6 changes: 3 additions & 3 deletions track.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type publisherTrack struct {
mimeType string
publication *lksdk.LocalTrackPublication
isEnded atomic.Bool
onEOS func()
onEOS atomic.Pointer[func()]
lastKeyframeRequestNs atomic.Int64
}

Expand Down Expand Up @@ -94,8 +94,8 @@ func (t *publisherTrack) IsEnded() bool {
// callback function when EOS is received
func (t *publisherTrack) handleEOS(_ *app.Sink) {
t.isEnded.Store(true)
if t.onEOS != nil {
t.onEOS()
if cb := t.onEOS.Load(); cb != nil {
(*cb)()
}
}

Expand Down