Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 10 additions & 82 deletions sender/frame_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"image"
"sync"
"time"
)

// Static errors for err113 compliance.
Expand All @@ -17,64 +16,21 @@ var (
ErrFailedToAddFrameAfterDrop = errors.New("failed to add frame after dropping oldest")
)

// Global shared black frame to avoid repeated allocations.
var (
sharedBlackFrame *image.YCbCr //nolint:gochecknoglobals // Performance optimization for shared black frame
blackFrameOnce sync.Once //nolint:gochecknoglobals // Required for thread-safe initialization
)

// getBlackFrame returns a shared black frame for the given dimensions
// The black frame is created once on first call and reused for all subsequent calls.
func getBlackFrame(width, height int) *image.YCbCr {
blackFrameOnce.Do(func() {
// Create YUV420 black frame
ySize := width * height
uvSize := ySize / 4
totalSize := ySize + 2*uvSize // Y + U + V planes

// Allocate buffer for YUV420 data
data := make([]byte, totalSize)

// Set Y plane to black (0 = black in YUV)
// Y plane is already zero-initialized (black)

// Set U and V planes to neutral (128 = neutral chroma) - optimized
// Use range-based loop for better performance
uvPlanes := data[ySize:] // Both U and V planes
for i := range uvPlanes {
uvPlanes[i] = 128
}

sharedBlackFrame = &image.YCbCr{
Y: data[:ySize], // Y plane: full resolution
Cb: data[ySize : ySize+uvSize], // U plane: 1/4 resolution
Cr: data[ySize+uvSize:], // V plane: 1/4 resolution
YStride: width, // Y plane stride
CStride: width / 2, // Chroma planes stride
Rect: image.Rect(0, 0, width, height),
SubsampleRatio: image.YCbCrSubsampleRatio420,
}
})

return sharedBlackFrame
}

// FrameBuffer is a simple in-memory frame buffer that implements VideoSource
// It can be used as a virtual video driver for testing or programmatic frame injection.
type FrameBuffer struct {
frameChan chan image.Image
closeChan chan struct{}
closeOnce sync.Once
width int
height int
id string
initialized bool
frameChan chan image.Image
closeChan chan struct{}
closeOnce sync.Once
width int
height int
id string
}

// NewFrameBuffer creates a new frame buffer with the specified dimensions.
func NewFrameBuffer(width, height int) *FrameBuffer {
return &FrameBuffer{
frameChan: make(chan image.Image, 8), // Increased from 2 to 8 for better buffering
frameChan: make(chan image.Image, 8),
closeChan: make(chan struct{}),
width: width,
height: height,
Expand All @@ -96,39 +52,15 @@ func (f *FrameBuffer) Close() error {
return nil
}

// SetInitialized marks the frame buffer as initialized.
func (f *FrameBuffer) SetInitialized() {
f.initialized = true
}

// ResetInitialized temporarily marks the frame buffer as uninitialized.
// This causes Read() to return black frames on timeout instead of ErrNoFrameAvailable,
// which is needed when recreating an encoder (NewEncodedReader reads one frame for
// property detection during initialization).
func (f *FrameBuffer) ResetInitialized() {
f.initialized = false
}

// Read returns the next available frame from the buffer
// If no frame is available within the timeout, it returns a black frame.
// Read returns the next frame from the buffer if one is available.
// Returns ErrNoFrameAvailable immediately if the buffer is empty.
func (f *FrameBuffer) Read() (image.Image, func(), error) {
// Add timeout to prevent indefinite blocking during encoder initialization
timer := time.NewTimer(100 * time.Millisecond)
defer timer.Stop()

select {
case img := <-f.frameChan:
return img, func() {}, nil
case <-f.closeChan:
return nil, func() {}, ErrBufferClosed
case <-timer.C:
// Return black frame if not initialized, nil otherwise
if !f.initialized {
blackFrame := getBlackFrame(f.width, f.height)

return blackFrame, func() {}, nil
}

default:
return nil, func() {}, ErrNoFrameAvailable
}
}
Expand All @@ -144,23 +76,19 @@ func (f *FrameBuffer) SendFrame(frame image.Image) error {

select {
case f.frameChan <- frame:
// Successfully added frame
return nil
default:
// Buffer full - drop oldest frame and add the new one
select {
case <-f.frameChan: // Remove oldest
// Successfully removed old frame
default:
// Buffer was empty (race condition)
}

// Now add the new frame
select {
case f.frameChan <- frame:
return nil
default:
// Still can't add (shouldn't happen)
return ErrFailedToAddFrameAfterDrop
}
}
Expand Down
65 changes: 8 additions & 57 deletions sender/frame_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,12 @@ func TestFrameBuffer_SendAndRead(t *testing.T) {
err := fb.SendFrame(testImg)
require.NoError(t, err)

// Mark as initialized so we don't get black frames
fb.SetInitialized()

// Read the frame back
img, release, err := fb.Read()
require.NoError(t, err)
require.NotNil(t, img)
require.NotNil(t, release)

// Release the frame
release()
}

func TestFrameBuffer_ReadTimeout(t *testing.T) {
fb := NewFrameBuffer(640, 480)
defer func() { _ = fb.Close() }()

// Don't mark as initialized, so we should get a black frame on timeout
img, release, err := fb.Read()
require.NoError(t, err)
require.NotNil(t, img)
require.NotNil(t, release)

// Should be a YCbCr image (black frame)
_, ok := img.(*image.YCbCr)
assert.True(t, ok, "Expected YCbCr image for black frame")

release()
}

Expand All @@ -76,7 +55,7 @@ func TestFrameBuffer_ReadAfterClose(t *testing.T) {
// Try to read after close
img, release, err := fb.Read()
assert.Nil(t, img)
assert.NotNil(t, release) // Release function should still be provided
assert.NotNil(t, release)
require.Error(t, err)
assert.ErrorIs(t, err, ErrBufferClosed)
}
Expand Down Expand Up @@ -110,22 +89,6 @@ func TestFrameBuffer_BufferFull(t *testing.T) {
}
}

func TestFrameBuffer_ReadWithoutFrames(t *testing.T) {
fb := NewFrameBuffer(640, 480)
defer func() { _ = fb.Close() }()

// Mark as initialized so we don't get black frames
fb.SetInitialized()

// Try to read without sending any frames
// This should timeout and return ErrNoFrameAvailable
img, release, err := fb.Read()
assert.Nil(t, img)
assert.NotNil(t, release)
require.Error(t, err)
assert.ErrorIs(t, err, ErrNoFrameAvailable)
}

func TestFrameBuffer_MultipleClose(t *testing.T) {
fb := NewFrameBuffer(640, 480)

Expand All @@ -137,26 +100,21 @@ func TestFrameBuffer_MultipleClose(t *testing.T) {
assert.NoError(t, err2)
}

func TestGetBlackFrame(t *testing.T) {
// Test that getBlackFrame returns a valid black frame
blackFrame := getBlackFrame(640, 480)
require.NotNil(t, blackFrame)

assert.Equal(t, 640, blackFrame.Bounds().Dx())
assert.Equal(t, 480, blackFrame.Bounds().Dy())
func TestFrameBuffer_ReadWithoutFrames(t *testing.T) {
fb := NewFrameBuffer(640, 480)
defer func() { _ = fb.Close() }()

// Test that multiple calls return the same instance (singleton)
blackFrame2 := getBlackFrame(640, 480)
assert.Equal(t, blackFrame, blackFrame2, "getBlackFrame should return the same instance")
img, release, err := fb.Read()
assert.Nil(t, img)
assert.NotNil(t, release)
assert.ErrorIs(t, err, ErrNoFrameAvailable)
}

func TestFrameBufferStaticErrors(t *testing.T) {
// Test that all frame buffer static errors are properly defined
assert.NotNil(t, ErrBufferClosed)
assert.NotNil(t, ErrNoFrameAvailable)
assert.NotNil(t, ErrFailedToAddFrameAfterDrop)

// Test error messages
assert.Contains(t, ErrBufferClosed.Error(), "closed")
assert.Contains(t, ErrNoFrameAvailable.Error(), "no frame")
assert.Contains(t, ErrFailedToAddFrameAfterDrop.Error(), "failed to add")
Expand All @@ -166,9 +124,6 @@ func TestFrameBuffer_ConcurrentAccess(t *testing.T) {
fb := NewFrameBuffer(640, 480)
defer func() { _ = fb.Close() }()

fb.SetInitialized()

// Test concurrent sends and reads
done := make(chan bool)

// Sender goroutine
Expand All @@ -189,13 +144,9 @@ func TestFrameBuffer_ConcurrentAccess(t *testing.T) {
if err == nil && img != nil {
release()
}
time.Sleep(15 * time.Millisecond)
}
}()

// Wait for both goroutines to complete
<-done
<-done

// Test should complete without deadlock or panic
}
13 changes: 0 additions & 13 deletions sender/rtc_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,6 @@ func (s *RTCSender) AddVideoTrack(info VideoTrackInfo) error {
mimeType: mimeType,
}

// Mark the frame buffer as initialized after successful track creation
frameSource.SetInitialized()

// Add track to peer connection if it exists
if s.peerConnection != nil {
rtpSender, err := s.peerConnection.AddTrack(videoTrack)
Expand Down Expand Up @@ -560,8 +557,6 @@ func (s *RTCSender) processEncodedFrames() {
allHaveErrors := true
for result := range results {
if result.error != nil {
// ErrNoFrameAvailable is expected during normal operation (timing gaps between frames)
// Log it at Debug level to reduce noise; other errors remain at Error level
if errors.Is(result.error, ErrNoFrameAvailable) {
s.log.Debugf("No frame available for track %s", result.trackID)
} else {
Expand Down Expand Up @@ -663,14 +658,6 @@ func (s *RTCSender) recreateEncodersForActivatedTracks(newAllocation map[string]
// after a track has been idle for a long time.
// Must be called while holding tracksMu.Lock.
func (s *RTCSender) recreateEncoder(track *EncodedTrack) error {
// Temporarily reset FrameBuffer to uninitialized so NewEncodedReader
// can read a black frame for codec property detection during init.
// Always restore to initialized afterward, regardless of success or failure.
if fb, ok := track.videoSource.(*FrameBuffer); ok {
fb.ResetInitialized()
defer fb.SetInitialized()
}

// Create the new encoder FIRST, before closing the old one.
// If creation fails, the old encoder remains functional.
encodedReader, err := track.mediaTrack.NewEncodedReader(track.mimeType)
Expand Down
Loading