Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.

Commit c21a30c

Browse files
authored
Communication worker unit tests (#252)
* Created test_communications.py * Modified the test to match the PR * wrote communications unit test and fix typo in communications * Fixed all the comments * style: auto-format code with linter * fix: resolve issues from code review
1 parent ebe3d51 commit c21a30c

File tree

2 files changed

+304
-1
lines changed

2 files changed

+304
-1
lines changed

modules/communications/communications.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def run(
5757
) -> tuple[True, bytes, list[bytes]] | tuple[False, None, None]:
5858

5959
objects_in_world_global = []
60+
6061
for object_in_world in objects_in_world:
6162
# We assume detected objects are on the ground
6263
north = object_in_world.location_x
@@ -91,7 +92,7 @@ def run(
9192
self.__logger.info(f"{time.time()}: {objects_in_world_global}")
9293

9394
encoded_position_global_objects = []
94-
for object in object_in_world_global:
95+
for object in objects_in_world_global:
9596

9697
result, message = message_encoding_decoding.encode_position_global(
9798
worker_enum.WorkerEnum.COMMUNICATIONS_WORKER, object

tests/unit/test_communications.py

Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
"""
2+
Tests the communications class.
3+
"""
4+
5+
import pytest
6+
7+
from modules.communications import communications
8+
from modules.common.modules.logger import logger
9+
from modules import object_in_world
10+
from modules.common.modules import position_local
11+
from modules.common.modules.mavlink import local_global_conversion
12+
from modules.common.modules import position_global
13+
from modules.common.modules.data_encoding import metadata_encoding_decoding
14+
from modules.common.modules.data_encoding import message_encoding_decoding
15+
from modules.common.modules.data_encoding.worker_enum import WorkerEnum
16+
17+
# Test functions use test fixture signature names and access class privates
18+
# No enable
19+
# pylint: disable=protected-access,redefined-outer-name
20+
21+
LATITUDE_TOLERANCE = 0.000001
22+
LONGITUDE_TOLERANCE = 0.000001
23+
ALTITUDE_TOLERANCE = 7
24+
25+
26+
@pytest.fixture
27+
def home_position() -> position_global.PositionGlobal: # type: ignore
28+
"""
29+
Home position.
30+
"""
31+
# University of Waterloo WGS84 Coordinate
32+
result, position = position_global.PositionGlobal.create(43.472978, -80.540103, 336.0)
33+
assert result
34+
assert position is not None
35+
36+
yield position
37+
38+
39+
@pytest.fixture
40+
def communications_maker(
41+
home_position: position_global.PositionGlobal,
42+
) -> communications.Communications: # type: ignore
43+
"""
44+
Construct a Communications instance with the Home position
45+
"""
46+
result, test_logger = logger.Logger.create("test_logger", False)
47+
48+
assert result
49+
assert test_logger is not None
50+
51+
result, communications_instance = communications.Communications.create(
52+
home_position, test_logger
53+
)
54+
assert result
55+
assert communications_instance is not None
56+
57+
yield communications_instance # type: ignore
58+
59+
60+
def object_in_world_from_position_local(
61+
position_local: position_local.PositionLocal,
62+
) -> object_in_world.ObjectInWorld:
63+
"""
64+
Convert position local to object_in_world as defined in Communications.py
65+
"""
66+
result, obj = object_in_world.ObjectInWorld.create(
67+
position_local.north, position_local.east, 0.0
68+
)
69+
assert result
70+
assert obj is not None
71+
72+
return obj
73+
74+
75+
def assert_global_positions(
76+
expected: position_global.PositionGlobal, actual: position_global.PositionGlobal
77+
) -> None:
78+
"""
79+
Assert each values of the global positions using the Tolerances
80+
"""
81+
assert abs(expected.latitude - actual.latitude) < LATITUDE_TOLERANCE
82+
assert abs(expected.longitude - actual.longitude) < LONGITUDE_TOLERANCE
83+
assert abs(expected.altitude - actual.altitude) < ALTITUDE_TOLERANCE
84+
85+
86+
class TestCommunications:
87+
"""
88+
Tests for the Communications.run() method.
89+
"""
90+
91+
def test_run(
92+
self,
93+
home_position: position_global.PositionGlobal,
94+
communications_maker: communications.Communications,
95+
) -> None:
96+
"""
97+
Test if the Communications.run returns the correct instance
98+
"""
99+
# Setup
100+
result, position = position_global.PositionGlobal.create(43.472978, -80.540103, 336.0)
101+
assert result
102+
assert position is not None
103+
104+
result, actual = local_global_conversion.position_local_from_position_global(
105+
home_position, position
106+
)
107+
assert result
108+
assert actual is not None
109+
110+
objects_in_world = [object_in_world_from_position_local(actual)]
111+
112+
# Run
113+
result, metadata, generated_objects = communications_maker.run(objects_in_world)
114+
115+
# Test
116+
assert result
117+
assert isinstance(metadata, bytes)
118+
assert all(isinstance(obj, bytes) for obj in generated_objects)
119+
120+
def test_normal(
121+
self,
122+
home_position: position_global.PositionGlobal,
123+
communications_maker: communications.Communications,
124+
) -> None:
125+
"""
126+
Normal
127+
"""
128+
# Setup
129+
result, global_position_1 = position_global.PositionGlobal.create(
130+
43.472978, -80.540103, 336.0
131+
)
132+
assert result
133+
assert global_position_1 is not None
134+
135+
result, local_position_1 = local_global_conversion.position_local_from_position_global(
136+
home_position, global_position_1
137+
)
138+
assert result
139+
assert local_position_1 is not None
140+
141+
result, global_position_2 = position_global.PositionGlobal.create(
142+
43.472800, -80.539500, 330.0
143+
)
144+
assert result
145+
assert global_position_2 is not None
146+
147+
result, local_position_2 = local_global_conversion.position_local_from_position_global(
148+
home_position, global_position_2
149+
)
150+
assert result
151+
assert local_position_2 is not None
152+
153+
global_positions = [global_position_1, global_position_2]
154+
155+
objects_in_world = [
156+
object_in_world_from_position_local(local_position_1),
157+
object_in_world_from_position_local(local_position_2),
158+
]
159+
number_of_messages = len(objects_in_world)
160+
161+
# Run
162+
result, metadata, generated_objects = communications_maker.run(objects_in_world)
163+
assert result
164+
assert isinstance(metadata, bytes)
165+
assert all(isinstance(obj, bytes) for obj in generated_objects)
166+
167+
result, worker_id, actual_number_of_messages = metadata_encoding_decoding.decode_metadata(
168+
metadata
169+
)
170+
assert result
171+
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER
172+
173+
# Test
174+
assert actual_number_of_messages == number_of_messages
175+
176+
# Conversion
177+
for i, global_position in enumerate(global_positions):
178+
result, worker_id, actual = message_encoding_decoding.decode_bytes_to_position_global(
179+
generated_objects[i]
180+
)
181+
assert result
182+
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER
183+
184+
assert_global_positions(global_position, actual)
185+
186+
def test_empty_objects(
187+
self,
188+
communications_maker: communications.Communications,
189+
) -> None:
190+
"""
191+
When nothing is passed in
192+
"""
193+
objects_in_world = []
194+
195+
result, metadata, generated_objects = communications_maker.run(objects_in_world)
196+
assert result
197+
assert isinstance(metadata, bytes)
198+
assert all(isinstance(obj, bytes) for obj in generated_objects)
199+
200+
result, worker_id, actual_number_of_messages = metadata_encoding_decoding.decode_metadata(
201+
metadata
202+
)
203+
204+
assert result
205+
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER
206+
# Test
207+
assert actual_number_of_messages == 0
208+
assert len(generated_objects) == 0
209+
210+
def test_same_as_home(
211+
self,
212+
home_position: position_global.PositionGlobal,
213+
communications_maker: communications.Communications,
214+
) -> None:
215+
"""
216+
When the objects_in_world contains the home positions
217+
"""
218+
# Setup
219+
result, local_position = local_global_conversion.position_local_from_position_global(
220+
home_position, home_position
221+
)
222+
assert result
223+
assert local_position is not None
224+
225+
actual = object_in_world_from_position_local(local_position)
226+
objects_in_world = [actual]
227+
number_of_messages = len(objects_in_world)
228+
229+
# Run
230+
result, metadata, generated_objects = communications_maker.run(objects_in_world)
231+
assert result
232+
assert isinstance(metadata, bytes)
233+
assert all(isinstance(obj, bytes) for obj in generated_objects)
234+
# Conversion
235+
result, worker_id, actual_number_of_messages = metadata_encoding_decoding.decode_metadata(
236+
metadata
237+
)
238+
assert result
239+
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER
240+
241+
# Test
242+
assert actual_number_of_messages == number_of_messages
243+
244+
# Conversion
245+
result, worker_id, actual = message_encoding_decoding.decode_bytes_to_position_global(
246+
generated_objects[0]
247+
)
248+
assert result
249+
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER
250+
251+
# Test
252+
assert_global_positions(home_position, actual)
253+
254+
def test_duplicate_coordinates(
255+
self,
256+
home_position: position_global.PositionGlobal,
257+
communications_maker: communications.Communications,
258+
) -> None:
259+
"""
260+
When the objects_in_world contains duplicate positions
261+
"""
262+
# Setup
263+
result, global_position = position_global.PositionGlobal.create(
264+
43.472978, -80.540103, 336.0
265+
)
266+
assert result
267+
assert global_position is not None
268+
269+
result, local_position = local_global_conversion.position_local_from_position_global(
270+
home_position, global_position
271+
)
272+
assert result
273+
assert local_position is not None
274+
275+
position = object_in_world_from_position_local(local_position)
276+
277+
objects_in_world = [position, position, position]
278+
number_of_messages = len(objects_in_world)
279+
280+
# Run
281+
result, metadata, generated_objects = communications_maker.run(objects_in_world)
282+
assert result
283+
assert isinstance(metadata, bytes)
284+
assert all(isinstance(obj, bytes) for obj in generated_objects)
285+
286+
result, worker_id, actual_number_of_messages = metadata_encoding_decoding.decode_metadata(
287+
metadata
288+
)
289+
assert result
290+
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER
291+
# Test
292+
assert actual_number_of_messages == number_of_messages
293+
294+
for generated_object in generated_objects:
295+
# Conversion
296+
result, worker_id, actual = message_encoding_decoding.decode_bytes_to_position_global(
297+
generated_object
298+
)
299+
assert result
300+
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER
301+
# Test
302+
assert_global_positions(global_position, actual)

0 commit comments

Comments
 (0)