From 04135a0a2baec7ca85df230af7144c8a656b8b5d Mon Sep 17 00:00:00 2001 From: swerty24 Date: Fri, 13 Jun 2025 18:04:46 -0700 Subject: [PATCH] updates and ensure using python 3.11 --- requirements.txt | 2 +- src/ssoss/process_road_objects.py | 25 +- src/ssoss/process_road_objects_archive.py | 760 ++++++++++++++++++++++ 3 files changed, 774 insertions(+), 13 deletions(-) create mode 100644 src/ssoss/process_road_objects_archive.py diff --git a/requirements.txt b/requirements.txt index 42dc5c1..c359b01 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,7 +14,7 @@ imageio==2.31.5 # via -r requirements.in lxml==4.9.3 # via -r requirements.in -numpy==1.26.0 +numpy>=1.26,<2.0 # via # -r requirements.in # imageio diff --git a/src/ssoss/process_road_objects.py b/src/ssoss/process_road_objects.py index ef6b454..b7c0146 100644 --- a/src/ssoss/process_road_objects.py +++ b/src/ssoss/process_road_objects.py @@ -8,7 +8,8 @@ from zoneinfo import ZoneInfo from pathlib import Path -import geopy + +from geopy import Point from geopy.distance import geodesic, Distance import gpxpy @@ -194,7 +195,7 @@ def load_generic_so_csv(self, generic_so_filename: str) -> pd.DataFrame: self.generic_so_load["generic_so_obj"].append(GenericStaticObject( id_num = int(row[0]), street_name = str(row[1]), - pt = geopy.Point(float(row[2]),float(row[3])), + pt = Point(float(row[2]),float(row[3])), bearing = row[4], description = str(row[5]), distance_ft = float(row[6]) @@ -235,7 +236,7 @@ def load_intersection_csv(self, intersection_filename: str) -> pd.DataFrame: Intersection( int(row[0]), tuple((str(row[1]), str(row[2]))), - geopy.Point(float(row[3]), float(row[4])), + Point(float(row[3]), float(row[4])), spd=tuple( ( int(row[5]), @@ -269,7 +270,7 @@ def load_intersection_csv(self, intersection_filename: str) -> pd.DataFrame: temp_i = Intersection( int(row[0]), tuple((str(row[1]), str(row[2]))), - geopy.Point(float(row[3]), float(row[4])), + Point(float(row[3]), float(row[4])), spd=tuple( ( int(row[5]), @@ -286,10 +287,10 @@ def load_intersection_csv(self, intersection_filename: str) -> pd.DataFrame: float(row[12]), ) ), - stop_bar_nb=tuple((geopy.Point(row[13], row[14]), geopy.Point(row[15], row[16]))), - stop_bar_eb=tuple((geopy.Point(row[17], row[18]), geopy.Point(row[19], row[20]))), - stop_bar_sb=tuple((geopy.Point(row[21], row[22]), geopy.Point(row[23], row[24]))), - stop_bar_wb=tuple((geopy.Point(row[25], row[26]), geopy.Point(row[27], row[28]))), + stop_bar_nb=tuple((Point(row[13], row[14]), Point(row[15], row[16]))), + stop_bar_eb=tuple((Point(row[17], row[18]), Point(row[19], row[20]))), + stop_bar_sb=tuple((Point(row[21], row[22]), Point(row[23], row[24]))), + stop_bar_wb=tuple((Point(row[25], row[26]), Point(row[27], row[28]))), ) temp_i.set_sb_pts_bools((nb_sb_pts,eb_sb_pts,sb_sb_pts,wb_sb_pts)) self.intersection_load["intersection_obj"].append(temp_i) @@ -335,7 +336,7 @@ def load_gpx_to_obj_df(self, gpx_filename: str, gpx_ver = "1.0", use_pickle=True gpx_load = {"gpx_pt": []} # initialize starting variables if GPX v1.1 needs speed calcs - pnt1 = geopy.Point() + pnt1 = Point() t1 = None if use_pickle and Path(self.pickle_file).is_file(): @@ -385,7 +386,7 @@ def load_gpx_to_obj_df(self, gpx_filename: str, gpx_ver = "1.0", use_pickle=True if point.speed is not None: point.speed = extra_data["speed"] - p = geopy.Point( + p = Point( latitude=point.latitude, longitude=point.longitude, ) @@ -715,7 +716,7 @@ def get_location_at_timestamp(self, ts): Returns ------- - geopy.Point or None + Point or None The interpolated location or ``None`` if ``ts`` is outside the range of the loaded GPX data. """ @@ -749,7 +750,7 @@ def get_location_at_timestamp(self, ts): lon = p0.get_location().longitude + ratio * ( p1.get_location().longitude - p0.get_location().longitude ) - return geopy.Point(lat, lon) + return Point(lat, lon) return None diff --git a/src/ssoss/process_road_objects_archive.py b/src/ssoss/process_road_objects_archive.py new file mode 100644 index 0000000..b6499ad --- /dev/null +++ b/src/ssoss/process_road_objects_archive.py @@ -0,0 +1,760 @@ +# !/usr/bin/env python +# coding: utf-8 + +import csv, math +import textwrap +import statistics +from datetime import datetime, timezone +from zoneinfo import ZoneInfo +from pathlib import Path + + +from geopy import Point +from geopy.distance import geodesic, Distance + +import gpxpy +import gpxpy.gpx + +import pandas as pd +import lxml +from lxml import etree +from tqdm import tqdm +from timezonefinder import TimezoneFinder + +from ssoss.static_road_object import Intersection, GenericStaticObject +from ssoss.motion_road_object import GPXPoint + + +class ProcessRoadObjects: + + def __init__(self, + gpx_filestring: str = "", + #signals_filestring: str = "", + generic_static_object_filestring: str = "", + use_pickle: bool = True + ): + """ Class to process Road Object files. Using January 1st 1970 as time epoc + + :gpx_filepath: as string, full directory and filename of gpx file + :signals_filepath: as string, full directory and filename of sign or signal CSV file + generic_static_object_filestring: as string, full directory and filename of generic SO CSV file + """ + + self.intersection_load = None + self.intersection_listDF = None + + self.generic_so_load = None #id, generic_SO object list + self.generic_so_listDF = None # data frame of generic_SO object list + + self.date_format = "%m-%d-%Y--%H-%M-%S.%f-%Z" + self.pretty_datetime_format = "%y-%m-%d %H:%M:%S" + self.in_gpx_dir_path = Path(gpx_filestring).parent + self.in_dir_path = self.in_gpx_dir_path + self.out_dir_path = self.in_dir_path / "out" + self.out_dir_path.mkdir(exist_ok=True, parents=True) + + # init variables + #if signals_filestring: + # self.intersection_filename = Path(signals_filestring) + if generic_static_object_filestring: + self.generic_so_filename = Path(generic_static_object_filestring) + self.gpx_filename = Path(gpx_filestring).stem + self.pickle_file = '' + self.gpx_file = '' + self.csv_file = None + self.gpxDF = '' + self.gpx_listDF = None + self.gDF_pickle_file = None + + self.sum_time_gap = 0.0 + self.sum_total_points = 0.0 + + self.intersection_approaches = 0 + self.generic_so_approaches = 0 + + # store whether to load/save pickled GPX data + self.use_pickle = use_pickle + + # scafold directory structure if not present + gpx_video_dir = self.in_gpx_dir_path + p = Path(str(gpx_video_dir)) + + self.static_object_type = "" + + """ + if signals_filestring: + all_intersections_df = self.load_intersection_csv(self.intersection_filename) + """ + if generic_static_object_filestring: + # if filestring file has 7 rows, load_generic_so_csv, else load_intersection_csv + with open(self.generic_so_filename, 'r') as f: + reader = csv.reader(f) + so_file_columns = len(next(reader)) + if so_file_columns == 7: + self.static_object_type = "generic static object" + all_generic_so_df = self.load_generic_so_csv(self.generic_so_filename) + elif so_file_columns == 13 or so_file_columns == 29: + self.static_object_type = "intersection" + all_intersections_df = self.load_intersection_csv(self.generic_so_filename) + else: + raise ValueError("generic static object .csv file must have 7, 13 or 29 columns. Check documentation.") + if self.gpx_filename: + gpx_df = self.load_gpx_to_obj_df(self.gpx_filename, use_pickle=self.use_pickle) + + + @staticmethod + def speed_calc(point1, point2, t1, t2) -> float: + """ + Calculates Meters Per Second speed between two point and time objects + """ + # dist = geo.distance(point1.latitude, point1.longitude, point2.latitude, point2.longitude) + dist = geodesic(point1, point2).meters + time = (t2 - t1).total_seconds() # timedelta converted to float + + if time > 0: + speed = dist / time + if speed < 50: # 50 meters/sec threshold for accurate speed + return speed + else: + return 0.0 + else: + return 0.0 + + def get_intersection_object_by_id(self, intersection_id): + return self.intersection_listDF.iloc[intersection_id-1, 1] + + def get_generic_so_object_by_id(self, id): + return self.generic_so_listDF.iloc[id-1, 1] + + def get_static_object_type(self): + return self.static_object_type + + def generic_so_description(self, sro_id, distance, ts, desc_type="filename"): + """ + create descriptive labels for image filenames and labels at bottom of image + creates descriptive labels for generic static objects + + """ + generic_so_id = sro_id + generic_so_obj = self.get_generic_so_object_by_id(sro_id) + generic_so_name = generic_so_obj.get_name() + generic_so_sd = int(generic_so_obj.get_sd()) + generic_so_dist = distance + ts_utc = round(ts, 3) + dt_temp = datetime.fromtimestamp(ts, tz=None) + date_time = dt_temp.strftime("%a, %b %e %Y at %I:%M %p") # Note: using %-[char] gives error for windowsOS + + if desc_type == "filename": + filename_desc = f'{generic_so_id}.{generic_so_sd}-{generic_so_name}-{generic_so_obj.get_description()}-{ts_utc}' + return filename_desc + elif desc_type == "label": + label = f'{generic_so_obj.get_bearing_str()} {generic_so_name} (#{sro_id}) {generic_so_obj.get_description()} at ~{generic_so_sd} ft on {date_time}' + return label + + + def intersection_frame_description(self, sro_id, b_index, distance, ts, desc_type="filename"): + i_obj = self.get_intersection_object_by_id(sro_id) + + i_id = sro_id + i_bearing = b_index + i_compass_bearing = i_obj.get_bearing_str(b_index) + i_name = i_obj.get_name().replace("+", "-") + i_name_one = i_obj.get_name(0) + i_name_two = i_obj.get_name(1) + i_sd = i_obj.get_sd(b_index) + i_dist = distance + ts_utc = round(ts, 3) + dt_temp = datetime.fromtimestamp(ts, tz=None) + date_time = dt_temp.strftime("%a, %b %e %Y at %I:%M %p") # %-[char] gives error for windowsOS + + # Note-to-self: don't use "/" when building filename string + if desc_type == "filename": + filename_desc = f'{i_id}.{i_bearing}-{i_name}-{i_sd}-{ts_utc}' + return filename_desc + elif desc_type == "label": + label = f'{i_compass_bearing} approach of {i_name_one} and {i_name_two} (#{sro_id}) at ~{i_sd} ft on {date_time}' + return label + + def load_generic_so_csv(self, generic_so_filename: str) -> pd.DataFrame: + """ Loads CSV file into Generic Static Object Class DataFrame + :param generic_so_filename: name of CSV file for loading (leave off .csv) + Format: #,Street Name,latitude,longitude,direction,object type, distance + :return: dataframe of generic static objects in each row + """ + + csv_generic_so_file = Path(self.in_dir_path, generic_so_filename) + self.generic_so_load = {"id": [], "generic_so_obj": []} + + with open(csv_generic_so_file, "r") as csv_file: + csv_reader = csv.reader(csv_file, delimiter=",") + next(csv_reader, None) # skip the header + line_count = 0 + for row in csv_reader: + try: + self.generic_so_load["id"].append(int(row[0])) + self.generic_so_load["generic_so_obj"].append(GenericStaticObject( + id_num = int(row[0]), + street_name = str(row[1]), + pt = Point(float(row[2]),float(row[3])), + bearing = row[4], + description = str(row[5]), + distance_ft = float(row[6]) + )) + except: + print("Check intersection input file formatting.\n" + "Input should be:\n" + "#,Street Name,Latitude, Longitude, Bearing (NB,EB,SB,WB), Description, Distance (in feet)\n" + ) + line_count += 1 + self.generic_so_listDF = pd.DataFrame(self.generic_so_load) + print( + f"Processed {line_count} lines of CSV file") + print( + f"for a total of {len(self.generic_so_listDF.index)} Generic Static Object(s)") + + + def load_intersection_csv(self, intersection_filename: str) -> pd.DataFrame: + """ Loads CSV file into Intersection Class DataFrame + + :param intersection_filename: name of CSV file for loading (leave off .csv) + CSV Format: #,name1(N/S),name2(E/W),latitude,longitude,spd_N,spd_E,spd_S,spd_W,bearing_N,bearing_E,bearing_S,bearing_W, + :return: dataframe of intersections objects in each row + """ + + csv_intersection_file = Path(self.in_dir_path, intersection_filename) + self.intersection_load = {"id": [], "intersection_obj": []} + with open(csv_intersection_file, "r") as csv_file: + csv_reader = csv.reader(csv_file, delimiter=",") + next(csv_reader, None) # skip the header + line_count = 0 + count_sb_i = 0 + for row in csv_reader: + columns_in_row = len(row) + if columns_in_row == 13: + self.intersection_load["id"].append(int(row[0])) + self.intersection_load["intersection_obj"].append( + Intersection( + int(row[0]), + tuple((str(row[1]), str(row[2]))), + geopy.Point(float(row[3]), float(row[4])), + spd=tuple( + ( + int(row[5]), + int(row[6]), + int(row[7]), + int(row[8]), + ) + ), + bearing=tuple( + ( + float(row[9]), + float(row[10]), + float(row[11]), + float(row[12]), + ) + ), + ) + ) + elif columns_in_row == 29: + nb_sb_pts = eb_sb_pts = sb_sb_pts = wb_sb_pts = False + if (row[13] and row[14] and + row[15] and row[16]) != "": nb_sb_pts = True + if (row[17] and row[18] and + row[19] and row[20]) != "": eb_sb_pts = True + if (row[21] and row[22] and + row[23] and row[24]) != "": sb_sb_pts = True + if (row[25] and row[26] and + row[27] and row[28]) != "": wb_sb_pts = True + + self.intersection_load["id"].append(int(row[0])) + temp_i = Intersection( + int(row[0]), + tuple((str(row[1]), str(row[2]))), + Point(float(row[3]), float(row[4])), + spd=tuple( + ( + int(row[5]), + int(row[6]), + int(row[7]), + int(row[8]), + ) + ), + bearing=tuple( + ( + float(row[9]), + float(row[10]), + float(row[11]), + float(row[12]), + ) + ), + stop_bar_nb=tuple((geopy.Point(row[13], row[14]), geopy.Point(row[15], row[16]))), + stop_bar_eb=tuple((geopy.Point(row[17], row[18]), geopy.Point(row[19], row[20]))), + stop_bar_sb=tuple((geopy.Point(row[21], row[22]), geopy.Point(row[23], row[24]))), + stop_bar_wb=tuple((geopy.Point(row[25], row[26]), geopy.Point(row[27], row[28]))), + ) + temp_i.set_sb_pts_bools((nb_sb_pts,eb_sb_pts,sb_sb_pts,wb_sb_pts)) + self.intersection_load["intersection_obj"].append(temp_i) + + if temp_i.all_sb_line_available(): count_sb_i += 1 + + + else: + print("Check intersection input file formatting.\n" + "Input should be:\n" + "#,Street Name1, Steet Name2, Latitude, Longitude, Speed Limit_NB, SL_EB, SL_SB, SL_WB, Bearing_NB, B_EB, B_SB, B_WB\n" + "Optional additional input for stop bar lines after bearing\n" + "NB_Left_StopBar_Latitude, NB_Left_StopBarLongitude, NB_Right_StopBar_Latitude, NB_Left_StopBar_Longitude\n" + "Same for EB Stop Bar, SB Stop Bar, and WB Stop Bar\n" + ) + line_count += 1 + self.intersection_listDF = pd.DataFrame(self.intersection_load) + print( + f"Processed {line_count} lines of CSV file for a total of {len(self.intersection_listDF.index)} intersections, \n \ + and of those {count_sb_i} with stop bar information." + ) + return self.intersection_listDF + + def set_gpx_ver(self): + gpx_string = etree.tostring(etree.parse(self.gpx_file), encoding=str) + gpx_ver_10 = gpx_string.find("http://www.topografix.com/GPX/1/0") + gpx_ver_11 = gpx_string.find("http://www.topografix.com/GPX/1/1") + if gpx_ver_10 < 0 and gpx_ver_11 > 0: + self.gpx_ver = "1.1" + return self.gpx_ver + else: + self.gpx_ver = "1.0" + return self.gpx_ver + + def load_gpx_to_obj_df(self, gpx_filename: str, gpx_ver = "1.0", use_pickle=True) -> pd.DataFrame: + """ Loads GPX file into point objects and returns a dataframe of all the points + """ + self.gpx_filename = gpx_filename + self.gpx_ver = gpx_ver + self.gpx_file = self.in_dir_path / self.in_gpx_dir_path / (gpx_filename + ".gpx") + self.pickle_file = self.out_dir_path / (gpx_filename + ".pkl") + self.csv_file = self.out_dir_path / (gpx_filename + ".csv") + gpx_load = {"gpx_pt": []} + + # initialize starting variables if GPX v1.1 needs speed calcs + pnt1 = geopy.Point() + t1 = None + + if use_pickle and Path(self.pickle_file).is_file(): + self.gpx_listDF = pd.read_pickle(self.pickle_file) + print( + f"Loaded Pickle file {self.pickle_file} into Dataframe with {self.gpx_listDF.last_valid_index()} rows" + ) + if self.intersection_listDF is not None: + self.update_gpx_points(so_type="intersection") + if self.generic_so_listDF is not None: + self.update_gpx_points(so_type="generic_so") + self.gpx_summary() + return self.gpx_listDF + else: + print( + f"Using GPX file: {self.gpx_file}" + ) + + self.gpx_ver = self.set_gpx_ver() + gpx_file_ref = open(self.gpx_file, "r") + + gpx = gpxpy.parse(gpx_file_ref, version=self.gpx_ver) + + # determine timezone from first point + tz_name = "UTC" + if gpx.tracks and gpx.tracks[0].segments and gpx.tracks[0].segments[0].points: + first = gpx.tracks[0].segments[0].points[0] + finder = TimezoneFinder() + tz_guess = finder.timezone_at(lng=first.longitude, lat=first.latitude) + if tz_guess: + tz_name = tz_guess + tzinfo = ZoneInfo(tz_name) + + pt_count = 0 + for track in gpx.tracks: + for segment in track.segments: + for point in segment.points: + if self.gpx_ver == "1.1": + extra_data = {} + if len(point.extensions) > 0: + extension_data = { + lxml.etree.QName(child).localname: float(child.text) + for child in point.extensions[0] + } + for k, v in extension_data.items(): + extra_data[k] = v + if point.speed is not None: + point.speed = extra_data["speed"] + + p = geopy.Point( + latitude=point.latitude, + longitude=point.longitude, + ) + + # convert timestamp to local timezone + local_time = point.time + if local_time.tzinfo is None: + local_time = local_time.replace(tzinfo=tzinfo) + else: + local_time = local_time.astimezone(tzinfo) + + if point.speed is not None: + pass # GPX v1.0 includes speed in track, v1.1 can include in extension data + elif point.speed is None: # GPX v1.1 speed calculation + if pt_count == 0: + point.speed = 0 + t1 = local_time + pnt1 = p + else: + pnt0 = pnt1 + pnt1 = p + t0 = t1 + t1 = local_time + + point.speed = self.speed_calc(pnt0, pnt1, t0, t1) + + gpx_load["gpx_pt"].append( + GPXPoint( + pt_count, + local_time.isoformat(), + tuple(p), # tuple of point, lat and lon + point.speed + ) + ) + pt_count += 1 + + self.gpx_listDF = pd.DataFrame(gpx_load) + if use_pickle: + self.gpx_listDF.to_pickle(self.pickle_file) + self.gpx_listDF.to_csv(self.csv_file) + print( + f"Processing {pt_count} points of GPX file." + ) + if self.intersection_listDF is not None: + self.update_gpx_points(so_type = "intersection") + if self.generic_so_listDF is not None: + self.update_gpx_points(so_type = "generic_so") + self.gpx_summary() + return self.gpx_listDF + + def update_gpx_points(self, so_type): + """ + updates gpx_prev_point and gpx_next_point as objects after the initial points + are loaded + """ + # simplify variable names + last_index = self.gpx_listDF.last_valid_index() + gpx_df = self.gpx_listDF + cuml_d = 0.0 + + # edge cases + gpx_df.iloc[0, 0].set_next_gpx_point(gpx_df.iloc[1, 0]) + + + for i in range(1, last_index): # populate prev_gpx_points + gpx_df.iloc[i, 0].set_prev_gpx_point(gpx_df.iloc[i-1, 0]) + for i in range(last_index-1, 0, -1): # reversed range to populate next_gpx_points + gpx_df.iloc[i, 0].set_next_gpx_point(gpx_df.iloc[i+1, 0]) + + # edge cases + gpx_df.iloc[0, 0].set_next_gpx_point(gpx_df.iloc[1, 0]) + gpx_df.iloc[last_index, 0].set_prev_gpx_point(gpx_df.iloc[last_index - 1, 0]) + + # repopulate prev_gpx_point objects so next_gpx_points are not NONE + for i in range(1, last_index+1): + current_gpx_pt = gpx_df.iloc[i, 0] + current_gpx_pt.set_prev_gpx_point(gpx_df.iloc[i-1, 0]) + self.sum_time_gap += current_gpx_pt.get_prev_timedelta() + cuml_d += current_gpx_pt.distance_to(current_gpx_pt.get_prev_gpx_point().get_location()) + current_gpx_pt.set_cumulative_distance(cuml_d) + + # edge cases again + gpx_df.iloc[0, 0].set_next_gpx_point(gpx_df.iloc[1, 0]) + + # display progress bar for calculating time-consuming/unoptimized backflow function + for i in tqdm(range(len(gpx_df.index))): + if so_type == "intersection": + gpx_df.iloc[i, 0].backflow(self.intersection_listDF, "intersection") + elif so_type == "generic_so": + gpx_df.iloc[i, 0].backflow(self.generic_so_listDF, "generic_so") + + def get_start_timestamp(self): + return self.gpx_listDF.iloc[0, 0].get_timestamp() + + def get_end_timestamp(self): + last_index = self.gpx_listDF.last_valid_index() + return self.gpx_listDF.iloc[last_index, 0].get_timestamp() + + def generic_so_checks(self): + """ + perform generic distance check on static road object + """ + gpx_df = self.gpx_listDF + all_generic_so = self.generic_so_listDF + generic_so_desc = [] + generic_so_ts = [] + generic_so_error = [] + bearing_buffer_angle = 50 + time_buffer = 3 + + for point in range(len(gpx_df.index)): + generic_so_info = gpx_df.iloc[point, 0].get_generic_so_approach_list() + + if generic_so_info: + p = gpx_df.iloc[point, 0] + generic_so_id, dist, approaching_bool = zip(*generic_so_info) + for item in range(len(list(generic_so_id))): + sro_id = int(list(generic_so_id)[item]) + d_current = list(dist)[item] + + approach_generic_so = all_generic_so.iloc[sro_id-1, 1] # shift by 1 for generic_so dataframe (starting at 0, not 1) + approach_generic_so_sight_distance = approach_generic_so.get_sd() + + if p.calc_bearing_diff(approach_generic_so.get_bearing()) < bearing_buffer_angle: + print( + f"Generic Object #{sro_id} at {approach_generic_so_sight_distance} ft at {p.get_timestamp()}" + ) + prev_current_b4_next = p.generic_so_prev_and_current_before_next(approach_generic_so) + next_less_than_current = p.generic_so_next_less_than_current(approach_generic_so) + neg_dist = p.distance_to(approach_generic_so.get_location()) - approach_generic_so_sight_distance + #if prev_current_b4_next and next_less_than_current: + filtered_sightings = p.generic_so_single_filter(approach_generic_so) + + if filtered_sightings[0] and (neg_dist < 0): + t_acc = p.t_to_generic_so_acc(approach_generic_so) + print( + f"Generic Object #{sro_id} at {approach_generic_so_sight_distance} ft acc shift by {t_acc} with error {filtered_sightings[1]} ft" + ) + + t_shift_acc = p.get_timestamp() + t_acc + if approach_generic_so.print_detail_info() in generic_so_desc: + index_item = generic_so_desc.index(approach_generic_so.print_detail_info()) + if t_shift_acc-time_buffer < generic_so_ts[index_item] < t_shift_acc + time_buffer: + if filtered_sightings[1] < generic_so_error[index_item]: + # remove + del generic_so_desc[index_item] + del generic_so_ts[index_item] + del generic_so_error[index_item] + # append + generic_so_desc.append(approach_generic_so.print_detail_info()) + generic_so_ts.append(t_shift_acc) + generic_so_error.append(filtered_sightings[1]) + else: + generic_so_desc.append(approach_generic_so.print_detail_info()) + #generic_so_sights.append(self.generic_so_description(sro_id, approach_generic_so_sight_distance, t_shift_acc)) + generic_so_ts.append(t_shift_acc) + generic_so_error.append(filtered_sightings[1]) + + + updated_desc = self.include_timestamp_to_description(generic_so_desc, generic_so_ts) + z = zip(updated_desc, generic_so_ts) # create tuples with generic Static Object descriptions,timestamps and errors + id_ts_error = list(z) # convert zip to list + time_sort = sorted(id_ts_error, key=lambda x: x[1]) # sort the list by timestamps + #id_sort = sorted(id_ts_error, key=lambda x: x[0]) # sort the list by id + #error_sort = sorted(id_ts_error, key=lambda x: x[2]) # sort the list by errors + + self.generic_so_approaches = len(time_sort) + return time_sort + + + @staticmethod + def include_timestamp_to_description(desc, ts): + """ + add timestamp to description in one string + """ + for i in range(len(desc)): + desc[i] = desc[i] + "-" + str(round(ts[i],3)) + return desc + + def intersection_checks(self): + """ + perform intersection sight distance checks. + find timestamp of intersection approach sight distance locations + check each GPX point + """ + gpx_df = self.gpx_listDF + all_intersections = self.intersection_listDF + intersection_sd = [] # store intersection id & index in list + intersection_ts = [] # store timestamps in list + + for point in range(len(gpx_df.index)): + intersections_info = gpx_df.iloc[point, 0].get_intersection_approach_list() + if intersections_info: + p = gpx_df.iloc[point, 0] + intersection_id, bearing_index, dist, approach = zip(*intersections_info) + for item in range(len(list(intersection_id))): + sro_id = int(list(intersection_id)[item]) + b_index = int(list(bearing_index)[item]) + d_current = list(dist)[item] + + approach_intersection = all_intersections.iloc[sro_id-1, 1] + approach_intersection_sight_distance = approach_intersection.get_sd(b_index) + + prev_current_b4_next = p.h_prev_and_current_before_next(approach_intersection, b_index) + next_less_than_current = p.h_next_less_than_current(approach_intersection, b_index) + simple_int_approach = p.simple_intersection_approach(approach_intersection, b_index) + + if (prev_current_b4_next and next_less_than_current): + #if simple_int_approach: + t_acc = p.t_to_approach_acc(approach_intersection, b_index) + print( + f"Signal #{sro_id}.{b_index} at {approach_intersection_sight_distance} ft acc shift by {t_acc}" + ) + + t_shift_acc = p.get_timestamp() + t_acc + intersection_sd.append(self.intersection_frame_description(sro_id, b_index, d_current, t_shift_acc)) + intersection_ts.append(t_shift_acc) + + z = zip(intersection_sd, intersection_ts) # create tuples with intersection descriptions and timestamps + id_ts = list(z) # convert zip to list + ret = sorted(id_ts, key=lambda x: x[1]) # sort the list by timestamps + self.intersection_approaches = len(ret) + return ret + + + @staticmethod + def hr_min_sec(sec): + if sec < 60: + return f'{sec} seconds' + elif sec < 3600: + minutes = int(sec / 60) + sec_remain = round(sec - minutes * 60, 2) + return f'{minutes:02}:{sec_remain:05.2f} (MM:SS.ss)' + elif sec >= 3600: + hr = int(sec / 3600) + minutes = int((sec - hr * 3600) / 60) + sec_remain = round(sec - (hr * 3600 + minutes * 60), 2) + return f'{hr:02}:{minutes:02}:{sec_remain:05.2f} (HH:MM:SS.ss)' + + @staticmethod + def simplify_distance(d_ft): + if d_ft < 5280: + return f'{round(Distance(feet=d_ft).ft, 2)} feet' + else: + return f'{round(Distance(feet=d_ft).miles, 2)} miles' + + def gpx_summary(self): + gpx_df = self.gpx_listDF + last_index = gpx_df.last_valid_index() + self.sum_total_points = last_index + 1 # pd dataframe starts at zero + avg_time_gap = round(self.sum_time_gap / self.sum_total_points, 2) + tot_sec = round(self.get_end_timestamp() - self.get_start_timestamp(), 2) + tot_distance = gpx_df.iloc[last_index, 0].get_cumulative_distance() + + if self.sum_total_points > 0: + conv = gpx_df.iloc[0, 0].FTPStoMPH + spd_vals = [gpx_df.iloc[i, 0].get_speed() for i in range(self.sum_total_points)] + spd_mph = [s * conv for s in spd_vals] + avg_speed = round((tot_distance / tot_sec) * conv, 2) if tot_sec > 0 else 0.0 + max_speed = round(max(spd_mph), 2) + min_speed = round(min(spd_mph), 2) + if self.sum_total_points > 1: + acc_vals = [gpx_df.iloc[i, 0].acceleration() for i in range(self.sum_total_points - 1)] + avg_acc = round(statistics.mean(acc_vals) * conv, 2) + else: + avg_acc = 0.0 + else: + avg_speed = max_speed = min_speed = avg_acc = 0.0 + + width = 70 + title = "GPX SUMMARY" + symbol = "-" + + summary = textwrap.dedent( + f""" + {symbol * width} + {title.center(width)} + {symbol * width} + GPX File: {self.gpx_file} + Using GPX version: {self.gpx_ver} + Start time: {datetime.fromtimestamp(self.get_start_timestamp(), tz=None)} + End time: {datetime.fromtimestamp(self.get_end_timestamp(), tz=None)} + Total duration: {self.hr_min_sec(tot_sec)} + Total distance: {self.simplify_distance(tot_distance)} + Number of data points: {self.sum_total_points} + Avg. Time Gap between data points: {avg_time_gap} Seconds + Avg. Speed: {avg_speed} MPH + Max Speed: {max_speed} MPH + Min Speed: {min_speed} MPH + Avg. Acceleration: {avg_acc} MPH/s + {symbol * width} + """ + ) + + print(summary) + + @staticmethod + def avg_speed(spd1, spd2): + return (spd1 + spd2) / 2 + + def get_speed_at_timestamp(self, ts): + point_list = self.gpx_listDF + last_point = len(point_list)-1 + speed = None + + # boundary conditions + # first point check + if ts < point_list.iloc[0, 0].get_timestamp(): + return None + # last point check + if ts > point_list.iloc[last_point, 0].get_timestamp(): + return None + + for i in range(len(point_list)-1): + if point_list.iloc[i, 0].get_timestamp() <= ts <= point_list.iloc[i + 1, 0].get_timestamp(): + speed = self.avg_speed(point_list.iloc[i, 0].get_speed(), point_list.iloc[i+1, 0].get_speed()) + break + return speed + + def get_location_at_timestamp(self, ts): + """Return a geopy ``Point`` interpolated for ``ts``. + + Parameters + ---------- + ts : float + Unix timestamp to interpolate the latitude and longitude for. + + Returns + ------- + geopy.Point or None + The interpolated location or ``None`` if ``ts`` is outside the + range of the loaded GPX data. + """ + + points = self.gpx_listDF + if points is None or len(points) == 0: + return None + + last_idx = len(points) - 1 + + # Boundary checks + first_ts = points.iloc[0, 0].get_timestamp() + last_ts = points.iloc[last_idx, 0].get_timestamp() + if ts < first_ts or ts > last_ts: + return None + + # Locate the two surrounding points + for i in range(last_idx): + p0 = points.iloc[i, 0] + p1 = points.iloc[i + 1, 0] + t0 = p0.get_timestamp() + t1 = p1.get_timestamp() + if t0 <= ts <= t1: + if t1 == t0: + return p0.get_location() + + ratio = (ts - t0) / (t1 - t0) + lat = p0.get_location().latitude + ratio * ( + p1.get_location().latitude - p0.get_location().latitude + ) + lon = p0.get_location().longitude + ratio * ( + p1.get_location().longitude - p0.get_location().longitude + ) + return geopy.Point(lat, lon) + + return None + + + + +