diff --git a/README.md b/README.md index e8f10f1..c48dd3d 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ streamlined and repeatable process to monitor signs and signals along any roadwa * Example CSV templates are provided to help get started making the static roadway object input file for both static objects and traffic signals. * Video Synchronization Helper Tools: Options are provided to export the video frames and help to synchronize the video file. * Image Labeling and animated GIF image tools: Selectable options are included to label images or create an animated GIF from multiple images. +* Extracted images now embed GPS coordinates in their EXIF metadata for easier mapping. ## Requirements - Python 3.9 @@ -146,6 +147,7 @@ For Each GPX Point: * If no, go to next GPX point From the sight distance timestamp and synchronized video file, the frame is extracted that is closest to that time. +The resulting image file contains GPS EXIF information so coordinates can be viewed in external photo applications. ## Contributions Contributions are welcome to the SSOSS project! If you have an idea for a new feature or have found a bug, please open an issue or submit a pull request. diff --git a/requirements.in b/requirements.in index 005e62b..0057b4b 100644 --- a/requirements.in +++ b/requirements.in @@ -9,4 +9,5 @@ tqdm lxml pillow python-dateutil +piexif diff --git a/requirements.txt b/requirements.txt index 381db48..eeb72f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,41 +2,43 @@ # This file is autogenerated by pip-compile with Python 3.11 # by the following command: # -# pip-compile +# pip-compile requirements.in # geographiclib==2.0 # via geopy -geopy==2.4.0 +geopy==2.4.1 # via -r requirements.in -gpxpy==1.5.0 +gpxpy==1.6.2 # via -r requirements.in -imageio==2.31.5 +imageio==2.37.0 # via -r requirements.in -lxml==4.9.3 +lxml==5.4.0 # via -r requirements.in -numpy==1.26.0 +numpy==2.3.0 # via # -r requirements.in # imageio # opencv-python # pandas -opencv-python==4.8.1.78 +opencv-python==4.11.0.86 # via -r requirements.in -pandas==2.1.1 +pandas==2.3.0 # via -r requirements.in -pillow==10.0.1 +piexif==1.1.3 + # via -r requirements.in +pillow==11.2.1 # via # -r requirements.in # imageio -python-dateutil==2.8.2 +python-dateutil==2.9.0.post0 # via # -r requirements.in # pandas -pytz==2023.3.post1 +pytz==2025.2 # via pandas -six==1.16.0 +six==1.17.0 # via python-dateutil -tqdm==4.66.1 +tqdm==4.67.1 # via -r requirements.in -tzdata==2023.3 +tzdata==2025.2 # via pandas diff --git a/src/ssoss/process_road_objects.py b/src/ssoss/process_road_objects.py index 5bdc8bb..0958e81 100644 --- a/src/ssoss/process_road_objects.py +++ b/src/ssoss/process_road_objects.py @@ -677,6 +677,32 @@ def get_speed_at_timestamp(self, ts): break return speed + def get_location_at_timestamp(self, ts): + """Interpolate latitude and longitude for a given timestamp.""" + point_list = self.gpx_listDF + last_point = len(point_list) - 1 + + if ts < point_list.loc[0][0].get_timestamp(): + return None + if ts > point_list.loc[last_point][0].get_timestamp(): + return None + + for i in range(len(point_list) - 1): + p0 = point_list.loc[i][0] + p1 = point_list.loc[i + 1][0] + t0 = p0.get_timestamp() + t1 = p1.get_timestamp() + if t0 <= ts <= t1: + if t1 == t0: + return p0.get_location().latitude, p0.get_location().longitude + ratio = (ts - t0) / (t1 - t0) + lat0, lon0 = p0.get_location().latitude, p0.get_location().longitude + lat1, lon1 = p1.get_location().latitude, p1.get_location().longitude + lat = lat0 + (lat1 - lat0) * ratio + lon = lon0 + (lon1 - lon0) * ratio + return lat, lon + return None + diff --git a/src/ssoss/process_video.py b/src/ssoss/process_video.py index 2b2c761..f456fd8 100644 --- a/src/ssoss/process_video.py +++ b/src/ssoss/process_video.py @@ -11,6 +11,32 @@ from tqdm import tqdm import cv2 import imageio +from PIL import Image +import piexif + + +def _deg_to_dms_rational(deg_float): + deg = int(abs(deg_float)) + min_float = (abs(deg_float) - deg) * 60 + minute = int(min_float) + sec = int(round((min_float - minute) * 60 * 10000)) + return ((deg, 1), (minute, 1), (sec, 10000)) + + +def add_gps_exif(path, lat, lon): + """Insert GPS EXIF tags into an image file.""" + if lat is None or lon is None: + return + try: + exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "1st": {}, "thumbnail": None} + exif_dict["GPS"][piexif.GPSIFD.GPSLatitudeRef] = b"N" if lat >= 0 else b"S" + exif_dict["GPS"][piexif.GPSIFD.GPSLatitude] = _deg_to_dms_rational(lat) + exif_dict["GPS"][piexif.GPSIFD.GPSLongitudeRef] = b"E" if lon >= 0 else b"W" + exif_dict["GPS"][piexif.GPSIFD.GPSLongitude] = _deg_to_dms_rational(lon) + exif_bytes = piexif.dump(exif_dict) + piexif.insert(exif_bytes, str(path)) + except Exception as exc: + print(f"Error adding GPS EXIF to {path}: {exc}") class ProcessVideo: @@ -87,9 +113,10 @@ def sync(self, frame: int, ts): return None def create_pic_list_from_zip(self, i_desc_timestamps): - """returns sight distance description text and frame of video to extract as 2 lists""" + """Return lists of descriptions, frames, and timestamps for extraction.""" intersection_desc = [] frames = [] + timestamps = [] prev_frame = 0 filename_description, time_of_sd = zip(*i_desc_timestamps) @@ -98,13 +125,13 @@ def create_pic_list_from_zip(self, i_desc_timestamps): if time_of_picture > 0 and time_of_picture <= self.get_duration(): frame_of_video = time_of_picture * self.fps - # build up lists if not duplicate frame if int(frame_of_video) > int(prev_frame): intersection_desc.append(filename_description[sd_item]) frames.append(int(frame_of_video)) + timestamps.append(time_of_sd[sd_item]) prev_frame = frame_of_video - return intersection_desc, frames + return intersection_desc, frames, timestamps def save_frame_ffmpeg(self, frame_number: int, output_path: Path) -> None: """Save a specific frame quickly using ffmpeg.""" @@ -135,7 +162,8 @@ def extract_generic_so_sightings( project: instance of ProcessRoadObjects() class """ - generic_so_desc, extract_frames = self.create_pic_list_from_zip(desc_timestamps) + generic_so_desc, extract_frames, ts_list = self.create_pic_list_from_zip(desc_timestamps) + gps_list = [project.get_location_at_timestamp(ts) for ts in ts_list] image_path = Path( self.video_dir, "out", @@ -144,14 +172,16 @@ def extract_generic_so_sightings( ) image_path.mkdir(exist_ok=True, parents=True) - for desc, frame_num in tqdm( - list(zip(generic_so_desc, extract_frames)), + for desc, frame_num, gps in tqdm( + list(zip(generic_so_desc, extract_frames, gps_list)), desc="Frame Extraction", unit=" frame", ): frame_name = str(desc) + ".jpg" frame_filepath = image_path / frame_name self.save_frame_ffmpeg(frame_num, frame_filepath) + if gps: + add_gps_exif(frame_filepath, gps[0], gps[1]) print( f"PICTURE CAPTURED AT {frame_num}: {desc}, Saved {generic_so_desc.index(desc) + 1} picture(s) of {len(extract_frames)}" ) @@ -166,30 +196,33 @@ def extract_sightings( ): """Extract sighting images from a video.""" - intersection_desc, extract_frames = self.create_pic_list_from_zip( + intersection_desc, extract_frames, ts_list = self.create_pic_list_from_zip( desc_timestamps ) + gps_list = [project.get_location_at_timestamp(ts) for ts in ts_list] image_path = Path( self.video_dir, "out", self.video_filepath.stem, "signal_sightings/" ) image_path.mkdir(exist_ok=True, parents=True) - self._save_frames(intersection_desc, extract_frames, image_path) + self._save_frames(intersection_desc, extract_frames, image_path, gps_list) if label_img: self.img_overlay_info_box(self.video_filename, project) if gen_gif: self.generate_gif(desc_timestamps, project) - def _save_frames(self, descriptions, frames, image_path: Path) -> None: + def _save_frames(self, descriptions, frames, image_path: Path, gps_list=None) -> None: """Save frames described by ``descriptions`` and ``frames`` to disk.""" - for desc, frame_num in tqdm( + for idx, (desc, frame_num) in enumerate(tqdm( list(zip(descriptions, frames)), desc="Frame Extraction", unit=" frame" - ): + )): frame_name = str(desc) + ".jpg" frame_filepath = image_path / frame_name self.save_frame_ffmpeg(frame_num, frame_filepath) + if gps_list and idx < len(gps_list) and gps_list[idx]: + add_gps_exif(frame_filepath, gps_list[idx][0], gps_list[idx][1]) print( f"PICTURE CAPTURED AT {frame_num}: {desc}, Saved {descriptions.index(desc) + 1} picture(s) of {len(frames)}" ) @@ -452,6 +485,7 @@ def labels( descriptive_label, height_percent: tuple, ssoss_and_descriptive=True, + ro_info=None, ): alpha = 1 # Transparency factor. @@ -549,6 +583,11 @@ def labels( ) # save image cv2.imwrite(output_filename, ssoss_and_descriptive_label) + if ro_info is not None: + ts = float(Path(output_filename).stem.split("-")[-1]) + loc = ro_info.get_location_at_timestamp(ts) + if loc: + add_gps_exif(output_filename, loc[0], loc[1]) else: # no ssoss label, just descriptive label (not recommended) @@ -570,6 +609,11 @@ def labels( 2, ) cv2.imwrite(output_filename, img_new) + if ro_info is not None: + ts = float(Path(output_filename).stem.split("-")[-1]) + loc = ro_info.get_location_at_timestamp(ts) + if loc: + add_gps_exif(output_filename, loc[0], loc[1]) @staticmethod def generate_descriptive_label( @@ -626,7 +670,11 @@ def generic_so_img_overlay_info_box(self, vid_filename_dir, ro_info): ) self.labels( - img, label_img_name, descriptive_label, label_height_percents + img, + label_img_name, + descriptive_label, + label_height_percents, + ro_info=ro_info, ) def img_overlay_info_box(self, vid_filename_dir, ro_info): @@ -665,5 +713,9 @@ def img_overlay_info_box(self, vid_filename_dir, ro_info): ) self.labels( - img, label_img_name, descriptive_label, label_height_percents + img, + label_img_name, + descriptive_label, + label_height_percents, + ro_info=ro_info, )