From df8d335bf11979b959823ac9f23c51f6f7ba5bec Mon Sep 17 00:00:00 2001 From: LydianJay Date: Tue, 13 Jan 2026 20:01:00 +0800 Subject: [PATCH 1/2] Added handling callbacks for closing and error --- lib/simple_flutter_reverb.dart | 165 +++++++++++++++++++------ lib/simple_flutter_reverb_options.dart | 13 +- 2 files changed, 140 insertions(+), 38 deletions(-) diff --git a/lib/simple_flutter_reverb.dart b/lib/simple_flutter_reverb.dart index 91d52cf..bed1d8a 100644 --- a/lib/simple_flutter_reverb.dart +++ b/lib/simple_flutter_reverb.dart @@ -1,5 +1,4 @@ import 'dart:convert'; - import 'package:simple_flutter_reverb/simple_flutter_reverb_options.dart'; import 'package:http/http.dart' as http; import 'package:logger/logger.dart'; @@ -8,13 +7,21 @@ import 'package:web_socket_channel/web_socket_channel.dart'; abstract class ReverbService { Future _authenticate(String socketId, String channelName); - void _subscribe(String channelName, String? broadcastAuthToken, {bool isPrivate = false}); - void listen(void Function(dynamic) onData, String channelName, {bool isPrivate = false}); + void _subscribe( + String channelName, + String? broadcastAuthToken, { + bool isPrivate = false, + }); + void listen( + void Function(dynamic) onData, + String channelName, { + bool isPrivate = false, + }); void close(); } class SimpleFlutterReverb implements ReverbService { - late final WebSocketChannel _channel; + late WebSocketChannel _channel; final SimpleFlutterReverbOptions options; final Logger _logger = Logger(); @@ -28,16 +35,33 @@ class SimpleFlutterReverb implements ReverbService { } } + Future _connect() async { + try { + final wsUrl = _constructWebSocketUrl(); + _channel = WebSocketChannel.connect(Uri.parse(wsUrl)); + } catch (e) { + _logger.e('Failed to connect to WebSocket: $e'); + rethrow; + } + } + String _constructWebSocketUrl() { return '${options.scheme}://${options.host}:${options.port}/app/${options.appKey}'; } @override - void _subscribe(String channelName, String? broadcastAuthToken, {bool isPrivate = false}) { + void _subscribe( + String channelName, + String? broadcastAuthToken, { + bool isPrivate = false, + }) { try { final subscription = { "event": "pusher:subscribe", - "data": isPrivate ? {"channel": channelName, "auth": broadcastAuthToken} : {"channel": channelName}, + "data": + isPrivate + ? {"channel": channelName, "auth": broadcastAuthToken} + : {"channel": channelName}, }; _channel.sink.add(jsonEncode(subscription)); } catch (e) { @@ -47,41 +71,110 @@ class SimpleFlutterReverb implements ReverbService { } @override - void listen(void Function(dynamic) onData, String channelName, {bool isPrivate = false}) { + void listen( + void Function(dynamic) onData, + String channelName, { + bool isPrivate = false, + }) { try { final channelPrefix = options.usePrefix ? options.privatePrefix : ''; - final fullChannelName = isPrivate ? '$channelPrefix$channelName' : channelName; - _subscribe(channelName, null); - _channel.stream.listen( - (message) async { - try { - final Map jsonMessage = jsonDecode(message); - final response = WebsocketResponse.fromJson(jsonMessage); - - if (response.event == 'pusher:connection_established') { - final socketId = response.data?['socket_id']; - - if (socketId == null) { - throw Exception('Socket ID is missing'); + final fullChannelName = + isPrivate ? '$channelPrefix$channelName' : channelName; + + void attachListener() { + _channel.stream.listen( + (message) async { + try { + final Map jsonMessage = jsonDecode(message); + final response = WebsocketResponse.fromJson(jsonMessage); + + if (response.event == 'pusher:connection_established') { + final socketId = response.data?['socket_id']; + + if (socketId == null) { + throw Exception('Socket ID is missing'); + } + + if (isPrivate) { + final authToken = await _authenticate( + socketId, + fullChannelName, + ); + if (authToken != null) { + _subscribe( + fullChannelName, + authToken, + isPrivate: isPrivate, + ); + } + } else { + _subscribe(fullChannelName, null, isPrivate: isPrivate); + } + } else if (response.event == 'pusher:ping') { + _channel.sink.add(jsonEncode({'event': 'pusher:pong'})); } + onData(response); + } catch (e) { + _logger.e('Error processing message: $e'); + } + }, + onError: + (error) => () { + if (options.onError != null) { + options.onError?.call(error); + } else { + _logger.e('WebSocket error: $error'); + } + }, + onDone: () async { + _logger.i('Connection closed: $channelName'); + try { + options.onClose?.call(fullChannelName); + } catch (e) { + _logger.e('onClose handler error: $e'); + } - if (isPrivate) { - final authToken = await _authenticate(socketId, fullChannelName); - _subscribe(fullChannelName, authToken!, isPrivate: isPrivate); - } else { - _subscribe(fullChannelName, null, isPrivate: isPrivate); + if (options.reconnectOnClose) { + for ( + var attempt = 1; + attempt <= options.maxReconnectAttempts; + attempt++ + ) { + final wait = Duration( + milliseconds: + options.reconnectInterval.inMilliseconds * attempt, + ); + _logger.i( + 'Attempting reconnect #$attempt in ${wait.inSeconds}s', + ); + await Future.delayed(wait); + try { + await _connect(); + attachListener(); + _logger.i( + 'Reconnected on attempt #$attempt to $fullChannelName', + ); + break; + } catch (e) { + _logger.e('Reconnect attempt #$attempt failed: $e'); + if (attempt == options.maxReconnectAttempts) { + _logger.e( + 'Max reconnect attempts reached for $fullChannelName', + ); + } + } } - } else if (response.event == 'pusher:ping') { - _channel.sink.add(jsonEncode({'event': 'pusher:pong'})); } - onData(response); - } catch (e) { - _logger.e('Error processing message: $e'); - } - }, - onError: (error) => _logger.e('WebSocket error: $error'), - onDone: () => _logger.i('Connection closed: $channelName'), - ); + }, + ); + } + + // initial subscribe attempt (will finalize on connection_established) + try { + _subscribe(channelName, null); + } catch (_) {} + + attachListener(); } catch (e) { _logger.e('Failed to listen to WebSocket: $e'); rethrow; @@ -145,4 +238,4 @@ class WebsocketResponse { data: json['data'] != null ? jsonDecode(json['data']) : null, ); } -} \ No newline at end of file +} diff --git a/lib/simple_flutter_reverb_options.dart b/lib/simple_flutter_reverb_options.dart index b52c2ab..b14debb 100644 --- a/lib/simple_flutter_reverb_options.dart +++ b/lib/simple_flutter_reverb_options.dart @@ -1,4 +1,3 @@ - class SimpleFlutterReverbOptions { final String scheme; final String host; @@ -8,6 +7,11 @@ class SimpleFlutterReverbOptions { final String? authUrl; final String privatePrefix; final bool usePrefix; + final void Function(String channelName)? onClose; + final void Function(dynamic error)? onError; + final bool reconnectOnClose; + final int maxReconnectAttempts; + final Duration reconnectInterval; SimpleFlutterReverbOptions({ required this.scheme, @@ -18,5 +22,10 @@ class SimpleFlutterReverbOptions { this.authUrl, this.privatePrefix = 'private-', this.usePrefix = true, + this.onClose, + this.onError, + this.reconnectOnClose = false, + this.maxReconnectAttempts = 5, + this.reconnectInterval = const Duration(seconds: 2), }); -} \ No newline at end of file +} From a37089186a87dee94cd8b57316183d347a8b8ced Mon Sep 17 00:00:00 2001 From: LydianJay Date: Tue, 13 Jan 2026 20:15:02 +0800 Subject: [PATCH 2/2] Fix critical issue with onError --- lib/simple_flutter_reverb.dart | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/simple_flutter_reverb.dart b/lib/simple_flutter_reverb.dart index bed1d8a..0abcd54 100644 --- a/lib/simple_flutter_reverb.dart +++ b/lib/simple_flutter_reverb.dart @@ -118,14 +118,13 @@ class SimpleFlutterReverb implements ReverbService { _logger.e('Error processing message: $e'); } }, - onError: - (error) => () { - if (options.onError != null) { - options.onError?.call(error); - } else { - _logger.e('WebSocket error: $error'); - } - }, + onError: (error, stackTrace) { + if (options.onError != null) { + options.onError?.call(error); + } else { + _logger.e('WebSocket error: $error'); + } + }, onDone: () async { _logger.i('Connection closed: $channelName'); try {