diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index 6625f19e..86fbf9bd 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -40,6 +40,8 @@ import '../support/websocket.dart'; import '../track/local/local.dart'; import '../track/local/video.dart'; import '../types/internal.dart'; +import '../utils/data_packet_buffer.dart'; +import '../utils/ttl_map.dart'; import '../types/other.dart'; import 'signal_client.dart'; import 'transport.dart'; @@ -149,6 +151,15 @@ class Engine extends Disposable with EventsEmittable { List? get enabledPublishCodecs => _enabledPublishCodecs; + // E2E reliability for data channels + int _reliableDataSequence = 1; + final DataPacketBuffer _reliableMessageBuffer = DataPacketBuffer( + maxBufferSize: 64 * 1024 * 1024, // 64MB + maxPacketCount: 1000, // max 1000 packets + ); + final TTLMap _reliableReceivedState = TTLMap(30000); + bool _isReconnecting = false; + void clearReconnectTimeout() { if (reconnectTimeout != null) { reconnectTimeout?.cancel(); @@ -182,6 +193,7 @@ class Engine extends Disposable with EventsEmittable { await cleanUp(); await events.dispose(); await _signalListener.dispose(); + _reliableReceivedState.dispose(); }); } @@ -260,6 +272,12 @@ class Engine extends Disposable with EventsEmittable { fullReconnectOnNext = false; attemptingReconnect = false; + // Reset reliability state + _reliableDataSequence = 1; + _reliableMessageBuffer.clear(); + _reliableReceivedState.clear(); + _isReconnecting = false; + clearPendingReconnect(); } @@ -327,48 +345,104 @@ class Engine extends Disposable with EventsEmittable { return completer.future; } + Future _resendReliableMessagesForResume(int lastMessageSeq) async { + logger.fine('Resending reliable messages from sequence $lastMessageSeq'); + + final channel = _publisherDataChannel(Reliability.reliable); + if (channel == null) { + logger.warning('Reliable data channel is null, cannot resend messages'); + return; + } + + // Remove acknowledged messages from buffer + _reliableMessageBuffer.popToSequence(lastMessageSeq); + + // Get remaining messages to resend + final messagesToResend = _reliableMessageBuffer.getAll(); + + if (messagesToResend.isEmpty) { + logger.fine('No reliable messages to resend'); + return; + } + + logger.fine('Resending ${messagesToResend.length} reliable messages'); + + for (final item in messagesToResend) { + try { + await channel.send(item.message); + logger.fine('Resent reliable message with sequence ${item.sequence}'); + } catch (e) { + logger + .warning('Failed to resend reliable message ${item.sequence}: $e'); + } + } + } + @internal Future sendDataPacket( lk_models.DataPacket packet, { - bool? reliability = true, + Reliability reliability = Reliability.lossy, }) async { + + // Add sequence number for reliable packets + if (reliability == Reliability.reliable) { + packet.sequence = _reliableDataSequence++; + } + // construct the data channel message final message = rtc.RTCDataChannelMessage.fromBinary(packet.writeToBuffer()); - final reliabilityType = - reliability == true ? Reliability.reliable : Reliability.lossy; - if (_subscriberPrimary) { // make sure publisher transport is connected - await _publisherEnsureConnected(); // wait for data channel to open (if not already) - if (_publisherDataChannelState(reliabilityType) != + if (_publisherDataChannelState(reliability) != rtc.RTCDataChannelState.RTCDataChannelOpen) { - logger.fine('Waiting for data channel ${reliabilityType} to open...'); + logger.fine('Waiting for data channel ${reliability} to open...'); await events.waitFor( - filter: (event) => event.type == reliabilityType, + filter: (event) => event.type == reliability, duration: connectOptions.timeouts.connection, ); } } // chose data channel - final rtc.RTCDataChannel? channel = _publisherDataChannel( - reliability == true ? Reliability.reliable : Reliability.lossy); + final rtc.RTCDataChannel? channel = _publisherDataChannel(reliability); if (channel == null) { throw UnexpectedStateException( 'Data channel for ${packet.kind.toSDKType()} is null'); } - logger.fine('sendDataPacket(label:${channel.label})'); + // Buffer reliable packets for potential resending + if (reliability == Reliability.reliable) { + _reliableMessageBuffer.push(BufferedDataPacket( + packet: packet, + message: message, + sequence: packet.sequence, + )); + } + + // Don't send during reconnection, but keep message buffered for resending + if (_isReconnecting) { + logger.fine('Deferring data packet send during reconnection (will resend when resumed)'); + return; + } + + logger.fine( + 'sendDataPacket(label:${channel.label}, sequence:${packet.sequence})'); await channel.send(message); - _dcBufferStatus[reliabilityType] = await channel.getBufferedAmount() <= + _dcBufferStatus[reliability] = await channel.getBufferedAmount() <= channel.bufferedAmountLowThreshold!; + + // Align buffer with WebRTC buffer for reliable packets + if (reliability == Reliability.reliable) { + _reliableMessageBuffer + .alignBufferedAmount(await channel.getBufferedAmount()); + } } Future _publisherEnsureConnected() async { @@ -645,6 +719,24 @@ class Engine extends Disposable with EventsEmittable { final dp = lk_models.DataPacket.fromBuffer(message.binary); + // Handle sequence numbers for reliable packets + if (dp.kind == lk_models.DataPacket_Kind.RELIABLE && dp.hasSequence()) { + final participantKey = dp.participantIdentity; + final sequence = dp.sequence; + + // Check for duplicates and out-of-order packets + final lastReceived = _reliableReceivedState.get(participantKey) ?? 0; + + if (sequence <= lastReceived) { + logger.fine('Ignoring duplicate or out-of-order packet: ' + 'sequence=$sequence, lastReceived=$lastReceived, participant=$participantKey'); + return; + } + + // Update received state + _reliableReceivedState.set(participantKey, sequence); + } + if (dp.whichValue() == lk_models.DataPacket_Value.speaker) { // Speaker packet events.emit(EngineActiveSpeakersUpdateEvent( @@ -725,6 +817,8 @@ class Engine extends Disposable with EventsEmittable { logger .info('onDisconnected state:${connectionState} reason:${reason.name}'); + _isReconnecting = true; + if (reconnectAttempts == 0) { reconnectStart = DateTime.timestamp(); } @@ -803,6 +897,7 @@ class Engine extends Disposable with EventsEmittable { } clearPendingReconnect(); attemptingReconnect = false; + _isReconnecting = false; } catch (e) { reconnectAttempts = reconnectAttempts! + 1; bool recoverable = true; @@ -878,6 +973,7 @@ class Engine extends Disposable with EventsEmittable { logger.fine('resumeConnection: primary connected'); } + _isReconnecting = false; events.emit(const EngineResumedEvent()); } @@ -945,12 +1041,26 @@ class Engine extends Disposable with EventsEmittable { }) async { final previousAnswer = (await subscriber?.pc.getLocalDescription())?.toPBType(); + + // Build data channel receive states for reliability + final dataChannelReceiveStates = []; + for (final participantId in _reliableReceivedState.keys) { + final lastSequence = _reliableReceivedState.get(participantId); + if (lastSequence != null) { + final receiveState = lk_rtc.DataChannelReceiveState(); + receiveState.publisherSid = participantId; + receiveState.lastSeq = lastSequence; + dataChannelReceiveStates.add(receiveState); + } + } + signalClient.sendSyncState( answer: previousAnswer, subscription: subscription, publishTracks: publishTracks, dataChannelInfo: dataChannelInfo(), trackSidsDisabled: trackSidsDisabled, + dataChannelReceiveStates: dataChannelReceiveStates, ); } @@ -1011,7 +1121,8 @@ class Engine extends Disposable with EventsEmittable { logger.fine('Handle ReconnectResponse: ' 'iceServers: ${event.response.iceServers}, ' - 'forceRelay: $event.response.clientConfiguration.forceRelay'); + 'forceRelay: $event.response.clientConfiguration.forceRelay, ' + 'lastMessageSeq: ${event.response.lastMessageSeq}'); final rtcConfiguration = await _buildRtcConfiguration( serverResponseForceRelay: @@ -1025,6 +1136,11 @@ class Engine extends Disposable with EventsEmittable { await negotiate(); } + // Handle reliable message resending + if (event.response.hasLastMessageSeq()) { + await _resendReliableMessagesForResume(event.response.lastMessageSeq); + } + events.emit(const SignalReconnectedEvent()); }) ..on((event) async { diff --git a/lib/src/core/signal_client.dart b/lib/src/core/signal_client.dart index c3fa208e..f25020eb 100644 --- a/lib/src/core/signal_client.dart +++ b/lib/src/core/signal_client.dart @@ -483,6 +483,7 @@ extension SignalClientRequests on SignalClient { required Iterable? publishTracks, required Iterable? dataChannelInfo, required List trackSidsDisabled, + List? dataChannelReceiveStates, }) => _sendRequest(lk_rtc.SignalRequest( syncState: lk_rtc.SyncState( @@ -491,6 +492,7 @@ extension SignalClientRequests on SignalClient { publishTracks: publishTracks, dataChannels: dataChannelInfo, trackSidsDisabled: trackSidsDisabled, + datachannelReceiveStates: dataChannelReceiveStates, ), )); diff --git a/lib/src/data_stream/stream_writer.dart b/lib/src/data_stream/stream_writer.dart index a182b58b..94d4c476 100644 --- a/lib/src/data_stream/stream_writer.dart +++ b/lib/src/data_stream/stream_writer.dart @@ -9,15 +9,13 @@ import '../types/data_stream.dart'; import '../types/other.dart'; import '../utils.dart'; - class BaseStreamWriter { final StreamWriter writableStream; Function()? onClose; final InfoType info; - BaseStreamWriter( - {required this.writableStream, required this.info, this.onClose}); + BaseStreamWriter({required this.writableStream, required this.info, this.onClose}); Future write(T chunk) async { return writableStream.write(chunk); @@ -30,10 +28,7 @@ class BaseStreamWriter { } class TextStreamWriter extends BaseStreamWriter { - TextStreamWriter( - {required super.writableStream, - required super.info, - required super.onClose}); + TextStreamWriter({required super.writableStream, required super.info, required super.onClose}); } class ByteStreamWriter extends BaseStreamWriter { @@ -49,7 +44,7 @@ class WritableStream implements StreamWriter { int chunkId = 0; List? destinationIdentities; Engine engine; - + WritableStream({ required this.streamId, required this.engine, @@ -62,10 +57,11 @@ class WritableStream implements StreamWriter { streamId: streamId, ); final trailerPacket = lk_models.DataPacket( + kind: lk_models.DataPacket_Kind.RELIABLE, destinationIdentities: destinationIdentities, streamTrailer: trailer, ); - await engine.sendDataPacket(trailerPacket, reliability: true); + await engine.sendDataPacket(trailerPacket, reliability: Reliability.reliable); } @override @@ -78,10 +74,11 @@ class WritableStream implements StreamWriter { chunkIndex: Int64(chunkId), ); final chunkPacket = lk_models.DataPacket( + kind: lk_models.DataPacket_Kind.RELIABLE, destinationIdentities: destinationIdentities, streamChunk: chunk, ); - await engine.sendDataPacket(chunkPacket, reliability: true); + await engine.sendDataPacket(chunkPacket, reliability: Reliability.reliable); chunkId += 1; } } diff --git a/lib/src/participant/local.dart b/lib/src/participant/local.dart index 75e982a6..3e4f420c 100644 --- a/lib/src/participant/local.dart +++ b/lib/src/participant/local.dart @@ -583,14 +583,12 @@ class LocalParticipant extends Participant { /// @param topic, the topic under which the message gets published. Future publishData( List data, { - bool? reliable, + bool isReliable = false, List? destinationIdentities, String? topic, }) async { final packet = lk_models.DataPacket( - kind: reliable == true - ? lk_models.DataPacket_Kind.RELIABLE - : lk_models.DataPacket_Kind.LOSSY, + kind: isReliable ? lk_models.DataPacket_Kind.RELIABLE : lk_models.DataPacket_Kind.LOSSY, user: lk_models.UserPacket( payload: data, participantIdentity: identity, @@ -599,7 +597,7 @@ class LocalParticipant extends Participant { ), ); - await room.engine.sendDataPacket(packet, reliability: reliable); + await room.engine.sendDataPacket(packet, reliability: isReliable ? Reliability.reliable : Reliability.lossy); } /// Sets and updates the metadata of the local participant. @@ -953,6 +951,7 @@ extension RPCMethods on LocalParticipant { } final packet = lk_models.DataPacket( + kind: lk_models.DataPacket_Kind.RELIABLE, rpcRequest: lk_models.RpcRequest( id: requestId, method: method, @@ -964,7 +963,7 @@ extension RPCMethods on LocalParticipant { destinationIdentities: [destinationIdentity], ); - await room.engine.sendDataPacket(packet, reliability: true); + await room.engine.sendDataPacket(packet, reliability: Reliability.reliable); } @internal @@ -975,6 +974,7 @@ extension RPCMethods on LocalParticipant { lk_models.RpcError? error, }) async { final packet = lk_models.DataPacket( + kind: lk_models.DataPacket_Kind.RELIABLE, rpcResponse: lk_models.RpcResponse( requestId: requestId, payload: error == null ? payload : null, @@ -984,7 +984,7 @@ extension RPCMethods on LocalParticipant { participantIdentity: identity, ); - await room.engine.sendDataPacket(packet, reliability: true); + await room.engine.sendDataPacket(packet, reliability: Reliability.reliable); } @internal @@ -993,6 +993,7 @@ extension RPCMethods on LocalParticipant { required String requestId, }) async { final packet = lk_models.DataPacket( + kind: lk_models.DataPacket_Kind.RELIABLE, rpcAck: lk_models.RpcAck( requestId: requestId, ), @@ -1000,7 +1001,7 @@ extension RPCMethods on LocalParticipant { participantIdentity: identity, ); - await room.engine.sendDataPacket(packet, reliability: true); + await room.engine.sendDataPacket(packet, reliability: Reliability.reliable); } void handleIncomingRpcAck(String requestId) { @@ -1248,11 +1249,11 @@ extension DataStreamParticipantMethods on LocalParticipant { final destinationIdentities = options?.destinationIdentities; final packet = lk_models.DataPacket( + kind: lk_models.DataPacket_Kind.RELIABLE, destinationIdentities: destinationIdentities, streamHeader: header, ); - - await room.engine.sendDataPacket(packet, reliability: true); + await room.engine.sendDataPacket(packet, reliability: Reliability.reliable); final writableStream = WritableStream( destinationIdentities: destinationIdentities!, @@ -1346,9 +1347,12 @@ extension DataStreamParticipantMethods on LocalParticipant { final destinationIdentities = options?.destinationIdentities; final packet = lk_models.DataPacket( - destinationIdentities: destinationIdentities, streamHeader: header); + kind: lk_models.DataPacket_Kind.RELIABLE, + destinationIdentities: destinationIdentities, + streamHeader: header, + ); - await room.engine.sendDataPacket(packet, reliability: true); + await room.engine.sendDataPacket(packet, reliability: Reliability.reliable); final writableStream = WritableStream( destinationIdentities: destinationIdentities, diff --git a/lib/src/types/data_stream.dart b/lib/src/types/data_stream.dart index 6263697a..583c212a 100644 --- a/lib/src/types/data_stream.dart +++ b/lib/src/types/data_stream.dart @@ -247,8 +247,6 @@ abstract class StreamWriter { Future write(T chunk); } -typedef ByteStreamHandler = void Function( - ByteStreamReader reader, String participantIdentity); +typedef ByteStreamHandler = void Function(ByteStreamReader reader, String participantIdentity); -typedef TextStreamHandler = Function( - TextStreamReader reader, String participantIdentity); +typedef TextStreamHandler = Function(TextStreamReader reader, String participantIdentity); diff --git a/lib/src/utils/data_packet_buffer.dart b/lib/src/utils/data_packet_buffer.dart new file mode 100644 index 00000000..f393683d --- /dev/null +++ b/lib/src/utils/data_packet_buffer.dart @@ -0,0 +1,135 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; + +import '../proto/livekit_models.pb.dart' as lk_models; +import '../logger.dart' show logger; + +class BufferedDataPacket { + final lk_models.DataPacket packet; + final rtc.RTCDataChannelMessage message; + final int sequence; + + BufferedDataPacket({ + required this.packet, + required this.message, + required this.sequence, + }); +} + +class DataPacketBuffer { + final List _buffer = []; + int _totalSize = 0; + + // Maximum buffer size in bytes (64MB by default) + final int maxBufferSize; + + // Maximum number of packets (1000 by default) + final int maxPacketCount; + + DataPacketBuffer({ + this.maxBufferSize = 64 * 1024 * 1024, // 64MB + this.maxPacketCount = 1000, + }); + + void push(BufferedDataPacket item) { + _buffer.add(item); + _totalSize += item.message.binary.length; + + // Enforce buffer limits + _enforceBufferLimits(); + } + + void _enforceBufferLimits() { + int removedCount = 0; + + // Remove oldest packets if we exceed count limit + while (_buffer.length > maxPacketCount && _buffer.isNotEmpty) { + final removed = pop(); + if (removed == null) break; + removedCount++; + } + + // Remove oldest packets if we exceed size limit, but keep at least one packet + while (_totalSize > maxBufferSize && _buffer.length > 1) { + final removed = pop(); + if (removed == null) break; + removedCount++; + } + + // Log buffer limit enforcement + if (removedCount > 0) { + logger.warning('DataPacketBuffer limit reached: removed $removedCount old packets. ' + 'Current: ${_buffer.length} packets, ${(_totalSize / 1024).round()}KB. ' + 'Limits: $maxPacketCount packets, ${(maxBufferSize / 1024).round()}KB'); + } + } + + BufferedDataPacket? pop() { + if (_buffer.isEmpty) return null; + final item = _buffer.removeAt(0); + _totalSize -= item.message.binary.length; + return item; + } + + void popToSequence(int sequence) { + while (_buffer.isNotEmpty && _buffer.first.sequence <= sequence) { + pop(); + } + } + + void alignBufferedAmount(int bufferedAmount) { + // If bufferedAmount is 0, remove all packets + if (bufferedAmount == 0) { + clear(); + return; + } + + while (_buffer.isNotEmpty) { + final first = _buffer.first; + final sizeAfterRemoving = _totalSize - first.message.binary.length; + + // If removing this packet would bring us <= bufferedAmount, stop + if (sizeAfterRemoving <= bufferedAmount) { + break; + } + + pop(); + } + } + + List getAll() { + return List.from(_buffer); + } + + void clear() { + _buffer.clear(); + _totalSize = 0; + } + + int get length => _buffer.length; + int get totalSize => _totalSize; + bool get isEmpty => _buffer.isEmpty; + bool get isNotEmpty => _buffer.isNotEmpty; + + // Buffer limit getters + bool get isOverSizeLimit => _totalSize > maxBufferSize; + bool get isOverCountLimit => _buffer.length > maxPacketCount; + bool get isOverLimits => isOverSizeLimit || isOverCountLimit; + + // Buffer utilization (0.0 to 1.0+) + double get sizeUtilization => _totalSize / maxBufferSize; + double get countUtilization => _buffer.length / maxPacketCount; +} diff --git a/lib/src/utils/ttl_map.dart b/lib/src/utils/ttl_map.dart new file mode 100644 index 00000000..965d526e --- /dev/null +++ b/lib/src/utils/ttl_map.dart @@ -0,0 +1,86 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; + +class _TTLEntry { + final V value; + final DateTime expiry; + + _TTLEntry(this.value, this.expiry); + + bool get isExpired => DateTime.timestamp().isAfter(expiry); +} + +class TTLMap { + final int ttlMs; + final Map> _map = {}; + Timer? _cleanupTimer; + + Iterable get keys => _map.keys; + int get size => _map.length; + + TTLMap(this.ttlMs) { + _startCleanupTimer(); + } + + void _startCleanupTimer() { + _cleanupTimer?.cancel(); + _cleanupTimer = Timer.periodic( + Duration(milliseconds: ttlMs ~/ 2), + (_) => _cleanup(), + ); + } + + void _cleanup() { + final now = DateTime.timestamp(); + _map.removeWhere((key, entry) => now.isAfter(entry.expiry)); + } + + V? get(K key) { + final entry = _map[key]; + if (entry == null || entry.isExpired) { + _map.remove(key); + return null; + } + return entry.value; + } + + void set(K key, V value) { + final expiry = DateTime.timestamp().add(Duration(milliseconds: ttlMs)); + _map[key] = _TTLEntry(value, expiry); + } + + bool has(K key) { + final entry = _map[key]; + if (entry == null || entry.isExpired) { + _map.remove(key); + return false; + } + return true; + } + + void delete(K key) { + _map.remove(key); + } + + void clear() { + _map.clear(); + } + + void dispose() { + _cleanupTimer?.cancel(); + _map.clear(); + } +} diff --git a/test/integration/data_stream_reliability_test.dart b/test/integration/data_stream_reliability_test.dart new file mode 100644 index 00000000..e5ac7917 --- /dev/null +++ b/test/integration/data_stream_reliability_test.dart @@ -0,0 +1,496 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +@Timeout(Duration(seconds: 15)) +library; + +import 'dart:async'; +import 'dart:io'; +import 'dart:math'; +import 'dart:typed_data'; +import 'package:collection/collection.dart'; + +import 'package:flutter_test/flutter_test.dart'; +import 'package:livekit_client/livekit_client.dart'; + +import '../mock/e2e_container.dart'; + +void main() { + E2EContainer? container; + late Room room; + + // Shared list equality checker for deep comparison + const listEquality = ListEquality(); + + setUpAll(() async { + container = E2EContainer(); + room = container!.room; + await container!.connectRoom(); + }); + + tearDownAll(() async { + await container?.dispose(); + }); + + group('Data Stream Reliability Integration Tests', () { + test('Reliable Text Stream Message Ordering and Integrity', () async { + final messageCount = 25; + final receivedMessages = []; + final receivedCompleter = Completer(); + + // Test reliable text stream with message ordering + room.registerTextStreamHandler('reliability-test', (TextStreamReader reader, String participantIdentity) async { + final text = await reader.readAll(); + receivedMessages.add(text); + print('Received reliable text message ${receivedMessages.length}/${messageCount}: ${text}'); + + if (receivedMessages.length >= messageCount) { + receivedCompleter.complete(); + } + }); + + // Send messages rapidly to test reliability and ordering + print('Sending ${messageCount} rapid reliable text messages'); + final expectedMessages = []; + + for (int i = 0; i < messageCount; i++) { + final messageContent = 'ReliabilityTest_${i}_${DateTime.now().millisecondsSinceEpoch}'; + expectedMessages.add(messageContent); + + try { + final info = await room.localParticipant?.sendText(messageContent, + options: SendTextOptions( + topic: 'reliability-test', + onProgress: (progress) { + // Verify progress is within bounds (0.0-1.0) + expect(progress, greaterThanOrEqualTo(0.0)); + expect(progress, lessThanOrEqualTo(1.0)); + }, + )); + expect(info, isNotNull); + + // Small delay between messages to create realistic load + if (i % 5 == 0) { + await Future.delayed(Duration(milliseconds: 10)); + } + } catch (e) { + print('Warning: Failed to send message ${i}: $e'); + } + } + + // Wait for all messages to be received + await receivedCompleter.future.timeout(Duration(seconds: 12)); + + // Verify all messages received exactly once + expect(receivedMessages.length, equals(messageCount), + reason: 'All ${messageCount} messages should be received exactly once'); + + // Verify no duplicates + final uniqueMessages = receivedMessages.toSet(); + expect(uniqueMessages.length, equals(receivedMessages.length), + reason: 'No duplicate messages should be received'); + + // Verify each expected message was received + for (final expectedMessage in expectedMessages) { + expect(receivedMessages, contains(expectedMessage), + reason: 'Expected message should be received: $expectedMessage'); + } + + print('✅ Text stream reliability test passed: All ${messageCount} messages received correctly'); + }); + + test('Reliable Byte Stream With Large Data Chunks', () async { + final chunkCount = 10; + final chunkSize = 10000; // 10KB chunks + final receivedFiles = >[]; + final receivedCompleter = Completer(); + + room.registerByteStreamHandler('reliability-bytes', (ByteStreamReader reader, String participantIdentity) async { + final chunks = await reader.readAll(); + final fileData = chunks.expand((chunk) => chunk).toList(); + receivedFiles.add(fileData); + + // Print first 10 bytes for debugging + final firstBytes = fileData.take(10).toList(); + print( + 'Received reliable byte stream ${receivedFiles.length}/${chunkCount}: ${fileData.length} bytes from $participantIdentity'); + print(' First 10 bytes: $firstBytes'); + + if (receivedFiles.length >= chunkCount) { + receivedCompleter.complete(); + } + }); + + // Send multiple byte streams with random data + print('Sending ${chunkCount} reliable byte streams with random data'); + final expectedFiles = >[]; + final random = Random(); + + for (int i = 0; i < chunkCount; i++) { + // Create unique random data for each file + final fileData = List.generate(chunkSize, (index) => random.nextInt(256)); + expectedFiles.add(fileData); + + try { + // Print first 10 bytes of what we're sending + final firstBytes = fileData.take(10).toList(); + print('Sending file ${i}: ${fileData.length} bytes, first 10: $firstBytes'); + + final stream = await room.localParticipant?.streamBytes(StreamBytesOptions( + topic: 'reliability-bytes', + name: 'reliable-test-file-${i}.bin', + mimeType: 'application/octet-stream', + totalSize: chunkSize, + )); + + await stream?.write(Uint8List.fromList(fileData)); + await stream?.close(); + + // Brief delay between files + await Future.delayed(Duration(milliseconds: 50)); + } catch (e) { + print('Warning: Failed to send file ${i}: $e'); + } + } + + // Wait for all files to be received + await receivedCompleter.future.timeout(Duration(seconds: 12)); + + // Verify all files received + expect(receivedFiles.length, equals(chunkCount), reason: 'All ${chunkCount} byte streams should be received'); + + // Verify data integrity - all expected files should be received (order may vary) + expect(receivedFiles.length, equals(expectedFiles.length), + reason: 'Should receive exactly ${expectedFiles.length} files'); + + // Use deep equality comparison for lists + + // Verify each expected file is received exactly once + for (int i = 0; i < expectedFiles.length; i++) { + final expectedFile = expectedFiles[i]; + final matchingFiles = receivedFiles.where((received) => listEquality.equals(received, expectedFile)).toList(); + + expect(matchingFiles.length, equals(1), + reason: 'Expected file ${i} should be received exactly once, found ${matchingFiles.length} matches'); + } + + // Verify no unexpected files received + for (int i = 0; i < receivedFiles.length; i++) { + final receivedFile = receivedFiles[i]; + final matchingExpected = + expectedFiles.where((expected) => listEquality.equals(receivedFile, expected)).toList(); + + expect(matchingExpected.length, equals(1), reason: 'Received file ${i} should match exactly one expected file'); + } + + print('✅ Byte stream reliability test passed: All ${chunkCount} files received correctly'); + }); + + test('Sequence Number Integrity and No Duplicates', () async { + final messageCount = 30; + final receivedSequences = []; + final duplicateTracker = {}; + final receivedCompleter = Completer(); + + room.registerTextStreamHandler('sequence-test', (TextStreamReader reader, String participantIdentity) async { + final text = await reader.readAll(); + final parts = text.split('_'); + if (parts.length >= 2) { + final seqNum = int.tryParse(parts[1]); + if (seqNum != null) { + receivedSequences.add(seqNum); + duplicateTracker[seqNum] = (duplicateTracker[seqNum] ?? 0) + 1; + } + } + + if (receivedSequences.length >= messageCount) { + receivedCompleter.complete(); + } + }); + + // Send messages in rapid succession + print('Testing sequence integrity with ${messageCount} rapid messages'); + for (int i = 0; i < messageCount; i++) { + try { + await room.localParticipant?.sendText('sequence_${i}_test', options: SendTextOptions(topic: 'sequence-test')); + } catch (e) { + print('Failed to send sequence message ${i}: $e'); + } + } + + await receivedCompleter.future.timeout(Duration(seconds: 8)); + + // Verify no duplicates (each sequence should appear exactly once) + for (final entry in duplicateTracker.entries) { + expect(entry.value, equals(1), + reason: 'Sequence ${entry.key} should appear exactly once, but appeared ${entry.value} times'); + } + + // Verify correct count of unique sequences + expect(receivedSequences.length, equals(messageCount)); + final uniqueSequences = receivedSequences.toSet(); + expect(uniqueSequences.length, equals(messageCount), reason: 'All ${messageCount} sequences should be unique'); + + print('✅ Sequence integrity test passed: ${messageCount} unique sequences, no duplicates'); + }); + + test('Concurrent Reliable Streams Stress Test', () async { + final concurrentStreams = 5; + final messagesPerStream = 8; + final receivedMessages = >{}; + final completers = >{}; + + // Set up handlers for multiple concurrent topics + for (int streamId = 0; streamId < concurrentStreams; streamId++) { + final topic = 'concurrent-${streamId}'; + receivedMessages[topic] = []; + completers[topic] = Completer(); + + room.registerTextStreamHandler(topic, (TextStreamReader reader, String participantIdentity) async { + final text = await reader.readAll(); + receivedMessages[topic]!.add(text); + + if (receivedMessages[topic]!.length >= messagesPerStream) { + completers[topic]!.complete(); + } + }); + } + + // Send messages concurrently across multiple streams + print('Starting concurrent streams: ${concurrentStreams} streams x ${messagesPerStream} messages'); + + final sendFutures = []; + for (int streamId = 0; streamId < concurrentStreams; streamId++) { + final topic = 'concurrent-${streamId}'; + + sendFutures.add(() async { + for (int msgId = 0; msgId < messagesPerStream; msgId++) { + try { + await room.localParticipant + ?.sendText('Stream${streamId}_Message${msgId}', options: SendTextOptions(topic: topic)); + // Small randomized delay to create realistic concurrent load + await Future.delayed(Duration(milliseconds: Random().nextInt(30) + 10)); + } catch (e) { + print('Failed to send message ${msgId} on stream ${streamId}: $e'); + } + } + }()); + } + + // Wait for all send operations to complete + await Future.wait(sendFutures); + + // Wait for all messages to be received + await Future.wait(completers.values.map((c) => c.future)).timeout(Duration(seconds: 10)); + + // Verify all messages received correctly + for (int streamId = 0; streamId < concurrentStreams; streamId++) { + final topic = 'concurrent-${streamId}'; + expect(receivedMessages[topic]!.length, equals(messagesPerStream), + reason: 'Stream ${streamId} should receive all ${messagesPerStream} messages'); + + // Verify message content uniqueness within each stream + final uniqueInStream = receivedMessages[topic]!.toSet(); + expect(uniqueInStream.length, equals(messagesPerStream), + reason: 'Stream ${streamId} should have ${messagesPerStream} unique messages'); + + // Verify expected messages + for (int msgId = 0; msgId < messagesPerStream; msgId++) { + final expectedMessage = 'Stream${streamId}_Message${msgId}'; + expect(receivedMessages[topic], contains(expectedMessage), + reason: 'Stream ${streamId} should contain message ${msgId}'); + } + } + + print( + '✅ Concurrent streams test passed: ${concurrentStreams * messagesPerStream} total messages across ${concurrentStreams} streams'); + }); + + test('Mixed Data Types Reliability Test', () async { + final textMessages = 8; + final byteStreams = 4; + final receivedTexts = []; + final receivedBytes = >[]; + final textCompleter = Completer(); + final byteCompleter = Completer(); + + // Set up mixed handlers + room.registerTextStreamHandler('mixed-text', (reader, participantIdentity) async { + final text = await reader.readAll(); + receivedTexts.add(text); + if (receivedTexts.length >= textMessages) { + textCompleter.complete(); + } + }); + + room.registerByteStreamHandler('mixed-bytes', (reader, participantIdentity) async { + final chunks = await reader.readAll(); + final data = chunks.expand((chunk) => chunk).toList(); + receivedBytes.add(data); + + // Print first 10 bytes for debugging + final firstBytes = data.take(10).toList(); + print( + 'Received mixed byte stream ${receivedBytes.length}/${byteStreams}: ${data.length} bytes, first 10: $firstBytes'); + + if (receivedBytes.length >= byteStreams) { + byteCompleter.complete(); + } + }); + + // Send mixed data types concurrently + final futures = []; + + // Send text messages + for (int i = 0; i < textMessages; i++) { + futures.add(() async { + await room.localParticipant + ?.sendText('Mixed text message ${i}', options: SendTextOptions(topic: 'mixed-text')); + }()); + } + + // Send byte streams with random data + final mixedRandom = Random(); + final expectedMixedBytes = >[]; + + for (int i = 0; i < byteStreams; i++) { + futures.add(() async { + final data = List.generate(500, (index) => mixedRandom.nextInt(256)); + expectedMixedBytes.add(data); + final firstBytes = data.take(10).toList(); + print('Sending mixed byte stream ${i}: ${data.length} bytes, first 10: $firstBytes'); + + final stream = await room.localParticipant?.streamBytes(StreamBytesOptions( + topic: 'mixed-bytes', + name: 'mixed-file-${i}.dat', + totalSize: data.length, + )); + await stream?.write(Uint8List.fromList(data)); + await stream?.close(); + }()); + } + + await Future.wait(futures); + await Future.wait([textCompleter.future, byteCompleter.future]).timeout(Duration(seconds: 10)); + + // Verify mixed data integrity + expect(receivedTexts.length, equals(textMessages)); + expect(receivedBytes.length, equals(byteStreams)); + + // Verify text content + for (int i = 0; i < textMessages; i++) { + expect(receivedTexts, contains('Mixed text message ${i}')); + } + + // Verify random byte data using deep equality comparison + for (int i = 0; i < expectedMixedBytes.length; i++) { + final expectedData = expectedMixedBytes[i]; + final matchFound = receivedBytes.any((received) => listEquality.equals(received, expectedData)); + + expect(matchFound, isTrue, reason: 'Expected random byte data for file ${i} should be received'); + } + + print('✅ Mixed data types test passed: ${textMessages} texts + ${byteStreams} byte streams'); + }); + + test('Incremental Progress Tracking With File Attachments', () async { + final numFiles = 5; // Multiple files = incremental progress + final progressValues = []; + final receivedCompleter = Completer(); + final textCompleter = Completer(); + var receivedFileCount = 0; + + // Create test files in testfiles/ directory (same pattern as data_stream_test.dart) + final files = [ + 'testfiles/progress_test_1.bin', + 'testfiles/progress_test_2.bin', + 'testfiles/progress_test_3.bin', + 'testfiles/progress_test_4.bin', + 'testfiles/progress_test_5.bin' + ]; + + /// Create test files with random data + final tempFiles = []; + for (int i = 0; i < numFiles; i++) { + final file = File(files[i]); + final random = Random(); + final bytes = List.generate(1024, (index) => random.nextInt(256)); // 1KB random data + file.writeAsBytesSync(bytes); + tempFiles.add(file); + print('Created test file ${i}: ${file.path} (${bytes.length} bytes)'); + } + + room.registerTextStreamHandler('progress-test', (TextStreamReader reader, String participantIdentity) async { + final text = await reader.readAll(); + print('Received text message: ${text}'); + textCompleter.complete(); + }); + + room.registerByteStreamHandler('progress-test', (ByteStreamReader reader, String participantIdentity) async { + final chunks = await reader.readAll(); + final data = chunks.expand((chunk) => chunk).toList(); + receivedFileCount++; + print('Received file attachment ${receivedFileCount}/${numFiles}: ${reader.info?.name} (${data.length} bytes)'); + + if (receivedFileCount >= numFiles) { + receivedCompleter.complete(); + } + }); + + // Send text with multiple file attachments - this triggers incremental progress + print('Sending text with ${numFiles} file attachments to test incremental progress...'); + final info = await room.localParticipant?.sendText('Message with ${numFiles} attachments', + options: SendTextOptions( + topic: 'progress-test', + attachments: tempFiles, + onProgress: (progress) { + progressValues.add(progress); + print('Progress: ${(progress * 100).toStringAsFixed(1)}% (${progressValues.length} updates)'); + + // Verify progress bounds + expect(progress, greaterThanOrEqualTo(0.0)); + expect(progress, lessThanOrEqualTo(1.0)); + }, + )); + + expect(info, isNotNull); + await Future.wait([textCompleter.future, receivedCompleter.future]).timeout(Duration(seconds: 15)); + + // Clean up test files + for (final file in tempFiles) { + if (await file.exists()) { + await file.delete(); + } + } + + // Verify incremental progress + print('Progress tracking results:'); + for (int i = 0; i < progressValues.length; i++) { + print(' Update ${i + 1}: ${(progressValues[i] * 100).toStringAsFixed(1)}%'); + } + + expect(progressValues, isNotEmpty, reason: 'Progress callbacks should have been called'); + expect(progressValues.last, equals(1.0), reason: 'Final progress should be 1.0'); + expect(progressValues.length, greaterThan(1), reason: 'Should have multiple incremental progress updates'); + + // Verify progress is non-decreasing + for (int i = 1; i < progressValues.length; i++) { + expect(progressValues[i], greaterThanOrEqualTo(progressValues[i - 1]), + reason: 'Progress should be non-decreasing'); + } + + print('✅ Incremental progress test passed: ${progressValues.length} progress updates from 0% to 100%'); + }); + }); +} diff --git a/test/utils/data_packet_buffer_test.dart b/test/utils/data_packet_buffer_test.dart new file mode 100644 index 00000000..18330b61 --- /dev/null +++ b/test/utils/data_packet_buffer_test.dart @@ -0,0 +1,542 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:typed_data'; +import 'package:flutter_test/flutter_test.dart'; +import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; +import 'package:livekit_client/src/utils/data_packet_buffer.dart'; +import 'package:livekit_client/src/proto/livekit_models.pb.dart' as lk_models; + +void main() { + group('DataPacketBuffer', () { + late DataPacketBuffer buffer; + + setUp(() { + buffer = DataPacketBuffer( + maxBufferSize: 1024, // 1KB for testing + maxPacketCount: 5, // 5 packets for testing + ); + }); + + BufferedDataPacket createTestPacket(int sequence, String data) { + final userPacket = lk_models.UserPacket()..payload = data.codeUnits; + + final packet = lk_models.DataPacket() + ..sequence = sequence + ..kind = lk_models.DataPacket_Kind.RELIABLE + ..user = userPacket; + + final message = rtc.RTCDataChannelMessage.fromBinary( + Uint8List.fromList(data.codeUnits), + ); + + return BufferedDataPacket( + packet: packet, + message: message, + sequence: sequence, + ); + } + + group('Basic Operations', () { + test('should be empty initially', () { + expect(buffer.isEmpty, isTrue); + expect(buffer.isNotEmpty, isFalse); + expect(buffer.length, equals(0)); + expect(buffer.totalSize, equals(0)); + }); + + test('should push and track packets correctly', () { + final packet1 = createTestPacket(1, 'hello'); + final packet2 = createTestPacket(2, 'world'); + + buffer.push(packet1); + expect(buffer.length, equals(1)); + expect(buffer.totalSize, equals(5)); // 'hello'.length + expect(buffer.isEmpty, isFalse); + expect(buffer.isNotEmpty, isTrue); + + buffer.push(packet2); + expect(buffer.length, equals(2)); + expect(buffer.totalSize, equals(10)); // 'hello' + 'world' + }); + + test('should pop packets correctly', () { + final packet1 = createTestPacket(1, 'hello'); + final packet2 = createTestPacket(2, 'world'); + + buffer.push(packet1); + buffer.push(packet2); + + final popped = buffer.pop(); + expect(popped, isNotNull); + expect(popped!.sequence, equals(1)); + expect(buffer.length, equals(1)); + expect(buffer.totalSize, equals(5)); // only 'world' left + + final popped2 = buffer.pop(); + expect(popped2!.sequence, equals(2)); + expect(buffer.isEmpty, isTrue); + expect(buffer.totalSize, equals(0)); + + final popped3 = buffer.pop(); + expect(popped3, isNull); + }); + + test('should clear buffer correctly', () { + buffer.push(createTestPacket(1, 'hello')); + buffer.push(createTestPacket(2, 'world')); + + buffer.clear(); + expect(buffer.isEmpty, isTrue); + expect(buffer.totalSize, equals(0)); + expect(buffer.length, equals(0)); + }); + + test('should get all packets correctly', () { + final packet1 = createTestPacket(1, 'hello'); + final packet2 = createTestPacket(2, 'world'); + + buffer.push(packet1); + buffer.push(packet2); + + final all = buffer.getAll(); + expect(all.length, equals(2)); + expect(all[0].sequence, equals(1)); + expect(all[1].sequence, equals(2)); + + // Should be a copy, not the original + all.clear(); + expect(buffer.length, equals(2)); + }); + }); + + group('Sequence Management', () { + test('should popToSequence correctly (removes acknowledged messages)', () { + buffer.push(createTestPacket(1, 'a')); + buffer.push(createTestPacket(2, 'b')); + buffer.push(createTestPacket(3, 'c')); + buffer.push(createTestPacket(4, 'd')); + + // This simulates server acknowledging up to sequence 2 + // Should remove packets 1 and 2, keep 3 and 4 + buffer.popToSequence(2); + + final remaining = buffer.getAll(); + expect(remaining.length, equals(2)); + expect(remaining[0].sequence, equals(3)); + expect(remaining[1].sequence, equals(4)); + expect(buffer.totalSize, equals(2)); // 'c' + 'd' + }); + + test('should handle popToSequence with empty buffer', () { + buffer.popToSequence(5); + expect(buffer.isEmpty, isTrue); + }); + + test('should handle popToSequence beyond all packets', () { + buffer.push(createTestPacket(1, 'a')); + buffer.push(createTestPacket(2, 'b')); + + buffer.popToSequence(10); + expect(buffer.isEmpty, isTrue); + expect(buffer.totalSize, equals(0)); + }); + }); + + group('Buffer Alignment', () { + test('should alignBufferedAmount correctly', () { + buffer.push(createTestPacket(1, 'hello')); // 5 bytes + buffer.push(createTestPacket(2, 'world')); // 5 bytes + buffer.push(createTestPacket(3, 'test')); // 4 bytes + // Total: 14 bytes + + // If bufferedAmount is 6, we should remove packets until + // totalSize - nextPacketSize <= bufferedAmount + // Current: 14 bytes, bufferedAmount: 6 + // Check packet 1 (5 bytes): 14 - 5 = 9 > 6, so remove it + // Check packet 2 (5 bytes): 9 - 5 = 4 <= 6, so keep it + buffer.alignBufferedAmount(6); + + final remaining = buffer.getAll(); + expect(remaining.length, equals(2)); // packets 2 and 3 remain + expect(remaining[0].sequence, equals(2)); + expect(remaining[1].sequence, equals(3)); + expect(buffer.totalSize, equals(9)); // 'world' + 'test' + }); + + test('should not remove packets when buffer is already aligned', () { + buffer.push(createTestPacket(1, 'hi')); // 2 bytes + buffer.push(createTestPacket(2, 'yo')); // 2 bytes + // Total: 4 bytes + + buffer.alignBufferedAmount(10); + + expect(buffer.length, equals(2)); + expect(buffer.totalSize, equals(4)); + }); + + test('should handle alignBufferedAmount with empty buffer', () { + buffer.alignBufferedAmount(10); + expect(buffer.isEmpty, isTrue); + }); + + test('should remove all packets if bufferedAmount is 0', () { + buffer.push(createTestPacket(1, 'hello')); + buffer.push(createTestPacket(2, 'world')); + + buffer.alignBufferedAmount(0); + expect(buffer.isEmpty, isTrue); + expect(buffer.totalSize, equals(0)); + }); + }); + + group('Buffer Limits', () { + test('should enforce packet count limit', () { + // Buffer limit is 5 packets + for (int i = 1; i <= 7; i++) { + buffer.push(createTestPacket(i, 'x')); + } + + // Should only keep latest 5 packets + expect(buffer.length, equals(5)); + final remaining = buffer.getAll(); + expect(remaining[0].sequence, equals(3)); // oldest kept + expect(remaining[4].sequence, equals(7)); // newest + }); + + test('should enforce size limit', () { + // Buffer limit is 1024 bytes + // Create packets with 300 bytes each + final largeData = 'x' * 300; + + for (int i = 1; i <= 5; i++) { + buffer.push(createTestPacket(i, largeData)); + } + + // 5 * 300 = 1500 bytes > 1024, so should remove old packets + expect(buffer.totalSize, lessThanOrEqualTo(1024)); + expect(buffer.length, lessThan(5)); + }); + + test('should report limit status correctly', () { + expect(buffer.isOverSizeLimit, isFalse); + expect(buffer.isOverCountLimit, isFalse); + expect(buffer.isOverLimits, isFalse); + + // Add packets to exceed count limit + for (int i = 1; i <= 6; i++) { + buffer.push(createTestPacket(i, 'x')); + } + + // Limits should be enforced automatically, so should not be over limits + expect(buffer.isOverCountLimit, isFalse); + expect(buffer.isOverLimits, isFalse); + }); + + test('should report utilization correctly', () { + expect(buffer.sizeUtilization, equals(0.0)); + expect(buffer.countUtilization, equals(0.0)); + + buffer.push(createTestPacket(1, 'x' * 512)); // Half of 1024 byte limit + expect(buffer.sizeUtilization, closeTo(0.5, 0.01)); + expect(buffer.countUtilization, equals(0.2)); // 1 of 5 packets + + buffer.push(createTestPacket(2, 'x' * 256)); + expect(buffer.sizeUtilization, closeTo(0.75, 0.01)); + expect(buffer.countUtilization, equals(0.4)); // 2 of 5 packets + }); + }); + + group('Edge Cases', () { + test('should handle single packet larger than buffer limit', () { + final largeData = 'x' * 2000; // Larger than 1KB limit + buffer.push(createTestPacket(1, largeData)); + + // Should still keep the packet even though it exceeds the limit + expect(buffer.length, equals(1)); + expect(buffer.totalSize, equals(2000)); + }); + + test('should handle multiple operations on empty buffer', () { + buffer.pop(); + buffer.popToSequence(10); + buffer.alignBufferedAmount(100); + buffer.clear(); + + expect(buffer.isEmpty, isTrue); + expect(buffer.totalSize, equals(0)); + }); + + test('should maintain consistency after limit enforcement', () { + // Add many packets to trigger limit enforcement + for (int i = 1; i <= 10; i++) { + buffer.push(createTestPacket(i, 'data$i')); + } + + // Verify buffer is consistent + final totalSizeCalculated = + buffer.getAll().map((p) => p.message.binary.length).fold(0, (sum, size) => sum + size); + + expect(buffer.totalSize, equals(totalSizeCalculated)); + expect(buffer.length, equals(buffer.getAll().length)); + }); + }); + + group('Complex Sequence Scenarios', () { + test('should handle gaps in sequence numbers', () { + buffer.push(createTestPacket(1, 'a')); + buffer.push(createTestPacket(5, 'b')); // Gap + buffer.push(createTestPacket(3, 'c')); + buffer.push(createTestPacket(7, 'd')); + + // Should maintain insertion order, not sequence order + final all = buffer.getAll(); + expect(all.length, equals(4)); + expect(all[0].sequence, equals(1)); + expect(all[1].sequence, equals(5)); + expect(all[2].sequence, equals(3)); + expect(all[3].sequence, equals(7)); + }); + + test('should handle duplicate sequences in popToSequence', () { + buffer.push(createTestPacket(1, 'a')); + buffer.push(createTestPacket(2, 'b')); + buffer.push(createTestPacket(2, 'c')); // Duplicate sequence + buffer.push(createTestPacket(3, 'd')); + + buffer.popToSequence(2); + + // Should remove all packets with sequence <= 2 + final remaining = buffer.getAll(); + expect(remaining.length, equals(1)); + expect(remaining[0].sequence, equals(3)); + }); + + test('should handle zero and very small sequences', () { + buffer.push(createTestPacket(0, 'zero')); + buffer.push(createTestPacket(1, 'one')); + buffer.push(createTestPacket(2, 'two')); + + buffer.popToSequence(0); + + final remaining = buffer.getAll(); + expect(remaining.length, equals(2)); + expect(remaining[0].sequence, equals(1)); + expect(remaining[1].sequence, equals(2)); + }); + + test('should handle very large sequence numbers', () { + const largeSeq = 1000000; // Large but reasonable number + buffer.push(createTestPacket(largeSeq - 1, 'smaller')); + buffer.push(createTestPacket(largeSeq, 'large')); + + buffer.popToSequence(largeSeq - 1); + + final remaining = buffer.getAll(); + expect(remaining.length, equals(1)); + expect(remaining[0].sequence, equals(largeSeq)); + }); + }); + + group('Buffer State Management', () { + test('should maintain correct state after mixed operations', () { + // Complex sequence of operations + buffer.push(createTestPacket(1, 'a')); + buffer.push(createTestPacket(2, 'bb')); + buffer.push(createTestPacket(3, 'ccc')); + + expect(buffer.totalSize, equals(6)); // 1+2+3 + + final popped = buffer.pop(); + expect(popped?.sequence, equals(1)); + expect(buffer.totalSize, equals(5)); // 2+3 + + buffer.push(createTestPacket(4, 'dddd')); + expect(buffer.totalSize, equals(9)); // 2+3+4 + + buffer.popToSequence(2); + expect(buffer.totalSize, equals(7)); // 3+4 + + buffer.alignBufferedAmount(6); + // After alignment, should have packets 3 and 4 (total 7 bytes) + // since 7 - 4 = 3 <= 6 (stop condition) + expect(buffer.totalSize, equals(7)); // 'ccc' + 'dddd' + expect(buffer.length, equals(2)); + }); + + test('should handle empty buffer edge cases thoroughly', () { + // All operations on empty buffer + expect(buffer.pop(), isNull); + expect(buffer.getAll(), isEmpty); + expect(buffer.totalSize, equals(0)); + expect(buffer.length, equals(0)); + + buffer.popToSequence(100); + expect(buffer.isEmpty, isTrue); + + buffer.alignBufferedAmount(50); + expect(buffer.isEmpty, isTrue); + + buffer.clear(); + expect(buffer.isEmpty, isTrue); + + // Should handle repeated operations + for (int i = 0; i < 10; i++) { + buffer.clear(); + expect(buffer.isEmpty, isTrue); + } + }); + + test('should handle rapid push/pop cycles', () { + // Simulate rapid message traffic + for (int cycle = 0; cycle < 10; cycle++) { + // Push messages + for (int i = 1; i <= 10; i++) { + buffer.push(createTestPacket(cycle * 10 + i, 'msg$i')); + } + + expect(buffer.length, lessThanOrEqualTo(5)); // Limited by maxPacketCount + expect(buffer.totalSize, lessThanOrEqualTo(1024)); // Limited by maxBufferSize + + // Pop half + for (int i = 0; i < 5; i++) { + final popped = buffer.pop(); + expect(popped, isNotNull); + } + + expect(buffer.length, lessThanOrEqualTo(5)); + expect(buffer.totalSize, lessThanOrEqualTo(1024)); + + // Clear remaining + buffer.clear(); + expect(buffer.isEmpty, isTrue); + } + }); + }); + + group('Performance and Stress Tests', () { + test('should handle large numbers of packets efficiently', () { + const packetCount = 5000; + final stopwatch = Stopwatch()..start(); + + // Add many packets + for (int i = 1; i <= packetCount; i++) { + buffer.push(createTestPacket(i, 'data$i')); + } + + // Should complete quickly (less than 1 second) + expect(stopwatch.elapsedMilliseconds, lessThan(1000)); + expect(buffer.length, equals(5)); // Limited by maxPacketCount + + stopwatch.reset(); + + // Sequential pop operations + while (buffer.isNotEmpty) { + buffer.pop(); + } + + expect(stopwatch.elapsedMilliseconds, lessThan(100)); + }); + + test('should handle memory efficiently with large packets', () { + // Create packets with varying sizes + final sizes = [100, 1000, 10000, 50000, 100000]; + + for (int i = 0; i < sizes.length; i++) { + final data = 'x' * sizes[i]; + buffer.push(createTestPacket(i + 1, data)); + } + + // Buffer limits should have been enforced - keeps at least 1 packet + expect(buffer.length, greaterThan(0)); // At least one packet + expect(buffer.length, lessThanOrEqualTo(5)); + // The largest packet (100KB) might be kept even if it exceeds buffer size + expect(buffer.totalSize, greaterThan(0)); + }); + }); + + group('Error Recovery and Robustness', () { + test('should recover gracefully from inconsistent states', () { + // Add packets normally + buffer.push(createTestPacket(1, 'a')); + buffer.push(createTestPacket(2, 'bb')); + + expect(buffer.length, equals(2)); + expect(buffer.totalSize, equals(3)); + + // Simulate various operations that might cause issues + buffer.popToSequence(10); // Beyond all sequences + expect(buffer.isEmpty, isTrue); + expect(buffer.totalSize, equals(0)); + + // Should still work normally after + buffer.push(createTestPacket(5, 'hello')); + expect(buffer.length, equals(1)); + expect(buffer.totalSize, equals(5)); + }); + + test('should handle repeated limit enforcement', () { + // Continuously add packets that exceed limits + for (int round = 0; round < 10; round++) { + for (int i = 1; i <= 20; i++) { + buffer.push(createTestPacket(round * 20 + i, 'x' * 100)); + } + + // Limits should always be enforced + expect(buffer.length, lessThanOrEqualTo(5)); + expect(buffer.totalSize, lessThanOrEqualTo(1024)); + } + + // Buffer should still be functional + final remaining = buffer.getAll(); + expect(remaining.length, greaterThan(0)); + expect(remaining.length, lessThanOrEqualTo(5)); + }); + + test('should maintain consistency across all operations', () { + final operations = 1000; + + for (int i = 0; i < operations; i++) { + final op = i % 5; + + switch (op) { + case 0: + buffer.push(createTestPacket(i, 'data$i')); + break; + case 1: + buffer.pop(); + break; + case 2: + buffer.popToSequence(i ~/ 2); + break; + case 3: + buffer.alignBufferedAmount((i % 100) * 10); + break; + case 4: + if (i % 50 == 0) buffer.clear(); + break; + } + + // Verify consistency after each operation + final all = buffer.getAll(); + final calculatedSize = all.fold(0, (sum, packet) => sum + packet.message.binary.length); + + expect(buffer.totalSize, equals(calculatedSize)); + expect(buffer.length, equals(all.length)); + expect(buffer.isEmpty, equals(all.isEmpty)); + } + }); + }); + }); +} diff --git a/test/utils/ttl_map_test.dart b/test/utils/ttl_map_test.dart new file mode 100644 index 00000000..4f8f747d --- /dev/null +++ b/test/utils/ttl_map_test.dart @@ -0,0 +1,306 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'package:flutter_test/flutter_test.dart'; +import 'package:livekit_client/src/utils/ttl_map.dart'; + +void main() { + group('TTLMap', () { + test('should store and retrieve values', () { + final map = TTLMap(5000); + + map.set('key1', 100); + map.set('key2', 200); + + expect(map.get('key1'), equals(100)); + expect(map.get('key2'), equals(200)); + expect(map.has('key1'), isTrue); + expect(map.has('key2'), isTrue); + + map.dispose(); + }); + + test('should return null for non-existent keys', () { + final map = TTLMap(5000); + + expect(map.get('nonexistent'), isNull); + expect(map.has('nonexistent'), isFalse); + + map.dispose(); + }); + + test('should delete values', () { + final map = TTLMap(5000); + + map.set('key1', 100); + expect(map.has('key1'), isTrue); + + map.delete('key1'); + expect(map.has('key1'), isFalse); + expect(map.get('key1'), isNull); + + map.dispose(); + }); + + test('should clear all values', () { + final map = TTLMap(5000); + + map.set('key1', 100); + map.set('key2', 200); + expect(map.has('key1'), isTrue); + expect(map.has('key2'), isTrue); + + map.clear(); + expect(map.has('key1'), isFalse); + expect(map.has('key2'), isFalse); + + map.dispose(); + }); + + test('should return keys', () { + final map = TTLMap(5000); + + map.set('key1', 100); + map.set('key2', 200); + map.set('key3', 300); + + final keys = map.keys.toSet(); + expect(keys, equals({'key1', 'key2', 'key3'})); + + map.dispose(); + }); + + test('should expire values after TTL', () async { + final map = TTLMap(100); // 100ms TTL + + map.set('key1', 100); + expect(map.has('key1'), isTrue); + + // Wait for expiry + await Future.delayed(const Duration(milliseconds: 150)); + + expect(map.has('key1'), isFalse); + expect(map.get('key1'), isNull); + + map.dispose(); + }); + + test('should not expire values before TTL', () async { + final map = TTLMap(200); // 200ms TTL + + map.set('key1', 100); + expect(map.has('key1'), isTrue); + + // Wait less than TTL + await Future.delayed(const Duration(milliseconds: 50)); + + expect(map.has('key1'), isTrue); + expect(map.get('key1'), equals(100)); + + map.dispose(); + }); + + test('should handle concurrent access', () { + final map = TTLMap(5000); + + // Simulate concurrent access + for (int i = 0; i < 100; i++) { + map.set('key$i', i); + } + + for (int i = 0; i < 100; i++) { + expect(map.get('key$i'), equals(i)); + } + + map.dispose(); + }); + + test('should cleanup expired entries automatically', () async { + final map = TTLMap(50); // Very short TTL + + // Add multiple entries + for (int i = 0; i < 10; i++) { + map.set('key$i', i); + } + + // All should be present initially + for (int i = 0; i < 10; i++) { + expect(map.has('key$i'), isTrue); + } + + // Wait for cleanup cycle (TTL/2 + some buffer) + await Future.delayed(const Duration(milliseconds: 100)); + + // All should be expired and cleaned up + for (int i = 0; i < 10; i++) { + expect(map.has('key$i'), isFalse); + } + + map.dispose(); + }); + + test('should update existing keys', () { + final map = TTLMap(5000); + + map.set('key1', 100); + expect(map.get('key1'), equals(100)); + + map.set('key1', 200); + expect(map.get('key1'), equals(200)); + + map.dispose(); + }); + + test('should handle different value types', () { + final stringMap = TTLMap(5000); + final listMap = TTLMap>(5000); + + stringMap.set('text', 'hello world'); + listMap.set('numbers', [1, 2, 3, 4, 5]); + + expect(stringMap.get('text'), equals('hello world')); + expect(listMap.get('numbers'), equals([1, 2, 3, 4, 5])); + + stringMap.dispose(); + listMap.dispose(); + }); + + test('should dispose cleanly', () { + final map = TTLMap(5000); + + map.set('key1', 100); + expect(map.has('key1'), isTrue); + + map.dispose(); + + // After dispose, map should be empty + expect(map.has('key1'), isFalse); + expect(map.get('key1'), isNull); + }); + + test('should handle keys property correctly', () { + final map = TTLMap(5000); + + expect(map.keys.isEmpty, isTrue); + + map.set('key1', 100); + expect(map.keys.length, equals(1)); + expect(map.keys, contains('key1')); + + map.set('key2', 200); + expect(map.keys.length, equals(2)); + expect(map.keys, containsAll(['key1', 'key2'])); + + map.delete('key1'); + expect(map.keys.length, equals(1)); + expect(map.keys, contains('key2')); + expect(map.keys, isNot(contains('key1'))); + + map.clear(); + expect(map.keys.isEmpty, isTrue); + + map.dispose(); + }); + + test('should handle size property correctly', () { + final map = TTLMap(5000); + + expect(map.size, equals(0)); + + map.set('key1', 100); + expect(map.size, equals(1)); + + map.set('key2', 200); + expect(map.size, equals(2)); + + map.set('key3', 300); + expect(map.size, equals(3)); + + map.delete('key1'); + expect(map.size, equals(2)); + + map.clear(); + expect(map.size, equals(0)); + + map.dispose(); + }); + + test('should handle key iteration', () { + final map = TTLMap(5000); + + map.set('key1', 100); + map.set('key2', 200); + map.set('key3', 300); + + final collectedKeys = []; + for (final key in map.keys) { + collectedKeys.add(key); + } + + expect(collectedKeys.length, equals(3)); + expect(collectedKeys, containsAll(['key1', 'key2', 'key3'])); + + map.dispose(); + }); + + test('should handle null values correctly', () { + final map = TTLMap(5000); + + map.set('nullKey', null); + expect(map.has('nullKey'), isTrue); + expect(map.get('nullKey'), isNull); + + // Different from non-existent key + expect(map.has('nonExistent'), isFalse); + + map.dispose(); + }); + + test('should handle stress test with rapid operations', () { + final map = TTLMap(10000); + + // Rapid set/get operations + for (int i = 0; i < 1000; i++) { + map.set('stress$i', i); + expect(map.get('stress$i'), equals(i)); + } + + expect(map.size, equals(1000)); + + map.dispose(); + }); + + test('should update size correctly during TTL expiration', () async { + final map = TTLMap(100); // 100ms TTL + + // Start with empty map + expect(map.size, equals(0)); + + // Add some entries + map.set('key1', 100); + map.set('key2', 200); + map.set('key3', 300); + expect(map.size, equals(3)); + + // Wait for expiry + await Future.delayed(const Duration(milliseconds: 150)); + + // Size should be updated after accessing expired entries + expect(map.has('key1'), isFalse); // This triggers cleanup + expect(map.size, equals(0)); + + map.dispose(); + }); + }); +}