diff --git a/go.mod b/go.mod index b2dcdeb..2ac3e59 100644 --- a/go.mod +++ b/go.mod @@ -79,7 +79,7 @@ require ( golang.org/x/crypto v0.43.0 // indirect golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b // indirect golang.org/x/net v0.46.0 // indirect - golang.org/x/sync v0.17.0 // indirect + golang.org/x/sync v0.17.0 golang.org/x/sys v0.37.0 // indirect golang.org/x/text v0.30.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20251014184007-4626949a642f // indirect diff --git a/publish.go b/publish.go index d5ec92f..3a7e78c 100644 --- a/publish.go +++ b/publish.go @@ -23,6 +23,7 @@ import ( "github.com/go-gst/go-glib/glib" "github.com/go-gst/go-gst/gst" + "golang.org/x/sync/errgroup" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -74,47 +75,56 @@ func (p *Publisher) Start() error { return err } - // TODO: connect at the same time in parallel as spinning up pipeline - cb := lksdk.NewRoomCallback() - cb.OnDisconnected = func() { - // TODO: stop publishing and exit - } - p.room = lksdk.NewRoom(cb) - err := p.room.JoinWithToken(p.params.URL, p.params.Token, - lksdk.WithAutoSubscribe(false), - ) - if err != nil { - return err - } + var g errgroup.Group - // publish tracks if sinks are set up - if p.videoTrack != nil { - pub, err := p.room.LocalParticipant.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{ - Source: livekit.TrackSource_CAMERA, - }) - if err != nil { - return err + g.Go(func() error { + cb := lksdk.NewRoomCallback() + cb.OnDisconnected = func() { + // TODO: stop publishing and exit } - p.videoTrack.publication = pub - p.videoTrack.onEOS = func() { - _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) + p.room = lksdk.NewRoom(cb) + if err := p.room.JoinWithToken(p.params.URL, p.params.Token, + lksdk.WithAutoSubscribe(false), + ); err != nil { + return err } - } - if p.audioTrack != nil { - pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{ - Source: livekit.TrackSource_MICROPHONE, - }) - if err != nil { - return err + if p.videoTrack != nil { + pub, err := p.room.LocalParticipant.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{ + Source: livekit.TrackSource_CAMERA, + }) + if err != nil { + return err + } + p.videoTrack.publication = pub + onEOS := func() { + _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) + } + p.videoTrack.onEOS.Store(&onEOS) } - p.audioTrack.publication = pub - p.audioTrack.onEOS = func() { - _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) + + if p.audioTrack != nil { + pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{ + Source: livekit.TrackSource_MICROPHONE, + }) + if err != nil { + return err + } + p.audioTrack.publication = pub + onEOS := func() { + _ = p.room.LocalParticipant.UnpublishTrack(pub.SID()) + } + p.audioTrack.onEOS.Store(&onEOS) } - } + return nil + }) - if err := p.pipeline.Start(); err != nil { + g.Go(func() error { + return p.pipeline.Start() + }) + + if err := g.Wait(); err != nil { + p.Stop() return err } diff --git a/track.go b/track.go index 5568932..cab7693 100644 --- a/track.go +++ b/track.go @@ -39,7 +39,7 @@ type publisherTrack struct { mimeType string publication *lksdk.LocalTrackPublication isEnded atomic.Bool - onEOS func() + onEOS atomic.Pointer[func()] lastKeyframeRequestNs atomic.Int64 } @@ -94,8 +94,8 @@ func (t *publisherTrack) IsEnded() bool { // callback function when EOS is received func (t *publisherTrack) handleEOS(_ *app.Sink) { t.isEnded.Store(true) - if t.onEOS != nil { - t.onEOS() + if cb := t.onEOS.Load(); cb != nil { + (*cb)() } }