From 53c1cc6aaf25cf7a4c3bec5f14fc5224421f8021 Mon Sep 17 00:00:00 2001 From: Gold87 <91761103+Gold872@users.noreply.github.com> Date: Fri, 7 Feb 2025 09:52:13 -0500 Subject: [PATCH 01/11] Add timestamped version of onMessage() --- burt_network/lib/src/utils.dart | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/burt_network/lib/src/utils.dart b/burt_network/lib/src/utils.dart index d76a6d9b..860764da 100644 --- a/burt_network/lib/src/utils.dart +++ b/burt_network/lib/src/utils.dart @@ -31,6 +31,28 @@ extension WrappedMessageStream on Stream { where((wrapper) => wrapper.name == name) .map((wrapper) => constructor(wrapper.data)) .listen(callback); + + /// Allows callers to listen only for specific messages, with a specific timestamp. + /// + /// To use this, pass the name of the message, a function to create the message + /// from binary data, and a callback to handle the message and its time. For example, + /// ```dart + /// collection.server.messages.onMessageTimestamped( + /// name: VideoData().messageName, // equals "VideoData" + /// constructor: VideoData.fromBuffer, + /// callback: (data, time) => print("${data.name}\t$time"); + /// ) + /// ``` + /// + /// This function returns a [StreamSubscription] that you can use to stop listening. + StreamSubscription onMessageTimestamped({ + required String name, + required T Function(List) constructor, + required void Function(T message, Timestamp timestamp) callback, + }) => + where((wrapper) => wrapper.name == name).listen( + (wrapper) => callback(constructor(wrapper.data), wrapper.timestamp), + ); } /// Helpful methods on streams of nullable values. From a82816be8c869609961e41761b8e115d7661c102 Mon Sep 17 00:00:00 2001 From: Gold87 <91761103+Gold872@users.noreply.github.com> Date: Fri, 21 Mar 2025 16:12:55 -0400 Subject: [PATCH 02/11] Added time synchronization protocol --- burt_network/lib/protobuf.dart | 2 +- burt_network/lib/src/generated/core.pb.dart | 67 +++++++++++++++++++ .../lib/src/generated/core.pbjson.dart | 14 ++++ burt_network/lib/src/generated/drive.pb.dart | 18 ++--- burt_network/lib/src/generated/logs.pb.dart | 10 +-- .../lib/src/generated/wrapper.pb.dart | 12 ++-- burt_network/lib/src/udp/burt_socket.dart | 65 +++++++++++++++++- burt_network/lib/src/udp/rover_socket.dart | 5 +- burt_network/lib/src/udp/rover_timesync.dart | 38 +++++++++++ burt_network/lib/src/udp/timesync_server.dart | 37 ++++++++++ burt_network/lib/udp.dart | 1 + 11 files changed, 245 insertions(+), 24 deletions(-) create mode 100644 burt_network/lib/src/udp/rover_timesync.dart create mode 100644 burt_network/lib/src/udp/timesync_server.dart diff --git a/burt_network/lib/protobuf.dart b/burt_network/lib/protobuf.dart index eeba137d..bf9ca6ad 100644 --- a/burt_network/lib/protobuf.dart +++ b/burt_network/lib/protobuf.dart @@ -44,7 +44,7 @@ extension MessageUtils on Message { WrappedMessage wrap([DateTime? timestamp]) => WrappedMessage( data: writeToBuffer(), name: messageName, - timestamp: Timestamp.fromDateTime(timestamp ?? DateTime.now()), + timestamp: Timestamp.fromDateTime(timestamp ?? DateTime.timestamp()), ); } diff --git a/burt_network/lib/src/generated/core.pb.dart b/burt_network/lib/src/generated/core.pb.dart index 1cf1a85c..e871fa23 100644 --- a/burt_network/lib/src/generated/core.pb.dart +++ b/burt_network/lib/src/generated/core.pb.dart @@ -14,6 +14,7 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; import 'core.pbenum.dart'; +import 'google/protobuf/timestamp.pb.dart' as $5; export 'core.pbenum.dart'; @@ -82,6 +83,72 @@ class Connect extends $pb.GeneratedMessage { void clearReceiver() => clearField(2); } +class Timesync extends $pb.GeneratedMessage { + factory Timesync({ + Device? sender, + $5.Timestamp? sendTime, + }) { + final $result = create(); + if (sender != null) { + $result.sender = sender; + } + if (sendTime != null) { + $result.sendTime = sendTime; + } + return $result; + } + Timesync._() : super(); + factory Timesync.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory Timesync.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + + static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'Timesync', createEmptyInstance: create) + ..e(1, _omitFieldNames ? '' : 'sender', $pb.PbFieldType.OE, defaultOrMaker: Device.DEVICE_UNDEFINED, valueOf: Device.valueOf, enumValues: Device.values) + ..aOM<$5.Timestamp>(2, _omitFieldNames ? '' : 'sendTime', protoName: 'sendTime', subBuilder: $5.Timestamp.create) + ..hasRequiredFields = false + ; + + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' + 'Will be removed in next major version') + Timesync clone() => Timesync()..mergeFromMessage(this); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' + 'Will be removed in next major version') + Timesync copyWith(void Function(Timesync) updates) => super.copyWith((message) => updates(message as Timesync)) as Timesync; + + $pb.BuilderInfo get info_ => _i; + + @$core.pragma('dart2js:noInline') + static Timesync create() => Timesync._(); + Timesync createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + @$core.pragma('dart2js:noInline') + static Timesync getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static Timesync? _defaultInstance; + + @$pb.TagNumber(1) + Device get sender => $_getN(0); + @$pb.TagNumber(1) + set sender(Device v) { setField(1, v); } + @$pb.TagNumber(1) + $core.bool hasSender() => $_has(0); + @$pb.TagNumber(1) + void clearSender() => clearField(1); + + @$pb.TagNumber(2) + $5.Timestamp get sendTime => $_getN(1); + @$pb.TagNumber(2) + set sendTime($5.Timestamp v) { setField(2, v); } + @$pb.TagNumber(2) + $core.bool hasSendTime() => $_has(1); + @$pb.TagNumber(2) + void clearSendTime() => clearField(2); + @$pb.TagNumber(2) + $5.Timestamp ensureSendTime() => $_ensure(1); +} + /// Notifies the recipient that the sender will no longer be connected. class Disconnect extends $pb.GeneratedMessage { factory Disconnect({ diff --git a/burt_network/lib/src/generated/core.pbjson.dart b/burt_network/lib/src/generated/core.pbjson.dart index f04e5f18..65f9c588 100644 --- a/burt_network/lib/src/generated/core.pbjson.dart +++ b/burt_network/lib/src/generated/core.pbjson.dart @@ -56,6 +56,20 @@ final $typed_data.Uint8List connectDescriptor = $convert.base64Decode( 'CgdDb25uZWN0Eh8KBnNlbmRlchgBIAEoDjIHLkRldmljZVIGc2VuZGVyEiMKCHJlY2VpdmVyGA' 'IgASgOMgcuRGV2aWNlUghyZWNlaXZlcg=='); +@$core.Deprecated('Use timesyncDescriptor instead') +const Timesync$json = { + '1': 'Timesync', + '2': [ + {'1': 'sender', '3': 1, '4': 1, '5': 14, '6': '.Device', '10': 'sender'}, + {'1': 'sendTime', '3': 2, '4': 1, '5': 11, '6': '.google.protobuf.Timestamp', '10': 'sendTime'}, + ], +}; + +/// Descriptor for `Timesync`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List timesyncDescriptor = $convert.base64Decode( + 'CghUaW1lc3luYxIfCgZzZW5kZXIYASABKA4yBy5EZXZpY2VSBnNlbmRlchI2CghzZW5kVGltZR' + 'gCIAEoCzIaLmdvb2dsZS5wcm90b2J1Zi5UaW1lc3RhbXBSCHNlbmRUaW1l'); + @$core.Deprecated('Use disconnectDescriptor instead') const Disconnect$json = { '1': 'Disconnect', diff --git a/burt_network/lib/src/generated/drive.pb.dart b/burt_network/lib/src/generated/drive.pb.dart index 052c97f1..c5fd3ddd 100644 --- a/burt_network/lib/src/generated/drive.pb.dart +++ b/burt_network/lib/src/generated/drive.pb.dart @@ -14,7 +14,7 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; import 'drive.pbenum.dart'; -import 'status.pbenum.dart' as $5; +import 'status.pbenum.dart' as $6; import 'utils.pbenum.dart' as $0; import 'version.pb.dart' as $3; @@ -32,7 +32,7 @@ class DriveCommand extends $pb.GeneratedMessage { $core.double? frontTilt, $core.double? rearSwivel, $core.double? rearTilt, - $5.RoverStatus? status, + $6.RoverStatus? status, $3.Version? version, ProtoColor? color, $0.BoolState? blink, @@ -97,7 +97,7 @@ class DriveCommand extends $pb.GeneratedMessage { ..a<$core.double>(8, _omitFieldNames ? '' : 'frontTilt', $pb.PbFieldType.OF) ..a<$core.double>(9, _omitFieldNames ? '' : 'rearSwivel', $pb.PbFieldType.OF) ..a<$core.double>(10, _omitFieldNames ? '' : 'rearTilt', $pb.PbFieldType.OF) - ..e<$5.RoverStatus>(11, _omitFieldNames ? '' : 'status', $pb.PbFieldType.OE, defaultOrMaker: $5.RoverStatus.DISCONNECTED, valueOf: $5.RoverStatus.valueOf, enumValues: $5.RoverStatus.values) + ..e<$6.RoverStatus>(11, _omitFieldNames ? '' : 'status', $pb.PbFieldType.OE, defaultOrMaker: $6.RoverStatus.DISCONNECTED, valueOf: $6.RoverStatus.valueOf, enumValues: $6.RoverStatus.values) ..aOM<$3.Version>(12, _omitFieldNames ? '' : 'version', subBuilder: $3.Version.create) ..e(13, _omitFieldNames ? '' : 'color', $pb.PbFieldType.OE, defaultOrMaker: ProtoColor.PROTO_COLOR_UNDEFINED, valueOf: ProtoColor.valueOf, enumValues: ProtoColor.values) ..e<$0.BoolState>(14, _omitFieldNames ? '' : 'blink', $pb.PbFieldType.OE, defaultOrMaker: $0.BoolState.BOOL_UNDEFINED, valueOf: $0.BoolState.valueOf, enumValues: $0.BoolState.values) @@ -222,9 +222,9 @@ class DriveCommand extends $pb.GeneratedMessage { void clearRearTilt() => clearField(10); @$pb.TagNumber(11) - $5.RoverStatus get status => $_getN(10); + $6.RoverStatus get status => $_getN(10); @$pb.TagNumber(11) - set status($5.RoverStatus v) { setField(11, v); } + set status($6.RoverStatus v) { setField(11, v); } @$pb.TagNumber(11) $core.bool hasStatus() => $_has(10); @$pb.TagNumber(11) @@ -283,7 +283,7 @@ class DriveData extends $pb.GeneratedMessage { $core.double? middleRight, $core.double? frontRight, ProtoColor? color, - $5.RoverStatus? status, + $6.RoverStatus? status, }) { final $result = create(); if (throttle != null) { @@ -380,7 +380,7 @@ class DriveData extends $pb.GeneratedMessage { ..a<$core.double>(19, _omitFieldNames ? '' : 'middleRight', $pb.PbFieldType.OF) ..a<$core.double>(20, _omitFieldNames ? '' : 'frontRight', $pb.PbFieldType.OF) ..e(21, _omitFieldNames ? '' : 'color', $pb.PbFieldType.OE, defaultOrMaker: ProtoColor.PROTO_COLOR_UNDEFINED, valueOf: ProtoColor.valueOf, enumValues: ProtoColor.values) - ..e<$5.RoverStatus>(22, _omitFieldNames ? '' : 'status', $pb.PbFieldType.OE, defaultOrMaker: $5.RoverStatus.DISCONNECTED, valueOf: $5.RoverStatus.valueOf, enumValues: $5.RoverStatus.values) + ..e<$6.RoverStatus>(22, _omitFieldNames ? '' : 'status', $pb.PbFieldType.OE, defaultOrMaker: $6.RoverStatus.DISCONNECTED, valueOf: $6.RoverStatus.valueOf, enumValues: $6.RoverStatus.values) ..hasRequiredFields = false ; @@ -605,9 +605,9 @@ class DriveData extends $pb.GeneratedMessage { void clearColor() => clearField(21); @$pb.TagNumber(22) - $5.RoverStatus get status => $_getN(21); + $6.RoverStatus get status => $_getN(21); @$pb.TagNumber(22) - set status($5.RoverStatus v) { setField(22, v); } + set status($6.RoverStatus v) { setField(22, v); } @$pb.TagNumber(22) $core.bool hasStatus() => $_has(21); @$pb.TagNumber(22) diff --git a/burt_network/lib/src/generated/logs.pb.dart b/burt_network/lib/src/generated/logs.pb.dart index 2c99466f..0f673c31 100644 --- a/burt_network/lib/src/generated/logs.pb.dart +++ b/burt_network/lib/src/generated/logs.pb.dart @@ -13,7 +13,7 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; -import 'core.pbenum.dart' as $6; +import 'core.pbenum.dart' as $7; import 'logs.pbenum.dart'; export 'logs.pbenum.dart'; @@ -23,7 +23,7 @@ class BurtLog extends $pb.GeneratedMessage { BurtLogLevel? level, $core.String? title, $core.String? body, - $6.Device? device, + $7.Device? device, }) { final $result = create(); if (level != null) { @@ -48,7 +48,7 @@ class BurtLog extends $pb.GeneratedMessage { ..e(1, _omitFieldNames ? '' : 'level', $pb.PbFieldType.OE, defaultOrMaker: BurtLogLevel.BURT_LOG_LEVEL_UNDEFINED, valueOf: BurtLogLevel.valueOf, enumValues: BurtLogLevel.values) ..aOS(2, _omitFieldNames ? '' : 'title') ..aOS(3, _omitFieldNames ? '' : 'body') - ..e<$6.Device>(4, _omitFieldNames ? '' : 'device', $pb.PbFieldType.OE, defaultOrMaker: $6.Device.DEVICE_UNDEFINED, valueOf: $6.Device.valueOf, enumValues: $6.Device.values) + ..e<$7.Device>(4, _omitFieldNames ? '' : 'device', $pb.PbFieldType.OE, defaultOrMaker: $7.Device.DEVICE_UNDEFINED, valueOf: $7.Device.valueOf, enumValues: $7.Device.values) ..hasRequiredFields = false ; @@ -101,9 +101,9 @@ class BurtLog extends $pb.GeneratedMessage { void clearBody() => clearField(3); @$pb.TagNumber(4) - $6.Device get device => $_getN(3); + $7.Device get device => $_getN(3); @$pb.TagNumber(4) - set device($6.Device v) { setField(4, v); } + set device($7.Device v) { setField(4, v); } @$pb.TagNumber(4) $core.bool hasDevice() => $_has(3); @$pb.TagNumber(4) diff --git a/burt_network/lib/src/generated/wrapper.pb.dart b/burt_network/lib/src/generated/wrapper.pb.dart index 0938c731..c000cd3f 100644 --- a/burt_network/lib/src/generated/wrapper.pb.dart +++ b/burt_network/lib/src/generated/wrapper.pb.dart @@ -13,13 +13,13 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; -import 'google/protobuf/timestamp.pb.dart' as $8; +import 'google/protobuf/timestamp.pb.dart' as $5; class WrappedMessage extends $pb.GeneratedMessage { factory WrappedMessage({ $core.List<$core.int>? data, $core.String? name, - $8.Timestamp? timestamp, + $5.Timestamp? timestamp, }) { final $result = create(); if (data != null) { @@ -40,7 +40,7 @@ class WrappedMessage extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'WrappedMessage', createEmptyInstance: create) ..a<$core.List<$core.int>>(1, _omitFieldNames ? '' : 'data', $pb.PbFieldType.OY) ..aOS(2, _omitFieldNames ? '' : 'name') - ..aOM<$8.Timestamp>(3, _omitFieldNames ? '' : 'timestamp', subBuilder: $8.Timestamp.create) + ..aOM<$5.Timestamp>(3, _omitFieldNames ? '' : 'timestamp', subBuilder: $5.Timestamp.create) ..hasRequiredFields = false ; @@ -84,15 +84,15 @@ class WrappedMessage extends $pb.GeneratedMessage { void clearName() => clearField(2); @$pb.TagNumber(3) - $8.Timestamp get timestamp => $_getN(2); + $5.Timestamp get timestamp => $_getN(2); @$pb.TagNumber(3) - set timestamp($8.Timestamp v) { setField(3, v); } + set timestamp($5.Timestamp v) { setField(3, v); } @$pb.TagNumber(3) $core.bool hasTimestamp() => $_has(2); @$pb.TagNumber(3) void clearTimestamp() => clearField(3); @$pb.TagNumber(3) - $8.Timestamp ensureTimestamp() => $_ensure(2); + $5.Timestamp ensureTimestamp() => $_ensure(2); } diff --git a/burt_network/lib/src/udp/burt_socket.dart b/burt_network/lib/src/udp/burt_socket.dart index 3520a008..36669537 100644 --- a/burt_network/lib/src/udp/burt_socket.dart +++ b/burt_network/lib/src/udp/burt_socket.dart @@ -4,8 +4,12 @@ import "dart:io"; import "package:burt_network/burt_network.dart"; import "package:meta/meta.dart"; -extension on Datagram { +/// Utility methods for parsing datagram messages as wrapped messages +extension DatagramUtil on Datagram { + /// Returns the wrapped message parsed from the data of the datagram WrappedMessage parseWrapper() => WrappedMessage.fromBuffer(data); + + /// The source that the datagram was sent from SocketInfo get source => SocketInfo(address: address, port: port); } @@ -40,7 +44,16 @@ abstract class BurtSocket extends UdpSocket { /// Used to properly respond to heartbeats and for thorough logging. final Device device; + /// The port to send and receive timesync messages from + /// If this socket is configured to send timesync messages, + /// the [Timesync] message will be sent to a socket with + /// the [destination] IP address, and timesync port. + /// + /// By default, the timesync port is 8020 + final int timesyncPort; + Timer? _heartbeatTimer; + Timer? _timesyncTimer; StreamSubscription? _subscription; @@ -53,6 +66,7 @@ abstract class BurtSocket extends UdpSocket { BurtSocket({ required super.port, required this.device, + this.timesyncPort = 8020, super.destination, super.quiet, super.keepDestination, @@ -68,6 +82,9 @@ abstract class BurtSocket extends UdpSocket { await super.init(); _subscription = stream.listen(_onPacket); _heartbeatTimer = Timer.periodic(heartbeatInterval, (_) => checkHeartbeats()); + if (shouldSendTimesync) { + _timesyncTimer = Timer.periodic(const Duration(seconds: 1), (_) => sendTimesync()); + } return true; } @@ -75,6 +92,7 @@ abstract class BurtSocket extends UdpSocket { Future dispose() async { await _subscription?.cancel(); _heartbeatTimer?.cancel(); + _timesyncTimer?.cancel(); await super.dispose(); } @@ -114,10 +132,14 @@ abstract class BurtSocket extends UdpSocket { } void _onPacket(Datagram packet) { + final receiveTime = DateTime.timestamp(); final wrapper = packet.parseWrapper(); if (wrapper.name == Connect().messageName) { final heartbeat = Connect.fromBuffer(wrapper.data); onHeartbeat(heartbeat, packet.source); + } else if (wrapper.name == Timesync().messageName) { + final timesync = Timesync.fromBuffer(wrapper.data); + onTimesync(timesync, wrapper.timestamp, receiveTime, packet.source); } else if (wrapper.name == UpdateSetting().messageName) { final settings = UpdateSetting.fromBuffer(wrapper.data); onSettings(settings); @@ -133,6 +155,18 @@ abstract class BurtSocket extends UdpSocket { /// the current [destination] to properly handle the heartbeat. void onHeartbeat(Heartbeat heartbeat, SocketInfo source); + /// Handle an incoming Timesync message from a given source. + /// + /// The message's send time, server receive time, and client receive + /// time will be used to calculate the estimated time offset between + /// the socket and timesync server + void onTimesync( + Timesync timesync, + Timestamp serverReceiveTime, + DateTime clientReceiveTime, + SocketInfo source, + ) {} + /// Handle an incoming request to change network settings. /// /// Be sure to echo the message back using [sendMessage], to confirm receipt. @@ -144,9 +178,34 @@ abstract class BurtSocket extends UdpSocket { /// Whether the device on the other end is connected. bool get isConnected; + /// Whether or not this socket should be sending timesync events to its destination + /// If true, it will periodically send timesync events to its destination to + bool get shouldSendTimesync; + + /// The current timestamp of the socket + /// This timestamp is used as the default timestamp when sending a message + DateTime get timestamp; + /// Sends or waits for heartbeats to or from the other device. void checkHeartbeats(); + /// Sends a timesync message to the destination IP address on port [timesyncPort] + void sendTimesync() { + if (destination == null) { + return; + } + sendMessage( + Timesync( + sender: device, + sendTime: Timestamp.fromDateTime(DateTime.timestamp()), + ), + destination: SocketInfo( + address: destination!.address, + port: timesyncPort, + ), + ); + } + /// Sets [destination] to the incoming [source]. /// /// Override this function to run custom code when a device connects to this socket. @@ -178,4 +237,8 @@ abstract class BurtSocket extends UdpSocket { _connectionCompleter = Completer(); return _connectionCompleter!.future; } + + @override + void sendMessage(Message message, {SocketInfo? destination}) => + sendWrapper(message.wrap(timestamp), destination: destination); } diff --git a/burt_network/lib/src/udp/rover_socket.dart b/burt_network/lib/src/udp/rover_socket.dart index de35ed29..4f0f3f53 100644 --- a/burt_network/lib/src/udp/rover_socket.dart +++ b/burt_network/lib/src/udp/rover_socket.dart @@ -1,7 +1,8 @@ import "burt_socket.dart"; import "rover_heartbeats.dart"; +import "rover_timesync.dart"; import "rover_logger.dart"; import "rover_settings.dart"; -/// A UDP socket fit for use on the rover, with heartbeats, logging, and settings included. -class RoverSocket = BurtSocket with RoverHeartbeats, RoverLogger, RoverSettings; +/// A UDP socket fit for use on the rover, with heartbeats, timesync, logging, and settings included. +class RoverSocket = BurtSocket with RoverHeartbeats, RoverTimesync, RoverLogger, RoverSettings; diff --git a/burt_network/lib/src/udp/rover_timesync.dart b/burt_network/lib/src/udp/rover_timesync.dart new file mode 100644 index 00000000..80c60512 --- /dev/null +++ b/burt_network/lib/src/udp/rover_timesync.dart @@ -0,0 +1,38 @@ +import "package:burt_network/burt_network.dart"; + +/// A mixin to automatically handle time synchronization +mixin RoverTimesync on BurtSocket { + Duration _timeOffset = Duration.zero; + + @override + bool get shouldSendTimesync => true; + + @override + DateTime get timestamp => DateTime.timestamp().subtract(_timeOffset); + + @override + void onTimesync(Timesync timesync, Timestamp serverReceiveTime, DateTime clientReceiveTime, SocketInfo source) { + if (timesync.sender != device) { + if (!quiet) { + logger.warning( + "Device ${device.name} received a timesync event for ${timesync.sender.name}", + );} + return; + } + if (source.port != timesyncPort) { + if (!quiet) { + logger.warning( + "Socket on port ${destination?.port} expected to receive timesync message from port $timesyncPort, but received from ${source.port} instead", + ); + } + return; + } + final pongLocalTime = clientReceiveTime.microsecondsSinceEpoch; + final pingClientTime = timesync.sendTime.toDateTime().microsecondsSinceEpoch; + final pongServerTime = serverReceiveTime.toDateTime().microsecondsSinceEpoch; + + final rtt = pongLocalTime - pingClientTime; + final serverOffsetMicros = pongServerTime - rtt ~/ 2 - pingClientTime; + _timeOffset = Duration(microseconds: serverOffsetMicros); + } +} diff --git a/burt_network/lib/src/udp/timesync_server.dart b/burt_network/lib/src/udp/timesync_server.dart new file mode 100644 index 00000000..82771e8c --- /dev/null +++ b/burt_network/lib/src/udp/timesync_server.dart @@ -0,0 +1,37 @@ +import "dart:async"; +import "dart:io"; + +import "package:burt_network/burt_network.dart"; + +/// A UDP socket to act as the "time server" for time synchronization events +/// +/// The time synchronization is received and handled from an independent UDP +/// socket, to reduce congestion with other data messages, which lowers latency +/// and improves time accuracy. +class TimesyncServer extends UdpSocket { + StreamSubscription? _subscription; + + /// Default constructor for TimesyncServer + TimesyncServer({required super.port}); + + @override + Future init() async { + await super.init(); + _subscription = stream.listen(_onPacket); + return true; + } + + @override + Future dispose() async { + await _subscription?.cancel(); + await super.dispose(); + } + + void _onPacket(Datagram packet) { + final rxTime = DateTime.timestamp(); + final wrapper = packet.parseWrapper(); + wrapper.timestamp = Timestamp.fromDateTime(rxTime); + + sendWrapper(wrapper, destination: packet.source); + } +} diff --git a/burt_network/lib/udp.dart b/burt_network/lib/udp.dart index 75c9161e..64bbc9ab 100644 --- a/burt_network/lib/udp.dart +++ b/burt_network/lib/udp.dart @@ -30,3 +30,4 @@ export "src/udp/rover_settings.dart"; export "src/udp/rover_heartbeats.dart"; export "src/udp/socket_info.dart"; export "src/udp/udp_socket.dart"; +export "src/udp/timesync_server.dart"; From d345c328445f3a649f8f41bb030eb6e21d609a3d Mon Sep 17 00:00:00 2001 From: Gold87 <91761103+Gold872@users.noreply.github.com> Date: Fri, 21 Mar 2025 16:20:55 -0400 Subject: [PATCH 03/11] Add default timestamp getter --- burt_network/lib/src/udp/burt_socket.dart | 2 +- burt_network/test/udp_sockets.dart | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/burt_network/lib/src/udp/burt_socket.dart b/burt_network/lib/src/udp/burt_socket.dart index 36669537..66df6006 100644 --- a/burt_network/lib/src/udp/burt_socket.dart +++ b/burt_network/lib/src/udp/burt_socket.dart @@ -184,7 +184,7 @@ abstract class BurtSocket extends UdpSocket { /// The current timestamp of the socket /// This timestamp is used as the default timestamp when sending a message - DateTime get timestamp; + DateTime get timestamp => DateTime.timestamp(); /// Sends or waits for heartbeats to or from the other device. void checkHeartbeats(); diff --git a/burt_network/test/udp_sockets.dart b/burt_network/test/udp_sockets.dart index a6cf6374..7a618239 100644 --- a/burt_network/test/udp_sockets.dart +++ b/burt_network/test/udp_sockets.dart @@ -104,6 +104,9 @@ class TestClient extends BurtSocket { @override void onSettings(UpdateSetting settings) { } + + @override + bool get shouldSendTimesync => false; } class RestartTrackingService extends Service { From d38e2a7d17df8261ae7792de316d9d3f579dfbd6 Mon Sep 17 00:00:00 2001 From: Gold87 <91761103+Gold872@users.noreply.github.com> Date: Mon, 24 Mar 2025 11:00:20 -0400 Subject: [PATCH 04/11] Default shouldSendTimesync to false --- burt_network/lib/src/udp/burt_socket.dart | 2 +- burt_network/lib/src/udp/rover_timesync.dart | 22 +++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/burt_network/lib/src/udp/burt_socket.dart b/burt_network/lib/src/udp/burt_socket.dart index 66df6006..148e3b2c 100644 --- a/burt_network/lib/src/udp/burt_socket.dart +++ b/burt_network/lib/src/udp/burt_socket.dart @@ -180,7 +180,7 @@ abstract class BurtSocket extends UdpSocket { /// Whether or not this socket should be sending timesync events to its destination /// If true, it will periodically send timesync events to its destination to - bool get shouldSendTimesync; + bool get shouldSendTimesync => false; /// The current timestamp of the socket /// This timestamp is used as the default timestamp when sending a message diff --git a/burt_network/lib/src/udp/rover_timesync.dart b/burt_network/lib/src/udp/rover_timesync.dart index 80c60512..597ec14b 100644 --- a/burt_network/lib/src/udp/rover_timesync.dart +++ b/burt_network/lib/src/udp/rover_timesync.dart @@ -8,15 +8,21 @@ mixin RoverTimesync on BurtSocket { bool get shouldSendTimesync => true; @override - DateTime get timestamp => DateTime.timestamp().subtract(_timeOffset); + DateTime get timestamp => DateTime.timestamp().add(_timeOffset); @override - void onTimesync(Timesync timesync, Timestamp serverReceiveTime, DateTime clientReceiveTime, SocketInfo source) { + void onTimesync( + Timesync timesync, + Timestamp serverReceiveTime, + DateTime clientReceiveTime, + SocketInfo source, + ) { if (timesync.sender != device) { if (!quiet) { - logger.warning( - "Device ${device.name} received a timesync event for ${timesync.sender.name}", - );} + logger.warning( + "Device ${device.name} received a timesync event for ${timesync.sender.name}", + ); + } return; } if (source.port != timesyncPort) { @@ -28,8 +34,10 @@ mixin RoverTimesync on BurtSocket { return; } final pongLocalTime = clientReceiveTime.microsecondsSinceEpoch; - final pingClientTime = timesync.sendTime.toDateTime().microsecondsSinceEpoch; - final pongServerTime = serverReceiveTime.toDateTime().microsecondsSinceEpoch; + final pingClientTime = + timesync.sendTime.toDateTime().microsecondsSinceEpoch; + final pongServerTime = + serverReceiveTime.toDateTime().microsecondsSinceEpoch; final rtt = pongLocalTime - pingClientTime; final serverOffsetMicros = pongServerTime - rtt ~/ 2 - pingClientTime; From fedfa14b6945ed0b88351325f8d3925221cbf841 Mon Sep 17 00:00:00 2001 From: Gold87 <91761103+Gold872@users.noreply.github.com> Date: Mon, 24 Mar 2025 11:12:48 -0400 Subject: [PATCH 05/11] Updated docs --- burt_network/lib/src/udp/burt_socket.dart | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/burt_network/lib/src/udp/burt_socket.dart b/burt_network/lib/src/udp/burt_socket.dart index 148e3b2c..5afd9b05 100644 --- a/burt_network/lib/src/udp/burt_socket.dart +++ b/burt_network/lib/src/udp/burt_socket.dart @@ -28,6 +28,7 @@ typedef NetworkSettings = UpdateSetting; /// This class also filters incoming messages to handle special types of messages: /// - Override [onHeartbeat] to handle [Heartbeat] messages /// - Override [onSettings] to handle [NetworkSettings] messages +/// - Override [onTimesync] to handle [Timesync] messages /// /// To ensure connectedness, even over UDP, we send [Heartbeat] messages periodically. Override /// [checkHeartbeats] and [heartbeatInterval] to send or wait for heartbeats, and set [isConnected] @@ -179,11 +180,16 @@ abstract class BurtSocket extends UdpSocket { bool get isConnected; /// Whether or not this socket should be sending timesync events to its destination - /// If true, it will periodically send timesync events to its destination to + /// If true, it will periodically send timesync events to a destination timesync server. + /// + /// This serves a separate function than heartbeats. Heartbeats are used solely to + /// keep up the connection, wheras timesync is used to ensure that the dashboard and + /// rover are on the same time scale, allowing message timestamps to be consistent + /// regardless of where they are sent to or from. bool get shouldSendTimesync => false; - /// The current timestamp of the socket - /// This timestamp is used as the default timestamp when sending a message + /// The current time of the socket. + /// This timestamp is used as the default timestamp when sending a message. DateTime get timestamp => DateTime.timestamp(); /// Sends or waits for heartbeats to or from the other device. From e9b5e0230232c46509ad46663f39273a14337f15 Mon Sep 17 00:00:00 2001 From: Gold87 <91761103+Gold872@users.noreply.github.com> Date: Mon, 24 Mar 2025 11:15:11 -0400 Subject: [PATCH 06/11] Allow timesync server to be quiet --- burt_network/lib/src/udp/timesync_server.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/burt_network/lib/src/udp/timesync_server.dart b/burt_network/lib/src/udp/timesync_server.dart index 82771e8c..b1ca43b8 100644 --- a/burt_network/lib/src/udp/timesync_server.dart +++ b/burt_network/lib/src/udp/timesync_server.dart @@ -12,7 +12,7 @@ class TimesyncServer extends UdpSocket { StreamSubscription? _subscription; /// Default constructor for TimesyncServer - TimesyncServer({required super.port}); + TimesyncServer({required super.port, super.quiet}); @override Future init() async { From a59038e0cb9838e643ce536c53e83a1a026a9f0e Mon Sep 17 00:00:00 2001 From: Gold87 <91761103+Gold872@users.noreply.github.com> Date: Mon, 24 Mar 2025 13:23:27 -0400 Subject: [PATCH 07/11] Fix server offset math --- burt_network/lib/src/udp/rover_timesync.dart | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/burt_network/lib/src/udp/rover_timesync.dart b/burt_network/lib/src/udp/rover_timesync.dart index 597ec14b..bd43168d 100644 --- a/burt_network/lib/src/udp/rover_timesync.dart +++ b/burt_network/lib/src/udp/rover_timesync.dart @@ -33,14 +33,20 @@ mixin RoverTimesync on BurtSocket { } return; } + if (source.address.isLoopback) { + _timeOffset = Duration.zero; + return; + } + final pongLocalTime = clientReceiveTime.microsecondsSinceEpoch; final pingClientTime = timesync.sendTime.toDateTime().microsecondsSinceEpoch; final pongServerTime = serverReceiveTime.toDateTime().microsecondsSinceEpoch; - final rtt = pongLocalTime - pingClientTime; - final serverOffsetMicros = pongServerTime - rtt ~/ 2 - pingClientTime; + final rtt2 = pongLocalTime - pingClientTime; + final serverTimeAtRx = pongServerTime + rtt2 ~/ 2; + final serverOffsetMicros = serverTimeAtRx - pongLocalTime; _timeOffset = Duration(microseconds: serverOffsetMicros); } } From aa38b326e1fe896b4422362d024e9856e7b177c5 Mon Sep 17 00:00:00 2001 From: Gold87 <91761103+Gold872@users.noreply.github.com> Date: Mon, 24 Mar 2025 18:51:12 -0400 Subject: [PATCH 08/11] Use socketinfo for destination --- burt_network/lib/src/udp/burt_socket.dart | 42 ++++++++++++-------- burt_network/lib/src/udp/rover_timesync.dart | 6 ++- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/burt_network/lib/src/udp/burt_socket.dart b/burt_network/lib/src/udp/burt_socket.dart index 5afd9b05..a05d0042 100644 --- a/burt_network/lib/src/udp/burt_socket.dart +++ b/burt_network/lib/src/udp/burt_socket.dart @@ -45,13 +45,25 @@ abstract class BurtSocket extends UdpSocket { /// Used to properly respond to heartbeats and for thorough logging. final Device device; - /// The port to send and receive timesync messages from - /// If this socket is configured to send timesync messages, - /// the [Timesync] message will be sent to a socket with - /// the [destination] IP address, and timesync port. - /// - /// By default, the timesync port is 8020 - final int timesyncPort; + /// The address and port of the timesync server. + /// + /// If this socket is configured to send timesync messages, the [Timesync] message + /// will be sent to a socket with the specified IP address and port. + /// + /// If the Socket Info's IP address is [InternetAddress.anyIPv4], it will be sent to + /// the [destination] address + /// + /// By default, the address is the destination address, on port 8020 + SocketInfo get timesyncDestination { + final address = + _timesyncDestination.address != InternetAddress.anyIPv4 + ? _timesyncDestination.address + : destination?.address ?? InternetAddress.loopbackIPv4; + + return SocketInfo(address: address, port: _timesyncDestination.port); + } + + late final SocketInfo _timesyncDestination; Timer? _heartbeatTimer; Timer? _timesyncTimer; @@ -67,12 +79,14 @@ abstract class BurtSocket extends UdpSocket { BurtSocket({ required super.port, required this.device, - this.timesyncPort = 8020, super.destination, super.quiet, super.keepDestination, this.collection, - }); + SocketInfo? timesyncAddress, + }) : _timesyncDestination = + timesyncAddress ?? + SocketInfo(address: InternetAddress.anyIPv4, port: 8020); /// A stream of [WrappedMessage]s as they arrive in the UDP socket. @override @@ -195,20 +209,14 @@ abstract class BurtSocket extends UdpSocket { /// Sends or waits for heartbeats to or from the other device. void checkHeartbeats(); - /// Sends a timesync message to the destination IP address on port [timesyncPort] + /// Sends a timesync message to the [timesyncDestination] void sendTimesync() { - if (destination == null) { - return; - } sendMessage( Timesync( sender: device, sendTime: Timestamp.fromDateTime(DateTime.timestamp()), ), - destination: SocketInfo( - address: destination!.address, - port: timesyncPort, - ), + destination: timesyncDestination, ); } diff --git a/burt_network/lib/src/udp/rover_timesync.dart b/burt_network/lib/src/udp/rover_timesync.dart index bd43168d..c8ccfcd9 100644 --- a/burt_network/lib/src/udp/rover_timesync.dart +++ b/burt_network/lib/src/udp/rover_timesync.dart @@ -25,14 +25,16 @@ mixin RoverTimesync on BurtSocket { } return; } - if (source.port != timesyncPort) { + + if (source != timesyncDestination) { if (!quiet) { logger.warning( - "Socket on port ${destination?.port} expected to receive timesync message from port $timesyncPort, but received from ${source.port} instead", + "Socket on port $port expected to receive timesync message from $timesyncDestination, but received from $source instead", ); } return; } + if (source.address.isLoopback) { _timeOffset = Duration.zero; return; From 522e3c62c8751d181d25e750bca8e3216429339f Mon Sep 17 00:00:00 2001 From: Gold87 <91761103+Gold872@users.noreply.github.com> Date: Mon, 24 Mar 2025 19:26:59 -0400 Subject: [PATCH 09/11] Add timesync file --- burt_network/bin/timesync_server.dart | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 burt_network/bin/timesync_server.dart diff --git a/burt_network/bin/timesync_server.dart b/burt_network/bin/timesync_server.dart new file mode 100644 index 00000000..4f7ea628 --- /dev/null +++ b/burt_network/bin/timesync_server.dart @@ -0,0 +1,6 @@ +import "package:burt_network/src/udp/timesync_server.dart"; + +void main() async { + final timesync = TimesyncServer(port: 8020); + await timesync.init(); +} From f5a994eef354455305c90d7b6e9d8b4ffa18bf3d Mon Sep 17 00:00:00 2001 From: Gold87 <91761103+Gold872@users.noreply.github.com> Date: Mon, 24 Mar 2025 19:41:50 -0400 Subject: [PATCH 10/11] Make timesync destination mutable --- burt_network/lib/src/udp/burt_socket.dart | 6 +++++- burt_network/lib/udp.dart | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/burt_network/lib/src/udp/burt_socket.dart b/burt_network/lib/src/udp/burt_socket.dart index a05d0042..b5070e66 100644 --- a/burt_network/lib/src/udp/burt_socket.dart +++ b/burt_network/lib/src/udp/burt_socket.dart @@ -63,7 +63,11 @@ abstract class BurtSocket extends UdpSocket { return SocketInfo(address: address, port: _timesyncDestination.port); } - late final SocketInfo _timesyncDestination; + set timesyncDestination(SocketInfo destination) { + _timesyncDestination = destination; + } + + late SocketInfo _timesyncDestination; Timer? _heartbeatTimer; Timer? _timesyncTimer; diff --git a/burt_network/lib/udp.dart b/burt_network/lib/udp.dart index 64bbc9ab..85c9dc36 100644 --- a/burt_network/lib/udp.dart +++ b/burt_network/lib/udp.dart @@ -28,6 +28,7 @@ export "src/udp/burt_socket.dart"; export "src/udp/rover_socket.dart"; export "src/udp/rover_settings.dart"; export "src/udp/rover_heartbeats.dart"; +export "src/udp/rover_timesync.dart"; export "src/udp/socket_info.dart"; export "src/udp/udp_socket.dart"; export "src/udp/timesync_server.dart"; From 834814151ef9c5eaf986de831bbdce53df18b3e7 Mon Sep 17 00:00:00 2001 From: Gold87 <91761103+Gold872@users.noreply.github.com> Date: Mon, 24 Mar 2025 19:52:11 -0400 Subject: [PATCH 11/11] Default to localhost for timesync address --- burt_network/lib/src/udp/burt_socket.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/burt_network/lib/src/udp/burt_socket.dart b/burt_network/lib/src/udp/burt_socket.dart index b5070e66..b0245702 100644 --- a/burt_network/lib/src/udp/burt_socket.dart +++ b/burt_network/lib/src/udp/burt_socket.dart @@ -90,7 +90,7 @@ abstract class BurtSocket extends UdpSocket { SocketInfo? timesyncAddress, }) : _timesyncDestination = timesyncAddress ?? - SocketInfo(address: InternetAddress.anyIPv4, port: 8020); + SocketInfo(address: InternetAddress.loopbackIPv4, port: 8020); /// A stream of [WrappedMessage]s as they arrive in the UDP socket. @override