diff --git a/README.md b/README.md index 2e6bba5..78aa2f5 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,31 @@ go build -o choice ./choice ``` +### Testing + +```bash +go test ./... +``` + +### Benchmarks + +```bash +# 全ベンチマーク実行 +go test -bench=. -benchmem ./pkg/buffer/... + +# 特定のベンチマークのみ +go test -bench=BenchmarkBufferWrite -benchmem ./pkg/buffer/... + +# 複数回実行して安定性を確認 +go test -bench=. -benchmem -count=5 ./pkg/buffer/... + +# CPUプロファイル付き +go test -bench=BenchmarkBufferWrite -benchmem -cpuprofile=cpu.prof ./pkg/buffer/... + +# メモリプロファイル付き +go test -bench=BenchmarkBufferWrite -benchmem -memprofile=mem.prof ./pkg/buffer/... +``` + ## License This project is licensed under the MIT License. diff --git a/pkg/buffer/bucket_benchmark_test.go b/pkg/buffer/bucket_benchmark_test.go new file mode 100644 index 0000000..236d80e --- /dev/null +++ b/pkg/buffer/bucket_benchmark_test.go @@ -0,0 +1,132 @@ +package buffer + +import ( + "testing" + + "github.com/pion/rtp" +) + +func newTestBucket() *Bucket { + buf := make([]byte, minBufferSize) + return NewBucket(&buf) +} + +func createTestRTPPacketForBucket(seqNo uint16, timestamp uint32) []byte { + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Padding: false, + Extension: false, + Marker: false, + PayloadType: 96, + SequenceNumber: seqNo, + Timestamp: timestamp, + SSRC: 12345, + }, + Payload: make([]byte, 100), + } + data, _ := pkt.Marshal() + return data +} + +// ベンチマーク: Bucket.AddPacket (連続書き込み) +func BenchmarkBucketAddPacket(b *testing.B) { + bucket := newTestBucket() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + pkt := createTestRTPPacketForBucket(uint16(i), uint32(i)*100) + _, _ = bucket.AddPacket(pkt, uint16(i), true) + } +} + +// ベンチマーク: Bucket.AddPacket (並列) +func BenchmarkBucketAddPacketParallel(b *testing.B) { + bucket := newTestBucket() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + seqNo := uint16(0) + for pb.Next() { + pkt := createTestRTPPacketForBucket(seqNo, uint32(seqNo)*100) + _, _ = bucket.AddPacket(pkt, seqNo, true) + seqNo++ + } + }) +} + +// ベンチマーク: Bucket.GetPacket +func BenchmarkBucketGetPacket(b *testing.B) { + bucket := newTestBucket() + + // 事前にパケットを書き込む + for i := range 1000 { + pkt := createTestRTPPacketForBucket(uint16(i), uint32(i)*100) + _, _ = bucket.AddPacket(pkt, uint16(i), true) + } + + buf := make([]byte, maxPktSize) + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + sn := uint16(i % 1000) + _, _ = bucket.GetPacket(buf, sn) + } +} + +// ベンチマーク: Bucket.AddPacket (非連続シーケンス番号) +func BenchmarkBucketAddPacketWithGap(b *testing.B) { + bucket := newTestBucket() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + // 2つおきにシーケンス番号を進める(パケットロスをシミュレート) + seqNo := uint16(i * 2) + pkt := createTestRTPPacketForBucket(seqNo, uint32(seqNo)*100) + _, _ = bucket.AddPacket(pkt, seqNo, true) + } +} + +// ベンチマーク: Bucket.AddPacket (リオーダー) +func BenchmarkBucketAddPacketReorder(b *testing.B) { + bucket := newTestBucket() + + // 事前に連続パケットを書き込む + for i := range 100 { + pkt := createTestRTPPacketForBucket(uint16(i), uint32(i)*100) + _, _ = bucket.AddPacket(pkt, uint16(i), true) + } + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + // 過去のシーケンス番号で再送パケットをシミュレート + seqNo := uint16(50 + (i % 30)) + pkt := createTestRTPPacketForBucket(seqNo, uint32(seqNo)*100) + _, _ = bucket.AddPacket(pkt, seqNo, false) + } +} + +// ベンチマーク: Bucket 作成 +func BenchmarkBucketNew(b *testing.B) { + b.ResetTimer() + for b.Loop() { + buf := make([]byte, minBufferSize) + _ = NewBucket(&buf) + } +} + +// ベンチマーク: AddPacket と GetPacket の組み合わせ +func BenchmarkBucketAddAndGet(b *testing.B) { + bucket := newTestBucket() + getBuf := make([]byte, maxPktSize) + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + seqNo := uint16(i) + pkt := createTestRTPPacketForBucket(seqNo, uint32(seqNo)*100) + _, _ = bucket.AddPacket(pkt, seqNo, true) + + // 直前に追加したパケットを取得 + _, _ = bucket.GetPacket(getBuf, seqNo) + } +} diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go index 6613d5d..6d62da5 100644 --- a/pkg/buffer/buffer.go +++ b/pkg/buffer/buffer.go @@ -76,7 +76,7 @@ type Buffer struct { twccExt uint8 audioExt uint8 bound bool - closed atomicBool + closed atomic.Bool mime string // パケット到着通知用 @@ -233,7 +233,7 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) { b.mu.Lock() defer b.mu.Unlock() - if b.closed.get() { + if b.closed.Load() { err = io.EOF return } @@ -259,7 +259,7 @@ func (b *Buffer) Read(buff []byte) (int, error) { defer b.mu.Unlock() for { - if b.closed.get() { + if b.closed.Load() { return 0, io.EOF } @@ -283,7 +283,7 @@ func (b *Buffer) ReadExtended() (*ExtPacket, error) { defer b.mu.Unlock() for { - if b.closed.get() { + if b.closed.Load() { return nil, io.EOF } @@ -308,7 +308,7 @@ func (b *Buffer) Close() error { b.audioPool.Put(&b.bucket.buf) } - b.closed.set(true) + b.closed.Store(true) // 待機中のgoroutineを起こす b.extPacketNotifier.broadcast() @@ -645,7 +645,7 @@ func (b *Buffer) GetPacket(buff []byte, sn uint16) (int, error) { b.mu.Lock() defer b.mu.Unlock() - if b.closed.get() { + if b.closed.Load() { return 0, io.EOF } diff --git a/pkg/buffer/buffer_benchmark_test.go b/pkg/buffer/buffer_benchmark_test.go new file mode 100644 index 0000000..9a30673 --- /dev/null +++ b/pkg/buffer/buffer_benchmark_test.go @@ -0,0 +1,231 @@ +package buffer + +import ( + "sync" + "testing" + + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/pion/webrtc/v4" +) + +// テスト用のヘルパー関数 +func newTestBuffer() *Buffer { + videoPool := &sync.Pool{ + New: func() any { + b := make([]byte, minBufferSize) + return &b + }, + } + audioPool := &sync.Pool{ + New: func() any { + b := make([]byte, minBufferSize) + return &b + }, + } + return NewBuffer(12345, videoPool, audioPool) +} + +func bindTestBuffer(b *Buffer, codecType string) { + var mimeType string + switch codecType { + case "video": + mimeType = "video/VP8" + case "audio": + mimeType = "audio/opus" + default: + mimeType = "video/VP8" + } + + // コールバックを設定 + b.OnFeedback(func(fb []rtcp.Packet) {}) + b.OnTransportWideCC(func(sn uint16, timeNS int64, marker bool) {}) + b.OnAudioLevel(func(level uint8) {}) + + params := webrtc.RTPParameters{ + Codecs: []webrtc.RTPCodecParameters{ + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: mimeType, + ClockRate: 90000, + }, + }, + }, + } + b.Bind(params, Options{MaxBitRate: 1_000_000}) +} + +func createTestRTPPacket(seqNo uint16, timestamp uint32) []byte { + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Padding: false, + Extension: false, + Marker: false, + PayloadType: 96, + SequenceNumber: seqNo, + Timestamp: timestamp, + SSRC: 12345, + }, + Payload: make([]byte, 100), + } + data, _ := pkt.Marshal() + return data +} + +// ベンチマーク: Buffer.Write +func BenchmarkBufferWrite(b *testing.B) { + buf := newTestBuffer() + bindTestBuffer(buf, "video") + + pkt := createTestRTPPacket(1, 1000) + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + // シーケンス番号を更新してパケットを作成 + pkt[2] = byte(i >> 8) + pkt[3] = byte(i) + _, _ = buf.Write(pkt) + } +} + +// ベンチマーク: Buffer.Write (並列) +func BenchmarkBufferWriteParallel(b *testing.B) { + buf := newTestBuffer() + bindTestBuffer(buf, "video") + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + seqNo := uint16(0) + for pb.Next() { + pkt := createTestRTPPacket(seqNo, uint32(seqNo)*100) + _, _ = buf.Write(pkt) + seqNo++ + } + }) +} + +// ベンチマーク: Buffer.ReadExtended +// 並行してWriteとReadを行い、Read側の性能を計測 +func BenchmarkBufferReadExtended(b *testing.B) { + buf := newTestBuffer() + bindTestBuffer(buf, "video") + + // Writer goroutine: パケットを継続的に書き込む + stop := make(chan struct{}) + writerDone := make(chan struct{}) + go func() { + defer close(writerDone) + for i := 0; ; i++ { + select { + case <-stop: + return + default: + pkt := createTestRTPPacket(uint16(i), uint32(i)*100) + _, _ = buf.Write(pkt) + } + } + }() + + b.ResetTimer() + for b.Loop() { + _, err := buf.ReadExtended() + if err != nil { + b.Fatal(err) + } + } + b.StopTimer() + + close(stop) + _ = buf.Close() + <-writerDone +} + +// ベンチマーク: Write と ReadExtended の組み合わせ +func BenchmarkBufferWriteRead(b *testing.B) { + buf := newTestBuffer() + bindTestBuffer(buf, "video") + + readCount := 0 + done := make(chan struct{}) + go func() { + defer close(done) + for { + _, err := buf.ReadExtended() + if err != nil { + return + } + readCount++ + if readCount >= b.N { + return + } + } + }() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + pkt := createTestRTPPacket(uint16(i), uint32(i)*100) + _, _ = buf.Write(pkt) + } + b.StopTimer() + + buf.Close() + <-done +} + +// ベンチマーク: GetPacket +func BenchmarkBufferGetPacket(b *testing.B) { + buf := newTestBuffer() + bindTestBuffer(buf, "video") + + // 事前にパケットを書き込む + for i := range 1000 { + pkt := createTestRTPPacket(uint16(i), uint32(i)*100) + _, _ = buf.Write(pkt) + } + + buff := make([]byte, 1500) + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + sn := uint16(i % 1000) + _, _ = buf.GetPacket(buff, sn) + } +} + +// ベンチマーク: GetStats +func BenchmarkBufferGetStats(b *testing.B) { + buf := newTestBuffer() + bindTestBuffer(buf, "video") + + // 事前にパケットを書き込む + for i := range 100 { + pkt := createTestRTPPacket(uint16(i), uint32(i)*100) + _, _ = buf.Write(pkt) + } + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + buf.GetStats() + } +} + +// ベンチマーク: Bind +func BenchmarkBufferBind(b *testing.B) { + params := webrtc.RTPParameters{ + Codecs: []webrtc.RTPCodecParameters{ + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: "video/VP8", + ClockRate: 90000, + }, + }, + }, + } + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + buf := newTestBuffer() + buf.Bind(params, Options{MaxBitRate: 1_000_000}) + } +} diff --git a/pkg/buffer/helpers.go b/pkg/buffer/helpers.go index 25d25b6..85de4f2 100644 --- a/pkg/buffer/helpers.go +++ b/pkg/buffer/helpers.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "errors" "log/slog" - "sync/atomic" ) var ( @@ -12,20 +11,6 @@ var ( errNilPacket = errors.New("invalid nil packet") ) -type atomicBool int32 - -func (a *atomicBool) set(value bool) { - var i int32 - if value { - i = 1 - } - atomic.StoreInt32((*int32)(a), i) -} - -func (a *atomicBool) get() bool { - return atomic.LoadInt32((*int32)(a)) != 0 -} - // VP8 はVP8 RTPペイロード記述子をパースする(RFC 7741) type VP8 struct { TemporalSupported bool diff --git a/pkg/buffer/helpers_benchmark_test.go b/pkg/buffer/helpers_benchmark_test.go new file mode 100644 index 0000000..6512970 --- /dev/null +++ b/pkg/buffer/helpers_benchmark_test.go @@ -0,0 +1,152 @@ +package buffer + +import ( + "testing" +) + +// VP8 ペイロードのテストデータ +var ( + // シンプルなVP8ペイロード(拡張なし) + vp8PayloadSimple = []byte{0x10, 0x00, 0x01, 0x02, 0x03} + + // 拡張付きVP8ペイロード(PictureID, TL0PICIDX, TID) + vp8PayloadExtended = []byte{ + 0x90, // X=1, S=1 + 0xE0, // I=1, L=1, T=1 + 0x81, 0x23, // PictureID (M=1, 15-bit) + 0x45, // TL0PICIDX + 0x80, // TID + 0x00, // キーフレームビット + } + + // キーフレームVP8ペイロード + vp8PayloadKeyframe = []byte{0x10, 0x00} + + // 非キーフレームVP8ペイロード + vp8PayloadNonKeyframe = []byte{0x10, 0x01} +) + +// H.264 ペイロードのテストデータ +var ( + // 単一NALUキーフレーム (IDR, type=5) + h264SingleNALUKeyframe = []byte{0x05, 0x00, 0x01, 0x02} + + // 単一NALU非キーフレーム (non-IDR, type=1) + h264SingleNALUNonKeyframe = []byte{0x01, 0x00, 0x01, 0x02} + + // STAP-A (type=24) キーフレーム含む + h264STAPAKeyframe = []byte{ + 0x18, // STAP-A (24) + 0x00, 0x04, // length + 0x07, 0x00, 0x01, 0x02, // SPS (type=7) + 0x00, 0x03, // length + 0x08, 0x00, 0x01, // PPS (type=8) + } + + // STAP-A (type=24) 非キーフレーム + h264STAPANonKeyframe = []byte{ + 0x18, // STAP-A (24) + 0x00, 0x04, // length + 0x01, 0x00, 0x01, 0x02, // non-IDR (type=1) + } + + // FU-A (type=28) キーフレーム開始 + h264FUAKeyframeStart = []byte{0x1C, 0x87, 0x00, 0x01} + + // FU-A (type=28) 非キーフレーム + h264FUANonKeyframe = []byte{0x1C, 0x81, 0x00, 0x01} +) + +// ベンチマーク: VP8.Unmarshal (シンプル) +func BenchmarkVP8UnmarshalSimple(b *testing.B) { + var vp8 VP8 + b.ResetTimer() + for b.Loop() { + _ = vp8.Unmarshal(vp8PayloadSimple) + } +} + +// ベンチマーク: VP8.Unmarshal (拡張付き) +func BenchmarkVP8UnmarshalExtended(b *testing.B) { + var vp8 VP8 + b.ResetTimer() + for b.Loop() { + _ = vp8.Unmarshal(vp8PayloadExtended) + } +} + +// ベンチマーク: VP8.Unmarshal (キーフレーム) +func BenchmarkVP8UnmarshalKeyframe(b *testing.B) { + var vp8 VP8 + b.ResetTimer() + for b.Loop() { + _ = vp8.Unmarshal(vp8PayloadKeyframe) + } +} + +// ベンチマーク: VP8.Unmarshal (並列) +func BenchmarkVP8UnmarshalParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + var vp8 VP8 + for pb.Next() { + _ = vp8.Unmarshal(vp8PayloadExtended) + } + }) +} + +// ベンチマーク: isH264Keyframe (単一NALUキーフレーム) +func BenchmarkIsH264KeyframeSingleNALU(b *testing.B) { + b.ResetTimer() + for b.Loop() { + _ = isH264Keyframe(h264SingleNALUKeyframe) + } +} + +// ベンチマーク: isH264Keyframe (単一NALU非キーフレーム) +func BenchmarkIsH264KeyframeSingleNALUNonKeyframe(b *testing.B) { + b.ResetTimer() + for b.Loop() { + _ = isH264Keyframe(h264SingleNALUNonKeyframe) + } +} + +// ベンチマーク: isH264Keyframe (STAP-A キーフレーム) +func BenchmarkIsH264KeyframeSTAPA(b *testing.B) { + b.ResetTimer() + for b.Loop() { + _ = isH264Keyframe(h264STAPAKeyframe) + } +} + +// ベンチマーク: isH264Keyframe (STAP-A 非キーフレーム) +func BenchmarkIsH264KeyframeSTAPANonKeyframe(b *testing.B) { + b.ResetTimer() + for b.Loop() { + _ = isH264Keyframe(h264STAPANonKeyframe) + } +} + +// ベンチマーク: isH264Keyframe (FU-A キーフレーム開始) +func BenchmarkIsH264KeyframeFUA(b *testing.B) { + b.ResetTimer() + for b.Loop() { + _ = isH264Keyframe(h264FUAKeyframeStart) + } +} + +// ベンチマーク: isH264Keyframe (FU-A 非キーフレーム) +func BenchmarkIsH264KeyframeFUANonKeyframe(b *testing.B) { + b.ResetTimer() + for b.Loop() { + _ = isH264Keyframe(h264FUANonKeyframe) + } +} + +// ベンチマーク: isH264Keyframe (並列) +func BenchmarkIsH264KeyframeParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _ = isH264Keyframe(h264STAPAKeyframe) + } + }) +} diff --git a/pkg/buffer/nack_benchmark_test.go b/pkg/buffer/nack_benchmark_test.go new file mode 100644 index 0000000..4855032 --- /dev/null +++ b/pkg/buffer/nack_benchmark_test.go @@ -0,0 +1,198 @@ +package buffer + +import ( + "testing" +) + +// ベンチマーク: nackQueue.push (連続) +func BenchmarkNackQueuePush(b *testing.B) { + q := newNACKQueue() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + q.push(uint32(i)) + } +} + +// ベンチマーク: nackQueue.push (ランダム順) +func BenchmarkNackQueuePushRandom(b *testing.B) { + // 疑似ランダムなシーケンス + sequence := []uint32{42, 7, 99, 23, 56, 12, 88, 34, 67, 5} + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + q := newNACKQueue() + for _, sn := range sequence { + q.push(sn + uint32(i)*100) + } + } +} + +// ベンチマーク: nackQueue.remove +func BenchmarkNackQueueRemove(b *testing.B) { + b.ResetTimer() + for i := 0; b.Loop(); i++ { + q := newNACKQueue() + // キューを埋める + for j := range 50 { + q.push(uint32(j)) + } + // 半分を削除 + for j := range 25 { + q.remove(uint32(j * 2)) + } + } +} + +// ベンチマーク: nackQueue.find +func BenchmarkNackQueueFind(b *testing.B) { + q := newNACKQueue() + for i := range 50 { + q.push(uint32(i * 2)) // 偶数のみ + } + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + _ = q.find(uint32(i % 100)) + } +} + +// ベンチマーク: nackQueue.pairs (少数のNACK) +func BenchmarkNackQueuePairsSmall(b *testing.B) { + b.ResetTimer() + for b.Loop() { + q := newNACKQueue() + for i := range 10 { + q.push(uint32(i)) + } + _, _ = q.pairs(100) + } +} + +// ベンチマーク: nackQueue.pairs (多数のNACK) +func BenchmarkNackQueuePairsLarge(b *testing.B) { + b.ResetTimer() + for b.Loop() { + q := newNACKQueue() + for i := range 80 { + q.push(uint32(i)) + } + _, _ = q.pairs(200) + } +} + +// ベンチマーク: nackQueue.pairs (連続シーケンス番号) +func BenchmarkNackQueuePairsConsecutive(b *testing.B) { + b.ResetTimer() + for b.Loop() { + q := newNACKQueue() + // 連続した17パケットのロス(1つのNACKペアに収まらない) + for i := range 17 { + q.push(uint32(i + 10)) + } + _, _ = q.pairs(100) + } +} + +// ベンチマーク: nackQueue.pairs (散発的なロス) +func BenchmarkNackQueuePairsSparse(b *testing.B) { + b.ResetTimer() + for b.Loop() { + q := newNACKQueue() + // 散発的なパケットロス + for _, sn := range []uint32{5, 15, 25, 35, 45, 55, 65, 75} { + q.push(sn) + } + _, _ = q.pairs(100) + } +} + +// ベンチマーク: nackQueue.pairs (キーフレーム要求発生) +func BenchmarkNackQueuePairsWithKeyframe(b *testing.B) { + b.ResetTimer() + for b.Loop() { + q := newNACKQueue() + // 3回以上NACKされたパケットを作成 + for i := range 10 { + q.push(uint32(i)) + } + // 複数回pairsを呼び出してnacked回数を増やす + _, _ = q.pairs(100) + _, _ = q.pairs(100) + _, _ = q.pairs(100) + _, _ = q.pairs(100) // キーフレーム要求が発生 + } +} + +// ベンチマーク: nackPairBuilder.add +func BenchmarkNackPairBuilderAdd(b *testing.B) { + b.ResetTimer() + for i := 0; b.Loop(); i++ { + builder := &nackPairBuilder{} + for j := range 20 { + builder.add(uint32(j)) + } + _ = builder.build() + } +} + +// ベンチマーク: nackPairBuilder (ビットマップ構築) +func BenchmarkNackPairBuilderBitmap(b *testing.B) { + b.ResetTimer() + for b.Loop() { + builder := &nackPairBuilder{} + // 連続した16パケット(1つのNACKペアで表現可能) + for j := range 16 { + builder.add(uint32(j + 100)) + } + _ = builder.build() + } +} + +// ベンチマーク: newNACKQueue +func BenchmarkNewNACKQueue(b *testing.B) { + b.ResetTimer() + for b.Loop() { + _ = newNACKQueue() + } +} + +// ベンチマーク: 現実的なシナリオ (5%パケットロス) +func BenchmarkNackQueueRealisticScenario(b *testing.B) { + b.ResetTimer() + for i := 0; b.Loop(); i++ { + q := newNACKQueue() + headSN := uint32(0) + + // 100パケット中5パケットロス + for j := range 100 { + sn := uint32(j + i*100) + if j%20 == 7 { // 5%ロス + q.push(sn) + } + headSN = sn + } + + _, _ = q.pairs(headSN) + } +} + +// ベンチマーク: push と remove の混合操作 +func BenchmarkNackQueueMixedOperations(b *testing.B) { + b.ResetTimer() + for i := 0; b.Loop(); i++ { + q := newNACKQueue() + base := uint32(i * 100) + + // push + for j := range 20 { + q.push(base + uint32(j)) + } + // remove (再送成功をシミュレート) + for j := range 10 { + q.remove(base + uint32(j*2)) + } + // pairs + _, _ = q.pairs(base + 100) + } +} diff --git a/pkg/buffer/rtcpreader.go b/pkg/buffer/rtcpreader.go index 5cc06d7..8cadf0b 100644 --- a/pkg/buffer/rtcpreader.go +++ b/pkg/buffer/rtcpreader.go @@ -2,14 +2,16 @@ package buffer import ( "io" + "sync" "sync/atomic" ) // RTCPReader は着信RTCPパケットの処理とコールバック通知を提供する type RTCPReader struct { + mu sync.RWMutex ssrc uint32 - closed atomicBool - onPacket atomic.Value // func([]byte) + closed atomic.Bool + onPacket func([]byte) onClose func() } @@ -20,12 +22,16 @@ func NewRTCPReader(ssrc uint32) *RTCPReader { } func (r *RTCPReader) Write(p []byte) (n int, err error) { - if r.closed.get() { + if r.closed.Load() { err = io.EOF return } - if f, ok := r.onPacket.Load().(func([]byte)); ok { + r.mu.RLock() + f := r.onPacket + r.mu.RUnlock() + + if f != nil { f(p) } @@ -37,13 +43,16 @@ func (r *RTCPReader) OnClose(fn func()) { } func (r *RTCPReader) Close() error { - r.closed.set(true) + r.closed.Store(true) r.onClose() return nil } func (r *RTCPReader) OnPacket(f func([]byte)) { - r.onPacket.Store(f) + r.mu.Lock() + defer r.mu.Unlock() + + r.onPacket = f } func (r *RTCPReader) Read(_ []byte) (n int, err error) { diff --git a/pkg/sfu/downtrack_benchmark_test.go b/pkg/sfu/downtrack_benchmark_test.go new file mode 100644 index 0000000..e969eb3 --- /dev/null +++ b/pkg/sfu/downtrack_benchmark_test.go @@ -0,0 +1,302 @@ +package sfu + +import ( + "testing" + + "github.com/HMasataka/choice/pkg/buffer" + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/pion/webrtc/v4" +) + +// mockReceiver はベンチマーク用のモックReceiver +type mockReceiver struct { + trackID string + streamID string +} + +func (m *mockReceiver) TrackID() string { return m.trackID } +func (m *mockReceiver) StreamID() string { return m.streamID } +func (m *mockReceiver) SSRC(_ int) uint32 { return 12345 } +func (m *mockReceiver) Codec() webrtc.RTPCodecParameters { return webrtc.RTPCodecParameters{} } +func (m *mockReceiver) Kind() webrtc.RTPCodecType { return webrtc.RTPCodecTypeVideo } +func (m *mockReceiver) AddUpTrack(_ *webrtc.TrackRemote, _ *buffer.Buffer, _ bool) { +} +func (m *mockReceiver) AddDownTrack(_ DownTrack, _ bool) {} +func (m *mockReceiver) SwitchDownTrack(_ DownTrack, _ int) error { return nil } +func (m *mockReceiver) GetBitrate() [3]uint64 { return [3]uint64{100000, 500000, 1000000} } +func (m *mockReceiver) GetMaxTemporalLayer() [3]int32 { return [3]int32{2, 2, 2} } +func (m *mockReceiver) RetransmitPackets(_ DownTrack, _ []packetMeta) error { return nil } +func (m *mockReceiver) DeleteDownTrack(_ int, _ string) {} +func (m *mockReceiver) OnCloseHandler(_ func()) {} +func (m *mockReceiver) SendRTCP(_ []rtcp.Packet) {} +func (m *mockReceiver) SetRTCPCh(_ chan []rtcp.Packet) {} +func (m *mockReceiver) GetSenderReportTime(_ int) (uint32, uint64) { return 0, 0 } +func (m *mockReceiver) ReadRTP(_ []byte, _ int) (int, error) { return 0, nil } +func (m *mockReceiver) SetTrackMeta(_, _ string) {} +func (m *mockReceiver) GetTrackMeta() (string, string) { return "", "" } + +// mockWriteStream はベンチマーク用のモックWriteStream +type mockWriteStream struct{} + +func (m *mockWriteStream) WriteRTP(_ *rtp.Header, _ []byte) (int, error) { return 0, nil } +func (m *mockWriteStream) Write(_ []byte) (int, error) { return 0, nil } + +func newTestDownTrack() *downTrack { + receiver := &mockReceiver{ + trackID: "test-track", + streamID: "test-stream", + } + + bf := buffer.NewBufferFactory(100) + + dt, _ := NewDownTrack( + webrtc.RTPCodecCapability{ + MimeType: "video/VP8", + ClockRate: 90000, + }, + receiver, + bf, + "test-peer", + 500, + ) + + dt.writeStream = &mockWriteStream{} + dt.ssrc = 12345 + dt.payloadType = 96 + dt.mime = "video/vp8" + dt.sequencer = newSequencer(500) + dt.enabled.Store(true) + dt.bound.Store(true) + dt.trackType = SimpleDownTrack + + return dt +} + +func createTestExtPacket(seqNo uint16, timestamp uint32, keyframe bool) *buffer.ExtPacket { + return &buffer.ExtPacket{ + Head: true, + KeyFrame: keyframe, + Packet: rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + SequenceNumber: seqNo, + Timestamp: timestamp, + SSRC: 12345, + }, + Payload: make([]byte, 100), + }, + } +} + +// ベンチマーク: NewDownTrack +func BenchmarkNewDownTrack(b *testing.B) { + receiver := &mockReceiver{ + trackID: "test-track", + streamID: "test-stream", + } + + bf := buffer.NewBufferFactory(100) + + codec := webrtc.RTPCodecCapability{ + MimeType: "video/VP8", + ClockRate: 90000, + } + + b.ResetTimer() + for b.Loop() { + _, _ = NewDownTrack(codec, receiver, bf, "test-peer", 500) + } +} + +// ベンチマーク: writeSimpleRTP +func BenchmarkDownTrackWriteSimpleRTP(b *testing.B) { + dt := newTestDownTrack() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + pkt := createTestExtPacket(uint16(i), uint32(i)*3000, true) + _ = dt.writeSimpleRTP(pkt) + } +} + +// ベンチマーク: writeSimpleRTP (再同期あり) +func BenchmarkDownTrackWriteSimpleRTPWithReSync(b *testing.B) { + dt := newTestDownTrack() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + if i%100 == 0 { + dt.reSync.Store(true) + } + pkt := createTestExtPacket(uint16(i), uint32(i)*3000, true) + _ = dt.writeSimpleRTP(pkt) + } +} + +// ベンチマーク: writeSimulcastRTP +func BenchmarkDownTrackWriteSimulcastRTP(b *testing.B) { + dt := newTestDownTrack() + dt.trackType = SimulcastDownTrack + dt.lastSSRC = 12345 + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + pkt := createTestExtPacket(uint16(i), uint32(i)*3000, true) + pkt.Arrival = int64(i) * 1000000 + _ = dt.writeSimulcastRTP(pkt, 0) + } +} + +// ベンチマーク: UpdateStats +func BenchmarkDownTrackUpdateStats(b *testing.B) { + dt := newTestDownTrack() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + dt.UpdateStats(uint32(100 + i%50)) + } +} + +// ベンチマーク: UpdateStats (並列) +func BenchmarkDownTrackUpdateStatsParallel(b *testing.B) { + dt := newTestDownTrack() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + dt.UpdateStats(uint32(100 + i%50)) + i++ + } + }) +} + +// ベンチマーク: CreateSourceDescriptionChunks +func BenchmarkDownTrackCreateSourceDescriptionChunks(b *testing.B) { + dt := newTestDownTrack() + dt.transceiver = &webrtc.RTPTransceiver{} + + b.ResetTimer() + for b.Loop() { + _ = dt.CreateSourceDescriptionChunks() + } +} + +// ベンチマーク: CreateSenderReport +func BenchmarkDownTrackCreateSenderReport(b *testing.B) { + dt := newTestDownTrack() + + b.ResetTimer() + for b.Loop() { + _ = dt.CreateSenderReport() + } +} + +// ベンチマーク: Kind +func BenchmarkDownTrackKind(b *testing.B) { + dt := newTestDownTrack() + + b.ResetTimer() + for b.Loop() { + _ = dt.Kind() + } +} + +// ベンチマーク: Enabled/Mute +func BenchmarkDownTrackEnabledMute(b *testing.B) { + dt := newTestDownTrack() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + _ = dt.Enabled() + if i%100 == 0 { + dt.Mute(true) + dt.Mute(false) + } + } +} + +// ベンチマーク: buildAdjustedHeader +func BenchmarkDownTrackBuildAdjustedHeader(b *testing.B) { + dt := newTestDownTrack() + hdr := rtp.Header{ + Version: 2, + PayloadType: 96, + SequenceNumber: 1000, + Timestamp: 3000000, + SSRC: 12345, + } + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + _ = dt.buildAdjustedHeader(hdr, uint16(i), uint32(i)*3000) + } +} + +// ベンチマーク: SwitchSpatialLayer +func BenchmarkDownTrackSwitchSpatialLayer(b *testing.B) { + dt := newTestDownTrack() + dt.trackType = SimulcastDownTrack + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + layer := int32(i % 3) + _ = dt.SwitchSpatialLayer(layer, false) + } +} + +// ベンチマーク: SwitchTemporalLayer +func BenchmarkDownTrackSwitchTemporalLayer(b *testing.B) { + dt := newTestDownTrack() + dt.trackType = SimulcastDownTrack + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + layer := int32(i % 3) + dt.SwitchTemporalLayer(layer, false) + } +} + +// ベンチマーク: CurrentSpatialLayer +func BenchmarkDownTrackCurrentSpatialLayer(b *testing.B) { + dt := newTestDownTrack() + + b.ResetTimer() + for b.Loop() { + _ = dt.CurrentSpatialLayer() + } +} + +// ベンチマーク: 現実的なシナリオ (30fps video streaming) +func BenchmarkDownTrackRealistic30fps(b *testing.B) { + dt := newTestDownTrack() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + // 30fpsで各フレーム約3パケット + seqNo := uint16(i) + ts := uint32(i/3) * 3000 + keyframe := i%90 == 0 // 3秒ごとにキーフレーム + + pkt := createTestExtPacket(seqNo, ts, keyframe) + _ = dt.writeSimpleRTP(pkt) + } +} + +// ベンチマーク: 高スループット (60fps video streaming) +func BenchmarkDownTrackHighThroughput60fps(b *testing.B) { + dt := newTestDownTrack() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + // 60fpsで各フレーム約5パケット + seqNo := uint16(i) + ts := uint32(i/5) * 1500 + keyframe := i%300 == 0 // 1秒ごとにキーフレーム + + pkt := createTestExtPacket(seqNo, ts, keyframe) + _ = dt.writeSimpleRTP(pkt) + } +} diff --git a/pkg/sfu/publisher.go b/pkg/sfu/publisher.go index 938c064..b195c5c 100644 --- a/pkg/sfu/publisher.go +++ b/pkg/sfu/publisher.go @@ -130,10 +130,16 @@ func NewPublisher(userID string, session Session, cfg *WebRTCTransportConfig) (* return p, nil } +// relayPeer は他のSFUサーバへメディアを転送するためのリレー接続を管理する。 +// SFUのカスケード構成(複数SFU間でのメディア共有)や、録画・文字起こしなどの外部サービスへのメディア転送に使用される。 type relayPeer struct { - peer *relay.Peer - dataChannels []*webrtc.DataChannel - withSRReports bool + // peer はリモートSFUとのWebRTC接続を管理する + peer *relay.Peer + // dataChannels はリレー経由で共有されるDataChannelのリスト + dataChannels []*webrtc.DataChannel + // withSRReports がtrueの場合、定期的にSenderReportをリレーピアに送信する + withSRReports bool + // relayFanOutDataChannels がtrueの場合、ファンアウトDataChannelをリレーに転送する relayFanOutDataChannels bool } diff --git a/pkg/sfu/sequencer_benchmark_test.go b/pkg/sfu/sequencer_benchmark_test.go new file mode 100644 index 0000000..f0bfcbe --- /dev/null +++ b/pkg/sfu/sequencer_benchmark_test.go @@ -0,0 +1,204 @@ +package sfu + +import ( + "testing" +) + +// ベンチマーク: sequencer.push (連続) +func BenchmarkSequencerPush(b *testing.B) { + seq := newSequencer(500) + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + seq.push(uint16(i), uint16(i), uint32(i)*3000, 0, true) + } +} + +// ベンチマーク: sequencer.push (ギャップあり) +func BenchmarkSequencerPushWithGap(b *testing.B) { + seq := newSequencer(500) + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + // 2パケットおきにギャップ + sn := uint16(i * 3) + seq.push(sn, sn, uint32(sn)*3000, 0, true) + } +} + +// ベンチマーク: sequencer.push (遅延パケット) +func BenchmarkSequencerPushLate(b *testing.B) { + seq := newSequencer(500) + + // 先にパケットを追加 + for i := range 100 { + seq.push(uint16(i), uint16(i), uint32(i)*3000, 0, true) + } + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + // 遅延パケット(過去のシーケンス番号) + sn := uint16(50 + (i % 40)) + seq.push(sn, sn, uint32(sn)*3000, 0, false) + } +} + +// ベンチマーク: sequencer.push (並列) +func BenchmarkSequencerPushParallel(b *testing.B) { + seq := newSequencer(500) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + seq.push(uint16(i), uint16(i), uint32(i)*3000, 0, true) + i++ + } + }) +} + +// ベンチマーク: sequencer.getSeqNoPairs (少数) +func BenchmarkSequencerGetSeqNoPairsSmall(b *testing.B) { + seq := newSequencer(500) + + // パケットを追加 + for i := range 200 { + seq.push(uint16(i), uint16(i), uint32(i)*3000, 0, true) + } + + seqNos := []uint16{50, 60, 70, 80, 90} + + b.ResetTimer() + for b.Loop() { + _ = seq.getSeqNoPairs(seqNos) + } +} + +// ベンチマーク: sequencer.getSeqNoPairs (最大バッチ) +func BenchmarkSequencerGetSeqNoPairsMaxBatch(b *testing.B) { + seq := newSequencer(500) + + // パケットを追加 + for i := range 200 { + seq.push(uint16(i), uint16(i), uint32(i)*3000, 0, true) + } + + // maxNackBatch (17) 個のシーケンス番号 + seqNos := make([]uint16, 17) + for i := range 17 { + seqNos[i] = uint16(50 + i*5) + } + + b.ResetTimer() + for b.Loop() { + _ = seq.getSeqNoPairs(seqNos) + } +} + +// ベンチマーク: sequencer.getSeqNoPairs (存在しないSN) +func BenchmarkSequencerGetSeqNoPairsMissing(b *testing.B) { + seq := newSequencer(500) + + // パケットを追加 + for i := range 200 { + seq.push(uint16(i), uint16(i), uint32(i)*3000, 0, true) + } + + // 存在しないシーケンス番号 + seqNos := []uint16{1000, 1001, 1002, 1003, 1004} + + b.ResetTimer() + for b.Loop() { + _ = seq.getSeqNoPairs(seqNos) + } +} + +// ベンチマーク: packetMeta VP8ペイロード操作 +func BenchmarkPacketMetaVP8(b *testing.B) { + b.ResetTimer() + for i := 0; b.Loop(); i++ { + pm := &packetMeta{} + pm.setVP8PayloadMeta(uint8(i%256), uint16(i)) + _, _ = pm.getVP8PayloadMeta() + } +} + +// ベンチマーク: newSequencer +func BenchmarkNewSequencer(b *testing.B) { + b.ResetTimer() + for b.Loop() { + _ = newSequencer(500) + } +} + +// ベンチマーク: newSequencer (大きいバッファ) +func BenchmarkNewSequencerLarge(b *testing.B) { + b.ResetTimer() + for b.Loop() { + _ = newSequencer(2000) + } +} + +// ベンチマーク: 現実的なシナリオ (30fps video) +func BenchmarkSequencerRealistic30fps(b *testing.B) { + seq := newSequencer(500) + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + // 30fpsで各フレーム約3パケット + sn := uint16(i) + ts := uint32(i/3) * 3000 // 90kHz clock, 30fps + layer := uint8(0) + pm := seq.push(sn, sn, ts, layer, true) + if pm != nil { + pm.setVP8PayloadMeta(uint8(i%256), uint16(i%32768)) + } + } +} + +// ベンチマーク: NACKシナリオ (5%パケットロス) +func BenchmarkSequencerNACKScenario(b *testing.B) { + b.ResetTimer() + for b.Loop() { + seq := newSequencer(500) + + // 100パケット追加 + for i := range 100 { + seq.push(uint16(i), uint16(i), uint32(i)*3000, 0, true) + } + + // 5%のパケットロス(NACK要求をシミュレート) + lostPackets := []uint16{5, 23, 47, 68, 91} + _ = seq.getSeqNoPairs(lostPackets) + } +} + +// ベンチマーク: シーケンス番号ラップアラウンド +func BenchmarkSequencerWrapAround(b *testing.B) { + seq := newSequencer(500) + + // 最大値付近から開始 + startSN := uint16(65530) + for i := range 100 { + sn := startSN + uint16(i) + seq.push(sn, sn, uint32(i)*3000, 0, true) + } + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + sn := startSN + uint16(100+i) + seq.push(sn, sn, uint32(100+i)*3000, 0, true) + } +} + +// ベンチマーク: 複数レイヤー (シミュルキャスト) +func BenchmarkSequencerMultiLayer(b *testing.B) { + seq := newSequencer(500) + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + sn := uint16(i) + layer := uint8(i % 3) // 3レイヤー + seq.push(sn, sn, uint32(i)*3000, layer, true) + } +} diff --git a/pkg/twcc/twcc_benchmark_test.go b/pkg/twcc/twcc_benchmark_test.go new file mode 100644 index 0000000..cea6ac0 --- /dev/null +++ b/pkg/twcc/twcc_benchmark_test.go @@ -0,0 +1,168 @@ +package twcc + +import ( + "testing" + + "github.com/pion/rtcp" +) + +func newTestResponder() *Responder { + r := NewTransportWideCCResponder(12345) + r.OnFeedback(func(p rtcp.RawPacket) {}) + return r +} + +// ベンチマーク: Responder.Push (単一パケット) +func BenchmarkResponderPush(b *testing.B) { + r := newTestResponder() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + r.Push(uint16(i), int64(i)*1000000, false) + } +} + +// ベンチマーク: Responder.Push (マーカービット付き) +func BenchmarkResponderPushWithMarker(b *testing.B) { + r := newTestResponder() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + // 30パケットごとにマーカービットを設定(フレーム境界をシミュレート) + marker := i%30 == 29 + r.Push(uint16(i), int64(i)*1000000, marker) + } +} + +// ベンチマーク: Responder.Push (並列、各goroutineで独立したResponder) +func BenchmarkResponderPushParallel(b *testing.B) { + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + r := newTestResponder() + i := 0 + for pb.Next() { + r.Push(uint16(i), int64(i)*1000000, false) + i++ + } + }) +} + +// ベンチマーク: Responder.Push (フィードバック生成あり) +// 21パケット以上で100ms経過でフィードバック送信 +func BenchmarkResponderPushWithFeedback(b *testing.B) { + r := newTestResponder() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + // 100ms間隔をシミュレート(tccReportDelta = 1e8) + timeNS := int64(i) * 5000000 // 5ms間隔 + r.Push(uint16(i), timeNS, false) + } +} + +// ベンチマーク: Responder.Push (パケットロスあり) +func BenchmarkResponderPushWithLoss(b *testing.B) { + r := newTestResponder() + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + // 10%のパケットロスをシミュレート + if i%10 == 5 { + continue + } + r.Push(uint16(i), int64(i)*1000000, false) + } +} + +// ベンチマーク: Responder.Push (シーケンス番号ラップアラウンド) +func BenchmarkResponderPushWithWrap(b *testing.B) { + r := newTestResponder() + + // シーケンス番号を最大値付近から開始 + startSN := uint16(65530) + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + sn := startSN + uint16(i) + r.Push(sn, int64(i)*1000000, false) + } +} + +// ベンチマーク: buildTransportCCPacket +func BenchmarkBuildTransportCCPacket(b *testing.B) { + b.ResetTimer() + for b.Loop() { + r := newTestResponder() + // フィードバック生成に十分なパケットを追加 + for i := range 50 { + r.Push(uint16(i), int64(i)*1000000, false) + } + // 直接呼び出し + r.mu.Lock() + _ = r.buildTransportCCPacket() + r.mu.Unlock() + } +} + +// ベンチマーク: buildPacketList (ソートとギャップ検出) +func BenchmarkBuildPacketList(b *testing.B) { + b.ResetTimer() + for b.Loop() { + r := newTestResponder() + // パケットを順不同で追加 + for _, sn := range []uint16{5, 2, 8, 1, 9, 3, 7, 4, 6, 0} { + r.extInfo = append(r.extInfo, rtpExtInfo{ + ExtTSN: uint32(sn), + Timestamp: int64(sn) * 1000, + }) + } + r.mu.Lock() + _ = r.buildPacketList() + r.mu.Unlock() + } +} + +// ベンチマーク: setNBitsOfUint16 +func BenchmarkSetNBitsOfUint16(b *testing.B) { + b.ResetTimer() + for i := 0; b.Loop(); i++ { + _ = setNBitsOfUint16(0, 2, 4, uint16(i%4)) + } +} + +// ベンチマーク: clampInt16 +func BenchmarkClampInt16(b *testing.B) { + values := []int64{0, 100, -100, 32767, -32768, 100000, -100000} + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + _ = clampInt16(values[i%len(values)]) + } +} + +// ベンチマーク: NewTransportWideCCResponder +func BenchmarkNewTransportWideCCResponder(b *testing.B) { + b.ResetTimer() + for b.Loop() { + r := NewTransportWideCCResponder(12345) + r.OnFeedback(func(p rtcp.RawPacket) {}) + } +} + +// ベンチマーク: 高スループットシナリオ (60fps video) +func BenchmarkResponderHighThroughput(b *testing.B) { + r := newTestResponder() + + // 60fpsで30パケット/フレームをシミュレート + packetsPerFrame := 30 + frameIntervalNS := int64(16666667) // ~60fps + + b.ResetTimer() + for i := 0; b.Loop(); i++ { + frame := i / packetsPerFrame + packetInFrame := i % packetsPerFrame + timeNS := int64(frame)*frameIntervalNS + int64(packetInFrame)*100000 + marker := packetInFrame == packetsPerFrame-1 + r.Push(uint16(i), timeNS, marker) + } +}