Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 114 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ choice は多人数ビデオ会議を実現する WebRTC SFU サーバーです

### 主な機能

- Simulcast 対応(low/mid/high の3レイヤー
- レイヤー選択によるクライアントへの適応的配信
- Simulcast 対応(low/mid/high の 3 レイヤー
- 帯域幅ベースの自動レイヤー切り替え
- キーフレームベースのレイヤー切り替え
- JSON-RPC 2.0 シグナリング

Expand Down Expand Up @@ -82,6 +82,8 @@ pkg/sfu/
├── layer.go # Layer - 品質レイヤー(low/mid/high)
├── receiver.go # LayerReceiver - RTP パケットの受信
├── downtrack.go # DownTrack - レイヤー選択と RTP 送信
├── bandwidth.go # BandwidthController - 帯域幅ベースのレイヤー自動選択
├── twcc.go # BandwidthEstimator - 帯域幅推定
├── rtp.go # RTP ユーティリティ(キーフレーム検出など)
├── signaling.go # JSON-RPC シグナリングハンドラー
└── transport.go # WebSocket 接続ラッパー(スレッドセーフ)
Expand All @@ -101,8 +103,10 @@ pkg/sfu/
| **TrackReceiver** | 1つのトラックの複数レイヤー(low/mid/high)を管理 |
| **Layer** | 品質レイヤーを表す。LayerReceiver を保持 |
| **LayerReceiver** | リモートトラックから RTP パケットを受信 |
| **DownTrack** | サブスクライバーに RTP パケットを送信。レイヤー選択を担当 |
| **LayerSelector** | 現在のレイヤーと目標レイヤーを管理。キーフレームでレイヤー切り替え |
| **DownTrack** | サブスクライバーに RTP パケットを送信。レイヤー選択を担当 |
| **LayerSelector** | 現在のレイヤーと目標レイヤーを管理。キーフレームでレイヤー切り替え |
| **BandwidthController** | 帯域幅に基づいて各トラックのレイヤーを自動選択 |
| **BandwidthEstimator** | 送信バイト数とパケットロス率から帯域幅を推定 |

## Simulcast とレイヤー選択

Expand All @@ -123,6 +127,112 @@ choice は Simulcast に対応しており、パブリッシャーから複数
| mid | 2 | 中品質 |
| low | 1 | 低品質 |

## 帯域幅ベースの自動レイヤー切り替え

Subscriber ごとに BandwidthController が動作し、帯域幅に応じて自動的にレイヤーを切り替えます。

### 動作フロー

```text
DownTrack ─── 送信バイト数を記録
Subscriber.statsLoop (1秒ごと)
BandwidthController.UpdateBitrate() ─── 帯域幅を更新
BandwidthController.recalculateAllocations() (500msごと)
onLayerChange コールバック
Subscriber.SetLayer() → DownTrack.SetTargetLayer()
```

### レイヤー選択の閾値

| 帯域幅予算 | 選択レイヤー |
| ------------ | ------------ |
| ≥ 2.5 Mbps | high |
| ≥ 500 Kbps | mid |
| < 500 Kbps | low |

### 帯域幅調整(パケットロス率に基づく)

| ロス率 | 調整 |
| -------- | -------------- |
| > 10% | 50% に削減 |
| > 2% | 85% に削減 |
| < 1% | 5% 増加 |

## 高度な輻輳制御 (GCC アルゴリズム)

choice は Google Congestion Control (GCC) アルゴリズムを実装した高度な輻輳制御機能を備えています。

### アーキテクチャ

```text
クライアント (RTCP TWCC フィードバック)
DownTrack.readRTCP()
├─ ReceiverReport → パケットロス率
└─ TransportLayerCC → TWCCReceiver.ProcessTWCCFeedback()
DelayBasedDetector (GCC)
├─ Trendline Filter (遅延勾配検出)
├─ Adaptive Threshold (適応的閾値)
└─ Hysteresis (ヒステリシス制御)
帯域幅推定値 (delay-based)
BandwidthEstimator
├─ Loss-based estimate (ロスベース)
└─ Delay-based estimate (遅延ベース)
min(loss-based, delay-based)
BandwidthController
LayerSelector → レイヤー自動選択
```

### GCC アルゴリズムの概要

1. **Trendline Filter**: パケット間到着時間の変動を指数移動平均でフィルタリング
2. **Adaptive Threshold**: ノイズ分散に基づいて閾値を動的に調整(誤検出防止)
3. **Hysteresis**: 状態変化に複数サンプルを要求(急激な変化を抑制)

### 輻輳状態

| 状態 | 条件 | アクション |
| --- | --- | --- |
| Overusing | 遅延勾配 > 閾値 | 帯域幅を 85% に削減 |
| Normal | -閾値 < 遅延勾配 < 閾値 | 現状維持 |
| Underusing | 遅延勾配 < -閾値 | 帯域幅を 5% 増加 |

### コンポーネント

| コンポーネント | 説明 |
| --- | --- |
| **TWCCReceiver** | TWCC フィードバックを受信し、遅延ベースの帯域幅を推定 |
| **DelayBasedDetector** | GCC の遅延検出アルゴリズムを実装 |
| **BandwidthEstimator** | ロスベースと遅延ベースの推定を統合 |
| **BandwidthController** | 帯域幅に基づいてレイヤーを自動選択 |

## シグナリングプロトコル

WebSocket 上で JSON-RPC 2.0 を使用。
Expand Down
94 changes: 76 additions & 18 deletions pkg/sfu/bandwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (

// LayerAllocation represents the target layer allocation for a subscriber
type LayerAllocation struct {
TrackID string
TargetLayer string
CurrentLayer string
MaxLayer string
Paused bool
TrackID string
TargetLayer string
CurrentLayer string
MaxLayer string
Paused bool
ManualOverrideUntil time.Time // Time until manual override is active (auto control disabled)
}

// BandwidthController manages bandwidth allocation across subscribers
Expand Down Expand Up @@ -107,7 +108,11 @@ func (bc *BandwidthController) SetMaxLayer(trackID, maxLayer string) {
}
}

// ManualOverrideDuration is how long manual layer selection disables auto control
const ManualOverrideDuration = 5 * time.Second

// RequestLayer requests a specific layer (manual override)
// This disables auto control for ManualOverrideDuration
func (bc *BandwidthController) RequestLayer(trackID, layer string) {
bc.mu.Lock()
defer bc.mu.Unlock()
Expand All @@ -118,6 +123,7 @@ func (bc *BandwidthController) RequestLayer(trackID, layer string) {
layer = alloc.MaxLayer
}
alloc.TargetLayer = layer
alloc.ManualOverrideUntil = time.Now().Add(ManualOverrideDuration)
}
}

Expand All @@ -132,11 +138,33 @@ func (bc *BandwidthController) GetTargetLayer(trackID string) string {
return LayerHigh
}

// UpdateBitrate updates the bandwidth estimate
// UpdateBitrate updates the bandwidth estimate (for backwards compatibility)
func (bc *BandwidthController) UpdateBitrate(receivedBytes uint64, duration time.Duration, lossRate float64) {
bc.estimator.Update(receivedBytes, duration, lossRate)
}

// UpdateBitrateWithDelay updates the bandwidth estimate with delay-based estimation
func (bc *BandwidthController) UpdateBitrateWithDelay(receivedBytes uint64, duration time.Duration, lossRate float64, delayEstimate uint64) {
oldEstimate := bc.estimator.GetEstimate()

// Update loss-based estimate
bc.estimator.Update(receivedBytes, duration, lossRate)

// Set delay-based estimate from TWCCReceiver
if delayEstimate > 0 {
bc.estimator.SetDelayBasedEstimate(delayEstimate)
}

newEstimate := bc.estimator.GetEstimate()
if newEstimate != oldEstimate {
slog.Debug("[BandwidthController] Bitrate updated",
slog.Uint64("from", oldEstimate),
slog.Uint64("to", newEstimate),
slog.Float64("lossRate", lossRate),
slog.Uint64("delayEstimate", delayEstimate))
}
}

// onBitrateUpdate handles bitrate updates from the estimator
func (bc *BandwidthController) onBitrateUpdate(bitrate uint64) {
bc.mu.Lock()
Expand All @@ -163,12 +191,19 @@ func (bc *BandwidthController) recalculateAllocations() {
// Calculate per-track budget
perTrackBudget := bc.availableBitrate / uint64(numTracks)

now := time.Now()

// Allocate layers based on budget
for trackID, alloc := range bc.allocations {
if alloc.Paused {
continue
}

// Skip auto control if manual override is active
if now.Before(alloc.ManualOverrideUntil) {
continue
}

newLayer := bc.selectLayerForBudget(perTrackBudget, alloc.MaxLayer)

if newLayer != alloc.TargetLayer {
Expand Down Expand Up @@ -236,14 +271,16 @@ func (bc *BandwidthController) Close() {

// LayerSelector handles layer selection for a single subscriber
type LayerSelector struct {
trackID string
currentLayer string
targetLayer string
pendingSwitch bool
lastSwitchTime time.Time
switchCooldown time.Duration
onSwitch func(layer string)
mu sync.RWMutex
trackID string
currentLayer string
targetLayer string
pendingSwitch bool
lastSwitchTime time.Time
switchCooldown time.Duration
onSwitch func(layer string)
lastKeyframeReqest time.Time
keyframeInterval time.Duration
mu sync.RWMutex
}

// NewLayerSelector creates a new layer selector
Expand All @@ -252,10 +289,11 @@ func NewLayerSelector(trackID string, initialLayer string) *LayerSelector {
initialLayer = LayerHigh
}
return &LayerSelector{
trackID: trackID,
currentLayer: initialLayer,
targetLayer: initialLayer,
switchCooldown: 2 * time.Second, // Minimum time between switches
trackID: trackID,
currentLayer: initialLayer,
targetLayer: initialLayer,
switchCooldown: 2 * time.Second, // Minimum time between switches
keyframeInterval: 500 * time.Millisecond, // Retry keyframe request interval
}
}

Expand Down Expand Up @@ -351,3 +389,23 @@ func (ls *LayerSelector) ForceSwitch(layer string) {
slog.String("to", ls.currentLayer),
)
}

// NeedsKeyframeRequest returns true if a keyframe request should be sent.
// This is used for retrying keyframe requests when switching layers.
func (ls *LayerSelector) NeedsKeyframeRequest() bool {
ls.mu.RLock()
defer ls.mu.RUnlock()

if !ls.pendingSwitch || ls.currentLayer == ls.targetLayer {
return false
}

return time.Since(ls.lastKeyframeReqest) >= ls.keyframeInterval
}

// MarkKeyframeRequested records that a keyframe request was sent.
func (ls *LayerSelector) MarkKeyframeRequested() {
ls.mu.Lock()
defer ls.mu.Unlock()
ls.lastKeyframeReqest = time.Now()
}
Loading