Skip to content

Connect to room and start pipeline in parallel#53

Open
TomFanella4 wants to merge 1 commit intolivekit:mainfrom
MainSquare:join-room-and-start-pipeline-2
Open

Connect to room and start pipeline in parallel#53
TomFanella4 wants to merge 1 commit intolivekit:mainfrom
MainSquare:join-room-and-start-pipeline-2

Conversation

@TomFanella4
Copy link
Copy Markdown

This PR parallelizes the LiveKit room connection/track publishing and GStreamer pipeline startup using errgroup.Group. Previously these ran sequentially; now they execute concurrently:

// Before (sequential):
room.JoinWithToken(...)
publishTracks()
pipeline.Start()
// After (parallel):
var g errgroup.Group
g.Go(func() {
  room.JoinWithToken(...)
  publishTracks()
})
g.Go(func() {
  pipeline.Start()
})
g.Wait()

Since onEOS can now be written (after room join) while the pipeline is already running, the onEOS field is changed from a plain func() to an atomic.Pointer[func()] for thread safety.

Comment thread publish.go
Comment on lines +120 to 130
})

if err := p.pipeline.Start(); err != nil {
g.Go(func() error {
return p.pipeline.Start()
})

if err := g.Wait(); err != nil {
p.Stop()
return err
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidzhao Any concerns with this issue Devin raised?

🟡 Race: pipeline EOS can fire before onEOS callback is registered, causing per-track unpublish to be skipped

The parallelization of room connection and pipeline startup introduces a race where handleEOS (track.go:95-99) can fire from a GStreamer streaming thread before the room goroutine stores the onEOS callback via atomic.Pointer.Store (publish.go:103 / publish.go:117). In the old sequential code, onEOS was always set before pipeline.Start(), so this race didn't exist.

When handleEOS fires with onEOS still nil, isEnded is set to true but the per-track UnpublishTrack callback never executes. If the room goroutine later publishes the track and stores onEOS, that callback will never fire because GStreamer won't deliver EOS again. In a multi-track pipeline where one stream ends before the other (e.g., video ends but audio continues), the ended track remains published (producing no data) instead of being explicitly unpublished. Room-level cleanup in Stop() eventually handles it when the entire pipeline ends, but this is a behavioral regression from the sequential code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant