Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 128 additions & 36 deletions lib/simple_flutter_reverb.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import 'dart:convert';

import 'package:simple_flutter_reverb/simple_flutter_reverb_options.dart';
import 'package:http/http.dart' as http;
import 'package:logger/logger.dart';
Expand All @@ -8,13 +7,21 @@ import 'package:web_socket_channel/web_socket_channel.dart';

abstract class ReverbService {
Future<String?> _authenticate(String socketId, String channelName);
void _subscribe(String channelName, String? broadcastAuthToken, {bool isPrivate = false});
void listen(void Function(dynamic) onData, String channelName, {bool isPrivate = false});
void _subscribe(
String channelName,
String? broadcastAuthToken, {
bool isPrivate = false,
});
void listen(
void Function(dynamic) onData,
String channelName, {
bool isPrivate = false,
});
void close();
}

class SimpleFlutterReverb implements ReverbService {
late final WebSocketChannel _channel;
late WebSocketChannel _channel;
final SimpleFlutterReverbOptions options;
final Logger _logger = Logger();

Expand All @@ -28,16 +35,33 @@ class SimpleFlutterReverb implements ReverbService {
}
}

Future<void> _connect() async {
try {
final wsUrl = _constructWebSocketUrl();
_channel = WebSocketChannel.connect(Uri.parse(wsUrl));
} catch (e) {
_logger.e('Failed to connect to WebSocket: $e');
rethrow;
}
}
Comment on lines +38 to +46

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic inside this new _connect method is identical to the connection logic in the SimpleFlutterReverb constructor (lines 28-36). Duplicating code can lead to maintenance issues where a bug fix or change is applied in one place but not the other. To adhere to the Don't Repeat Yourself (DRY) principle, this logic should be consolidated into a single private method that is called by both the constructor and the reconnect logic.


String _constructWebSocketUrl() {
return '${options.scheme}://${options.host}:${options.port}/app/${options.appKey}';
}

@override
void _subscribe(String channelName, String? broadcastAuthToken, {bool isPrivate = false}) {
void _subscribe(
String channelName,
String? broadcastAuthToken, {
bool isPrivate = false,
}) {
try {
final subscription = {
"event": "pusher:subscribe",
"data": isPrivate ? {"channel": channelName, "auth": broadcastAuthToken} : {"channel": channelName},
"data":
isPrivate
? {"channel": channelName, "auth": broadcastAuthToken}
: {"channel": channelName},
};
_channel.sink.add(jsonEncode(subscription));
} catch (e) {
Expand All @@ -47,41 +71,109 @@ class SimpleFlutterReverb implements ReverbService {
}

@override
void listen(void Function(dynamic) onData, String channelName, {bool isPrivate = false}) {
void listen(
void Function(dynamic) onData,
String channelName, {
bool isPrivate = false,
}) {
try {
final channelPrefix = options.usePrefix ? options.privatePrefix : '';
final fullChannelName = isPrivate ? '$channelPrefix$channelName' : channelName;
_subscribe(channelName, null);
_channel.stream.listen(
(message) async {
try {
final Map<String, dynamic> jsonMessage = jsonDecode(message);
final response = WebsocketResponse.fromJson(jsonMessage);

if (response.event == 'pusher:connection_established') {
final socketId = response.data?['socket_id'];

if (socketId == null) {
throw Exception('Socket ID is missing');
final fullChannelName =
isPrivate ? '$channelPrefix$channelName' : channelName;

void attachListener() {
_channel.stream.listen(
(message) async {
try {
final Map<String, dynamic> jsonMessage = jsonDecode(message);
final response = WebsocketResponse.fromJson(jsonMessage);

if (response.event == 'pusher:connection_established') {
final socketId = response.data?['socket_id'];

if (socketId == null) {
throw Exception('Socket ID is missing');
}

if (isPrivate) {
final authToken = await _authenticate(
socketId,
fullChannelName,
);
if (authToken != null) {
_subscribe(
fullChannelName,
authToken,
isPrivate: isPrivate,
);
}
} else {
_subscribe(fullChannelName, null, isPrivate: isPrivate);
}
} else if (response.event == 'pusher:ping') {
_channel.sink.add(jsonEncode({'event': 'pusher:pong'}));
}
onData(response);
} catch (e) {
_logger.e('Error processing message: $e');
}
},
onError: (error, stackTrace) {
if (options.onError != null) {
options.onError?.call(error);
} else {
_logger.e('WebSocket error: $error');
}
},
onDone: () async {
_logger.i('Connection closed: $channelName');
try {
options.onClose?.call(fullChannelName);
} catch (e) {
_logger.e('onClose handler error: $e');
}

if (isPrivate) {
final authToken = await _authenticate(socketId, fullChannelName);
_subscribe(fullChannelName, authToken!, isPrivate: isPrivate);
} else {
_subscribe(fullChannelName, null, isPrivate: isPrivate);
if (options.reconnectOnClose) {
for (
var attempt = 1;
attempt <= options.maxReconnectAttempts;
attempt++
) {
final wait = Duration(
milliseconds:
options.reconnectInterval.inMilliseconds * attempt,
);
Comment on lines +142 to +145

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation uses a linear backoff for reconnection attempts (interval * attempt). While this is better than a fixed interval, an exponential backoff strategy is generally recommended for network reconnections. Exponential backoff helps to prevent a "thundering herd" problem where many clients try to reconnect simultaneously after a service disruption, and it spreads out connection attempts more effectively over time.

You can achieve exponential backoff using a bitwise left shift, which avoids needing to import dart:math.

Suggested change
final wait = Duration(
milliseconds:
options.reconnectInterval.inMilliseconds * attempt,
);
final wait = Duration(
milliseconds:
options.reconnectInterval.inMilliseconds * (1 << (attempt - 1)),
);

_logger.i(
'Attempting reconnect #$attempt in ${wait.inSeconds}s',
);
await Future.delayed(wait);
try {
await _connect();
attachListener();
_logger.i(
'Reconnected on attempt #$attempt to $fullChannelName',
);
break;
} catch (e) {
_logger.e('Reconnect attempt #$attempt failed: $e');
if (attempt == options.maxReconnectAttempts) {
_logger.e(
'Max reconnect attempts reached for $fullChannelName',
);
}
}
}
} else if (response.event == 'pusher:ping') {
_channel.sink.add(jsonEncode({'event': 'pusher:pong'}));
}
onData(response);
} catch (e) {
_logger.e('Error processing message: $e');
}
},
onError: (error) => _logger.e('WebSocket error: $error'),
onDone: () => _logger.i('Connection closed: $channelName'),
);
},
);
}

// initial subscribe attempt (will finalize on connection_established)
try {
_subscribe(channelName, null);
} catch (_) {}
Comment on lines +172 to +174

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Swallowing exceptions silently using an empty catch block is generally discouraged as it can hide important issues during development and in production. If an error occurs during the initial subscription attempt, it should at least be logged for debugging purposes.

Suggested change
try {
_subscribe(channelName, null);
} catch (_) {}
try {
_subscribe(channelName, null);
} catch (e) {
_logger.w('Initial subscribe attempt failed, will retry on connection: $e');
}


attachListener();
} catch (e) {
_logger.e('Failed to listen to WebSocket: $e');
rethrow;
Expand Down Expand Up @@ -145,4 +237,4 @@ class WebsocketResponse {
data: json['data'] != null ? jsonDecode(json['data']) : null,
);
}
}
}
13 changes: 11 additions & 2 deletions lib/simple_flutter_reverb_options.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

class SimpleFlutterReverbOptions {
final String scheme;
final String host;
Expand All @@ -8,6 +7,11 @@ class SimpleFlutterReverbOptions {
final String? authUrl;
final String privatePrefix;
final bool usePrefix;
final void Function(String channelName)? onClose;
final void Function(dynamic error)? onError;
final bool reconnectOnClose;
final int maxReconnectAttempts;
final Duration reconnectInterval;

SimpleFlutterReverbOptions({
required this.scheme,
Expand All @@ -18,5 +22,10 @@ class SimpleFlutterReverbOptions {
this.authUrl,
this.privatePrefix = 'private-',
this.usePrefix = true,
this.onClose,
this.onError,
this.reconnectOnClose = false,
this.maxReconnectAttempts = 5,
this.reconnectInterval = const Duration(seconds: 2),
});
}
}