diff --git a/.gitignore b/.gitignore index 1f1d37a8..d1ba4d21 100644 --- a/.gitignore +++ b/.gitignore @@ -88,4 +88,4 @@ _build lib/generated_plugin_registrant.dart # Test files - ignore any binary files in testfiles directory -testfiles/*.bin +testfiles/*.bin \ No newline at end of file diff --git a/lib/src/core/pending_track_queue.dart b/lib/src/core/pending_track_queue.dart index 3450a9aa..87db9047 100644 --- a/lib/src/core/pending_track_queue.dart +++ b/lib/src/core/pending_track_queue.dart @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +import 'dart:math' as math; + import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; import 'package:meta/meta.dart'; @@ -26,26 +28,97 @@ typedef TrackExceptionEmitter = void Function(TrackSubscriptionExceptionEvent ev @internal class PendingTrackQueue { final int maxSize; - Duration ttl; + final int _maxPerParticipant; + Duration _queueTtl; + Duration _metadataTimeout; final TrackExceptionEmitter emitException; // keyed by participant sid final Map> _pending = {}; PendingTrackQueue({ - required this.ttl, + required Duration metadataTimeout, required this.emitException, this.maxSize = 100, - }); + }) : _metadataTimeout = metadataTimeout, + _queueTtl = _deriveQueueTtl(metadataTimeout), + // Keep any single participant from starving others by capping their share + // to roughly 25% of the total queue capacity (at least one slot). + _maxPerParticipant = math.max(1, (maxSize / 4).floor()); + + Duration get ttl => _queueTtl; + + Duration get metadataTimeout => _metadataTimeout; + + @visibleForTesting + PendingTrackQueueStats get stats => _buildStats(); + + @visibleForTesting + List debugPendingFor(String participantSid) => + List.unmodifiable(_pending[participantSid] ?? const []); - void updateTtl(Duration ttl) { - this.ttl = ttl; + bool get hasPending => _pending.values.any((entries) => entries.isNotEmpty); + + void updateTimeouts(Duration metadataTimeout) { + _metadataTimeout = metadataTimeout; + _queueTtl = _deriveQueueTtl(metadataTimeout); + refreshAll(reason: 'timeouts updated'); } - void clear() { + void clear({String reason = 'manual clear'}) { + final currentStats = stats; + if (currentStats.totalEntries > 0) { + logger.finer('Clearing pending track queue reason:$reason total:${currentStats.totalEntries}'); + } _pending.clear(); } + void refreshParticipant(String participantSid, {String reason = 'metadata progress'}) { + final entries = _pending[participantSid]; + if (entries == null || entries.isEmpty) return; + final newExpiry = _newExpiry(); + for (final entry in entries) { + entry.expiresAt = newExpiry; + } + logger.finer('Refreshed ${entries.length} pending tracks for participantSid:$participantSid reason:$reason'); + } + + void refreshAll({String reason = 'manual refresh'}) { + if (!hasPending) return; + final newExpiry = _newExpiry(); + var refreshed = 0; + _pending.forEach((sid, list) { + for (final entry in list) { + entry.expiresAt = newExpiry; + refreshed++; + } + }); + if (refreshed > 0) { + logger.finer('Refreshed ${refreshed} pending tracks reason:$reason'); + } + } + + void removeParticipant(String participantSid, {String reason = 'participant removed'}) { + final removed = _pending.remove(participantSid); + if (removed == null || removed.isEmpty) return; + final now = DateTime.now(); + final maxAge = removed + .map((entry) => now.difference(entry.enqueuedAt)) + .fold(Duration.zero, (acc, value) => value > acc ? value : acc); + logger.finer( + 'Removed ${removed.length} pending tracks sid:$participantSid reason:$reason maxAgeMs:${maxAge.inMilliseconds}', + ); + for (final item in removed) { + emitException( + TrackSubscriptionExceptionEvent( + participant: null, + sid: item.trackSid, + reason: TrackSubscribeFailReason.noParticipantFound, + ), + ); + } + } + void enqueue({ required rtc.MediaStreamTrack track, required rtc.MediaStream stream, @@ -68,68 +141,183 @@ class PendingTrackQueue { _removeExpired(); + final listForParticipant = _pending.putIfAbsent(participantSid, () => []); + if (listForParticipant.length >= _maxPerParticipant) { + _dropWithException( + participantSid: participantSid, + trackSid: trackSid, + reason: 'per-participant capacity reached ($_maxPerParticipant)', + ); + return; + } + final totalPending = _pending.values.fold(0, (sum, list) => sum + list.length); if (totalPending >= maxSize) { - final event = TrackSubscriptionExceptionEvent( - participant: null, - sid: trackSid, - reason: TrackSubscribeFailReason.noParticipantFound, + _dropWithException( + participantSid: participantSid, + trackSid: trackSid, + reason: 'global capacity reached ($maxSize)', ); - logger.severe('Pending track queue full, dropping trackSid:$trackSid participantSid:$participantSid'); - emitException(event); return; } - final expiresAt = DateTime.now().add(ttl); - logger.fine('Queueing pending trackSid:$trackSid participantSid:$participantSid until metadata is ready'); + final now = DateTime.now(); + final expiresAt = now.add(_queueTtl); + logger.fine( + 'Queueing pending trackSid:$trackSid participantSid:$participantSid pending=${totalPending + 1} ' + 'participantPending=${listForParticipant.length + 1}', + ); final entry = PendingTrack( track: track, stream: stream, receiver: receiver, participantSid: participantSid, trackSid: trackSid, + enqueuedAt: now, expiresAt: expiresAt, ); - final list = _pending.putIfAbsent(participantSid, () => []); - list.add(entry); + listForParticipant.add(entry); } @internal - Future flush({ + Future flush({ required bool isConnected, String? participantSid, required PendingTrackSubscriber subscriber, }) async { _removeExpired(); - if (!isConnected) return; + if (!isConnected) { + return PendingTrackQueueFlushResult( + attempted: 0, + succeeded: 0, + transientFailures: 0, + hasPending: hasPending, + skippedForDisconnect: true, + ); + } final Iterable source = participantSid != null ? List.from(_pending[participantSid] ?? const []) : _pending.values.expand((e) => e).toList(); + var attempted = 0; + var succeeded = 0; + var transientFailures = 0; + for (final item in source) { - final success = await subscriber(item); + if (!item.beginProcessing()) { + continue; + } + + attempted++; + var success = false; + try { + success = await subscriber(item); + } catch (error, stack) { + logger.warning( + 'Pending track subscriber threw trackSid:${item.trackSid} participantSid:${item.participantSid}', + error, + stack, + ); + } finally { + item.endProcessing(); + } + if (success) { + succeeded++; _pending[item.participantSid]?.remove(item); + if ((_pending[item.participantSid]?.isEmpty ?? false)) { + _pending.remove(item.participantSid); + } + } else { + transientFailures++; + item.retryCount += 1; } } + + return PendingTrackQueueFlushResult( + attempted: attempted, + succeeded: succeeded, + transientFailures: transientFailures, + hasPending: hasPending, + skippedForDisconnect: false, + ); } void _removeExpired() { final now = DateTime.now(); - _pending.forEach((sid, list) { - final expired = list.where((p) => p.expiresAt.isBefore(now)).toList(); - for (final item in expired) { - list.remove(item); - final event = TrackSubscriptionExceptionEvent( + final expiredEntries = []; + _pending.removeWhere((sid, list) { + list.removeWhere((item) { + if (item.expiresAt.isBefore(now)) { + expiredEntries.add(item); + return true; + } + return false; + }); + return list.isEmpty; + }); + + for (final item in expiredEntries) { + final age = now.difference(item.enqueuedAt); + logger.warning( + 'Pending track expired (ttl) trackSid:${item.trackSid} participantSid:${item.participantSid} ' + 'ageMs:${age.inMilliseconds}', + ); + emitException( + TrackSubscriptionExceptionEvent( participant: null, sid: item.trackSid, reason: TrackSubscribeFailReason.noParticipantFound, - ); - logger.warning('Pending track expired waiting for participant metadata: $event'); - emitException(event); + ), + ); + } + } + + static Duration _deriveQueueTtl(Duration subscribeTimeout) { + final multiplied = Duration(milliseconds: subscribeTimeout.inMilliseconds * 3); + const minTtl = Duration(seconds: 30); + return multiplied >= minTtl ? multiplied : minTtl; + } + + DateTime _newExpiry() => DateTime.now().add(_queueTtl); + + PendingTrackQueueStats _buildStats() { + var total = 0; + Duration? oldest; + final perParticipant = {}; + final now = DateTime.now(); + _pending.forEach((sid, list) { + total += list.length; + perParticipant[sid] = list.length; + for (final entry in list) { + final age = now.difference(entry.enqueuedAt); + if (oldest == null || age > oldest!) { + oldest = age; + } } }); + return PendingTrackQueueStats( + totalEntries: total, + entriesPerParticipant: perParticipant, + oldestEntryAge: oldest, + ); + } + + void _dropWithException({ + required String participantSid, + required String trackSid, + required String reason, + }) { + final event = TrackSubscriptionExceptionEvent( + participant: null, + sid: trackSid, + reason: TrackSubscribeFailReason.noParticipantFound, + ); + logger.severe( + 'Pending track queue drop trackSid:$trackSid participantSid:$participantSid reason:$reason', + ); + emitException(event); } } @@ -140,7 +328,10 @@ class PendingTrack { final rtc.RTCRtpReceiver? receiver; final String participantSid; final String trackSid; - final DateTime expiresAt; + final DateTime enqueuedAt; + DateTime expiresAt; + bool _processing = false; + int retryCount = 0; PendingTrack({ required this.track, @@ -148,6 +339,54 @@ class PendingTrack { required this.receiver, required this.participantSid, required this.trackSid, + required this.enqueuedAt, required this.expiresAt, }); + + bool beginProcessing() { + if (_processing) return false; + _processing = true; + return true; + } + + void endProcessing() { + _processing = false; + } +} + +@internal +class PendingTrackQueueStats { + final int totalEntries; + final Map entriesPerParticipant; + final Duration? oldestEntryAge; + + const PendingTrackQueueStats({ + required this.totalEntries, + required this.entriesPerParticipant, + required this.oldestEntryAge, + }); + + bool get hasEntries => totalEntries > 0; + + @override + String toString() => + 'PendingTrackQueueStats(total:$totalEntries, oldestMs:${oldestEntryAge?.inMilliseconds}, perParticipant:$entriesPerParticipant)'; +} + +class PendingTrackQueueFlushResult { + final int attempted; + final int succeeded; + final int transientFailures; + final bool hasPending; + final bool skippedForDisconnect; + + const PendingTrackQueueFlushResult({ + required this.attempted, + required this.succeeded, + required this.transientFailures, + required this.hasPending, + required this.skippedForDisconnect, + }); + + bool get needsRetry => hasPending && !skippedForDisconnect; } diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 2508063f..41f82f3f 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -55,7 +55,7 @@ import '../types/transcription_segment.dart'; import '../utils.dart' show unpackStreamId; import 'engine.dart'; import 'participant_collection.dart'; -import 'pending_track_queue.dart'; +import 'pending_track_queue.dart' as pending_queue; /// Room is the primary construct for LiveKit conferences. It contains a /// group of [Participant]s, each publishing and subscribing to [Track]s. @@ -137,7 +137,13 @@ class Room extends DisposableChangeNotifier with EventsEmittable { late final PreConnectAudioBuffer preConnectAudioBuffer; // Pending subscriber tracks keyed by participantSid, for tracks arriving before metadata or before the room connected. - late final PendingTrackQueue _pendingTrackQueue; + late final pending_queue.PendingTrackQueue _pendingTrackQueue; + Future _pendingTrackFlushChain = Future.value(); + Timer? _pendingTrackRetryTimer; + static const Duration _pendingTrackRetryDelay = Duration(milliseconds: 200); + + @visibleForTesting + pending_queue.PendingTrackQueue get pendingTrackQueue => _pendingTrackQueue; // for testing @internal @@ -166,9 +172,10 @@ class Room extends DisposableChangeNotifier with EventsEmittable { _signalListener = this.engine.signalClient.createListener(); _setUpSignalListeners(); - _pendingTrackQueue = PendingTrackQueue( - ttl: this.engine.connectOptions.timeouts.subscribe, + _pendingTrackQueue = pending_queue.PendingTrackQueue( + metadataTimeout: this.engine.connectOptions.timeouts.subscribe, emitException: (event) => events.emit(event), + maxSize: roomOptions.pendingTrackQueueMaxSize, ); // Any event emitted will trigger ChangeNotifier @@ -176,8 +183,11 @@ class Room extends DisposableChangeNotifier with EventsEmittable { logger.finer('[RoomEvent] $event, will notifyListeners()'); notifyListeners(); }); - // Keep a connected flush as a fallback in case tracks arrive pre-connected but before metadata. + // Keep lifecycle-based flushing/refresh as fallbacks in case tracks arrive before metadata. events.on((event) => _flushPendingTracks()); + events.on((event) => _flushPendingTracks()); + events.on((event) => _pendingTrackQueue.refreshAll(reason: 'room reconnecting')); + events.on((event) => _pendingTrackQueue.refreshAll(reason: 'attempt reconnect')); _setupRpcListeners(); @@ -244,7 +254,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { }) async { var roomOptions = this.roomOptions; connectOptions ??= ConnectOptions(); - _pendingTrackQueue.updateTtl(connectOptions.timeouts.subscribe); + _pendingTrackQueue.updateTimeouts(connectOptions.timeouts.subscribe); // ignore: deprecated_member_use_from_same_package if ((roomOptions.encryption != null || roomOptions.e2eeOptions != null) && engine.e2eeManager == null) { if (!lkPlatformSupportsE2EE()) { @@ -332,7 +342,26 @@ class Room extends DisposableChangeNotifier with EventsEmittable { } void _setUpSignalListeners() => _signalListener - ..on((event) => _onParticipantUpdateEvent(event.participants)) + ..on((event) async { + await _onParticipantUpdateEvent(event.participants); + + // Flush pending tracks after participant updates are processed. + // This handles the case where tracks arrived before participant metadata, + // got queued, and now the track publications have finally arrived in this update. + // This is the second flush opportunity that fixes the race condition where + // the first flush (in _onParticipantUpdateEvent) happens before publications are ready. + for (final info in event.participants) { + if (info.tracks.isEmpty) continue; + final participant = _remoteParticipants.bySid[info.sid]; + if (participant != null) { + logger.fine( + 'Track publications updated for ${info.identity} ' + '(${info.tracks.length} tracks), flushing pending queue', + ); + await _flushPendingTracks(participant: participant); + } + } + }) ..on((event) => _onSignalSpeakersChangedEvent(event.speakers)) ..on((event) => _onSignalConnectionQualityUpdateEvent(event.updates)) ..on((event) => _onSignalStreamStateUpdateEvent(event.updates)) @@ -518,6 +547,8 @@ class Room extends DisposableChangeNotifier with EventsEmittable { }) ..on((event) async { events.emit(const RoomReconnectingEvent()); + _pendingTrackQueue.clear(reason: 'engine full restart'); + _cancelPendingTrackRetry(); // reset params _name = null; @@ -536,6 +567,11 @@ class Room extends DisposableChangeNotifier with EventsEmittable { notifyListeners(); }) + ..on((event) async { + events.emit(const RoomReconnectingEvent()); + _pendingTrackQueue.refreshAll(reason: 'engine reconnecting'); + notifyListeners(); + }) ..on((event) async { // re-publish all tracks await localParticipant?.rePublishAllTracks(); @@ -548,6 +584,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { } } events.emit(const RoomReconnectedEvent()); + await _flushPendingTracks(); notifyListeners(); }) ..on((event) async { @@ -608,7 +645,13 @@ class Room extends DisposableChangeNotifier with EventsEmittable { ); } - final shouldDefer = connectionState != ConnectionState.connected || participant == null; + // Defer track subscription if: + // 1. Room not connected yet (tracks arrived pre-connection) + // 2. Participant not known yet (tracks arrived before participant metadata) + // 3. Track publication not known yet (tracks arrived before track metadata) + final shouldDefer = connectionState != ConnectionState.connected || + participant == null || + participant.getTrackPublicationBySid(trackSid) == null; if (shouldDefer) { _pendingTrackQueue.enqueue( track: event.track, @@ -672,12 +715,25 @@ class Room extends DisposableChangeNotifier with EventsEmittable { } final participant = _remoteParticipants.byIdentity[info.identity]; + final sidMatch = _remoteParticipants.bySid[info.sid]; + if (sidMatch != null && sidMatch.identity != info.identity) { + _pendingTrackQueue.removeParticipant(info.sid, reason: 'sid reused'); + } if (participant != null) { - // Return existing participant with no new publications; caller handles updates. - return ParticipantCreationResult( - participant: participant, - newPublications: const [], - ); + // If the participant has a different sid, they disconnected and rejoined. + // Remove the old participant and create a new one. + if (participant.sid != info.sid) { + logger.fine('Participant ${info.identity} rejoined with new sid ${info.sid} (old: ${participant.sid})'); + await _handleParticipantDisconnect(info.identity); + // Fall through to create new participant + } else { + _pendingTrackQueue.refreshParticipant(participant.sid, reason: 'participant reused'); + // Return existing participant with no new publications; caller handles updates. + return ParticipantCreationResult( + participant: participant, + newPublications: const [], + ); + } } final result = await RemoteParticipant.createFromInfo( @@ -686,6 +742,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { ); _remoteParticipants.set(result.participant); + _pendingTrackQueue.refreshParticipant(result.participant.sid, reason: 'participant created'); await _flushPendingTracks(participant: result.participant); return result; } @@ -731,11 +788,13 @@ class Room extends DisposableChangeNotifier with EventsEmittable { } } _remoteParticipants.set(result.participant); + _pendingTrackQueue.refreshParticipant(result.participant.sid, reason: 'metadata update (new participant)'); await _flushPendingTracks(participant: result.participant); } else { final wasUpdated = await result.participant.updateFromInfo(info); if (wasUpdated) { _remoteParticipants.set(result.participant); + _pendingTrackQueue.refreshParticipant(result.participant.sid, reason: 'metadata update'); await _flushPendingTracks(participant: result.participant); } } @@ -771,12 +830,28 @@ class Room extends DisposableChangeNotifier with EventsEmittable { emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); } - Future _flushPendingTracks({RemoteParticipant? participant}) => _pendingTrackQueue.flush( + Future _flushPendingTracks({RemoteParticipant? participant}) { + _pendingTrackFlushChain = _pendingTrackFlushChain.then( + (_) => _performPendingTrackFlush(participant: participant), + ); + return _pendingTrackFlushChain; + } + + Future _performPendingTrackFlush({RemoteParticipant? participant}) async { + late final pending_queue.PendingTrackQueueFlushResult result; + try { + result = await _pendingTrackQueue.flush( isConnected: connectionState == ConnectionState.connected, participantSid: participant?.sid, subscriber: (pending) async { final target = participant ?? _remoteParticipants.bySid[pending.participantSid]; - if (target == null) return false; + if (target == null) { + logger.fine( + 'Pending track still waiting for participantSid:${pending.participantSid} ' + 'trackSid:${pending.trackSid}', + ); + return false; + } try { await target.addSubscribedMediaTrack( pending.track, @@ -787,15 +862,56 @@ class Room extends DisposableChangeNotifier with EventsEmittable { ); return true; } on TrackSubscriptionExceptionEvent catch (event) { - logger.severe('Track subscription failed during flush: ${event}'); + final isTransient = event.reason == TrackSubscribeFailReason.notTrackMetadataFound; + + if (isTransient) { + logger.fine( + 'Track subscription temporarily failed: metadata not ready yet for ' + 'trackSid:${pending.trackSid} participantSid:${pending.participantSid}, ' + 'will retry on next flush', + ); + return false; + } + + logger.severe('Track subscription failed permanently during flush: ${event}'); events.emit(event); return true; - } catch (exception) { - logger.warning('Unknown exception during pending track flush: ${exception}'); + } catch (exception, stack) { + logger.warning('Unknown exception during pending track flush: ${exception}', exception, stack); return false; } }, ); + } catch (error, stack) { + logger.severe('Pending track flush failed', error, stack); + return; + } + + if (result.skippedForDisconnect) { + _pendingTrackQueue.refreshAll(reason: 'flush skipped (disconnected)'); + } + + if (result.needsRetry) { + _schedulePendingTrackRetry(); + } else { + _cancelPendingTrackRetry(); + } + } + + void _schedulePendingTrackRetry() { + if (_pendingTrackRetryTimer?.isActive ?? false) { + return; + } + _pendingTrackRetryTimer = Timer(_pendingTrackRetryDelay, () { + _pendingTrackRetryTimer = null; + unawaited(_flushPendingTracks()); + }); + } + + void _cancelPendingTrackRetry() { + _pendingTrackRetryTimer?.cancel(); + _pendingTrackRetryTimer = null; + } // from data channel // updates are sent only when there's a change to speaker ordering @@ -925,6 +1041,8 @@ class Room extends DisposableChangeNotifier with EventsEmittable { validateParticipantHasNoActiveDataStreams(identity); + _pendingTrackQueue.removeParticipant(participant.sid, reason: 'remote disconnect'); + await participant.removeAllPublishedTracks(notify: true); emitWhenConnected(ParticipantDisconnectedEvent(participant: participant)); @@ -969,11 +1087,13 @@ extension RoomPrivateMethods on Room { final participants = _remoteParticipants.toList(); _remoteParticipants.clear(); for (final participant in participants) { + _pendingTrackQueue.removeParticipant(participant.sid, reason: 'room cleanup'); await participant.removeAllPublishedTracks(notify: false); // RemoteParticipant is responsible for disposing resources await participant.dispose(); } - _pendingTrackQueue.clear(); + _pendingTrackQueue.clear(reason: 'room cleanup'); + _cancelPendingTrackRetry(); // clean up LocalParticipant await localParticipant?.unpublishAllTracks(); diff --git a/lib/src/options.dart b/lib/src/options.dart index 765b4e4b..03b5e9bc 100644 --- a/lib/src/options.dart +++ b/lib/src/options.dart @@ -120,6 +120,10 @@ class RoomOptions { /// fast track publication final bool fastPublish; + /// Maximum number of pending subscriber tracks kept while waiting for + /// metadata. Helps balance memory use against resilience to reconnect storms. + final int pendingTrackQueueMaxSize; + /// deprecated, use [createVisualizer] instead /// please refer to example/lib/widgets/sound_waveform.dart @Deprecated('Use createVisualizer instead') @@ -139,6 +143,7 @@ class RoomOptions { this.encryption, this.enableVisualizer = false, this.fastPublish = true, + this.pendingTrackQueueMaxSize = 100, }); RoomOptions copyWith({ @@ -154,6 +159,7 @@ class RoomOptions { E2EEOptions? e2eeOptions, E2EEOptions? encryption, bool? fastPublish, + int? pendingTrackQueueMaxSize, }) { return RoomOptions( defaultCameraCaptureOptions: defaultCameraCaptureOptions ?? this.defaultCameraCaptureOptions, @@ -169,6 +175,7 @@ class RoomOptions { e2eeOptions: e2eeOptions ?? this.e2eeOptions, encryption: encryption ?? this.encryption, fastPublish: fastPublish ?? this.fastPublish, + pendingTrackQueueMaxSize: pendingTrackQueueMaxSize ?? this.pendingTrackQueueMaxSize, ); } } diff --git a/test/core/pending_track_queue_test.dart b/test/core/pending_track_queue_test.dart new file mode 100644 index 00000000..514cc51c --- /dev/null +++ b/test/core/pending_track_queue_test.dart @@ -0,0 +1,262 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:typed_data'; + +import 'package:flutter_test/flutter_test.dart'; +import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; + +import 'package:livekit_client/src/core/pending_track_queue.dart'; +import 'package:livekit_client/src/events.dart'; +import 'package:livekit_client/src/types/other.dart'; + +void main() { + group('PendingTrackQueue', () { + late PendingTrackQueue queue; + late List emitted; + + setUp(() { + emitted = []; + queue = PendingTrackQueue( + metadataTimeout: const Duration(seconds: 5), + emitException: emitted.add, + maxSize: 4, + ); + }); + + test('removeParticipant purges pending entries and emits exceptions', () { + queue.enqueue( + track: _FakeMediaStreamTrack(id: 't1', kind: 'audio'), + stream: _FakeMediaStream('s1'), + receiver: null, + participantSid: 'remote_participant', + trackSid: 'sid_t1', + connectionState: ConnectionState.connected, + ); + + expect(queue.stats.totalEntries, 1); + + queue.removeParticipant('remote_participant', reason: 'test cleanup'); + + expect(queue.stats.totalEntries, 0); + expect(emitted, hasLength(1)); + expect(emitted.single.sid, 'sid_t1'); + }); + + test('per-participant capacity prevents starvation', () { + queue.enqueue( + track: _FakeMediaStreamTrack(id: 't1', kind: 'audio'), + stream: _FakeMediaStream('s1'), + receiver: null, + participantSid: 'remote_a', + trackSid: 'sid_t1', + connectionState: ConnectionState.connected, + ); + + // Second track for the same participant should be dropped because per-participant limit is 1. + queue.enqueue( + track: _FakeMediaStreamTrack(id: 't2', kind: 'audio'), + stream: _FakeMediaStream('s2'), + receiver: null, + participantSid: 'remote_a', + trackSid: 'sid_t2', + connectionState: ConnectionState.connected, + ); + + // Another participant can still enqueue successfully. + queue.enqueue( + track: _FakeMediaStreamTrack(id: 't3', kind: 'audio'), + stream: _FakeMediaStream('s3'), + receiver: null, + participantSid: 'remote_b', + trackSid: 'sid_t3', + connectionState: ConnectionState.connected, + ); + + expect(queue.stats.totalEntries, 2); + expect(queue.stats.entriesPerParticipant['remote_a'], 1); + expect(queue.stats.entriesPerParticipant['remote_b'], 1); + expect(emitted.length, 1, reason: 'Exceeded entries should emit failure events.'); + }); + + test('refreshParticipant extends expiration', () { + queue.enqueue( + track: _FakeMediaStreamTrack(id: 't1', kind: 'audio'), + stream: _FakeMediaStream('s1'), + receiver: null, + participantSid: 'remote_participant', + trackSid: 'sid_t1', + connectionState: ConnectionState.connected, + ); + + final pending = queue.debugPendingFor('remote_participant').single; + pending.expiresAt = DateTime.now().subtract(const Duration(seconds: 1)); + + queue.refreshParticipant('remote_participant', reason: 'metadata update'); + + expect(pending.expiresAt.isAfter(DateTime.now()), isTrue); + }); + + test('flush reports transient failures and keeps entries for retry', () async { + queue.enqueue( + track: _FakeMediaStreamTrack(id: 't1', kind: 'audio'), + stream: _FakeMediaStream('s1'), + receiver: null, + participantSid: 'remote_participant', + trackSid: 'sid_t1', + connectionState: ConnectionState.connected, + ); + + final firstResult = await queue.flush( + isConnected: true, + subscriber: (_) async => false, + ); + + expect(firstResult.transientFailures, 1); + expect(firstResult.hasPending, isTrue); + expect(queue.debugPendingFor('remote_participant').single.retryCount, 1); + + final secondResult = await queue.flush( + isConnected: true, + subscriber: (_) async => true, + ); + + expect(secondResult.succeeded, 1); + expect(queue.stats.totalEntries, 0); + }); + + test('flush skips work when disconnected but preserves queue', () async { + queue.enqueue( + track: _FakeMediaStreamTrack(id: 't1', kind: 'audio'), + stream: _FakeMediaStream('s1'), + receiver: null, + participantSid: 'remote_participant', + trackSid: 'sid_t1', + connectionState: ConnectionState.connected, + ); + + final result = await queue.flush( + isConnected: false, + subscriber: (_) async => true, + ); + + expect(result.skippedForDisconnect, isTrue); + expect(queue.stats.totalEntries, 1); + }); + }); +} + +class _FakeMediaStream extends rtc.MediaStream { + final List _tracks = []; + + _FakeMediaStream(String id) : super(id, 'fake-owner'); + + @override + bool? get active => true; + + @override + Future addTrack(rtc.MediaStreamTrack track, {bool addToNative = true}) async { + _tracks.add(track); + } + + @override + Future clone() async => _FakeMediaStream('${id}_clone'); + + @override + List getAudioTracks() => _tracks.where((t) => t.kind == 'audio').toList(); + + @override + Future getMediaTracks() async {} + + @override + List getTracks() => List.from(_tracks); + + @override + List getVideoTracks() => _tracks.where((t) => t.kind == 'video').toList(); + + @override + Future removeTrack(rtc.MediaStreamTrack track, {bool removeFromNative = true}) async { + _tracks.remove(track); + } +} + +class _FakeMediaStreamTrack implements rtc.MediaStreamTrack { + @override + rtc.StreamTrackCallback? onEnded; + + @override + rtc.StreamTrackCallback? onMute; + + @override + rtc.StreamTrackCallback? onUnMute; + + @override + bool enabled; + + @override + final String id; + + @override + final String kind; + + @override + String? get label => '$kind-track'; + + @override + bool? get muted => false; + + _FakeMediaStreamTrack({ + required this.id, + required this.kind, + this.enabled = true, + }); + + @override + Future applyConstraints([Map? constraints]) async {} + + @override + Future clone() async => _FakeMediaStreamTrack(id: '$id-clone', kind: kind, enabled: enabled); + + @override + Future dispose() async {} + + @override + Future adaptRes(int width, int height) async {} + + @override + Map getConstraints() => const {}; + + @override + Map getSettings() => const {}; + + @override + Future stop() async {} + + @override + void enableSpeakerphone(bool enable) {} + + @override + Future captureFrame() { + throw UnimplementedError(); + } + + @override + Future hasTorch() async => false; + + @override + Future setTorch(bool torch) async {} + + @override + Future switchCamera() async => false; +} diff --git a/test/core/room_e2e_test.dart b/test/core/room_e2e_test.dart index 5e21e57e..630c0e40 100644 --- a/test/core/room_e2e_test.dart +++ b/test/core/room_e2e_test.dart @@ -212,6 +212,46 @@ void main() { expect(trackSubscribed.participant.sid, remoteParticipantData.sid); expect(trackSubscribed.publication.track, isNotNull); }); + + test('pending queue is cleared when participant disconnects', () async { + ws.onData(participantJoinResponse.writeToBuffer()); + await room.events.waitFor(duration: const Duration(seconds: 1)); + + room.pendingTrackQueue.enqueue( + track: _FakeMediaStreamTrack(id: 'queued_track', kind: 'audio'), + stream: _FakeMediaStream('${remoteParticipantData.sid}|queued_stream'), + receiver: null, + participantSid: remoteParticipantData.sid, + trackSid: 'queued_track', + connectionState: ConnectionState.connected, + ); + expect(room.pendingTrackQueue.stats.totalEntries, 1); + + ws.onData(participantDisconnectResponse.writeToBuffer()); + await room.events.waitFor(duration: const Duration(seconds: 1)); + + expect(room.pendingTrackQueue.stats.totalEntries, 0); + }); + + test('engine restart clears pending queue entries', () async { + ws.onData(participantJoinResponse.writeToBuffer()); + await room.events.waitFor(duration: const Duration(seconds: 1)); + + room.pendingTrackQueue.enqueue( + track: _FakeMediaStreamTrack(id: 'queued_track', kind: 'audio'), + stream: _FakeMediaStream('${remoteParticipantData.sid}|queued_stream'), + receiver: null, + participantSid: remoteParticipantData.sid, + trackSid: 'queued_track', + connectionState: ConnectionState.connected, + ); + expect(room.pendingTrackQueue.stats.totalEntries, 1); + + container.engine.events.emit(const EngineFullRestartingEvent()); + await room.events.waitFor(duration: const Duration(seconds: 1)); + + expect(room.pendingTrackQueue.stats.totalEntries, 0); + }); }); }