Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
5 changes: 0 additions & 5 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,3 @@ geolocation:
camera_orientation_yaw: 0.0
camera_orientation_pitch: -1.57079632679
camera_orientation_roll: 0.0

cluster_estimation:
min_activation_threshold: 25
min_new_points_to_run: 5
random_state: 0
87 changes: 50 additions & 37 deletions main_2024.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
# Used in type annotation of flight interface output
# pylint: disable-next=unused-import
from modules import odometry_and_time
from modules.communications import communications_worker
from modules.detect_target import detect_target_factory
from modules.detect_target import detect_target_worker
from modules.flight_interface import flight_interface_worker
from modules.video_input import video_input_worker
from modules.data_merge import data_merge_worker
from modules.geolocation import geolocation_worker
from modules.geolocation import camera_properties
from modules.cluster_estimation import cluster_estimation_worker
from modules.common.modules.logger import logger
from modules.common.modules.logger import logger_main_setup
from modules.common.modules.read_yaml import read_yaml
Expand Down Expand Up @@ -86,8 +86,8 @@ def main() -> int:
VIDEO_INPUT_SAVE_PREFIX = str(pathlib.Path(logging_path, VIDEO_INPUT_SAVE_NAME_PREFIX))

DETECT_TARGET_WORKER_COUNT = config["detect_target"]["worker_count"]
detect_target_option_int = config["detect_target"]["option"]
DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(detect_target_option_int)
DETECT_TARGET_OPTION_INT = config["detect_target"]["option"]
DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(DETECT_TARGET_OPTION_INT)
DETECT_TARGET_DEVICE = "cpu" if args.cpu else config["detect_target"]["device"]
DETECT_TARGET_MODEL_PATH = config["detect_target"]["model_path"]
DETECT_TARGET_OVERRIDE_FULL_PRECISION = args.full
Expand All @@ -112,11 +112,6 @@ def main() -> int:
GEOLOCATION_CAMERA_ORIENTATION_YAW = config["geolocation"]["camera_orientation_yaw"]
GEOLOCATION_CAMERA_ORIENTATION_PITCH = config["geolocation"]["camera_orientation_pitch"]
GEOLOCATION_CAMERA_ORIENTATION_ROLL = config["geolocation"]["camera_orientation_roll"]

MIN_ACTIVATION_THRESHOLD = config["cluster_estimation"]["min_activation_threshold"]
MIN_NEW_POINTS_TO_RUN = config["cluster_estimation"]["min_new_points_to_run"]
RANDOM_STATE = config["cluster_estimation"]["random_state"]

# pylint: enable=invalid-name
except KeyError as exception:
main_logger.error(f"Config key(s) not found: {exception}", True)
Expand All @@ -141,19 +136,23 @@ def main() -> int:
mp_manager,
QUEUE_MAX_SIZE,
)
flight_interface_to_communications_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
)
data_merge_to_geolocation_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
)
geolocation_to_cluster_estimation_queue = queue_proxy_wrapper.QueueProxyWrapper(
geolocation_to_communications_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
)
flight_interface_decision_queue = queue_proxy_wrapper.QueueProxyWrapper(
communications_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
)
cluster_estimation_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper(
flight_interface_decision_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
)
Expand Down Expand Up @@ -238,7 +237,10 @@ def main() -> int:
FLIGHT_INTERFACE_WORKER_PERIOD,
),
input_queues=[flight_interface_decision_queue],
output_queues=[flight_interface_to_data_merge_queue],
output_queues=[
flight_interface_to_data_merge_queue,
flight_interface_to_communications_queue,
],
controller=controller,
local_logger=main_logger,
)
Expand Down Expand Up @@ -276,7 +278,7 @@ def main() -> int:
camera_extrinsics,
),
input_queues=[data_merge_to_geolocation_queue],
output_queues=[geolocation_to_cluster_estimation_queue],
output_queues=[geolocation_to_communications_queue],
controller=controller,
local_logger=main_logger,
)
Expand All @@ -287,21 +289,23 @@ def main() -> int:
# Get Pylance to stop complaining
assert geolocation_worker_properties is not None

result, cluster_estimation_worker_properties = worker_manager.WorkerProperties.create(
result, communications_worker_properties = worker_manager.WorkerProperties.create(
count=1,
target=cluster_estimation_worker.cluster_estimation_worker,
work_arguments=(MIN_ACTIVATION_THRESHOLD, MIN_NEW_POINTS_TO_RUN, RANDOM_STATE),
input_queues=[geolocation_to_cluster_estimation_queue],
output_queues=[cluster_estimation_to_main_queue],
target=communications_worker.communications_worker,
work_arguments=(),
input_queues=[
flight_interface_to_communications_queue,
geolocation_to_communications_queue,
],
output_queues=[communications_to_main_queue],
controller=controller,
local_logger=main_logger,
)
if not result:
main_logger.error("Failed to create arguments for Cluster Estimation", True)
main_logger.error("Failed to create arguments for Video Input", True)
return -1

# Get Pylance to stop complaining
assert cluster_estimation_worker_properties is not None
assert communications_worker_properties is not None

# Create managers
worker_managers = []
Expand Down Expand Up @@ -371,18 +375,18 @@ def main() -> int:

worker_managers.append(geolocation_manager)

result, cluster_estimation_manager = worker_manager.WorkerManager.create(
worker_properties=cluster_estimation_worker_properties,
result, communications_manager = worker_manager.WorkerManager.create(
worker_properties=communications_worker_properties,
local_logger=main_logger,
)
if not result:
main_logger.error("Failed to create manager for Cluster Estimation", True)
main_logger.error("Failed to create manager for Communications", True)
return -1

# Get Pylance to stop complaining
assert cluster_estimation_manager is not None
assert communications_manager is not None

worker_managers.append(cluster_estimation_manager)
worker_managers.append(communications_manager)

# Run
for manager in worker_managers:
Expand All @@ -396,16 +400,24 @@ def main() -> int:
return -1

try:
cluster_estimations = cluster_estimation_to_main_queue.queue.get_nowait()
geolocation_data = communications_to_main_queue.queue.get_nowait()
except queue.Empty:
cluster_estimations = None

if cluster_estimations is not None:
for cluster in cluster_estimations:
main_logger.debug("Cluster in world: " + True)
main_logger.debug("Cluster location x: " + str(cluster.location_x))
main_logger.debug("Cluster location y: " + str(cluster.location_y))
main_logger.debug("Cluster spherical variance: " + str(cluster.spherical_variance))
geolocation_data = None

if geolocation_data is not None:
for detection_world in geolocation_data:
main_logger.debug("Detection in world:", True)
main_logger.debug(
"geolocation vertices: " + str(detection_world.vertices.tolist()), True
)
main_logger.debug(
"geolocation centre: " + str(detection_world.centre.tolist()), True
)
main_logger.debug("geolocation label: " + str(detection_world.label), True)
main_logger.debug(
"geolocation confidence: " + str(detection_world.confidence), True
)

if cv2.waitKey(1) == ord("q"): # type: ignore
main_logger.info("Exiting main loop", True)
break
Expand All @@ -416,10 +428,11 @@ def main() -> int:
video_input_to_detect_target_queue.fill_and_drain_queue()
detect_target_to_data_merge_queue.fill_and_drain_queue()
flight_interface_to_data_merge_queue.fill_and_drain_queue()
flight_interface_to_communications_queue.fill_and_drain_queue()
data_merge_to_geolocation_queue.fill_and_drain_queue()
geolocation_to_cluster_estimation_queue.fill_and_drain_queue()
geolocation_to_communications_queue.fill_and_drain_queue()
communications_to_main_queue.fill_and_drain_queue()
flight_interface_decision_queue.fill_and_drain_queue()
cluster_estimation_to_main_queue.fill_and_drain_queue()

for manager in worker_managers:
manager.join_workers()
Expand Down
92 changes: 92 additions & 0 deletions modules/communications/communications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""
Logs data and forwards it.
"""

import time

from .. import detection_in_world
from ..common.modules.logger import logger
from ..common.modules import position_global
from ..common.modules import position_local
from ..common.modules.mavlink import local_global_conversion


class Communications:
"""
Currently logs data only.
"""

__create_key = object()

@classmethod
def create(
cls,
home_position: position_global.PositionGlobal,
local_logger: logger.Logger,
) -> "tuple[bool, Communications | None]":
"""
Logs data and forwards it.

home_location: Take-off location of drone.

Returns: Success, class object.
"""

return True, Communications(cls.__create_key, home_position, local_logger)

def __init__(
self,
class_private_create_key: object,
home_position: position_global.PositionGlobal,
local_logger: logger.Logger,
) -> None:
"""
Private constructor, use create() method.
"""
assert class_private_create_key is Communications.__create_key, "Use create() method"

self.__home_location = home_position
self.__logger = local_logger

def run(
self, detections_in_world: list[detection_in_world.DetectionInWorld]
) -> tuple[bool, list[detection_in_world.DetectionInWorld] | None]:

detections_in_world_global = []
for detection_in_world in detections_in_world:
# TODO: Change this when the conversion interface is changed
north = detection_in_world.centre[0]
east = detection_in_world.centre[1]
down = 0

result, drone_position_local = position_local.PositionLocal.create(
north,
east,
down,
)

if not result:
self.__logger.warning(
f"Could not convert DetectionInWorld to PositionLocal:\ndetection in world: {detection_in_world}"
)
return False, None

result, detection_in_world_global = (
local_global_conversion.position_global_from_position_local(
self.__home_location, drone_position_local
)
)

if not result:
# Log nothing if at least one of the conversions failed
self.__logger.warning(
f"drone_position_global_from_local conversion failed:\nhome_location: {self.__home_location}\ndrone_position_local: {drone_position_local}"
)
return False, None

detections_in_world_global.append(detection_in_world_global)

timestamp = time.time()
self.__logger.info(f"{timestamp}: {detections_in_world_global}")

return True, detections_in_world
56 changes: 56 additions & 0 deletions modules/communications/communications_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Logs data and forwards it.
"""

import os
import pathlib

from . import communications
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from ..common.modules.logger import logger


def communications_worker(
home_position_queue: queue_proxy_wrapper.QueueProxyWrapper,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Worker process.

home_position: get home_position for init
"""

worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True)
if not result:
print("ERROR: Worker failed to create logger")
return

# Get Pylance to stop complaining
assert local_logger is not None

local_logger.info("Logger initialized", True)

# Get home location
home_position = home_position_queue.queue.get()

result, comm = communications.Communications.create(home_position, local_logger)
if not result:
local_logger.error("Worker failed to create class object", True)
return

# Get Pylance to stop complaining
assert comm is not None

while not controller.is_exit_requested():
controller.check_pause()

result, value = comm.run(input_queue.queue.get())
if not result:
continue

output_queue.queue.put(value)
Loading