Join room and start pipeline#2
Conversation
WalkthroughThe changes update Go dependencies, refactor the publisher's startup method to use error groups for concurrent room setup and pipeline initialization, and convert the track's end-of-stream callback to use atomic pointers for thread-safe concurrent access. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
murilo-teleo
left a comment
There was a problem hiding this comment.
I have no idea on what's going on with the onEOS changes, but is sounds right. I don't know about go details to catch the nuances on this
| g.Go(func() error { | ||
| cb := lksdk.NewRoomCallback() | ||
| cb.OnDisconnected = func() { | ||
| // TODO: stop publishing and exit | ||
| } | ||
| p.videoTrack.publication = pub | ||
| p.videoTrack.onEOS = func() { | ||
| _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) | ||
| p.room = lksdk.NewRoom(cb) | ||
| if err := p.room.JoinWithToken(p.params.URL, p.params.Token, | ||
| lksdk.WithAutoSubscribe(false), | ||
| ); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| if p.audioTrack != nil { | ||
| pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{ | ||
| Source: livekit.TrackSource_MICROPHONE, | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| if p.videoTrack != nil { | ||
| pub, err := p.room.LocalParticipant.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{ | ||
| Source: livekit.TrackSource_CAMERA, | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| p.videoTrack.publication = pub | ||
| onEOS := func() { | ||
| _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) | ||
| } | ||
| p.videoTrack.onEOS.Store(&onEOS) | ||
| } | ||
| p.audioTrack.publication = pub | ||
| p.audioTrack.onEOS = func() { | ||
| _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) | ||
|
|
||
| if p.audioTrack != nil { | ||
| pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{ | ||
| Source: livekit.TrackSource_MICROPHONE, | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| p.audioTrack.publication = pub | ||
| onEOS := func() { | ||
| _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) | ||
| } | ||
| p.audioTrack.onEOS.Store(&onEOS) | ||
| } | ||
| } | ||
| return nil | ||
| }) |
There was a problem hiding this comment.
🟡 Race: pipeline EOS can fire before onEOS callback is registered, causing per-track unpublish to be skipped
The parallelization of room connection and pipeline startup introduces a race where handleEOS (track.go:95-99) can fire from a GStreamer streaming thread before the room goroutine stores the onEOS callback via atomic.Pointer.Store (publish.go:103 / publish.go:117). In the old sequential code, onEOS was always set before pipeline.Start(), so this race didn't exist.
When handleEOS fires with onEOS still nil, isEnded is set to true but the per-track UnpublishTrack callback never executes. If the room goroutine later publishes the track and stores onEOS, that callback will never fire because GStreamer won't deliver EOS again. In a multi-track pipeline where one stream ends before the other (e.g., video ends but audio continues), the ended track remains published (producing no data) instead of being explicitly unpublished. Room-level cleanup in Stop() eventually handles it when the entire pipeline ends, but this is a behavioral regression from the sequential code.
Prompt for agents
The race exists because the pipeline starts producing data (and can reach EOS) concurrently with the room goroutine that registers the onEOS callback. One approach to fix this is to use errgroup.WithContext to get a context, but that alone won't solve the logical ordering issue.
A cleaner fix would be to keep the parallel startup but defer publishing/onEOS registration until after both goroutines succeed. For example:
1. Run room.JoinWithToken and pipeline.Start in parallel via errgroup
2. After g.Wait succeeds, publish the tracks and register onEOS callbacks sequentially
This preserves the parallelism benefit (room connect and pipeline startup overlap) while ensuring tracks are only published after the pipeline is confirmed running, and onEOS is set before any EOS can arrive from GStreamer (since the pipeline is in PLAYING state but any queued EOS messages would be processed on the main loop which hasn't started yet at that point).
Alternatively, if EOS can fire from streaming threads even after Start returns, you could have handleEOS check isEnded and then have the room goroutine check isEnded after storing onEOS, firing it manually if EOS already happened. This would close the race window.
Relevant code paths:
- publish.go: Start() method, the two g.Go closures
- track.go: handleEOS method, onEOS atomic pointer
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
publish.go (1)
142-156:⚠️ Potential issue | 🟠 Major
Stop()is not safe against concurrent invocation.With this PR,
Stop()can now be called from at least three sites: the errgroup error path (line 127), the signal goroutine (line 135), andmessageWatchon EOS/error (lines 163, 169). The current implementation reads/writesp.pipeline,p.room,p.loopwithout any guard orsync.Once, so two concurrent callers can race (e.g., one observesp.pipeline != nilwhile another nils it, leading to a nil deref onBlockSetState, orDisconnectbeing called twice).Consider wrapping the body in a
sync.Once:🛡️ Proposed fix
type Publisher struct { params PublisherParams pipeline *gst.Pipeline loop *glib.MainLoop videoTrack *publisherTrack audioTrack *publisherTrack room *lksdk.Room + stopOnce sync.Once } ... func (p *Publisher) Stop() { - logger.Infow("stopping publisher..") - if p.pipeline != nil { - p.pipeline.BlockSetState(gst.StateNull) - p.pipeline = nil - } - if p.room != nil { - p.room.Disconnect() - p.room = nil - } - if p.loop != nil { - p.loop.Quit() - p.loop = nil - } + p.stopOnce.Do(func() { + logger.Infow("stopping publisher..") + if p.pipeline != nil { + p.pipeline.BlockSetState(gst.StateNull) + p.pipeline = nil + } + if p.room != nil { + p.room.Disconnect() + p.room = nil + } + if p.loop != nil { + p.loop.Quit() + p.loop = nil + } + }) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@publish.go` around lines 142 - 156, Stop() is not concurrency-safe: multiple goroutines may concurrently read/write Publisher.pipeline, Publisher.room and Publisher.loop causing nil-derefs or double-close; modify the Publisher type to include a sync.Once (or a dedicated mutex + closed flag) and wrap the entire body of Stop() so its teardown logic (references to p.pipeline.BlockSetState, p.room.Disconnect, p.loop.Quit and subsequent nil assignments) runs exactly once; ensure any callers that previously relied on idempotent behavior still work by making Stop() safe to call repeatedly and document/maintain idempotence for Stop().
🧹 Nitpick comments (2)
publish.go (1)
82-84: TODO:OnDisconnectedis a no-op.If the SDK signals disconnect (e.g., server-initiated kick, network drop after retries), the publisher will keep the pipeline running and silently produce into a dead room. At minimum this should call
p.Stop()so the process exits cleanly.cb.OnDisconnected = func() { - // TODO: stop publishing and exit + logger.Infow("room disconnected, stopping publisher") + p.Stop() }Want me to wire this up in a follow-up commit / open a tracking issue?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@publish.go` around lines 82 - 84, The cb.OnDisconnected handler is currently a no-op and must stop the publisher so the process exits cleanly when the SDK signals a disconnect; update the cb.OnDisconnected assignment to call the publisher's Stop method (p.Stop()) and perform any necessary cleanup (e.g., cancel contexts, close streams) inside that handler so the pipeline doesn't keep producing into a dead room—locate the cb.OnDisconnected closure in publish.go and invoke p.Stop() (and related shutdown helpers already present) from there.go.mod (1)
82-82: LGTM — direct dependency promotion is correct.
golang.org/x/sync/errgroupis now imported directly bypublish.go, so dropping the// indirectmarker is appropriate. Note that therequireblock at lines 23–91 mixes direct and indirect deps; consider runninggo mod tidyso the formatter splits them into separaterequireblocks for clarity.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@go.mod` at line 82, The go.mod currently promotes golang.org/x/sync to a direct dependency because publish.go now imports golang.org/x/sync/errgroup; run `go mod tidy` to recompute and clean up module requirements so direct and indirect dependencies are separated and formatted into appropriate require blocks, which will also remove stale `// indirect` markers and tidy the require block mixing lines 23–91; verify publish.go's import of errgroup is present and then commit the updated go.mod and go.sum.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@publish.go`:
- Around line 100-103: The onEOS closures assigned via p.videoTrack.onEOS.Store
and p.audioTrack.onEOS.Store capture p.room which Stop() can nil, risking a
nil-pointer race; fix by capturing the LocalParticipant (e.g., lp :=
p.room.LocalParticipant) into a local variable before creating the closure and
use lp.UnpublishTrack(pub.SID()) inside the closure (or check for nil lp inside
the closure) so the callback is self-contained and safe against concurrent
Stop() niling p.room; apply the same change to both the video and audio onEOS
closures and ensure handleEOS in track.go/messageWatch interactions remain safe.
- Around line 80-124: The pipeline is started concurrently with
JoinWithToken/PublishTrack causing early samples to be emitted before tracks are
published, a race on the onEOS closures accessing p.room after p.Stop clears it,
and lack of cancellation propagation; fix by sequencing Start after successful
room join and track publications (move the call to p.pipeline.Start() to run
only after JoinWithToken and both PublishTrack calls complete), change how you
store onEOS closures so they capture a safe reference (e.g., capture pub.SID()
and call Unpublish via a safe helper that checks p.room != nil) instead of
directly dereferencing p.room.LocalParticipant, and if you retain parallelism
replace g := errgroup.Group with errgroup.WithContext to propagate cancellation
from JoinWithToken failures to the pipeline start/operation.
---
Outside diff comments:
In `@publish.go`:
- Around line 142-156: Stop() is not concurrency-safe: multiple goroutines may
concurrently read/write Publisher.pipeline, Publisher.room and Publisher.loop
causing nil-derefs or double-close; modify the Publisher type to include a
sync.Once (or a dedicated mutex + closed flag) and wrap the entire body of
Stop() so its teardown logic (references to p.pipeline.BlockSetState,
p.room.Disconnect, p.loop.Quit and subsequent nil assignments) runs exactly
once; ensure any callers that previously relied on idempotent behavior still
work by making Stop() safe to call repeatedly and document/maintain idempotence
for Stop().
---
Nitpick comments:
In `@go.mod`:
- Line 82: The go.mod currently promotes golang.org/x/sync to a direct
dependency because publish.go now imports golang.org/x/sync/errgroup; run `go
mod tidy` to recompute and clean up module requirements so direct and indirect
dependencies are separated and formatted into appropriate require blocks, which
will also remove stale `// indirect` markers and tidy the require block mixing
lines 23–91; verify publish.go's import of errgroup is present and then commit
the updated go.mod and go.sum.
In `@publish.go`:
- Around line 82-84: The cb.OnDisconnected handler is currently a no-op and must
stop the publisher so the process exits cleanly when the SDK signals a
disconnect; update the cb.OnDisconnected assignment to call the publisher's Stop
method (p.Stop()) and perform any necessary cleanup (e.g., cancel contexts,
close streams) inside that handler so the pipeline doesn't keep producing into a
dead room—locate the cb.OnDisconnected closure in publish.go and invoke p.Stop()
(and related shutdown helpers already present) from there.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 82ed8951-63e6-4d3b-8d85-bd72f85d434d
📒 Files selected for processing (3)
go.modpublish.gotrack.go
| g.Go(func() error { | ||
| cb := lksdk.NewRoomCallback() | ||
| cb.OnDisconnected = func() { | ||
| // TODO: stop publishing and exit | ||
| } | ||
| p.videoTrack.publication = pub | ||
| p.videoTrack.onEOS = func() { | ||
| _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) | ||
| p.room = lksdk.NewRoom(cb) | ||
| if err := p.room.JoinWithToken(p.params.URL, p.params.Token, | ||
| lksdk.WithAutoSubscribe(false), | ||
| ); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| if p.audioTrack != nil { | ||
| pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{ | ||
| Source: livekit.TrackSource_MICROPHONE, | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| if p.videoTrack != nil { | ||
| pub, err := p.room.LocalParticipant.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{ | ||
| Source: livekit.TrackSource_CAMERA, | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| p.videoTrack.publication = pub | ||
| onEOS := func() { | ||
| _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) | ||
| } | ||
| p.videoTrack.onEOS.Store(&onEOS) | ||
| } | ||
| p.audioTrack.publication = pub | ||
| p.audioTrack.onEOS = func() { | ||
| _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) | ||
|
|
||
| if p.audioTrack != nil { | ||
| pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{ | ||
| Source: livekit.TrackSource_MICROPHONE, | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| p.audioTrack.publication = pub | ||
| onEOS := func() { | ||
| _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) | ||
| } | ||
| p.audioTrack.onEOS.Store(&onEOS) | ||
| } | ||
| } | ||
| return nil | ||
| }) | ||
|
|
||
| if err := p.pipeline.Start(); err != nil { | ||
| g.Go(func() error { | ||
| return p.pipeline.Start() | ||
| }) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
golang.org/x/sync errgroup error cancellation behavior without context
💡 Result:
In golang.org/x/sync/errgroup, a plain errgroup.Group (created as a zero value, without calling WithContext) has no context cancellation capability. When using g.Go(f), all submitted functions run to completion in their goroutines regardless of errors returned by other functions. The first non-nil error is captured and returned by g.Wait after all goroutines finish, but no cancellation occurs and remaining goroutines continue executing without interruption. This differs from errgroup.WithContext(ctx), where the group has an associated context that gets canceled (via the internal cancel func) when the first error is encountered. Goroutines can then check ctx.Err or use context-aware operations to exit early. Key evidence from source code: - Group struct has cancel func(error), which is nil in plain Group. - In Go/TryGo: if err != nil, g.errOnce.Do sets g.err and calls g.cancel(g.err) only if g.cancel != nil. - Wait calls g.wg.Wait (blocks until all done), then if g.cancel != nil calls g.cancel(g.err), then returns g.err. Plain Group behaves like sync.WaitGroup + error collection: always waits for all, no cancellation on error. Use WithContext explicitly for cancellation behavior.
Citations:
- 1: https://github.com/golang/sync/blob/master/errgroup/errgroup.go
- 2: https://golang.bg/pkg/cmd/vendor/golang.org/x/sync/errgroup/
- 3: proposal: x/sync/errgroup: Additional goroutines should not be started after the context is cancelled golang/go#61611
🏁 Script executed:
#!/bin/bash
# First, locate the publish.go file
find . -name "publish.go" -type f | head -5Repository: MainSquare/gstreamer-publisher
Length of output: 87
🏁 Script executed:
#!/bin/bash
# Read the relevant section of publish.go
sed -n '80,124p' ./publish.goRepository: MainSquare/gstreamer-publisher
Length of output: 1187
🏁 Script executed:
#!/bin/bash
# Get more context around the lines - see the full function/context
sed -n '60,140p' ./publish.goRepository: MainSquare/gstreamer-publisher
Length of output: 1750
🏁 Script executed:
#!/bin/bash
# Search for the Stop() method
grep -n "func.*Stop" ./publish.go | head -10Repository: MainSquare/gstreamer-publisher
Length of output: 107
🏁 Script executed:
#!/bin/bash
# Read the Stop() method
sed -n '142,200p' ./publish.goRepository: MainSquare/gstreamer-publisher
Length of output: 1408
🏁 Script executed:
#!/bin/bash
# Search for where onEOS is used/called
rg "onEOS" ./publish.go -A 2 -B 2Repository: MainSquare/gstreamer-publisher
Length of output: 417
🏁 Script executed:
#!/bin/bash
# Search for where onEOS callbacks are loaded and called
rg "onEOS.*Load" ./publish.go -A 3 -B 1Repository: MainSquare/gstreamer-publisher
Length of output: 56
🏁 Script executed:
#!/bin/bash
# Search more broadly for Load calls on track fields
rg "\.Load\(" ./publish.go -B 3 -A 2Repository: MainSquare/gstreamer-publisher
Length of output: 56
🏁 Script executed:
#!/bin/bash
# Search for Track type definition
rg "type.*Track" ./publish.go -A 5 | head -40Repository: MainSquare/gstreamer-publisher
Length of output: 56
🏁 Script executed:
#!/bin/bash
# Find all Go files in the repository
find . -name "*.go" -type fRepository: MainSquare/gstreamer-publisher
Length of output: 108
🏁 Script executed:
#!/bin/bash
# Read track.go to understand track structure and onEOS
cat ./track.goRepository: MainSquare/gstreamer-publisher
Length of output: 4466
Reconsider concurrent ordering of pipeline start vs. room join/publish.
p.pipeline.Start() is launched in parallel with JoinWithToken + PublishTrack. Three issues:
-
pipeline.Start()returns quickly after issuing an async state change toPLAYING, so the concurrency buys very little — the room-join goroutine is the only one that actually takes time. Meanwhile the pipeline begins producing samples forwarded tot.track.WriteSample()before the track is published, so initial encoded frames (including any keyframe at the head of the stream) are emitted into a track with no subscribers yet. For video this can extend time-to-first-frame for early subscribers until the next keyframe interval. -
Race condition in
onEOScallback: The closure stored viap.videoTrack.onEOS.Store()andp.audioTrack.onEOS.Store()accessesp.room.LocalParticipantwhen invoked later (inhandleEOS). Concurrently,p.Stop()setsp.room = nilfrom the signal handler, message watch, or error path. IfhandleEOSexecutes afterStop()has clearedp.room, a nil pointer dereference occurs. -
errgroup.Groupwithout context does not propagate cancellation — ifJoinWithTokenfails,p.pipeline.Start()continues transitioning toPLAYINGuntilg.Wait()returns andp.Stop()runs, leaving a window where the pipeline produces into discarded state.
Consider starting the pipeline only after the room is joined and tracks are published, and use errgroup.WithContext if you keep the parallel structure for future extension.
♻️ Sketch of a sequential ordering
- var g errgroup.Group
-
- g.Go(func() error {
- cb := lksdk.NewRoomCallback()
- ...
- })
-
- g.Go(func() error {
- return p.pipeline.Start()
- })
-
- if err := g.Wait(); err != nil {
- p.Stop()
- return err
- }
+ if err := p.joinAndPublish(); err != nil {
+ p.Stop()
+ return err
+ }
+ if err := p.pipeline.Start(); err != nil {
+ p.Stop()
+ return err
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@publish.go` around lines 80 - 124, The pipeline is started concurrently with
JoinWithToken/PublishTrack causing early samples to be emitted before tracks are
published, a race on the onEOS closures accessing p.room after p.Stop clears it,
and lack of cancellation propagation; fix by sequencing Start after successful
room join and track publications (move the call to p.pipeline.Start() to run
only after JoinWithToken and both PublishTrack calls complete), change how you
store onEOS closures so they capture a safe reference (e.g., capture pub.SID()
and call Unpublish via a safe helper that checks p.room != nil) instead of
directly dereferencing p.room.LocalParticipant, and if you retain parallelism
replace g := errgroup.Group with errgroup.WithContext to propagate cancellation
from JoinWithToken failures to the pipeline start/operation.
| onEOS := func() { | ||
| _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) | ||
| } | ||
| p.videoTrack.onEOS.Store(&onEOS) |
There was a problem hiding this comment.
Possible nil-pointer race: onEOS closure dereferences p.room which Stop() nils.
The onEOS closure calls p.room.LocalParticipant.UnpublishTrack(pub.SID()), but Stop() (lines 148–151) sets p.room = nil after disconnecting. EOS can be triggered from the GStreamer thread (handleEOS in track.go) concurrently with Stop() being invoked from messageWatch (on MessageEOS/MessageError) or the signal goroutine, leaving a window where the closure dereferences a nil p.room.
Capture p.room (or just p.room.LocalParticipant) in the closure to make the callback self-contained, or guard the access:
🛡️ Proposed fix
- if p.videoTrack != nil {
- pub, err := p.room.LocalParticipant.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{
+ if p.videoTrack != nil {
+ lp := p.room.LocalParticipant
+ pub, err := lp.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{
Source: livekit.TrackSource_CAMERA,
})
if err != nil {
return err
}
p.videoTrack.publication = pub
onEOS := func() {
- _ = p.room.LocalParticipant.UnpublishTrack(pub.SID())
+ _ = lp.UnpublishTrack(pub.SID())
}
p.videoTrack.onEOS.Store(&onEOS)
}(apply the same pattern to the audio block at lines 106–118.)
Also applies to: 114-117
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@publish.go` around lines 100 - 103, The onEOS closures assigned via
p.videoTrack.onEOS.Store and p.audioTrack.onEOS.Store capture p.room which
Stop() can nil, risking a nil-pointer race; fix by capturing the
LocalParticipant (e.g., lp := p.room.LocalParticipant) into a local variable
before creating the closure and use lp.UnpublishTrack(pub.SID()) inside the
closure (or check for nil lp inside the closure) so the callback is
self-contained and safe against concurrent Stop() niling p.room; apply the same
change to both the video and audio onEOS closures and ensure handleEOS in
track.go/messageWatch interactions remain safe.
Summary by CodeRabbit
Refactor
Chores