diff --git a/autonomy/lib/src/detector/network_detector.dart b/autonomy/lib/src/detector/network_detector.dart index 37e7796d..144b4dc0 100644 --- a/autonomy/lib/src/detector/network_detector.dart +++ b/autonomy/lib/src/detector/network_detector.dart @@ -24,7 +24,7 @@ class NetworkDetector extends DetectorInterface { @override Future init() async { - collection.server.messages.onMessage( + collection.server.messages.listenFor( name: AutonomyData().messageName, constructor: AutonomyData.fromBuffer, callback: _onDataReceived, diff --git a/autonomy/lib/src/detector/rover_detector.dart b/autonomy/lib/src/detector/rover_detector.dart index 48dedd4a..1511a419 100644 --- a/autonomy/lib/src/detector/rover_detector.dart +++ b/autonomy/lib/src/detector/rover_detector.dart @@ -115,7 +115,7 @@ class RoverDetector extends DetectorInterface { @override Future init() async { - _subscription = collection.server.messages.onMessage( + _subscription = collection.server.messages.listenFor( name: LidarPointCloud().messageName, constructor: LidarPointCloud.fromBuffer, callback: _handleLidarCloud, diff --git a/autonomy/lib/src/drive/drive_interface.dart b/autonomy/lib/src/drive/drive_interface.dart index c87ca3ba..4a128435 100644 --- a/autonomy/lib/src/drive/drive_interface.dart +++ b/autonomy/lib/src/drive/drive_interface.dart @@ -70,7 +70,7 @@ abstract class DriveInterface extends Service { faceOrientationState(direction.orientation); /// State to face the rover towards [orientation] - StateInterface faceOrientationState(Orientation orientation); + StateInterface faceOrientationState(Rotation3d orientation); /// State to execute actions relating to the turning of an [AutonomyAStarState] StateInterface turnStateState(AutonomyAStarState state) => @@ -83,7 +83,7 @@ abstract class DriveInterface extends Service { Future driveForward(GpsCoordinates position); /// Turn to face [orientation], returns whether or not it was able to turn - Future faceOrientation(Orientation orientation); + Future faceOrientation(Rotation3d orientation); /// Turn to face the orientation of [direction], returns whether or not it was able to turn Future faceDirection(CardinalDirection direction) => diff --git a/autonomy/lib/src/drive/rover_drive.dart b/autonomy/lib/src/drive/rover_drive.dart index 91064713..b111b5e9 100644 --- a/autonomy/lib/src/drive/rover_drive.dart +++ b/autonomy/lib/src/drive/rover_drive.dart @@ -84,7 +84,7 @@ class RoverDrive extends DriveInterface { Future approachAruco() => sensorDrive.approachAruco(); @override - Future faceOrientation(Orientation orientation) async { + Future faceOrientation(Rotation3d orientation) async { if (useImu) { return sensorDrive.faceOrientation(orientation); } else { @@ -135,7 +135,7 @@ class RoverDrive extends DriveInterface { } @override - StateInterface faceOrientationState(Orientation orientation) { + StateInterface faceOrientationState(Rotation3d orientation) { if (useImu) { return sensorDrive.faceOrientationState(orientation); } else { diff --git a/autonomy/lib/src/drive/sensor_drive.dart b/autonomy/lib/src/drive/sensor_drive.dart index 3b5302ff..9663de09 100644 --- a/autonomy/lib/src/drive/sensor_drive.dart +++ b/autonomy/lib/src/drive/sensor_drive.dart @@ -34,7 +34,7 @@ class SensorDrive extends DriveInterface with RoverDriveCommands { ); @override - StateInterface faceOrientationState(Orientation orientation) => + StateInterface faceOrientationState(Rotation3d orientation) => SensorTurnState( controller, collection: collection, @@ -115,7 +115,7 @@ class SensorDrive extends DriveInterface with RoverDriveCommands { } @override - Future faceOrientation(Orientation orientation) async { + Future faceOrientation(Rotation3d orientation) async { collection.logger.info("Turning to face $orientation..."); setThrottle(config.turnThrottle); final result = await runFeedback(() => _tryToFace(orientation)); @@ -123,7 +123,7 @@ class SensorDrive extends DriveInterface with RoverDriveCommands { return result; } - bool _tryToFace(Orientation orientation) { + bool _tryToFace(Rotation3d orientation) { final current = collection.imu.heading; final target = orientation.heading; final error = (target - current).clampHalfAngle(); diff --git a/autonomy/lib/src/drive/sim_drive.dart b/autonomy/lib/src/drive/sim_drive.dart index 3626b495..96ed9f7b 100644 --- a/autonomy/lib/src/drive/sim_drive.dart +++ b/autonomy/lib/src/drive/sim_drive.dart @@ -46,7 +46,7 @@ class DriveSimulator extends DriveInterface { ); @override - StateInterface faceOrientationState(Orientation orientation) => + StateInterface faceOrientationState(Rotation3d orientation) => SimulationDriveTurn( controller, collection: collection, @@ -68,7 +68,7 @@ class DriveSimulator extends DriveInterface { } @override - Future faceOrientation(Orientation orientation) async { + Future faceOrientation(Rotation3d orientation) async { if (shouldDelay) { await Future.delayed(const Duration(milliseconds: 500)); } diff --git a/autonomy/lib/src/drive/timed_drive.dart b/autonomy/lib/src/drive/timed_drive.dart index 131ff8ab..49e08625 100644 --- a/autonomy/lib/src/drive/timed_drive.dart +++ b/autonomy/lib/src/drive/timed_drive.dart @@ -46,7 +46,7 @@ class TimedDrive extends DriveInterface with RoverDriveCommands { TimedDrive({required super.collection, super.config}); @override - StateInterface faceOrientationState(Orientation orientation) { + StateInterface faceOrientationState(Rotation3d orientation) { throw UnsupportedError( "Cannot face any arbitrary direction using TimedDrive", ); @@ -206,7 +206,7 @@ class TimedDrive extends DriveInterface with RoverDriveCommands { } @override - Future faceOrientation(Orientation orientation) => + Future faceOrientation(Rotation3d orientation) => throw UnsupportedError( "Cannot face any arbitrary direction using TimedDrive", ); diff --git a/autonomy/lib/src/gps/rover_gps.dart b/autonomy/lib/src/gps/rover_gps.dart index 900062aa..a8909eee 100644 --- a/autonomy/lib/src/gps/rover_gps.dart +++ b/autonomy/lib/src/gps/rover_gps.dart @@ -8,7 +8,7 @@ class RoverGps extends GpsInterface { @override Future init() async { - collection.server.messages.onMessage( + collection.server.messages.listenFor( name: RoverPosition().messageName, constructor: RoverPosition.fromBuffer, callback: _internalUpdate, diff --git a/autonomy/lib/src/imu/cardinal_direction.dart b/autonomy/lib/src/imu/cardinal_direction.dart index ce30dc17..739385b7 100644 --- a/autonomy/lib/src/imu/cardinal_direction.dart +++ b/autonomy/lib/src/imu/cardinal_direction.dart @@ -13,14 +13,14 @@ enum CardinalDirection { final double angle; const CardinalDirection(this.angle); - Orientation get orientation => Orientation(z: angle); + Rotation3d get orientation => Rotation3d(yaw: angle); - static CardinalDirection nearest(Orientation orientation) { + static CardinalDirection nearest(Rotation3d orientation) { var smallestDiff = double.infinity; var closestOrientation = CardinalDirection.north; for (final value in values) { - final diff = (value.angle - orientation.z).clampHalfAngle(); + final diff = (value.angle - orientation.yaw).clampHalfAngle(); if (diff.abs() < smallestDiff) { smallestDiff = diff.abs(); closestOrientation = value; diff --git a/autonomy/lib/src/imu/imu_interface.dart b/autonomy/lib/src/imu/imu_interface.dart index 15bb4a25..a37188f4 100644 --- a/autonomy/lib/src/imu/imu_interface.dart +++ b/autonomy/lib/src/imu/imu_interface.dart @@ -5,17 +5,17 @@ abstract class ImuInterface extends Service with Receiver { final AutonomyInterface collection; ImuInterface({required this.collection}); - double get heading => raw.z; - Orientation get raw; + double get heading => raw.yaw; + Rotation3d get raw; CardinalDirection get nearest => CardinalDirection.nearest(raw); - void update(Orientation newValue); + void update(Rotation3d newValue); @visibleForTesting - void forceUpdate(Orientation newValue) {} + void forceUpdate(Rotation3d newValue) {} - bool isNear(Orientation orientation, [double? tolerance]) => + bool isNear(Rotation3d orientation, [double? tolerance]) => raw.isNear(orientation.heading, tolerance); @override diff --git a/autonomy/lib/src/imu/rover_imu.dart b/autonomy/lib/src/imu/rover_imu.dart index 83631107..e0c3eeb5 100644 --- a/autonomy/lib/src/imu/rover_imu.dart +++ b/autonomy/lib/src/imu/rover_imu.dart @@ -9,7 +9,7 @@ class RoverImu extends ImuInterface { @override Future init() async { - collection.server.messages.onMessage( + collection.server.messages.listenFor( name: RoverPosition().messageName, constructor: RoverPosition.fromBuffer, callback: _internalUpdate, @@ -23,32 +23,32 @@ class RoverImu extends ImuInterface { } @override - void update(Orientation newValue) { + void update(Rotation3d newValue) { // Do nothing, since this should only be internally updated } @override - void forceUpdate(Orientation newValue) => + void forceUpdate(Rotation3d newValue) => _internalUpdate(RoverPosition(orientation: newValue)); void _internalUpdate(RoverPosition newValue) { if (!newValue.hasOrientation()) return; // Angles are always between -180 and +180 - if (newValue.orientation.x.abs() > 180 || - newValue.orientation.y.abs() > 180 || - newValue.orientation.z.abs() > 180) { + if (newValue.orientation.pitch.abs() > 180 || + newValue.orientation.roll.abs() > 180 || + newValue.orientation.yaw.abs() > 180) { return; } - _xCorrector.addValue(newValue.orientation.x); - _yCorrector.addValue(newValue.orientation.y); - _zCorrector.addValue(newValue.orientation.z); + _xCorrector.addValue(newValue.orientation.pitch); + _yCorrector.addValue(newValue.orientation.roll); + _zCorrector.addValue(newValue.orientation.yaw); hasValue = true; } @override - Orientation get raw => Orientation( - x: _xCorrector.calibratedValue.clampHalfAngle(), - y: _yCorrector.calibratedValue.clampHalfAngle(), - z: _zCorrector.calibratedValue.clampHalfAngle(), + Rotation3d get raw => Rotation3d( + pitch: _xCorrector.calibratedValue.clampHalfAngle(), + roll: _yCorrector.calibratedValue.clampHalfAngle(), + yaw: _zCorrector.calibratedValue.clampHalfAngle(), ); } diff --git a/autonomy/lib/src/imu/sim_imu.dart b/autonomy/lib/src/imu/sim_imu.dart index 466b9ebf..213de9c4 100644 --- a/autonomy/lib/src/imu/sim_imu.dart +++ b/autonomy/lib/src/imu/sim_imu.dart @@ -9,17 +9,17 @@ class ImuSimulator extends ImuInterface with ValueReporter { @override RoverPosition getMessage() => RoverPosition(orientation: raw); - Orientation _orientation = Orientation(); + Rotation3d _orientation = Rotation3d(); @override - Orientation get raw => Orientation( - x: _orientation.x + _error.value, - y: _orientation.y + _error.value, - z: _orientation.z + _error.value, + Rotation3d get raw => Rotation3d( + pitch: _orientation.pitch + _error.value, + roll: _orientation.roll + _error.value, + yaw: _orientation.yaw + _error.value, ); @override - void update(Orientation newValue) => _orientation = newValue; + void update(Rotation3d newValue) => _orientation = newValue; @override Future init() async { @@ -29,7 +29,7 @@ class ImuSimulator extends ImuInterface with ValueReporter { @override Future dispose() async { - _orientation = Orientation(); + _orientation = Rotation3d(); await super.dispose(); } } diff --git a/autonomy/lib/src/orchestrator/orchestrator_interface.dart b/autonomy/lib/src/orchestrator/orchestrator_interface.dart index 1c2a2918..8a145ade 100644 --- a/autonomy/lib/src/orchestrator/orchestrator_interface.dart +++ b/autonomy/lib/src/orchestrator/orchestrator_interface.dart @@ -49,7 +49,7 @@ abstract class OrchestratorInterface extends Service { @override Future init() async { - collection.server.messages.onMessage( + collection.server.messages.listenFor( name: AutonomyCommand().messageName, constructor: AutonomyCommand.fromBuffer, callback: onCommand, diff --git a/autonomy/lib/src/orchestrator/rover_orchestrator.dart b/autonomy/lib/src/orchestrator/rover_orchestrator.dart index 8e05835d..bcd97100 100644 --- a/autonomy/lib/src/orchestrator/rover_orchestrator.dart +++ b/autonomy/lib/src/orchestrator/rover_orchestrator.dart @@ -152,7 +152,7 @@ class RoverOrchestrator extends OrchestratorInterface with ValueReporter { } // Re-align to desired start orientation if angle is too far if (state.instruction == DriveDirection.forward) { - Orientation targetOrientation; + Rotation3d targetOrientation; // if it has RTK, point towards the next coordinate if (collection.gps.coordinates.hasRTK) { final difference = @@ -160,7 +160,7 @@ class RoverOrchestrator extends OrchestratorInterface with ValueReporter { final angle = atan2(difference.y, difference.x) * 180 / pi; - targetOrientation = Orientation(z: angle); + targetOrientation = Rotation3d(yaw: angle); } else { targetOrientation = state.orientation.orientation; } @@ -309,8 +309,8 @@ class RoverOrchestrator extends OrchestratorInterface with ValueReporter { collection.logger.info("Found aruco"); currentState = AutonomyState.APPROACHING; - final arucoOrientation = Orientation( - z: collection.imu.heading - detectedAruco.yaw, + final arucoOrientation = Rotation3d( + yaw: collection.imu.heading - detectedAruco.yaw, ); await collection.drive.faceOrientation(arucoOrientation); detectedAruco = await collection.video.waitForAruco( @@ -409,7 +409,7 @@ class RoverOrchestrator extends OrchestratorInterface with ValueReporter { if (detectedAruco != null) { collection.logger.info("Rotating towards Aruco"); await collection.drive.faceOrientation( - Orientation(z: collection.imu.heading - detectedAruco!.yaw), + Rotation3d(yaw: collection.imu.heading - detectedAruco!.yaw), ); } else { collection.logger.warning("Could not find Aruco after following path"); diff --git a/autonomy/lib/src/state_machine/rover_states/navigation.dart b/autonomy/lib/src/state_machine/rover_states/navigation.dart index c20f81dc..a55119b7 100644 --- a/autonomy/lib/src/state_machine/rover_states/navigation.dart +++ b/autonomy/lib/src/state_machine/rover_states/navigation.dart @@ -65,7 +65,7 @@ class NavigationState extends RoverState { /// If the rover is not facing the proper direction, a new state will be pushed /// to re-correct the rover's orientation bool checkOrientation(AutonomyAStarState state) { - Orientation targetOrientation; + Rotation3d targetOrientation; // if it has RTK, point towards the next coordinate if (collection.gps.coordinates.hasRTK) { final difference = @@ -73,7 +73,7 @@ class NavigationState extends RoverState { final angle = atan2(difference.y, difference.x) * 180 / pi; - targetOrientation = Orientation(z: angle); + targetOrientation = Rotation3d(yaw: angle); } else { targetOrientation = state.orientation.orientation; } diff --git a/autonomy/lib/src/state_machine/rover_states/sensor_drive_turn.dart b/autonomy/lib/src/state_machine/rover_states/sensor_drive_turn.dart index e3bd8b53..ef2cd513 100644 --- a/autonomy/lib/src/state_machine/rover_states/sensor_drive_turn.dart +++ b/autonomy/lib/src/state_machine/rover_states/sensor_drive_turn.dart @@ -4,7 +4,7 @@ import "package:autonomy/src/drive/drive_config.dart"; class SensorTurnState extends RoverState { final AutonomyInterface collection; - final Orientation orientation; + final Rotation3d orientation; final RoverDriveCommands drive; diff --git a/autonomy/lib/src/state_machine/rover_states/simulation_drive_turn.dart b/autonomy/lib/src/state_machine/rover_states/simulation_drive_turn.dart index ebcdd240..7a46d56f 100644 --- a/autonomy/lib/src/state_machine/rover_states/simulation_drive_turn.dart +++ b/autonomy/lib/src/state_machine/rover_states/simulation_drive_turn.dart @@ -3,12 +3,12 @@ import "package:autonomy/autonomy.dart"; class SimulationDriveTurn extends RoverState { final SimulationMethod method; final AutonomyInterface collection; - final Orientation goalOrientation; + final Rotation3d goalOrientation; Duration get delay => collection.drive.config.turnDelay; DateTime _startTime = DateTime(0); - Orientation _startOrientation = Orientation(); + Rotation3d _startOrientation = Rotation3d(); SimulationDriveTurn( super.controller, { @@ -38,16 +38,16 @@ class SimulationDriveTurn extends RoverState { } if (method == SimulationMethod.intermediate) { - final deltaOrientation = (goalOrientation.z - _startOrientation.z) + final deltaOrientation = (goalOrientation.yaw - _startOrientation.yaw) .clampHalfAngle(); final deltaTime = DateTime.now().difference(_startTime); final timeFraction = 1.0 * deltaTime.inMicroseconds / delay.inMicroseconds; final intermediateRotation = - _startOrientation.z + deltaOrientation * timeFraction; + _startOrientation.yaw + deltaOrientation * timeFraction; - collection.imu.update(Orientation(z: intermediateRotation)); + collection.imu.update(Rotation3d(yaw: intermediateRotation)); } } } diff --git a/autonomy/lib/src/utils/imu_utils.dart b/autonomy/lib/src/utils/imu_utils.dart index 9b7a5391..86d871f1 100644 --- a/autonomy/lib/src/utils/imu_utils.dart +++ b/autonomy/lib/src/utils/imu_utils.dart @@ -1,25 +1,25 @@ import "package:autonomy/autonomy.dart"; -extension OrientationUtils on Orientation { +extension OrientationUtils on Rotation3d { /// North orientation - static final north = Orientation(z: CardinalDirection.north.angle); + static final north = Rotation3d(yaw: CardinalDirection.north.angle); /// East orientation - static final west = Orientation(z: CardinalDirection.west.angle); + static final west = Rotation3d(yaw: CardinalDirection.west.angle); /// South Orientation - static final south = Orientation(z: CardinalDirection.south.angle); + static final south = Rotation3d(yaw: CardinalDirection.south.angle); /// East orientation - static final east = Orientation(z: CardinalDirection.east.angle); + static final east = Rotation3d(yaw: CardinalDirection.east.angle); /// The heading of the orientation, or the compass direction we are facing - double get heading => z; + double get heading => yaw; /// Whether or not this orientation is within [epsilon] degrees of [value] bool isNear(double value, [double? epsilon]) { epsilon ??= Constants.turnEpsilon; - final error = (value - z).clampHalfAngle(); + final error = (value - yaw).clampHalfAngle(); return error.abs() <= epsilon; // if (value > 270 && z < 90) { diff --git a/autonomy/lib/src/video/rover_video.dart b/autonomy/lib/src/video/rover_video.dart index e3363e44..7ef7e666 100644 --- a/autonomy/lib/src/video/rover_video.dart +++ b/autonomy/lib/src/video/rover_video.dart @@ -45,7 +45,7 @@ class RoverVideo extends VideoInterface { late final StreamSubscription resultSubscription; - resultSubscription = collection.server.messages.onMessage( + resultSubscription = collection.server.messages.listenFor( name: VideoData().messageName, constructor: VideoData.fromBuffer, callback: (result) async { diff --git a/autonomy/lib/src/video/video_interface.dart b/autonomy/lib/src/video/video_interface.dart index 4344054d..eb09da35 100644 --- a/autonomy/lib/src/video/video_interface.dart +++ b/autonomy/lib/src/video/video_interface.dart @@ -12,7 +12,7 @@ abstract class VideoInterface extends Service with Receiver { @override Future init() async { - _dataSubscription = collection.server.messages.onMessage( + _dataSubscription = collection.server.messages.listenFor( name: VideoData().messageName, constructor: VideoData.fromBuffer, callback: updateFrame, diff --git a/autonomy/test/network_test.dart b/autonomy/test/network_test.dart index 78912f24..80b8df30 100644 --- a/autonomy/test/network_test.dart +++ b/autonomy/test/network_test.dart @@ -25,7 +25,7 @@ class MockSubsystems extends Service { @override Future init() async { await socket.init(); - socket.messages.onMessage( + socket.messages.listenFor( name: DriveCommand().messageName, constructor: DriveCommand.fromBuffer, callback: onDriveCommand, @@ -86,7 +86,7 @@ void main() => group("[Network]", tags: ["network"], () { test("Rover waits for all data to arrive", () async { final gps = GpsCoordinates(latitude: 1, longitude: 2); - final imu = Orientation(z: 60); + final imu = Rotation3d(yaw: 60); final posGps = RoverPosition(gps: gps); final posImu = RoverPosition(orientation: imu); final depth = VideoData(frame: [1, 2, 3, 4, 5]); diff --git a/autonomy/test/rover_test.dart b/autonomy/test/rover_test.dart index f59630d2..3fe5cbac 100644 --- a/autonomy/test/rover_test.dart +++ b/autonomy/test/rover_test.dart @@ -18,7 +18,7 @@ void main() => group("[Rover]", tags: ["rover"], () { test("Waits for sensor data", () async { final rover = RoverAutonomy(); - final orientation = Orientation(); + final orientation = Rotation3d(); final data = VideoData(); await rover.init(); diff --git a/burt_network/bin/client.dart b/burt_network/bin/client.dart index f6447791..3d48c125 100644 --- a/burt_network/bin/client.dart +++ b/burt_network/bin/client.dart @@ -11,11 +11,11 @@ final destination = SocketInfo( ); void main() async { - final client = UdpSocket(port: 8000, destination: destination); + final client = UdpSocket(port: 8000); await client.init(); final message = ScienceData(temperature: 1, co2: 2); while (true) { - client.sendMessage(message); + client.sendMessage(message, destination: destination); await Future.delayed(const Duration(seconds: 1)); } } diff --git a/burt_network/bin/server.dart b/burt_network/bin/server.dart index 01f54160..00511b8b 100644 --- a/burt_network/bin/server.dart +++ b/burt_network/bin/server.dart @@ -4,12 +4,15 @@ import "package:burt_network/burt_network.dart"; final logger = BurtLogger(); -void onData(ScienceData data) => - logger.info("Received ScienceData message at ${DateTime.now()}: ${data.toProto3Json()}"); +void onData(RoverPacket data) => + logger.info( + "Received ScienceData message from ${data.source} at ${data.timestamp.toDateTime().toIso8601String()}: ${data.message.toProto3Json()}", +); void main() async { final server = RoverSocket(port: 8001, device: Device.SUBSYSTEMS); - server.messages.onMessage( + // final server = RoverServer(port: 8001, device: Device.SUBSYSTEMS); + server.messages.onPacket( name: ScienceData().messageName, constructor: ScienceData.fromBuffer, callback: onData, diff --git a/burt_network/bin/timesync_server.dart b/burt_network/bin/timesync_server.dart new file mode 100644 index 00000000..4f7ea628 --- /dev/null +++ b/burt_network/bin/timesync_server.dart @@ -0,0 +1,6 @@ +import "package:burt_network/src/udp/timesync_server.dart"; + +void main() async { + final timesync = TimesyncServer(port: 8020); + await timesync.init(); +} diff --git a/burt_network/lib/protobuf.dart b/burt_network/lib/protobuf.dart index b7b480c1..0124cb64 100644 --- a/burt_network/lib/protobuf.dart +++ b/burt_network/lib/protobuf.dart @@ -45,7 +45,7 @@ extension MessageUtils on Message { WrappedMessage wrap([DateTime? timestamp]) => WrappedMessage( data: writeToBuffer(), name: messageName, - timestamp: Timestamp.fromDateTime(timestamp ?? DateTime.now()), + timestamp: Timestamp.fromDateTime(timestamp ?? DateTime.timestamp()), ); } diff --git a/burt_network/lib/src/generated/core.pb.dart b/burt_network/lib/src/generated/core.pb.dart index 947f88b9..126bee15 100644 --- a/burt_network/lib/src/generated/core.pb.dart +++ b/burt_network/lib/src/generated/core.pb.dart @@ -13,6 +13,8 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; +import 'package:protobuf/well_known_types/google/protobuf/timestamp.pb.dart' + as $0; import 'core.pbenum.dart'; @@ -86,6 +88,61 @@ class Connect extends $pb.GeneratedMessage { void clearReceiver() => $_clearField(2); } +class Timesync extends $pb.GeneratedMessage { + factory Timesync({ + $0.Timestamp? sendTime, + }) { + final result = create(); + if (sendTime != null) result.sendTime = sendTime; + return result; + } + + Timesync._(); + + factory Timesync.fromBuffer($core.List<$core.int> data, + [$pb.ExtensionRegistry registry = $pb.ExtensionRegistry.EMPTY]) => + create()..mergeFromBuffer(data, registry); + factory Timesync.fromJson($core.String json, + [$pb.ExtensionRegistry registry = $pb.ExtensionRegistry.EMPTY]) => + create()..mergeFromJson(json, registry); + + static final $pb.BuilderInfo _i = $pb.BuilderInfo( + _omitMessageNames ? '' : 'Timesync', + createEmptyInstance: create) + ..aOM<$0.Timestamp>(1, _omitFieldNames ? '' : 'sendTime', + protoName: 'sendTime', subBuilder: $0.Timestamp.create) + ..hasRequiredFields = false; + + @$core.Deprecated('See https://github.com/google/protobuf.dart/issues/998.') + Timesync clone() => deepCopy(); + @$core.Deprecated('See https://github.com/google/protobuf.dart/issues/998.') + Timesync copyWith(void Function(Timesync) updates) => + super.copyWith((message) => updates(message as Timesync)) as Timesync; + + @$core.override + $pb.BuilderInfo get info_ => _i; + + @$core.pragma('dart2js:noInline') + static Timesync create() => Timesync._(); + @$core.override + Timesync createEmptyInstance() => create(); + @$core.pragma('dart2js:noInline') + static Timesync getDefault() => + _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static Timesync? _defaultInstance; + + @$pb.TagNumber(1) + $0.Timestamp get sendTime => $_getN(0); + @$pb.TagNumber(1) + set sendTime($0.Timestamp value) => $_setField(1, value); + @$pb.TagNumber(1) + $core.bool hasSendTime() => $_has(0); + @$pb.TagNumber(1) + void clearSendTime() => $_clearField(1); + @$pb.TagNumber(1) + $0.Timestamp ensureSendTime() => $_ensure(0); +} + /// Notifies the recipient that the sender will no longer be connected. class Disconnect extends $pb.GeneratedMessage { factory Disconnect({ diff --git a/burt_network/lib/src/generated/core.pbjson.dart b/burt_network/lib/src/generated/core.pbjson.dart index 17d97443..942e824f 100644 --- a/burt_network/lib/src/generated/core.pbjson.dart +++ b/burt_network/lib/src/generated/core.pbjson.dart @@ -67,6 +67,26 @@ final $typed_data.Uint8List connectDescriptor = $convert.base64Decode( 'CgdDb25uZWN0Eh8KBnNlbmRlchgBIAEoDjIHLkRldmljZVIGc2VuZGVyEiMKCHJlY2VpdmVyGA' 'IgASgOMgcuRGV2aWNlUghyZWNlaXZlcg=='); +@$core.Deprecated('Use timesyncDescriptor instead') +const Timesync$json = { + '1': 'Timesync', + '2': [ + { + '1': 'sendTime', + '3': 1, + '4': 1, + '5': 11, + '6': '.google.protobuf.Timestamp', + '10': 'sendTime' + }, + ], +}; + +/// Descriptor for `Timesync`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List timesyncDescriptor = $convert.base64Decode( + 'CghUaW1lc3luYxI2CghzZW5kVGltZRgBIAEoCzIaLmdvb2dsZS5wcm90b2J1Zi5UaW1lc3RhbX' + 'BSCHNlbmRUaW1l'); + @$core.Deprecated('Use disconnectDescriptor instead') const Disconnect$json = { '1': 'Disconnect', diff --git a/burt_network/lib/src/generated/geometry.pb.dart b/burt_network/lib/src/generated/geometry.pb.dart index 32f4184a..5cd6a790 100644 --- a/burt_network/lib/src/generated/geometry.pb.dart +++ b/burt_network/lib/src/generated/geometry.pb.dart @@ -16,8 +16,8 @@ import 'package:protobuf/protobuf.dart' as $pb; export 'package:protobuf/protobuf.dart' show GeneratedMessageGenericExtensions; -class Coordinates extends $pb.GeneratedMessage { - factory Coordinates({ +class Translation3d extends $pb.GeneratedMessage { + factory Translation3d({ $core.double? x, $core.double? y, $core.double? z, @@ -29,17 +29,17 @@ class Coordinates extends $pb.GeneratedMessage { return result; } - Coordinates._(); + Translation3d._(); - factory Coordinates.fromBuffer($core.List<$core.int> data, + factory Translation3d.fromBuffer($core.List<$core.int> data, [$pb.ExtensionRegistry registry = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(data, registry); - factory Coordinates.fromJson($core.String json, + factory Translation3d.fromJson($core.String json, [$pb.ExtensionRegistry registry = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(json, registry); static final $pb.BuilderInfo _i = $pb.BuilderInfo( - _omitMessageNames ? '' : 'Coordinates', + _omitMessageNames ? '' : 'Translation3d', createEmptyInstance: create) ..aD(1, _omitFieldNames ? '' : 'x', fieldType: $pb.PbFieldType.OF) ..aD(2, _omitFieldNames ? '' : 'y', fieldType: $pb.PbFieldType.OF) @@ -47,23 +47,23 @@ class Coordinates extends $pb.GeneratedMessage { ..hasRequiredFields = false; @$core.Deprecated('See https://github.com/google/protobuf.dart/issues/998.') - Coordinates clone() => deepCopy(); + Translation3d clone() => deepCopy(); @$core.Deprecated('See https://github.com/google/protobuf.dart/issues/998.') - Coordinates copyWith(void Function(Coordinates) updates) => - super.copyWith((message) => updates(message as Coordinates)) - as Coordinates; + Translation3d copyWith(void Function(Translation3d) updates) => + super.copyWith((message) => updates(message as Translation3d)) + as Translation3d; @$core.override $pb.BuilderInfo get info_ => _i; @$core.pragma('dart2js:noInline') - static Coordinates create() => Coordinates._(); + static Translation3d create() => Translation3d._(); @$core.override - Coordinates createEmptyInstance() => create(); + Translation3d createEmptyInstance() => create(); @$core.pragma('dart2js:noInline') - static Coordinates getDefault() => _defaultInstance ??= - $pb.GeneratedMessage.$_defaultFor(create); - static Coordinates? _defaultInstance; + static Translation3d getDefault() => _defaultInstance ??= + $pb.GeneratedMessage.$_defaultFor(create); + static Translation3d? _defaultInstance; @$pb.TagNumber(1) $core.double get x => $_getN(0); @@ -93,87 +93,86 @@ class Coordinates extends $pb.GeneratedMessage { void clearZ() => $_clearField(3); } -class Orientation extends $pb.GeneratedMessage { - factory Orientation({ - $core.double? x, - $core.double? y, - $core.double? z, +class Rotation3d extends $pb.GeneratedMessage { + factory Rotation3d({ + $core.double? pitch, + $core.double? roll, + $core.double? yaw, }) { final result = create(); - if (x != null) result.x = x; - if (y != null) result.y = y; - if (z != null) result.z = z; + if (pitch != null) result.pitch = pitch; + if (roll != null) result.roll = roll; + if (yaw != null) result.yaw = yaw; return result; } - Orientation._(); + Rotation3d._(); - factory Orientation.fromBuffer($core.List<$core.int> data, + factory Rotation3d.fromBuffer($core.List<$core.int> data, [$pb.ExtensionRegistry registry = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(data, registry); - factory Orientation.fromJson($core.String json, + factory Rotation3d.fromJson($core.String json, [$pb.ExtensionRegistry registry = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(json, registry); static final $pb.BuilderInfo _i = $pb.BuilderInfo( - _omitMessageNames ? '' : 'Orientation', + _omitMessageNames ? '' : 'Rotation3d', createEmptyInstance: create) - ..aD(1, _omitFieldNames ? '' : 'x', fieldType: $pb.PbFieldType.OF) - ..aD(2, _omitFieldNames ? '' : 'y', fieldType: $pb.PbFieldType.OF) - ..aD(3, _omitFieldNames ? '' : 'z', fieldType: $pb.PbFieldType.OF) + ..aD(1, _omitFieldNames ? '' : 'pitch', fieldType: $pb.PbFieldType.OF) + ..aD(2, _omitFieldNames ? '' : 'roll', fieldType: $pb.PbFieldType.OF) + ..aD(3, _omitFieldNames ? '' : 'yaw', fieldType: $pb.PbFieldType.OF) ..hasRequiredFields = false; @$core.Deprecated('See https://github.com/google/protobuf.dart/issues/998.') - Orientation clone() => deepCopy(); + Rotation3d clone() => deepCopy(); @$core.Deprecated('See https://github.com/google/protobuf.dart/issues/998.') - Orientation copyWith(void Function(Orientation) updates) => - super.copyWith((message) => updates(message as Orientation)) - as Orientation; + Rotation3d copyWith(void Function(Rotation3d) updates) => + super.copyWith((message) => updates(message as Rotation3d)) as Rotation3d; @$core.override $pb.BuilderInfo get info_ => _i; @$core.pragma('dart2js:noInline') - static Orientation create() => Orientation._(); + static Rotation3d create() => Rotation3d._(); @$core.override - Orientation createEmptyInstance() => create(); + Rotation3d createEmptyInstance() => create(); @$core.pragma('dart2js:noInline') - static Orientation getDefault() => _defaultInstance ??= - $pb.GeneratedMessage.$_defaultFor(create); - static Orientation? _defaultInstance; + static Rotation3d getDefault() => _defaultInstance ??= + $pb.GeneratedMessage.$_defaultFor(create); + static Rotation3d? _defaultInstance; @$pb.TagNumber(1) - $core.double get x => $_getN(0); + $core.double get pitch => $_getN(0); @$pb.TagNumber(1) - set x($core.double value) => $_setFloat(0, value); + set pitch($core.double value) => $_setFloat(0, value); @$pb.TagNumber(1) - $core.bool hasX() => $_has(0); + $core.bool hasPitch() => $_has(0); @$pb.TagNumber(1) - void clearX() => $_clearField(1); + void clearPitch() => $_clearField(1); @$pb.TagNumber(2) - $core.double get y => $_getN(1); + $core.double get roll => $_getN(1); @$pb.TagNumber(2) - set y($core.double value) => $_setFloat(1, value); + set roll($core.double value) => $_setFloat(1, value); @$pb.TagNumber(2) - $core.bool hasY() => $_has(1); + $core.bool hasRoll() => $_has(1); @$pb.TagNumber(2) - void clearY() => $_clearField(2); + void clearRoll() => $_clearField(2); @$pb.TagNumber(3) - $core.double get z => $_getN(2); + $core.double get yaw => $_getN(2); @$pb.TagNumber(3) - set z($core.double value) => $_setFloat(2, value); + set yaw($core.double value) => $_setFloat(2, value); @$pb.TagNumber(3) - $core.bool hasZ() => $_has(2); + $core.bool hasYaw() => $_has(2); @$pb.TagNumber(3) - void clearZ() => $_clearField(3); + void clearYaw() => $_clearField(3); } class Pose3d extends $pb.GeneratedMessage { factory Pose3d({ - Coordinates? translation, - Orientation? rotation, + Translation3d? translation, + Rotation3d? rotation, }) { final result = create(); if (translation != null) result.translation = translation; @@ -193,10 +192,10 @@ class Pose3d extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo( _omitMessageNames ? '' : 'Pose3d', createEmptyInstance: create) - ..aOM(1, _omitFieldNames ? '' : 'translation', - subBuilder: Coordinates.create) - ..aOM(2, _omitFieldNames ? '' : 'rotation', - subBuilder: Orientation.create) + ..aOM(1, _omitFieldNames ? '' : 'translation', + subBuilder: Translation3d.create) + ..aOM(2, _omitFieldNames ? '' : 'rotation', + subBuilder: Rotation3d.create) ..hasRequiredFields = false; @$core.Deprecated('See https://github.com/google/protobuf.dart/issues/998.') @@ -218,26 +217,26 @@ class Pose3d extends $pb.GeneratedMessage { static Pose3d? _defaultInstance; @$pb.TagNumber(1) - Coordinates get translation => $_getN(0); + Translation3d get translation => $_getN(0); @$pb.TagNumber(1) - set translation(Coordinates value) => $_setField(1, value); + set translation(Translation3d value) => $_setField(1, value); @$pb.TagNumber(1) $core.bool hasTranslation() => $_has(0); @$pb.TagNumber(1) void clearTranslation() => $_clearField(1); @$pb.TagNumber(1) - Coordinates ensureTranslation() => $_ensure(0); + Translation3d ensureTranslation() => $_ensure(0); @$pb.TagNumber(2) - Orientation get rotation => $_getN(1); + Rotation3d get rotation => $_getN(1); @$pb.TagNumber(2) - set rotation(Orientation value) => $_setField(2, value); + set rotation(Rotation3d value) => $_setField(2, value); @$pb.TagNumber(2) $core.bool hasRotation() => $_has(1); @$pb.TagNumber(2) void clearRotation() => $_clearField(2); @$pb.TagNumber(2) - Orientation ensureRotation() => $_ensure(1); + Rotation3d ensureRotation() => $_ensure(1); } const $core.bool _omitFieldNames = diff --git a/burt_network/lib/src/generated/geometry.pbjson.dart b/burt_network/lib/src/generated/geometry.pbjson.dart index ef7ef33e..5dd15983 100644 --- a/burt_network/lib/src/generated/geometry.pbjson.dart +++ b/burt_network/lib/src/generated/geometry.pbjson.dart @@ -15,9 +15,9 @@ import 'dart:convert' as $convert; import 'dart:core' as $core; import 'dart:typed_data' as $typed_data; -@$core.Deprecated('Use coordinatesDescriptor instead') -const Coordinates$json = { - '1': 'Coordinates', +@$core.Deprecated('Use translation3dDescriptor instead') +const Translation3d$json = { + '1': 'Translation3d', '2': [ {'1': 'x', '3': 1, '4': 1, '5': 2, '10': 'x'}, {'1': 'y', '3': 2, '4': 1, '5': 2, '10': 'y'}, @@ -25,25 +25,25 @@ const Coordinates$json = { ], }; -/// Descriptor for `Coordinates`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List coordinatesDescriptor = $convert.base64Decode( - 'CgtDb29yZGluYXRlcxIMCgF4GAEgASgCUgF4EgwKAXkYAiABKAJSAXkSDAoBehgDIAEoAlIBeg' - '=='); +/// Descriptor for `Translation3d`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List translation3dDescriptor = $convert.base64Decode( + 'Cg1UcmFuc2xhdGlvbjNkEgwKAXgYASABKAJSAXgSDAoBeRgCIAEoAlIBeRIMCgF6GAMgASgCUg' + 'F6'); -@$core.Deprecated('Use orientationDescriptor instead') -const Orientation$json = { - '1': 'Orientation', +@$core.Deprecated('Use rotation3dDescriptor instead') +const Rotation3d$json = { + '1': 'Rotation3d', '2': [ - {'1': 'x', '3': 1, '4': 1, '5': 2, '10': 'x'}, - {'1': 'y', '3': 2, '4': 1, '5': 2, '10': 'y'}, - {'1': 'z', '3': 3, '4': 1, '5': 2, '10': 'z'}, + {'1': 'pitch', '3': 1, '4': 1, '5': 2, '10': 'pitch'}, + {'1': 'roll', '3': 2, '4': 1, '5': 2, '10': 'roll'}, + {'1': 'yaw', '3': 3, '4': 1, '5': 2, '10': 'yaw'}, ], }; -/// Descriptor for `Orientation`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List orientationDescriptor = $convert.base64Decode( - 'CgtPcmllbnRhdGlvbhIMCgF4GAEgASgCUgF4EgwKAXkYAiABKAJSAXkSDAoBehgDIAEoAlIBeg' - '=='); +/// Descriptor for `Rotation3d`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List rotation3dDescriptor = $convert.base64Decode( + 'CgpSb3RhdGlvbjNkEhQKBXBpdGNoGAEgASgCUgVwaXRjaBISCgRyb2xsGAIgASgCUgRyb2xsEh' + 'AKA3lhdxgDIAEoAlIDeWF3'); @$core.Deprecated('Use pose3dDescriptor instead') const Pose3d$json = { @@ -54,7 +54,7 @@ const Pose3d$json = { '3': 1, '4': 1, '5': 11, - '6': '.Coordinates', + '6': '.Translation3d', '10': 'translation' }, { @@ -62,7 +62,7 @@ const Pose3d$json = { '3': 2, '4': 1, '5': 11, - '6': '.Orientation', + '6': '.Rotation3d', '10': 'rotation' }, ], @@ -70,5 +70,5 @@ const Pose3d$json = { /// Descriptor for `Pose3d`. Decode as a `google.protobuf.DescriptorProto`. final $typed_data.Uint8List pose3dDescriptor = $convert.base64Decode( - 'CgZQb3NlM2QSLgoLdHJhbnNsYXRpb24YASABKAsyDC5Db29yZGluYXRlc1ILdHJhbnNsYXRpb2' - '4SKAoIcm90YXRpb24YAiABKAsyDC5PcmllbnRhdGlvblIIcm90YXRpb24='); + 'CgZQb3NlM2QSMAoLdHJhbnNsYXRpb24YASABKAsyDi5UcmFuc2xhdGlvbjNkUgt0cmFuc2xhdG' + 'lvbhInCghyb3RhdGlvbhgCIAEoCzILLlJvdGF0aW9uM2RSCHJvdGF0aW9u'); diff --git a/burt_network/lib/src/generated/gps.pb.dart b/burt_network/lib/src/generated/gps.pb.dart index 0796a8f6..5aab69f8 100644 --- a/burt_network/lib/src/generated/gps.pb.dart +++ b/burt_network/lib/src/generated/gps.pb.dart @@ -115,7 +115,7 @@ class GpsCoordinates extends $pb.GeneratedMessage { class RoverPosition extends $pb.GeneratedMessage { factory RoverPosition({ GpsCoordinates? gps, - $0.Orientation? orientation, + $0.Rotation3d? orientation, $1.Version? version, $core.List<$core.int>? rtkMessage, }) { @@ -141,8 +141,8 @@ class RoverPosition extends $pb.GeneratedMessage { createEmptyInstance: create) ..aOM(1, _omitFieldNames ? '' : 'gps', subBuilder: GpsCoordinates.create) - ..aOM<$0.Orientation>(2, _omitFieldNames ? '' : 'orientation', - subBuilder: $0.Orientation.create) + ..aOM<$0.Rotation3d>(2, _omitFieldNames ? '' : 'orientation', + subBuilder: $0.Rotation3d.create) ..aOM<$1.Version>(3, _omitFieldNames ? '' : 'version', subBuilder: $1.Version.create) ..a<$core.List<$core.int>>( @@ -180,15 +180,15 @@ class RoverPosition extends $pb.GeneratedMessage { GpsCoordinates ensureGps() => $_ensure(0); @$pb.TagNumber(2) - $0.Orientation get orientation => $_getN(1); + $0.Rotation3d get orientation => $_getN(1); @$pb.TagNumber(2) - set orientation($0.Orientation value) => $_setField(2, value); + set orientation($0.Rotation3d value) => $_setField(2, value); @$pb.TagNumber(2) $core.bool hasOrientation() => $_has(1); @$pb.TagNumber(2) void clearOrientation() => $_clearField(2); @$pb.TagNumber(2) - $0.Orientation ensureOrientation() => $_ensure(1); + $0.Rotation3d ensureOrientation() => $_ensure(1); @$pb.TagNumber(3) $1.Version get version => $_getN(2); diff --git a/burt_network/lib/src/generated/gps.pbjson.dart b/burt_network/lib/src/generated/gps.pbjson.dart index ede4f588..e97bd977 100644 --- a/burt_network/lib/src/generated/gps.pbjson.dart +++ b/burt_network/lib/src/generated/gps.pbjson.dart @@ -63,7 +63,7 @@ const RoverPosition$json = { '3': 2, '4': 1, '5': 11, - '6': '.Orientation', + '6': '.Rotation3d', '10': 'orientation' }, {'1': 'version', '3': 3, '4': 1, '5': 11, '6': '.Version', '10': 'version'}, @@ -73,7 +73,7 @@ const RoverPosition$json = { /// Descriptor for `RoverPosition`. Decode as a `google.protobuf.DescriptorProto`. final $typed_data.Uint8List roverPositionDescriptor = $convert.base64Decode( - 'Cg1Sb3ZlclBvc2l0aW9uEiEKA2dwcxgBIAEoCzIPLkdwc0Nvb3JkaW5hdGVzUgNncHMSLgoLb3' - 'JpZW50YXRpb24YAiABKAsyDC5PcmllbnRhdGlvblILb3JpZW50YXRpb24SIgoHdmVyc2lvbhgD' - 'IAEoCzIILlZlcnNpb25SB3ZlcnNpb24SHwoLcnRrX21lc3NhZ2UYBCABKAxSCnJ0a01lc3NhZ2' - 'U='); + 'Cg1Sb3ZlclBvc2l0aW9uEiEKA2dwcxgBIAEoCzIPLkdwc0Nvb3JkaW5hdGVzUgNncHMSLQoLb3' + 'JpZW50YXRpb24YAiABKAsyCy5Sb3RhdGlvbjNkUgtvcmllbnRhdGlvbhIiCgd2ZXJzaW9uGAMg' + 'ASgLMgguVmVyc2lvblIHdmVyc2lvbhIfCgtydGtfbWVzc2FnZRgEIAEoDFIKcnRrTWVzc2FnZQ' + '=='); diff --git a/burt_network/lib/src/udp/burt_socket.dart b/burt_network/lib/src/udp/burt_socket.dart index 4ca8cbda..d87eb711 100644 --- a/burt_network/lib/src/udp/burt_socket.dart +++ b/burt_network/lib/src/udp/burt_socket.dart @@ -4,8 +4,23 @@ import "dart:io"; import "package:burt_network/burt_network.dart"; import "package:meta/meta.dart"; -extension on Datagram { +/// Utility methods for parsing datagram messages as wrapped messages +extension DatagramUtil on Datagram { + /// Returns the wrapped message parsed from the data of the datagram WrappedMessage parseWrapper() => WrappedMessage.fromBuffer(data); + + /// Returns the wrapped message and its source + WrapperDatagram parseWrapperDatagram() { + final wrapper = WrappedMessage.fromBuffer(data); + + return WrapperDatagram( + message: wrapper, + timestamp: wrapper.timestamp, + source: source, + ); + } + + /// The source that the datagram was sent from SocketInfo get source => SocketInfo(address: address, port: port); } @@ -33,13 +48,39 @@ typedef NetworkSettings = UpdateSetting; /// example, the rover might override [checkHeartbeats] to ensure a heartbeat has been sent, while /// the Dashboard might use it to send a heartbeat and await a response. abstract class BurtSocket extends UdpSocket { - final _controller = StreamController.broadcast(); + final _controller = StreamController.broadcast(); /// The device this socket will be used on. /// /// Used to properly respond to heartbeats and for thorough logging. final Device device; + /// The destinations this socket will send to by default. + /// + /// All the `send` functions allow you to send to a specific [SocketInfo]. This field + /// is the default destinations it will send to if those parameters are omitted. + final Set destinations = {}; + + /// Whether or not the default destination should be kept when the socket is dispose. + /// + /// If this is true, [destinations] will not be cleared when [dispose] is called. + /// + /// This is intended to prevent scenarios where the socket automatically restarts due + /// to an allowed OS error (see [allowedErrors]), and the socket's destination can no + /// longer receive messages by this socket due to [destinations] being empty. + /// + /// It only makes sense to use this when communicating with a static IP. If the destination port + /// can change between resets, using this may mean the socket will try to communicate with a port + /// that no longer exists. Practically, that means only the Dashboard should set this to be true. + final bool keepDestination; + + /// Destinations that should not be removed, even if no + /// heartbeat is received from them + /// + /// This will be initialized on creation with the provided destinations. + /// See [keepDestination] + final Set staticDestinations = {}; + Timer? _heartbeatTimer; StreamSubscription? _subscription; @@ -53,15 +94,28 @@ abstract class BurtSocket extends UdpSocket { BurtSocket({ required super.port, required this.device, - super.destination, super.quiet, - super.keepDestination, + this.keepDestination = false, + List? destinations, + SocketInfo? destination, this.collection, - }); + }) : assert( + destinations == null || destination == null, + "Either destinations or destination must be null. Cannot initialize a singular and multiple destinations at the same time", + ) { + if (destinations != null) { + this.destinations.addAll(destinations); + } + if (destination != null) { + this.destinations.add(destination); + } + if (keepDestination) { + staticDestinations.addAll(this.destinations); + } + } - /// A stream of [WrappedMessage]s as they arrive in the UDP socket. - @override - Stream get messages => _controller.stream; + /// A stream of [WrapperDatagram]s as they arrive from the UDP socket + Stream get messages => _controller.stream; @override Future init() async { @@ -75,9 +129,31 @@ abstract class BurtSocket extends UdpSocket { Future dispose() async { await _subscription?.cancel(); _heartbeatTimer?.cancel(); + if (!keepDestination) { + destinations.clear(); + } await super.dispose(); } + @override + void send(List data, {SocketInfo? destination}) { + if (destination != null) { + return super.send(data, destination: destination); + } + for (final address in destinations) { + super.send(data, destination: address); + } + } + + @override + void sendWrapper(WrappedMessage wrapper, {SocketInfo? destination}) { + send(wrapper.writeToBuffer(), destination: destination); + } + + @override + void sendMessage(Message message, {SocketInfo? destination}) => + sendWrapper(message.wrap(timestamp), destination: destination); + /// A utility method to exchange a "handshake" to the destination /// /// This will immediately send the [message], and will complete once the @@ -96,7 +172,7 @@ abstract class BurtSocket extends UdpSocket { final completer = Completer(); late final StreamSubscription subscription; - subscription = messages.onMessage( + subscription = messages.listenFor( name: message.messageName, constructor: constructor, callback: (handshake) { @@ -114,12 +190,12 @@ abstract class BurtSocket extends UdpSocket { } void _onPacket(Datagram packet) { - final wrapper = packet.parseWrapper(); - if (wrapper.name == Connect().messageName) { - final heartbeat = Connect.fromBuffer(wrapper.data); + final wrapper = packet.parseWrapperDatagram(); + if (wrapper.message.name == Connect().messageName) { + final heartbeat = Connect.fromBuffer(wrapper.message.data); onHeartbeat(heartbeat, packet.source); - } else if (wrapper.name == UpdateSetting().messageName) { - final settings = UpdateSetting.fromBuffer(wrapper.data); + } else if (wrapper.message.name == UpdateSetting().messageName) { + final settings = UpdateSetting.fromBuffer(wrapper.message.data); onSettings(settings); _controller.add(wrapper); } else { @@ -130,7 +206,7 @@ abstract class BurtSocket extends UdpSocket { /// Handle an incoming heartbeat coming from a given source. /// /// Be sure to check [Connect.sender] and [Connect.receiver], and compare the [source] against - /// the current [destination] to properly handle the heartbeat. + /// the current [destinations] to properly handle the heartbeat. void onHeartbeat(Heartbeat heartbeat, SocketInfo source); /// Handle an incoming request to change network settings. @@ -144,31 +220,32 @@ abstract class BurtSocket extends UdpSocket { /// Whether the device on the other end is connected. bool get isConnected; + /// The current time of the socket. + /// This timestamp is used as the default timestamp when sending a message. + DateTime get timestamp => DateTime.timestamp(); + /// Sends or waits for heartbeats to or from the other device. void checkHeartbeats(); - /// Sets [destination] to the incoming [source]. + /// Adds [source] to the available [destinations]. /// /// Override this function to run custom code when a device connects to this socket. @mustCallSuper void onConnect(SocketInfo source) { - destination = source; + destinations.add(source); _connectionCompleter?.complete(); _connectionCompleter = null; // to avoid completing twice logger.info("Port $port is connected to $source"); } - /// Sends a [Disconnect] message to the dashboard and sets [destination] to `null`. + /// Sends a [Disconnect] message to the dashboard. /// /// Override this function to run custom code when the device on the other end disconnects. /// For example, put code to stop the rover from driving in here when connection is lost. @override Future onDisconnect() async { - logger.info("Port $port is disconnected from $destination"); - sendMessage(Disconnect(sender: device)); - if (!keepDestination) { - destination = null; - } + logger.info("Port $port is disconnected from all clients."); + destinations.removeWhere((e) => !staticDestinations.contains(e)); await collection?.onDisconnect(); await super.onDisconnect(); } diff --git a/burt_network/lib/src/udp/rover_heartbeats.dart b/burt_network/lib/src/udp/rover_heartbeats.dart index 46bb9bb4..d8246196 100644 --- a/burt_network/lib/src/udp/rover_heartbeats.dart +++ b/burt_network/lib/src/udp/rover_heartbeats.dart @@ -1,4 +1,3 @@ - import "package:burt_network/protobuf.dart"; import "burt_socket.dart"; @@ -6,11 +5,22 @@ import "socket_info.dart"; /// A mixin that automatically handles rover-side heartbeats. mixin RoverHeartbeats on BurtSocket { + /// The heartbeats received by the socket since the last call to [checkHeartbeats]. + final Set receivedHeartbeats = {}; + + /// The maximum number of clients that can be connected to this socket. + /// + /// Once the maximum number of clients have been connected, and incoming + /// connection attempts will be rejected, and will not have any data sent + /// to them. + late final int maxClients; + /// Whether this socket received a heartbeat since the last call to [checkHeartbeats]. - bool didReceivedHeartbeat = false; + bool get didReceivedHeartbeat => receivedHeartbeats.isNotEmpty; @override - bool get isConnected => destination != null && !keepDestination; + bool get isConnected => + destinations.difference(staticDestinations).isNotEmpty; @override Duration get heartbeatInterval => const Duration(seconds: 2); @@ -18,42 +28,69 @@ mixin RoverHeartbeats on BurtSocket { /// Handles incoming heartbeats. /// /// 1. If the heartbeat was meant for another device, log it and ignore it. - /// 2. If it came from our dashboard, respond to it with [sendHeartbeatResponse]. - /// 3. If it came from another dashboard, log it and ignore it. - /// 4. If we are not connected to any dashboard, call [onConnect] and respond to it. + /// 2. If it came from an existing client, respond to it with [sendHeartbeatResponse]. + /// 3. If the number of clients is less than [maxClients], call [onConnect] and respond with [sendHeartbeatResponse]. @override void onHeartbeat(Heartbeat heartbeat, SocketInfo source) { - if (heartbeat.receiver != device) { // (1) - logger.warning("Received a misaddressed heartbeat for ${heartbeat.receiver}"); - } else if (isConnected) { - if (destination == source) { // (2) - sendHeartbeatResponse(); - } else { // (3) - logger.warning("Port $port is connected to $destination but received a heartbeat from $source"); - } - } else { // (4) + // (1) + if (heartbeat.receiver != device) { + logger.warning( + "Received a misaddressed heartbeat for ${heartbeat.receiver}", + ); + return; + } + + // (2) + if (destinations.contains(source)) { + sendHeartbeatResponse(source); + } else if (destinations.length < maxClients) { + // (3) onConnect(source); - sendHeartbeatResponse(); + sendHeartbeatResponse(source); + } else { + logger.warning( + "Too many clients connected on port $port, ignoring connection from ${source.address.address}:${source.port}", + ); } } - /// Checks if a heartbeat has been received. If not, calls [onDisconnect]. + /// Checks if a heartbeat has been received from any destination. If not, + /// sends a [Disconnect] message to any destination who has not sent any heartbeats. + /// + /// If no heartbeats have been received, calls [onDisconnect]. /// /// This function runs every [heartbeatInterval]. @override Future checkHeartbeats() async { - if (didReceivedHeartbeat) { - didReceivedHeartbeat = false; - } else if (isConnected) { - logger.warning("Heartbeat not received. Assuming the dashboard has disconnected"); + final wasConnected = isConnected; + destinations.removeWhere((address) { + if (staticDestinations.contains(address)) { + return false; + } + if (!receivedHeartbeats.contains(address)) { + logger.warning( + "Heartbeat not received from ${address.address.address}:${address.port}, assuming client has disconnected", + ); + sendMessage(Disconnect(sender: device), destination: address); + return true; + } + return false; + }); + + if (receivedHeartbeats.isNotEmpty) { + receivedHeartbeats.clear(); + } else if (wasConnected) { + logger.warning( + "No heartbeats received. Assuming the dashboard has disconnected", + ); await onDisconnect(); } } - /// Responds to an incoming heartbeat. - void sendHeartbeatResponse() { + /// Responds to an incoming heartbeat from [source]. + void sendHeartbeatResponse(SocketInfo source) { final response = Connect(sender: device, receiver: Device.DASHBOARD); - sendMessage(response); - didReceivedHeartbeat = true; + sendMessage(response, destination: source); + receivedHeartbeats.add(source); } } diff --git a/burt_network/lib/src/udp/rover_socket.dart b/burt_network/lib/src/udp/rover_socket.dart index de35ed29..ebf5540c 100644 --- a/burt_network/lib/src/udp/rover_socket.dart +++ b/burt_network/lib/src/udp/rover_socket.dart @@ -1,7 +1,31 @@ import "burt_socket.dart"; import "rover_heartbeats.dart"; +import "rover_timesync.dart"; import "rover_logger.dart"; import "rover_settings.dart"; +import "socket_info.dart"; -/// A UDP socket fit for use on the rover, with heartbeats, logging, and settings included. -class RoverSocket = BurtSocket with RoverHeartbeats, RoverLogger, RoverSettings; +/// A UDP socket which acts as a server onboard the rover, handling heartbeats, +/// time synchronization, logging, and settings +/// +/// An instance of this class will act as a server that a client can connect and send data to +class RoverSocket extends BurtSocket + with RoverHeartbeats, RoverTimesync, RoverLogger, RoverSettings { + /// Default constructor for [RoverSocket], initializing all necessary fields + RoverSocket({ + required super.port, + required super.device, + super.quiet, + super.keepDestination, + super.destinations, + super.destination, + SocketInfo? timesyncAddress, + int maxClients = 5, + super.collection, + }) { + this.maxClients = maxClients; + if (timesyncAddress != null) { + timesyncDestination = timesyncAddress; + } + } +} diff --git a/burt_network/lib/src/udp/rover_timesync.dart b/burt_network/lib/src/udp/rover_timesync.dart new file mode 100644 index 00000000..eb2f21aa --- /dev/null +++ b/burt_network/lib/src/udp/rover_timesync.dart @@ -0,0 +1,26 @@ +import "package:burt_network/burt_network.dart"; + +/// A mixin to automatically handle time synchronization +mixin RoverTimesync on BurtSocket { + final TimesyncSocket _timesyncSocket = TimesyncSocket(); + + set timesyncDestination(SocketInfo destination) { + _timesyncSocket.timesyncDestination = destination; + } + + @override + DateTime get timestamp => _timesyncSocket.timestamp; + + @override + Future init() async { + await super.init(); + await _timesyncSocket.init(); + return true; + } + + @override + Future dispose() async { + await _timesyncSocket.dispose(); + await super.dispose(); + } +} diff --git a/burt_network/lib/src/udp/timesync_server.dart b/burt_network/lib/src/udp/timesync_server.dart new file mode 100644 index 00000000..056a2dbf --- /dev/null +++ b/burt_network/lib/src/udp/timesync_server.dart @@ -0,0 +1,37 @@ +import "dart:async"; +import "dart:io"; + +import "package:burt_network/burt_network.dart"; + +/// A UDP socket to act as the "time server" for time synchronization events +/// +/// The time synchronization is received and handled from an independent UDP +/// socket, to reduce congestion with other data messages, which lowers latency +/// and improves time accuracy. +class TimesyncServer extends UdpSocket { + StreamSubscription? _subscription; + + /// Default constructor for [TimesyncServer] + TimesyncServer({required super.port, super.quiet}); + + @override + Future init() async { + await super.init(); + _subscription = stream.listen(_onPacket); + return true; + } + + @override + Future dispose() async { + await _subscription?.cancel(); + await super.dispose(); + } + + void _onPacket(Datagram packet) { + final rxTime = DateTime.timestamp(); + final wrapper = packet.parseWrapper(); + wrapper.timestamp = Timestamp.fromDateTime(rxTime); + + sendWrapper(wrapper, destination: packet.source); + } +} diff --git a/burt_network/lib/src/udp/timesync_socket.dart b/burt_network/lib/src/udp/timesync_socket.dart new file mode 100644 index 00000000..d8e4eb4a --- /dev/null +++ b/burt_network/lib/src/udp/timesync_socket.dart @@ -0,0 +1,158 @@ +import "dart:async"; +import "dart:io"; + +import "package:burt_network/burt_network.dart"; + +/// A UDP socket that will only send timesync messages to a timesync server +/// +/// The time synchronization is received and handled from an independent UDP +/// socket, to reduce congestion with other data messages, which lowers latency +/// and improves time accuracy. +class TimesyncSocket extends UdpSocket { + Duration _timeOffset = Duration.zero; + Duration _latency = Duration.zero; + + bool _receivedTimesync = false; + + /// The Round-Trip-Time of the last timesync message received. + /// + /// This is equivalent to the amount of time it takes for a + /// message to be sent to the server. + Duration get latency => _latency; + + /// The current time of the socket, adjusted based on the time synchronization offset. + DateTime get timestamp => DateTime.timestamp().add(_timeOffset); + + /// Whether or not a timesync event has been received from the server + /// + /// If this is true, it means the time received from [timestamp] is + /// within a close range of the server. + /// + /// After the first timesync event is received, this will remain true + /// until the server address has changed. + bool get receivedTimesync => _receivedTimesync; + + /// The address and port of the timesync server. + /// + /// If this socket is configured to send timesync messages, the [Timesync] message + /// will be sent to a socket with the specified IP address and port. + /// + /// If the Socket Info's IP address is [InternetAddress.anyIPv4], it will be sent to + /// [InternetAddress.loopbackIPv4] + /// + /// By default, the address is [InternetAddress.loopbackIPv4], on port 8020 + SocketInfo get timesyncDestination { + final address = _timesyncDestination.address != InternetAddress.anyIPv4 + ? _timesyncDestination.address + : InternetAddress.loopbackIPv4; + + return SocketInfo(address: address, port: _timesyncDestination.port); + } + + set timesyncDestination(SocketInfo destination) { + if (_timesyncDestination != destination) { + _receivedTimesync = false; + _timeOffset = Duration.zero; + _latency = Duration.zero; + } + _timesyncDestination = destination; + } + + late SocketInfo _timesyncDestination; + + Timer? _timesyncTimer; + + StreamSubscription? _subscription; + + /// Default constructor for TimesyncSocket + TimesyncSocket({super.port, super.quiet = true, SocketInfo? timesyncAddress}) + : _timesyncDestination = + timesyncAddress ?? + SocketInfo(address: InternetAddress.loopbackIPv4, port: 8020); + + @override + Future init() async { + await super.init(); + _subscription = stream.listen(_onPacket); + _timesyncTimer = Timer.periodic(const Duration(seconds: 1), sendTimesync); + return true; + } + + @override + Future dispose() async { + await _subscription?.cancel(); + _subscription = null; + + _timesyncTimer?.cancel(); + _timesyncTimer = null; + + _receivedTimesync = false; + _timeOffset = Duration.zero; + _latency = Duration.zero; + + return super.dispose(); + } + + void _onPacket(Datagram packet) { + final clientRxTime = DateTime.timestamp(); + final wrapper = packet.parseWrapper(); + if (wrapper.name != Timesync().messageName) { + return; + } + + final timesyncMessage = Timesync.fromBuffer(wrapper.data); + final serverReceiveTime = wrapper.timestamp; + + onTimesync(timesyncMessage, serverReceiveTime, clientRxTime, packet.source); + } + + /// Sends a timesync message to the [timesyncDestination] + void sendTimesync([_]) => sendMessage( + Timesync(sendTime: Timestamp.fromDateTime(DateTime.timestamp())), + destination: timesyncDestination, + ); + + /// Handles an incoming [Timesync] message and updates the + /// time offsets accordingly + /// + /// The message's send time, server receive time, and client receive + /// time will be used to calculate the estimated time offset between + /// the socket and timesync server. + void onTimesync( + Timesync timesync, + Timestamp serverReceiveTime, + DateTime clientReceiveTime, + SocketInfo source, + ) { + if (source != timesyncDestination) { + if (!quiet) { + logger.warning( + "Socket on port $port expected to receive timesync message from $timesyncDestination, but received from $source instead", + ); + } + return; + } + + final pongLocalTime = clientReceiveTime.microsecondsSinceEpoch; + final pingClientTime = timesync.sendTime + .toDateTime() + .microsecondsSinceEpoch; + final pongServerTime = serverReceiveTime + .toDateTime() + .microsecondsSinceEpoch; + + final rtt2 = pongLocalTime - pingClientTime; + final serverTimeAtRx = pongServerTime + rtt2 ~/ 2; + final serverOffsetMicros = serverTimeAtRx - pongLocalTime; + + _receivedTimesync = true; + _latency = Duration(microseconds: rtt2 ~/ 2); + + if (source.address.isLoopback) { + _timeOffset = Duration.zero; + return; + } + + _timeOffset = Duration(microseconds: serverOffsetMicros); + } +} diff --git a/burt_network/lib/src/udp/udp_socket.dart b/burt_network/lib/src/udp/udp_socket.dart index f78d9afc..34db3353 100644 --- a/burt_network/lib/src/udp/udp_socket.dart +++ b/burt_network/lib/src/udp/udp_socket.dart @@ -40,31 +40,10 @@ class UdpSocket extends Service { /// the original value and will re-request the original port when [init] is called. final int? _port; - /// The destination port to send to. - /// - /// All the `send` functions allow you to send to a specific [SocketInfo]. This field - /// is the default destination if those parameters are omitted. - SocketInfo? destination; - - /// Whether or not the default destination should be kept when the socket is dispose. - /// - /// If this is true, [destination] will not be set to null when [dispose] is called. - /// - /// This is intended to prevent scenarios where the socket automatically restarts due - /// to an allowed OS error (see [allowedErrors]), and the socket's destination can no - /// longer receive messages by this socket due to [destination] being set null. - /// - /// It only makes sense to use this when communicating with a static IP. If the destination port - /// can change between resets, using this may mean the socket will try to communicate with a port - /// that no longer exists. Practically, that means only the Dashboard should set this to be true. - final bool keepDestination; - /// Opens a UDP socket on the given port that can send and receive data. UdpSocket({ required int? port, this.quiet = false, - this.destination, - this.keepDestination = false, }) : _port = port; /// The UDP socket backed by `dart:io`. @@ -120,9 +99,6 @@ class UdpSocket extends Service { if (!quiet) logger.info("Closing the socket on port $port"); await _subscription?.cancel(); _subscription = null; _socket?.close(); _socket = null; - if (!keepDestination) { - destination = null; - } _sendingQueue.clear(); } @@ -149,30 +125,24 @@ class UdpSocket extends Service { /// /// Being UDP, this function does not wait for a response or even confirmation of a /// successful send and is therefore very quick and non-blocking. - void send(List data, {SocketInfo? destination}) { - final target = destination ?? this.destination; - if (target == null) return; + void send(List data, {required SocketInfo destination}) { if (_socket == null) { throw StateError( "Cannot use a UdpSocket on port $_port after it's been disposed", ); } - final sent = _socket!.send(data, target.address, target.port); + final sent = _socket!.send(data, destination.address, destination.port); if (sent == 0) { - _sendingQueue.add((data: data, destination: target)); + _sendingQueue.add((data: data, destination: destination)); _socket!.writeEventsEnabled = true; } } /// Sends a [WrappedMessage] over the socket. - void sendWrapper(WrappedMessage wrapper, {SocketInfo? destination}) => + void sendWrapper(WrappedMessage wrapper, {required SocketInfo destination}) => send(wrapper.writeToBuffer(), destination: destination); /// Sends a [Message] over the socket (in a [WrappedMessage]). - void sendMessage(Message message, {SocketInfo? destination}) => + void sendMessage(Message message, {required SocketInfo destination}) => sendWrapper(message.wrap(), destination: destination); - - /// Wraps all incoming data in a [WrappedMessage]. - Stream get messages => stream - .map((packet) => WrappedMessage.fromBuffer(packet.data)); } diff --git a/burt_network/lib/src/utils.dart b/burt_network/lib/src/utils.dart index 3b5f4e00..89e1e329 100644 --- a/burt_network/lib/src/utils.dart +++ b/burt_network/lib/src/utils.dart @@ -1,13 +1,104 @@ import "dart:async"; +import "dart:io"; import "dart:math"; import "package:burt_network/protobuf.dart"; +import "package:burt_network/udp.dart"; import "package:coordinate_converter/coordinate_converter.dart"; +import "package:meta/meta.dart"; + +/// A message that has been decoded and contains the message, timestamp, and source. +@immutable +class RoverPacket { + /// The decoded message. + final T message; + + /// The timestamp of the message. + final Timestamp timestamp; + + /// The source of the message. + final SocketInfo source; + + /// Const constructor for [RoverPacket] + const RoverPacket({ + required this.message, + required this.timestamp, + required this.source, + }); +} /// JSON data as a map. typedef Json = Map; +/// Constructor for a Protobuf message +typedef ProtoConstructor = T Function(List data); + +/// A Datagram packet which contains a wrapped message +typedef WrapperDatagram = RoverPacket; + +/// Utilities for working with a [WrapperDatagram]. +extension WrapperDatagramUtil on WrapperDatagram { + /// The name of the message being held in the wrapper + String get name => message.name; + + /// Parses the internal [WrappedMessage] into a new [RoverPacket] with a specific message type. + /// + /// This is useful when you know the type of the message contained in the wrapper and want + /// to get a strongly-typed [RoverPacket] back. + RoverPacket parse(ProtoConstructor constructor) => + RoverPacket( + message: constructor(message.data), + timestamp: message.timestamp, + source: source, + ); +} + +/// Helpful methods on [Stream]s of [WrapperDatagram]s +extension WrappedDatagramMessageStream on Stream { + /// Allows callers to listen only for specific messages. + /// + /// To use this, pass the name of the message, a function to create the message + /// from binary data, and a callback to handle the message. For example, + /// ```dart + /// collection.server.messages.listenFor( + /// name: ScienceData().messageName, // equals "ScienceData" + /// constructor: ScienceData.fromBuffer, + /// callback: (data) => print(data.co2); + /// ) + /// ``` + /// + /// This function returns a [StreamSubscription] that you can use to stop listening. + StreamSubscription listenFor({ + required String name, + required T Function(List) constructor, + required void Function(T) callback, + }) => map( + (e) => e.message, + ).listenFor(name: name, constructor: constructor, callback: callback); + + /// Listens for a specific message type and unpacks it into a [RoverPacket]. + /// + /// This is a convenience method that filters the stream by message name, + /// parses the message, and passes the resulting [RoverPacket] to the callback. + /// + /// Example: + /// ```dart + /// stream.listenFor( + /// name: ScienceData().messageName, + /// constructor: ScienceData.fromBuffer, + /// callback: (packet) => print(packet.message.methane), + /// ); + /// ``` + StreamSubscription> onPacket({ + required String name, + required ProtoConstructor constructor, + required void Function(RoverPacket) callback, + }) => where( + (datagram) => datagram.message.name == name, + ).map((message) => message.parse(constructor)).listen(callback); +} + /// Helpful methods on [Stream]s of [WrappedMessage]s. extension WrappedMessageStream on Stream { /// Allows callers to listen only for specific messages. @@ -15,7 +106,7 @@ extension WrappedMessageStream on Stream { /// To use this, pass the name of the message, a function to create the message /// from binary data, and a callback to handle the message. For example, /// ```dart - /// collection.server.messages.onMessage( + /// collection.server.messages.listenFor( /// name: ScienceData().messageName, // equals "ScienceData" /// constructor: ScienceData.fromBuffer, /// callback: (data) => print(data.co2); @@ -23,14 +114,42 @@ extension WrappedMessageStream on Stream { /// ``` /// /// This function returns a [StreamSubscription] that you can use to stop listening. - StreamSubscription onMessage({ + StreamSubscription listenFor({ required String name, required T Function(List) constructor, required void Function(T) callback, - }) => - where((wrapper) => wrapper.name == name) - .map((wrapper) => constructor(wrapper.data)) - .listen(callback); + }) => where( + (wrapper) => wrapper.name == name, + ).map((wrapper) => constructor(wrapper.data)).listen(callback); + + /// Allows callers to listen only for specific messages with different callbacks for + /// different data passed in. + /// + /// To use this, pass the name of the message, a function to create the message + /// from binary data, and a callback to handle the message. For example, + /// ```dart + /// collection.server.messages.onPacket( + /// name: ScienceData().messageName, // equals "ScienceData" + /// constructor: ScienceData.fromBuffer, + /// onMessage: (data) => print(data.co2), + /// withTimestamp: (data, time) => print(time), + /// ) + /// ``` + /// + /// This function returns a [StreamSubscription] that you can use to stop listening. + StreamSubscription> onPacket({ + required String name, + required ProtoConstructor constructor, + required void Function(RoverPacket) callback, + }) => where((wrapper) => wrapper.name == name) + .map( + (wrapper) => RoverPacket( + message: constructor(wrapper.data), + timestamp: wrapper.timestamp, + source: SocketInfo(address: InternetAddress("0.0.0.0"), port: -1), + ), + ) + .listen(callback); } /// Helpful methods on streams of nullable values. diff --git a/burt_network/lib/udp.dart b/burt_network/lib/udp.dart index 75c9161e..572c8391 100644 --- a/burt_network/lib/udp.dart +++ b/burt_network/lib/udp.dart @@ -28,5 +28,8 @@ export "src/udp/burt_socket.dart"; export "src/udp/rover_socket.dart"; export "src/udp/rover_settings.dart"; export "src/udp/rover_heartbeats.dart"; +export "src/udp/rover_timesync.dart"; export "src/udp/socket_info.dart"; export "src/udp/udp_socket.dart"; +export "src/udp/timesync_socket.dart"; +export "src/udp/timesync_server.dart"; diff --git a/burt_network/test/udp_sockets.dart b/burt_network/test/udp_sockets.dart index a6cf6374..1e19b79e 100644 --- a/burt_network/test/udp_sockets.dart +++ b/burt_network/test/udp_sockets.dart @@ -37,7 +37,7 @@ class TestServer extends RoverSocket { @override Future init() async { await super.init(); - messages.onMessage( + messages.listenFor( name: ScienceData().messageName, constructor: ScienceData.fromBuffer, callback: (x) => data = x, @@ -59,9 +59,13 @@ class TestServer extends RoverSocket { } class EchoSocket extends RoverSocket { - EchoSocket({required super.port, required super.destination}) : super(device: Device.SUBSYSTEMS, keepDestination: true); + EchoSocket({ + required super.port, + required super.destination, + super.destinations, + }) : super(device: Device.SUBSYSTEMS, keepDestination: true); - StreamSubscription? _subscription; + StreamSubscription? _subscription; @override Future init() async { @@ -76,14 +80,15 @@ class EchoSocket extends RoverSocket { await super.dispose(); } - void echoBack(WrappedMessage wrapper) => sendWrapper(wrapper); + void echoBack(WrapperDatagram wrapper) => sendWrapper(wrapper.message); } class TestClient extends BurtSocket { @override Duration get heartbeatInterval => const Duration(milliseconds: 10); - TestClient({required super.port, super.destination}) : super(device: Device.DASHBOARD); + TestClient({required super.port, super.destination, super.destinations}) + : super(device: Device.DASHBOARD); @override bool isConnected = false; diff --git a/burt_network/test/udp_test.dart b/burt_network/test/udp_test.dart index 4e916099..72e1d771 100644 --- a/burt_network/test/udp_test.dart +++ b/burt_network/test/udp_test.dart @@ -69,7 +69,7 @@ void main() => group("ProtoSocket:", () { // Initialize both sockets await server.init(); await client.init(); - server.messages.onMessage( + server.messages.listenFor( name: ScienceData().messageName, constructor: ScienceData.fromBuffer, callback: (d) => data = d, @@ -95,24 +95,24 @@ void main() => group("ProtoSocket:", () { test("Multiple handlers can be registered", () async { var science1 = ScienceData(); var science2 = ScienceData(); - var orientation = Orientation(); + var orientation = Rotation3d(); final scienceTest = ScienceData(co2: 5); - final orientationTest = Orientation(x: 5); + final orientationTest = Rotation3d(pitch: 5); final server = TestServer(port: 8009); final client = TestClient(port: 8010, destination: withPort(8009)); - server.messages.onMessage( + server.messages.listenFor( name: ScienceData().messageName, constructor: ScienceData.fromBuffer, callback: (x) => science1 = x, ); - server.messages.onMessage( + server.messages.listenFor( name: ScienceData().messageName, constructor: ScienceData.fromBuffer, callback: (x) => science2 = x, ); - server.messages.onMessage( - name: Orientation().messageName, - constructor: Orientation.fromBuffer, + server.messages.listenFor( + name: Rotation3d().messageName, + constructor: Rotation3d.fromBuffer, callback: (x) => orientation = x, ); await server.init(); @@ -129,10 +129,11 @@ void main() => group("ProtoSocket:", () { }); test("Heartbeats are filtered out of the regular stream", () async { + final destination = withPort(8012); var receivedHeartbeat = false; final server = TestServer(port: 8011); - final client = UdpSocket(port: 8012, destination: withPort(8012)); - server.messages.onMessage( + final client = UdpSocket(port: 8012); + server.messages.listenFor( name: Heartbeat().messageName, constructor: Heartbeat.fromBuffer, callback: (_) => receivedHeartbeat = true, @@ -140,7 +141,10 @@ void main() => group("ProtoSocket:", () { await server.init(); await client.init(); await Future.delayed(heartbeatDelay); - client.sendMessage(Heartbeat(sender: Device.DASHBOARD, receiver: Device.SUBSYSTEMS)); + client.sendMessage( + Heartbeat(sender: Device.DASHBOARD, receiver: Device.SUBSYSTEMS), + destination: destination, + ); await Future.delayed(heartbeatDelay); expect(receivedHeartbeat, false); await server.dispose(); diff --git a/subsystems/bin/data.dart b/subsystems/bin/data.dart index 6f98b800..e648eb23 100644 --- a/subsystems/bin/data.dart +++ b/subsystems/bin/data.dart @@ -74,10 +74,10 @@ Future main() async { final data3 = ArmData(lift: MotorData(currentAngle: pi + -1 * 2 * motor.current), version: Version(major: 1)); server.sendMessage(data3); final data4 = RoverPosition( - orientation: Orientation( - x: roll.current, - y: pitch.current, - z: yaw.current, + orientation: Rotation3d( + roll: roll.current, + pitch: pitch.current, + yaw: yaw.current, ), gps: GpsCoordinates( latitude: gps.current, diff --git a/subsystems/bin/proto.dart b/subsystems/bin/proto.dart index a23f28b0..5a606238 100644 --- a/subsystems/bin/proto.dart +++ b/subsystems/bin/proto.dart @@ -19,11 +19,11 @@ extension on Iterable { RoverPosition fromBuffer(List buffer) => RoverPosition.fromBuffer(buffer); bool filter(RoverPosition data) => data.hasOrientation() - && data.orientation.hasX() && data.orientation.hasY() && data.orientation.hasZ() - && data.orientation.x.abs() < 400 - && data.orientation.y.abs() < 400 - && data.orientation.z.abs() < 400; -List expand(RoverPosition data) => [data.orientation.x, data.orientation.y, data.orientation.z]; + && data.orientation.hasRoll() && data.orientation.hasPitch() && data.orientation.hasYaw() + && data.orientation.roll.abs() < 400 + && data.orientation.pitch.abs() < 400 + && data.orientation.yaw.abs() < 400; +List expand(RoverPosition data) => [data.orientation.pitch, data.orientation.roll, data.orientation.yaw]; void main() async { logger.info("Reading log file..."); diff --git a/subsystems/bin/subsystems.dart b/subsystems/bin/subsystems.dart index 8ae6655a..cdeecd95 100644 --- a/subsystems/bin/subsystems.dart +++ b/subsystems/bin/subsystems.dart @@ -6,8 +6,8 @@ import "package:subsystems/subsystems.dart"; /// Extra service to forward science data and commands to/from the auxillary board class AuxillaryForwarder extends Service { - StreamSubscription? _commandSubscription; - StreamSubscription? _dataSubscription; + StreamSubscription? _commandSubscription; + StreamSubscription? _dataSubscription; @override Future init() async { @@ -19,7 +19,7 @@ class AuxillaryForwarder extends Service { ) .listen( (wrapper) => collection.server.sendWrapper( - wrapper, + wrapper.message, destination: SocketInfo( address: InternetAddress("192.168.1.60"), port: 8010, @@ -32,7 +32,7 @@ class AuxillaryForwarder extends Service { e.name == ArmData().messageName || e.name == ScienceData().messageName, ) - .listen(collection.server.sendWrapper); + .listen((wrapper) => collection.server.sendWrapper(wrapper.message)); return true; } diff --git a/subsystems/lib/src/devices/firmware.dart b/subsystems/lib/src/devices/firmware.dart index e1033f38..3136b86a 100644 --- a/subsystems/lib/src/devices/firmware.dart +++ b/subsystems/lib/src/devices/firmware.dart @@ -36,7 +36,7 @@ class FirmwareManager extends Service { @override Future init() async { devices = await getFirmwareDevices(); - collection.server.messages.listen(_sendToSerial); + collection.server.messages.map((e) => e.message).listen(_sendToSerial); var result = true; for (final device in devices) { logger.debug("Initializing device: ${device.port}"); diff --git a/subsystems/lib/src/devices/gps.dart b/subsystems/lib/src/devices/gps.dart index 3663dbd3..30264de2 100644 --- a/subsystems/lib/src/devices/gps.dart +++ b/subsystems/lib/src/devices/gps.dart @@ -24,7 +24,7 @@ final baseStationSocket = SocketInfo( ); /// The offset of the GPS antenna's position on the rover -final Coordinates antennaOffset = Coordinates( +final Translation3d antennaOffset = Translation3d( x: -0.18415, // 7.25 in y: -0.20955, // 8.25 in ); @@ -133,7 +133,7 @@ class GpsReader extends Service { return; } - final imuAngle = (collection.imu.lastValue.z + 90) * pi / 180; + final imuAngle = (collection.imu.lastValue.yaw + 90) * pi / 180; final xOffset = antennaOffset.x * cos(imuAngle) - antennaOffset.y * sin(imuAngle); final yOffset = @@ -184,7 +184,7 @@ class GpsReader extends Service { @override Future init() async { - _messageSubscription = collection.server.messages.onMessage( + _messageSubscription = collection.server.messages.listenFor( name: RoverPosition().messageName, constructor: RoverPosition.fromBuffer, callback: _handleIncomingMessage, diff --git a/subsystems/lib/src/devices/imu.dart b/subsystems/lib/src/devices/imu.dart index 7da69346..4d9d25de 100644 --- a/subsystems/lib/src/devices/imu.dart +++ b/subsystems/lib/src/devices/imu.dart @@ -31,7 +31,7 @@ class ImuReader extends Service { bool get isConnected => serial.isOpen; /// The last received reading from the IMU - Orientation lastValue = Orientation(); + Rotation3d lastValue = Rotation3d(); /// The subscription that will be notified when a new serial packet arrives. StreamSubscription>? subscription; @@ -65,10 +65,10 @@ class ImuReader extends Service { collection.server.sendMessage(SubsystemsCommand(zeroImu: true)); } if (message.address == "/euler") { - final orientation = Orientation( - x: message.arguments[0] as double, - y: message.arguments[1] as double, - z: message.arguments[2] as double, + final orientation = Rotation3d( + pitch: message.arguments[0] as double, + roll: message.arguments[1] as double, + yaw: message.arguments[2] as double, ); lastValue = orientation; final position = RoverPosition( @@ -89,7 +89,7 @@ class ImuReader extends Service { return false; } subscription = serial.stream.listen(handleSerial); - _commandSubscription = collection.server.messages.onMessage( + _commandSubscription = collection.server.messages.listenFor( name: SubsystemsCommand().messageName, constructor: SubsystemsCommand.fromBuffer, callback: handleCommand, @@ -111,6 +111,6 @@ class ImuReader extends Service { await subscription?.cancel(); await _commandSubscription?.cancel(); await serial.dispose(); - lastValue = Orientation(); + lastValue = Rotation3d(); } } diff --git a/video/bin/video.dart b/video/bin/video.dart index 4e61fded..4318a885 100644 --- a/video/bin/video.dart +++ b/video/bin/video.dart @@ -9,7 +9,7 @@ class CommandForwarder extends Service { @override Future init() async { - _commandSubscription = collection.videoServer.messages.onMessage( + _commandSubscription = collection.videoServer.messages.listenFor( name: VideoCommand().messageName, constructor: VideoCommand.fromBuffer, callback: (data) { diff --git a/video/lib/src/isolates/parent.dart b/video/lib/src/isolates/parent.dart index 356e92ce..5fbf8e7b 100644 --- a/video/lib/src/isolates/parent.dart +++ b/video/lib/src/isolates/parent.dart @@ -37,12 +37,12 @@ class CameraManager extends Service { List supportedCameras = CameraName.values, }) async { _supportedCameras ??= supportedCameras; - _commands = collection.videoServer.messages.onMessage( + _commands = collection.videoServer.messages.listenFor( name: VideoCommand().messageName, constructor: VideoCommand.fromBuffer, callback: _handleCommand, ); - _vision = collection.videoServer.messages.onMessage( + _vision = collection.videoServer.messages.listenFor( name: VideoData().messageName, constructor: VideoData.fromBuffer, callback: _handleVision, diff --git a/video/lib/src/targeting/aruco_detector.dart b/video/lib/src/targeting/aruco_detector.dart index add8c4e8..5d58d79c 100644 --- a/video/lib/src/targeting/aruco_detector.dart +++ b/video/lib/src/targeting/aruco_detector.dart @@ -143,15 +143,15 @@ class RoverArucoDetector { final bestRotation = pnpResult.rvecs[0].at(0, 0); bestCameraToTarget = Pose3d( - translation: Coordinates( + translation: Translation3d( x: bestTranslation.val1, y: -bestTranslation.val2, z: bestTranslation.val3, ), - rotation: Orientation( - x: bestRotation.val1 * (180 / pi), - y: bestRotation.val2 * (180 / pi), - z: bestRotation.val3 * (180 / pi), + rotation: Rotation3d( + pitch: bestRotation.val1 * (180 / pi), + roll: bestRotation.val2 * (180 / pi), + yaw: bestRotation.val3 * (180 / pi), ), ); bestReprojectionError = pnpResult.reprojectionError.at(0, 0);