diff --git a/.github/workflows/lint_pytest_deploy.yml b/.github/workflows/lint_pytest_deploy.yml new file mode 100644 index 00000000..5ffbb76a --- /dev/null +++ b/.github/workflows/lint_pytest_deploy.yml @@ -0,0 +1,60 @@ +# This workflow will install Python dependencies, run tests and lint with a variety of Python versions +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python + +name: Testing + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + lint: + name: Ruff (Lint & Style) + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.11 + uses: actions/setup-python@v4 + with: + python-version: "3.11" + + - name: Install Ruff + run: | + python -m pip install --upgrade pip + python -m pip install ruff + + # “ruff check .” performs linting; + # “ruff format --check .” verifies Black-style formatting. + - name: Run Ruff + run: | + ruff check . + ruff format --check . + + tests: + name: Pytest + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.11 + uses: actions/setup-python@v4 + with: + python-version: "3.11" + + - name: Install project & dependencies + run: | + python -m pip install --upgrade pip + python -m pip install --editable . + python -m pip install pytest + + - name: Set AEIC_DATA_DIR + run: echo "AEIC_DATA_DIR=$(pwd)/data" >> "$GITHUB_ENV" + + - name: Run Pytest + run: python -m pytest + diff --git a/docs/conf.py b/docs/conf.py index 31a95c46..d547489f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -9,20 +9,27 @@ import os import sys -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'src'))) +sys.path.insert( + 0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'src')) +) project = 'AEIC' -copyright = '2025, Wyatt Giroux, Prashanth Prakash, Prateek Ranjan, Aditeya Shukla, Raymond Speth' -author = 'Wyatt Giroux, Prashanth Prakash, Prateek Ranjan, Aditeya Shukla, Raymond Speth' +copyright = ( + '2025, Wyatt Giroux, Prashanth Prakash, Prateek Ranjan, ' + 'Aditeya Shukla, Raymond Speth' +) +author = 'Wyatt Giroux, Prashanth Prakash, Prateek Ranjan, Aditeya Shukla,Raymond Speth' release = '1.0.0a1' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration -extensions = ['sphinx.ext.autodoc', - 'sphinx.ext.autosummary', - 'sphinx.ext.coverage', - 'sphinx.ext.napoleon'] +extensions = [ + 'sphinx.ext.autodoc', + 'sphinx.ext.autosummary', + 'sphinx.ext.coverage', + 'sphinx.ext.napoleon', +] autosummary_generate = True templates_path = ['_templates'] diff --git a/pyproject.toml b/pyproject.toml index 627a6067..d9e98a94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,4 +32,23 @@ dev = [ [build-system] requires = ["hatchling"] -build-backend = "hatchling.build" \ No newline at end of file +build-backend = "hatchling.build" + +[tool.ruff] +# Use Black-compatible defaults +line-length = 88 + +# Optionally, choose which rule sets to enable; this is a typical starting point. +# Remove or extend as you like. +lint.select = ["E", "F", "W", "I", "UP"] # pycodestyle, pyflakes, warnings, isort, pyupgrade +lint.ignore = [] # specify ignores here + +# If you want Ruff to auto-format with Black style when you run +# ruff format . +# you don't need any extra options—the default formatter already follows Black’s 88-char width. + +[tool.black] +line-length = 88 + +[tool.ruff.format] +quote-style = "preserve" # keep whatever the file already uses, dont change single quotes to double \ No newline at end of file diff --git a/src/AEIC/performance_model.py b/src/AEIC/performance_model.py index d79902a8..cb5dc081 100644 --- a/src/AEIC/performance_model.py +++ b/src/AEIC/performance_model.py @@ -1,33 +1,39 @@ +import os + import numpy as np import tomllib -import json -import os -from parsers.PTF_reader import parse_PTF -from parsers.OPF_reader import parse_OPF -from parsers.LTO_reader import parseLTO + from BADA.aircraft_parameters import Bada3AircraftParameters from BADA.model import Bada3JetEngineModel +from parsers.LTO_reader import parseLTO +from parsers.OPF_reader import parse_OPF + # from src.missions.OAG_filter import filter_OAG_schedule from utils import file_location + class PerformanceModel: '''Performance model for an aircraft. Contains - fuel flow, airspeed, ROC/ROD, LTO emissions, - and OAG schedule''' + fuel flow, airspeed, ROC/ROD, LTO emissions, + and OAG schedule''' def __init__(self, config_file="IO/default_config.toml"): - ''' Initializes the performance model by reading the configuration, + '''Initializes the performance model by reading the configuration, loading mission data, and setting up performance and engine models.''' config_file_loc = file_location(config_file) self.config = {} with open(config_file_loc, 'rb') as f: config_data = tomllib.load(f) - self.config = {k: v for subdict in config_data.values() for k, v in subdict.items()} + self.config = { + k: v for subdict in config_data.values() for k, v in subdict.items() + } # Get mission data # self.filter_OAG_schedule = filter_OAG_schedule mission_file = file_location( - os.path.join(self.config['missions_folder'], self.config['missions_in_file']) + os.path.join( + self.config['missions_folder'], self.config['missions_in_file'] + ) ) with open(mission_file, 'rb') as f: all_missions = tomllib.load(f) @@ -38,9 +44,9 @@ def __init__(self, config_file="IO/default_config.toml"): self.initialize_performance() def initialize_performance(self): - '''Initializes aircraft performance characteristics from TOML sourcee. + '''Initializes aircraft performance characteristics from TOML sourcee. Also loads LTO/EDB data and sets up the engine model using BADA3 parameters.''' - + self.ac_params = Bada3AircraftParameters() # If OPF data input if self.config["performance_model_input"] == "OPF": @@ -68,13 +74,15 @@ def initialize_performance(self): if self.config["LTO_input_mode"] == "input_file": # Load LTO data self.LTO_data = parseLTO(self.config['LTO_input_file']) - + def read_performance_data(self): - '''Parses the TOML input file containing flight and LTO performance data. + '''Parses the TOML input file containing flight and LTO performance data. Separates model metadata and prepares the data for table generation.''' - - # Read and load TOML data - with open(file_location(self.config["performance_model_input_file"]), "rb") as f: + + # Read and load TOML data + with open( + file_location(self.config["performance_model_input_file"]), "rb" + ) as f: data = tomllib.load(f) self.LTO_data = data['LTO_performance'] @@ -98,7 +106,8 @@ def create_performance_table(self, data_dict): cols = data_dict["cols"] data = data_dict["data"] - # Identify output column (we assume it's the first column or explicitly labeled as fuel flow) + # Identify output column (we assume it's the first column or + # explicitly labeled as fuel flow) try: output_col_idx = cols.index("FUEL_FLOW") # Output is fuel flow except ValueError: @@ -107,8 +116,14 @@ def create_performance_table(self, data_dict): input_col_names = [c for i, c in enumerate(cols) if i != output_col_idx] # Extract and sort unique values for each input dimension - input_values = {col: sorted(set(row[cols.index(col)] for row in data)) for col in input_col_names} - input_indices = {col: {val: idx for idx, val in enumerate(input_values[col])} for col in input_col_names} + input_values = { + col: sorted(set(row[cols.index(col)] for row in data)) + for col in input_col_names + } + input_indices = { + col: {val: idx for idx, val in enumerate(input_values[col])} + for col in input_col_names + } # Prepare multidimensional shape and index arrays shape = tuple(len(input_values[col]) for col in input_col_names) @@ -131,5 +146,3 @@ def create_performance_table(self, data_dict): self.performance_table = fuel_flow_array self.performance_table_cols = [input_values[col] for col in input_col_names] self.performance_table_colnames = input_col_names # Save for external reference - - diff --git a/src/AEIC/trajectories/ads-b_trajectory.py b/src/AEIC/trajectories/ads-b_trajectory.py index 8ecb43e0..304fee11 100644 --- a/src/AEIC/trajectories/ads-b_trajectory.py +++ b/src/AEIC/trajectories/ads-b_trajectory.py @@ -1,22 +1,20 @@ -import numpy as np from AEIC.performance_model import PerformanceModel from AEIC.trajectories.trajectory import Trajectory + class ADSBTrajectory(Trajectory): '''Model for determining flight trajectories using ADS-B flight data. Can be optimized using methods defined by Marek Travnik. ''' - def __init__(self, ac_performance:PerformanceModel): + + def __init__(self, ac_performance: PerformanceModel): super().__init__(ac_performance) - - + def climb(self): pass - - + def cruise(self): pass - - + def descent(self): - pass \ No newline at end of file + pass diff --git a/src/AEIC/trajectories/dymos_trajectory.py b/src/AEIC/trajectories/dymos_trajectory.py index 51df1677..9a0abd2c 100644 --- a/src/AEIC/trajectories/dymos_trajectory.py +++ b/src/AEIC/trajectories/dymos_trajectory.py @@ -1,22 +1,20 @@ -import numpy as np from AEIC.performance_model import PerformanceModel from AEIC.trajectories.trajectory import Trajectory + class DymosTrajectory(Trajectory): '''Model for determining flight trajectories using ADS-B flight data. Can be optimized using methods defined by Marek Travnik. ''' - def __init__(self, ac_performance:PerformanceModel): + + def __init__(self, ac_performance: PerformanceModel): super().__init__(ac_performance) - - + def climb(self): pass - - + def cruise(self): pass - - + def descent(self): - pass \ No newline at end of file + pass diff --git a/src/AEIC/trajectories/legacy_trajectory.py b/src/AEIC/trajectories/legacy_trajectory.py index 4e21b5d2..bd50e213 100644 --- a/src/AEIC/trajectories/legacy_trajectory.py +++ b/src/AEIC/trajectories/legacy_trajectory.py @@ -1,172 +1,225 @@ import numpy as np + from AEIC.performance_model import PerformanceModel from AEIC.trajectories.trajectory import Trajectory -from utils.helpers import feet_to_meters, meters_to_feet, nautmiles_to_meters, filter_order_duplicates +from utils.helpers import ( + feet_to_meters, + filter_order_duplicates, + meters_to_feet, + nautmiles_to_meters, +) from utils.weather_utils import compute_ground_speed -import pandas as pd + class LegacyTrajectory(Trajectory): '''Model for determining flight trajectories using the legacy method from AEIC v2. ''' - def __init__(self, ac_performance:PerformanceModel, mission, optimize_traj:bool, iterate_mass:bool, - startMass:float=-1, pctStepClm=0.01, pctStepCrz=0.01, pctStepDes=0.01, fuel_LHV=43.8e6): - super().__init__(ac_performance, mission, optimize_traj, iterate_mass, startMass=startMass) - - # Define discretization of each phase in steps as a percent of the overall distance/altitude change + + def __init__( + self, + ac_performance: PerformanceModel, + mission, + optimize_traj: bool, + iterate_mass: bool, + startMass: float = -1, + pctStepClm=0.01, + pctStepCrz=0.01, + pctStepDes=0.01, + fuel_LHV=43.8e6, + ): + super().__init__( + ac_performance, mission, optimize_traj, iterate_mass, startMass=startMass + ) + + # Define discretization of each phase in steps as a percent of + # the overall distance/altitude change self.pctStepClm = pctStepClm self.pctStepCrz = pctStepCrz self.pctStepDes = pctStepDes - + self.NClm = int(1 / self.pctStepClm + 1) self.NCrz = int(1 / self.pctStepCrz + 1) self.NDes = int(1 / self.pctStepDes + 1) self.Ntot = self.NClm + self.NCrz + self.NDes self.fuel_LHV = fuel_LHV - + # Climb defined as starting 3000' above airport self.clm_start_altitude = self.dep_lon_lat_alt[-1] + feet_to_meters(3000.0) - + # Max alt should be changed to meters - max_alt = feet_to_meters(ac_performance.model_info['General_Information']['max_alt_ft']) - - # Check if starting altitude is above operating ceiling; if true, set start altitude to + max_alt = feet_to_meters( + ac_performance.model_info['General_Information']['max_alt_ft'] + ) + + # Check if starting altitude is above operating ceiling; + # if true, set start altitude to # departure airport altitude if self.clm_start_altitude >= max_alt: self.clm_start_altitude = self.dep_lon_lat_alt[-1] - + # Cruise altitude is the operating ceiling - 7000 feet - self.crz_start_altitude = max_alt - feet_to_meters(7000.) - + self.crz_start_altitude = max_alt - feet_to_meters(7000.0) + # Ensure cruise altitude is above the starting altitude if self.crz_start_altitude < self.clm_start_altitude: self.crz_start_altitude = self.clm_start_altitude - - # Prevent flying above A/C ceiling (NOTE: this will only trigger due to random + + # Prevent flying above A/C ceiling (NOTE: this will only trigger due to random # variables not currently implemented) if self.crz_start_altitude > max_alt: self.crz_start_altitude = max_alt - + # In legacy trajectory, descent start altitude is equal to cruise altitude self.des_start_altitude = self.crz_start_altitude - - # Set descent altitude based on 3000' above arrival airport altitude; clamp to A/C operating + + # Set descent altitude based on 3000' above arrival airport altitude; + # clamp to A/C operating # ceiling if needed self.des_end_altitude = self.arr_lon_lat_alt[-1] + feet_to_meters(3000.0) if self.des_end_altitude >= max_alt: self.des_end_altitude = max_alt - + # Save relevant flight levels self.crz_FL = meters_to_feet(self.crz_start_altitude) / 100 self.clm_FL = meters_to_feet(self.clm_start_altitude) / 100 self.des_FL = meters_to_feet(self.des_start_altitude) / 100 - self.end_FL = meters_to_feet(self.des_end_altitude) / 100 - + self.end_FL = meters_to_feet(self.des_end_altitude) / 100 + # Get the relevant bounding flight levels for cruise based on performance data self.__calc_crz_FLs() - + # Get the indices for 0-ROC performance - self.__get_zero_roc_index() - - + self.__get_zero_roc_index() + def climb(self): - ''' Function called by `fly_flight_iteration()` to simulate climb ''' + '''Function called by `fly_flight_iteration()` to simulate climb''' dAlt = (self.crz_start_altitude - self.clm_start_altitude) / (self.NClm - 1) if dAlt < 0: - raise ValueError('Departure airport + 3000ft should not be higher than start of cruise point') - + raise ValueError( + "Departure airport + 3000ft should not be higher" + "than start of cruise point" + ) + alts = np.linspace(self.clm_start_altitude, self.crz_start_altitude, self.NClm) - self.traj_data['altitude'][0:self.NClm] = alts - + self.traj_data['altitude'][0 : self.NClm] = alts + self.__legacy_climb() - - + def cruise(self): - ''' Function called by `fly_flight_iteration()` to simulate cruise ''' - # Start cruise at end-of-climb position and mass (fuel flow, TAS will be replaced) + '''Function called by `fly_flight_iteration()` to simulate cruise''' + # Start cruise at end-of-climb position and mass + # (fuel flow, TAS will be replaced) for field in self.traj_data.dtype.names: - self.traj_data[field][self.NClm] = self.traj_data[field][self.NClm-1] - + self.traj_data[field][self.NClm] = self.traj_data[field][self.NClm - 1] + # Cruise at constant altitude - self.traj_data['altitude'][self.NClm:self.NClm+self.NCrz] = self.crz_start_altitude - + self.traj_data['altitude'][self.NClm : self.NClm + self.NCrz] = ( + self.crz_start_altitude + ) + descent_dist_approx = 18.23 * (self.des_start_altitude - self.des_end_altitude) - + if descent_dist_approx < 0: raise ValueError('Arrival airport should not be above cruise altitude') - - cruise_start_distance = self.traj_data['groundDist'][self.NClm-1] - cruise_dist_approx = nautmiles_to_meters(self.gc_distance) - cruise_start_distance - descent_dist_approx - + + cruise_start_distance = self.traj_data['groundDist'][self.NClm - 1] + cruise_dist_approx = ( + nautmiles_to_meters(self.gc_distance) + - cruise_start_distance + - descent_dist_approx + ) + # Cruise is discretized into ground distance steps cruise_end_distance = cruise_start_distance + cruise_dist_approx - cruise_distance_values = np.linspace(cruise_start_distance, cruise_end_distance, self.NCrz) - self.traj_data['groundDist'][self.NClm:self.NClm+self.NCrz] = cruise_distance_values - + cruise_distance_values = np.linspace( + cruise_start_distance, cruise_end_distance, self.NCrz + ) + self.traj_data['groundDist'][self.NClm : self.NClm + self.NCrz] = ( + cruise_distance_values + ) + # Get distance step size dGD = cruise_dist_approx / (self.NCrz - 1) - + self.__legacy_cruise(dGD) - - + def descent(self): - ''' Function called by `fly_flight_iteration()` to simulate descent ''' - # Start descent at end-of-cruise position and mass (fuel flow, TAS will be replaced) + '''Function called by `fly_flight_iteration()` to simulate descent''' + # Start descent at end-of-cruise position and mass (fuel flow, + # TAS will be replaced) for field in self.traj_data.dtype.names: - self.traj_data[field][self.NClm+self.NCrz] = self.traj_data[field][self.NClm+self.NCrz-1] - + self.traj_data[field][self.NClm + self.NCrz] = self.traj_data[field][ + self.NClm + self.NCrz - 1 + ] + dAlt = (self.des_end_altitude - self.des_start_altitude) / (self.NDes) if dAlt > 0: - raise ValueError('Arrival airport + 3000ft should not be higher than end of cruise point') - + raise ValueError( + "Arrival airport + 3000ft should not be higher thanend of cruise point" + ) + alts = np.linspace(self.des_start_altitude, self.des_end_altitude, self.NDes) startN = self.NClm + self.NCrz endN = startN + self.NDes self.traj_data['altitude'][startN:endN] = alts - + self.__legacy_descent() - - + def calc_starting_mass(self): - ''' Calculates the starting mass using AEIC v2 methods. Sets both starting mass and non-reserve/hold/divert fuel mass ''' + '''Calculates the starting mass using AEIC v2 methods. + Sets both starting mass and non-reserve/hold/divert fuel mass''' # Use the highest value of mass per AEIC v2 method - mass_ind = [len(self.ac_performance.performance_table_cols[-1])-1] - crz_mass = np.array(self.ac_performance.performance_table_cols[-1])[mass_ind] - + mass_ind = [len(self.ac_performance.performance_table_cols[-1]) - 1] + # crz_mass = np.array(self.ac_performance.performance_table_cols[-1])[mass_ind] + subset_performance = self.ac_performance.performance_table[ - np.ix_(self.crz_FL_inds, # axis 0: flight levels - np.arange(self.ac_performance.performance_table.shape[1]), # axis 1: all TAS's - np.where(self.zero_roc_mask)[0], # axis 2: ROC ≈ 0 - mass_ind, # axis 3: high mass value - ) + np.ix_( + self.crz_FL_inds, + # axis 0: flight levels + np.arange(self.ac_performance.performance_table.shape[1]), + # axis 1: all TAS's + np.where(self.zero_roc_mask)[0], + # axis 2: ROC ≈ 0 + mass_ind, + # axis 3: high mass value + ) ] - + non_zero_mask = np.any(subset_performance != 0, axis=(0, 2, 3)) non_zero_perf = subset_performance[:, non_zero_mask, :, :] crz_tas = np.array(self.ac_performance.performance_table_cols[1])[non_zero_mask] - - # At this point, we should have a (2, 2, 1, 1)-shape matrix of fuel flow in (FL, TAS, --, --) - # where there should only be two non-0 values in the FL and TAS dimensions. Isolate this matrix: - if np.shape(non_zero_perf) != (2,2,1,1): + + # At this point, we should have a (2, 2, 1, 1)-shape matrix of + # fuel flow in (FL, TAS, --, --) + # where there should only be two non-0 values in the FL and TAS dimensions. + # Isolate this matrix: + if np.shape(non_zero_perf) != (2, 2, 1, 1): raise ValueError('Performance is overdefined for legacy methods') - - twoByTwoPerf = non_zero_perf[:,:,0,0] + + twoByTwoPerf = non_zero_perf[:, :, 0, 0] ff_mat = twoByTwoPerf[twoByTwoPerf != 0.0] if np.shape(ff_mat) != (2,): - raise ValueError(f'Mass estimation fuel flow matrix does not have the required dimensions (Expected: (2,); Recieved: {np.shape(ff_mat)})') - + raise ValueError( + "Mass estimation fuel flow matrix does not have the" + "required dimensions (Expected: (2,); Recieved: " + f"{np.shape(ff_mat)})" + ) + # Now perform the necessary interpolations in TAS and fuel flow - FL_weighting = (self.crz_FL - self.crz_FLs[0]) / (self.crz_FLs[1] - self.crz_FLs[0]) + FL_weighting = (self.crz_FL - self.crz_FLs[0]) / ( + self.crz_FLs[1] - self.crz_FLs[0] + ) dfuelflow = ff_mat[1] - ff_mat[0] dTAS = crz_tas[1] - crz_tas[0] - + fuelflow = ff_mat[0] + dfuelflow * FL_weighting tas = crz_tas[0] + dTAS * FL_weighting - + # Figure out startingMass components per AEIC v2: # - # | empty weight + # | empty weight # | + payload weight # | + fuel weight # | + fuel reserves weight @@ -174,103 +227,119 @@ def calc_starting_mass(self): # | + fuel hold weight # | _______________________ # = Take-off weight - + # Empty mass per BADA-3 (low mass / 1.2) emptyMass = self.ac_performance.performance_table_cols[-1][0] / 1.2 - + # Payload - payloadMass = self.ac_performance.model_info['General_Information']['max_payload_kg'] * self.load_factor - + payloadMass = ( + self.ac_performance.model_info['General_Information']['max_payload_kg'] + * self.load_factor + ) + # Fuel Needed (distance / velocity * fuel flow rate) approxTime = nautmiles_to_meters(self.gc_distance) / tas fuelMass = approxTime * fuelflow - + # Reserve fuel (assumed 5%) reserveMass = fuelMass * 0.05 - + # Diversion fuel per AEIC v2 - if approxTime / 60 > 180: # > 180 minutes - divertMass = nautmiles_to_meters(200.) / tas * fuelflow - holdMass = 30 * 60 * tas # 30 min; using cruise ff here + if approxTime / 60 > 180: # > 180 minutes + divertMass = nautmiles_to_meters(200.0) / tas * fuelflow + holdMass = 30 * 60 * tas # 30 min; using cruise ff here else: - divertMass = nautmiles_to_meters(100.) / tas * fuelflow - holdMass = 45 * 60 * tas # 30 min; using cruise ff here - - self.starting_mass = emptyMass + payloadMass + fuelMass + reserveMass + divertMass + holdMass - + divertMass = nautmiles_to_meters(100.0) / tas * fuelflow + holdMass = 45 * 60 * tas # 30 min; using cruise ff here + + self.starting_mass = ( + emptyMass + payloadMass + fuelMass + reserveMass + divertMass + holdMass + ) + # Limit to MTOM if overweight if self.starting_mass > self.ac_performance.performance_table_cols[-1][-1]: - self.starting_mass = self.ac_performance.performance_table_cols[-1][-1] - + self.starting_mass = self.ac_performance.performance_table_cols[-1][-1] + # Set fuel mass (for weight residual calculation) self.fuel_mass = fuelMass - - + ################### # PRIVATE METHODS # ################### def __calc_crz_FLs(self): - ''' Get the bounding cruise flight levels (for which data exists) ''' + '''Get the bounding cruise flight levels (for which data exists)''' # Get the two flight levels in data closest to the cruise FL self.crz_FL_inds = self.__search_flight_levels_ind(self.crz_FL) - self.crz_FLs = np.array(self.ac_performance.performance_table_cols[0])[self.crz_FL_inds] - - + self.crz_FLs = np.array(self.ac_performance.performance_table_cols[0])[ + self.crz_FL_inds + ] + def __search_flight_levels_ind(self, FL): - ''' Searches the valid flight levels in the performance model for the indices bounding a - known FL value. + '''Searches the valid flight levels in the performance model for + the indices bounding a known FL value. ''' FL_ind_high = np.searchsorted(self.ac_performance.performance_table_cols[0], FL) - + if FL_ind_high == 0: - raise ValueError(f'Aircraft is trying to fly below minimum cruise altitude (FL {FL:.2f})') + raise ValueError( + f"Aircraft is trying to fly below minimum cruise altitude(FL {FL:.2f})" + ) if FL_ind_high == len(self.ac_performance.performance_table_cols[0]): - raise ValueError(f'Aircraft is trying to fly above maximum cruise altitude (FL {FL:.2f})') - + raise ValueError( + f"Aircraft is trying to fly above maximum cruise altitude(FL {FL:.2f})" + ) + FL_ind_low = FL_ind_high - 1 FLs = [FL_ind_low, FL_ind_high] return FLs - - + def __search_TAS_ind(self, tas): - ''' Searches the valid tas in the performance model for the indices bounding a + '''Searches the valid tas in the performance model for the indices bounding a known tas value. ''' - tas_ind_high = np.searchsorted(self.ac_performance.performance_table_cols[1], tas) - + tas_ind_high = np.searchsorted( + self.ac_performance.performance_table_cols[1], tas + ) + if tas_ind_high == 0: - raise ValueError(f'Aircraft is trying to fly below minimum cruise altitude (FL {tas:.2f})') + raise ValueError( + f"Aircraft is trying to fly below minimum cruise altitude(FL {tas:.2f})" + ) if tas_ind_high == len(self.ac_performance.performance_table_cols[1]): - raise ValueError(f'Aircraft is trying to fly above maximum cruise altitude (FL {tas:.2f})') - + raise ValueError( + f"Aircraft is trying to fly above maximum cruise altitude(FL {tas:.2f})" + ) + tas_ind_low = tas_ind_high - 1 tass = [tas_ind_low, tas_ind_high] return tass - - + def __search_mass_ind(self, mass): - ''' Searches the valid mass values in the performance model for the indices bounding a - known mass value. + '''Searches the valid mass values in the performance model + for the indices bounding a known mass value. ''' - mass_ind_high = np.searchsorted(self.ac_performance.performance_table_cols[-1], mass) - + mass_ind_high = np.searchsorted( + self.ac_performance.performance_table_cols[-1], mass + ) + if mass_ind_high == 0: raise ValueError('Aircraft is trying to fly below minimum mass') if mass_ind_high == len(self.ac_performance.performance_table_cols[0]): raise ValueError('Aircraft is trying to fly above maximum mass') - + mass_ind_low = mass_ind_high - 1 masses = [mass_ind_low, mass_ind_high] return masses - - + def __get_zero_roc_index(self, roc_zero_tol=1e-6): - ''' Get the index along the ROC axis of performance where ROC == 0 ''' - self.zero_roc_mask = np.abs(np.array(self.ac_performance.performance_table_cols[2])) < roc_zero_tol - - + '''Get the index along the ROC axis of performance where ROC == 0''' + self.zero_roc_mask = ( + np.abs(np.array(self.ac_performance.performance_table_cols[2])) + < roc_zero_tol + ) + def __calc_FL_interp_vals(self, i, alt, roc_perf): - ''' Computes the state values that depend only on flight level. These include + '''Computes the state values that depend only on flight level. These include true airspeed (TAS) and fuel flow (ff). Rate of climb (roc) is also only dependent on FL in descent. ''' @@ -278,366 +347,428 @@ def __calc_FL_interp_vals(self, i, alt, roc_perf): self.traj_data['FLs'][i] = FL FL_inds = self.__search_flight_levels_ind(FL) bounding_fls = np.array(self.ac_performance.performance_table_cols[0])[FL_inds] - + # Construct interpolation weightings fl_weighting = (FL - bounding_fls[0]) / (bounding_fls[1] - bounding_fls[0]) self.traj_data['FL_weight'][i] = fl_weighting - + # Filter to bounding flight levels pos_roc_fl_reduced_perf = roc_perf[ - np.ix_( FL_inds, # axis 0: flight levels - np.arange(roc_perf.shape[1]), # axis 1: all TAS's - np.arange(roc_perf.shape[2]), # axis 2: all positive ROC - np.arange(roc_perf.shape[3]), # axis 3: mass value + np.ix_( + FL_inds, # axis 0: flight levels + np.arange(roc_perf.shape[1]), # axis 1: all TAS's + np.arange(roc_perf.shape[2]), # axis 2: all positive ROC + np.arange(roc_perf.shape[3]), # axis 3: mass value ) ] - - # The the collapsed indices and values of all non-0 fuel flow and TAS values in the filtered performance data + + # The the collapsed indices and values of all non-0 fuel flow + # and TAS values in the filtered performance data non_zero_ff_inds = np.nonzero(pos_roc_fl_reduced_perf) non_zero_ff_vals = pos_roc_fl_reduced_perf[non_zero_ff_inds] - + non_zero_tas_inds = non_zero_ff_inds[1] - non_zero_tas_vals = np.array(self.ac_performance.performance_table_cols[1])[non_zero_tas_inds] - + non_zero_tas_vals = np.array(self.ac_performance.performance_table_cols[1])[ + non_zero_tas_inds + ] + # ROC will only be valid in descent non_zero_roc_inds = non_zero_ff_inds[2] - non_zero_roc_vals = np.array(self.ac_performance.performance_table_cols[2])[non_zero_roc_inds] - - # Remove duplicate entries; we should have 2 entries in each corresponding to the two bounding flight levels + non_zero_roc_vals = np.array(self.ac_performance.performance_table_cols[2])[ + non_zero_roc_inds + ] + + # Remove duplicate entries; we should have 2 entries in each + # corresponding to the two bounding flight levels tas_vals = filter_order_duplicates(non_zero_tas_vals) ff_vals = filter_order_duplicates(non_zero_ff_vals) - + roc_vals = filter_order_duplicates(non_zero_roc_vals) - + # Interpolate to get TAS and fuel flow if len(tas_vals) == 1: tas_interp = tas_vals[0] else: tas_interp = tas_vals[0] + fl_weighting * (tas_vals[1] - tas_vals[0]) - + if len(ff_vals) == 1: ff_interp = ff_vals[0] else: - ff_interp = ff_vals[0] + fl_weighting * (ff_vals[1] - ff_vals[0]) - + ff_interp = ff_vals[0] + fl_weighting * (ff_vals[1] - ff_vals[0]) + if len(roc_vals) == 1: roc_interp = roc_vals[0] else: - roc_interp = roc_vals[0] + fl_weighting * (roc_vals[1] - roc_vals[0]) - + roc_interp = roc_vals[0] + fl_weighting * (roc_vals[1] - roc_vals[0]) + return tas_interp, ff_interp, roc_interp - - + def __calc_tas_crz(self, i, alt, roc_perf): - ''' Computes the TAS that depend only on flight level for cruise ''' + '''Computes the TAS that depend only on flight level for cruise''' FL = meters_to_feet(alt) / 100 self.traj_data['FLs'][i] = FL - + # Construct interpolation weightings fl_weighting = (FL - self.crz_FLs[0]) / (self.crz_FLs[1] - self.crz_FLs[0]) - - # The the collapsed indices and values of all non-0 fuel flow and TAS values in the filtered performance data + + # The the collapsed indices and values of all non-0 fuel flow + # and TAS values in the filtered performance data non_zero_ff_inds = np.nonzero(roc_perf) non_zero_tas_inds = non_zero_ff_inds[1] - non_zero_tas_vals = np.array(self.ac_performance.performance_table_cols[1])[non_zero_tas_inds] - - # Remove duplicate entries; we should have 2 entries in each corresponding to the two bounding flight levels + non_zero_tas_vals = np.array(self.ac_performance.performance_table_cols[1])[ + non_zero_tas_inds + ] + + # Remove duplicate entries; we should have 2 entries in each + # corresponding to the two bounding flight levels tas_vals = filter_order_duplicates(non_zero_tas_vals) - + # Interpolate to get TAS if len(tas_vals) == 1: tas_interp = tas_vals[0] else: tas_interp = tas_vals[0] + fl_weighting * (tas_vals[1] - tas_vals[0]) - + return tas_interp, fl_weighting - - + def __calc_roc_climb(self, i, FL, seg_start_mass, roc_perf, rocs): - ''' Calculates rate of climb (roc) given flight level and mass given a - subset of overall performance data (limited to roc > 0 or roc < 0 in + '''Calculates rate of climb (roc) given flight level and mass given a + subset of overall performance data (limited to roc > 0 or roc < 0 in `roc_perf`) ''' # Get bounding flight levels FL_inds = self.__search_flight_levels_ind(FL) bounding_fls = np.array(self.ac_performance.performance_table_cols[0])[FL_inds] - + # Get bounding mass values mass_inds = self.__search_mass_ind(seg_start_mass) - bounding_mass = np.array(self.ac_performance.performance_table_cols[3])[mass_inds] - + bounding_mass = np.array(self.ac_performance.performance_table_cols[3])[ + mass_inds + ] + # Filter to bounding values pos_roc_reduced_perf = roc_perf[ - np.ix_( FL_inds, # axis 0: flight levels - np.arange(roc_perf.shape[1]), # axis 1: all TAS's - np.arange(roc_perf.shape[2]), # axis 2: all zero ROC - mass_inds, # axis 3: mass value + np.ix_( + FL_inds, # axis 0: flight levels + np.arange(roc_perf.shape[1]), # axis 1: all TAS's + np.arange(roc_perf.shape[2]), # axis 2: all zero ROC + mass_inds, # axis 3: mass value ) ] - + non_zero_ff_inds = np.nonzero(pos_roc_reduced_perf) rocs = rocs[non_zero_ff_inds[2]] fls = bounding_fls[non_zero_ff_inds[0]] masses = bounding_mass[non_zero_ff_inds[3]] - + # Prepare ROC matrix roc_mat = np.full((len(bounding_fls), len(bounding_mass)), np.nan) - + # Fill ROC matrix for kk in range(len(rocs)): ii = np.where(bounding_fls == fls[kk])[0][0] jj = np.where(bounding_mass == masses[kk])[0][0] roc_mat[ii, jj] = rocs[kk] - + fl_weight = self.traj_data['FL_weight'][i] - mass_weight = (seg_start_mass - bounding_mass[0]) / (bounding_mass[1] - bounding_mass[0]) - - roc_1 = roc_mat[0, 0] + (roc_mat[1,0] - roc_mat[0,0]) * fl_weight - roc_2 = roc_mat[0, 1] + (roc_mat[1,1] - roc_mat[0,1]) * fl_weight - + mass_weight = (seg_start_mass - bounding_mass[0]) / ( + bounding_mass[1] - bounding_mass[0] + ) + + roc_1 = roc_mat[0, 0] + (roc_mat[1, 0] - roc_mat[0, 0]) * fl_weight + roc_2 = roc_mat[0, 1] + (roc_mat[1, 1] - roc_mat[0, 1]) * fl_weight + roc = roc_1 + (roc_2 - roc_1) * mass_weight return roc - - + def __calc_ff_cruise(self, i, seg_start_mass, crz_perf): - ''' Calculates rate of climb (roc) given flight level and mass given a - subset of overall performance data (limited to roc > 0 or roc < 0 in + '''Calculates rate of climb (roc) given flight level and mass given a + subset of overall performance data (limited to roc > 0 or roc < 0 in `roc_perf`) - ''' + ''' # Get bounding mass values mass_inds = self.__search_mass_ind(seg_start_mass) - bounding_mass = np.array(self.ac_performance.performance_table_cols[3])[mass_inds] - + bounding_mass = np.array(self.ac_performance.performance_table_cols[3])[ + mass_inds + ] + # Filter to bounding values crz_perf_reduced = crz_perf[ - np.ix_( np.arange(crz_perf.shape[0]), # axis 0: flight levels - np.arange(crz_perf.shape[1]), # axis 1: all TAS's - np.arange(crz_perf.shape[2]), # axis 2: all zero ROC - mass_inds, # axis 3: mass value + np.ix_( + np.arange(crz_perf.shape[0]), # axis 0: flight levels + np.arange(crz_perf.shape[1]), # axis 1: all TAS's + np.arange(crz_perf.shape[2]), # axis 2: all zero ROC + mass_inds, # axis 3: mass value ) ] - + non_zero_ff_inds = np.nonzero(crz_perf_reduced) ffs = crz_perf_reduced[non_zero_ff_inds] fls = self.crz_FLs[non_zero_ff_inds[0]] masses = bounding_mass[non_zero_ff_inds[3]] - + # Prepare FF matrix ff_mat = np.full((len(self.crz_FLs), len(bounding_mass)), np.nan) - + # Fill FF matrix for kk in range(len(ffs)): ii = np.where(self.crz_FLs == fls[kk])[0][0] jj = np.where(bounding_mass == masses[kk])[0][0] ff_mat[ii, jj] = ffs[kk] - + fl_weight = self.traj_data['FL_weight'][i] - mass_weight = (seg_start_mass - bounding_mass[0]) / (bounding_mass[1] - bounding_mass[0]) - - ff_1 = ff_mat[0, 0] + (ff_mat[1,0] - ff_mat[0,0]) * fl_weight - ff_2 = ff_mat[0, 1] + (ff_mat[1,1] - ff_mat[0,1]) * fl_weight - + mass_weight = (seg_start_mass - bounding_mass[0]) / ( + bounding_mass[1] - bounding_mass[0] + ) + + ff_1 = ff_mat[0, 0] + (ff_mat[1, 0] - ff_mat[0, 0]) * fl_weight + ff_2 = ff_mat[0, 1] + (ff_mat[1, 1] - ff_mat[0, 1]) * fl_weight + ff = ff_1 + (ff_2 - ff_1) * mass_weight return ff - - + def __legacy_climb(self): - ''' Computes state over the climb segment using AEIC v2 methods based on BADA-3 formulas ''' + '''Computes state over the climb segment using AEIC v2 methods + based on BADA-3 formulas''' # Create a mask for ROC limiting to only positive values (climb) pos_roc_mask = np.array(self.ac_performance.performance_table_cols[2]) > 0 - + # Convert ROC mask to the indices of positive ROC roc_inds = np.where(pos_roc_mask)[0] pos_rocs = np.array(self.ac_performance.performance_table_cols[2])[roc_inds] - + # Filter performance data to positive ROC pos_roc_perf = self.ac_performance.performance_table[ - np.ix_( np.arange(self.ac_performance.performance_table.shape[0]), # axis 0: flight levels - np.arange(self.ac_performance.performance_table.shape[1]), # axis 1: all TAS's - np.where(pos_roc_mask)[0], # axis 2: all positive ROC - np.arange(self.ac_performance.performance_table.shape[3]), # axis 3: mass value + np.ix_( + np.arange(self.ac_performance.performance_table.shape[0]), + # axis 0: flight levels + np.arange(self.ac_performance.performance_table.shape[1]), + # axis 1: all TAS's + np.where(pos_roc_mask)[0], + # axis 2: all positive ROC + np.arange(self.ac_performance.performance_table.shape[3]), + # axis 3: mass value ) ] - - # We first compute the instantaneous data at each flight level to avoid repeat calculations. + + # We first compute the instantaneous data at each flight level + # to avoid repeat calculations. # In AEIC v2 fuel flow and TAS are only dependent on flight level in climb. for i in range(0, self.NClm): alt = self.traj_data['altitude'][i] tas_interp, ff_interp, _ = self.__calc_FL_interp_vals(i, alt, pos_roc_perf) - + self.traj_data['fuelFlow'][i] = ff_interp self.traj_data['tas'][i] = tas_interp - + # Now we get rate of climb by running the flight - for i in range(0, self.NClm-1): + for i in range(0, self.NClm - 1): FL = self.traj_data['FLs'][i] tas = self.traj_data['tas'][i] ff = self.traj_data['fuelFlow'][i] seg_start_mass = self.traj_data['acMass'][i] - + # Calculate rate of climb roc = self.__calc_roc_climb(i, FL, seg_start_mass, pos_roc_perf, pos_rocs) self.traj_data['rocs'][i] = roc - + # Calculate the forward true airspeed (will be used for ground speed) fwd_tas = np.sqrt(tas**2 - roc**2) - + # Get time to complete alititude change segment and total fuel burned - segment_time = (self.traj_data['altitude'][i+1] - self.traj_data['altitude'][i]) / roc + segment_time = ( + self.traj_data['altitude'][i + 1] - self.traj_data['altitude'][i] + ) / roc segment_fuel = ff * segment_time - - self.traj_data['groundSpeed'][i], self.traj_data['heading'][i], u, v = compute_ground_speed( - lon=self.traj_data['latitude'][i], - lat=self.traj_data['latitude'][i], - lon_next=self.traj_data['latitude'][i+1], - lat_next=self.traj_data['latitude'][i+1], - alt_ft=FL*100, - tas_kts=fwd_tas + + self.traj_data['groundSpeed'][i], self.traj_data['heading'][i], u, v = ( + compute_ground_speed( + lon=self.traj_data['latitude'][i], + lat=self.traj_data['latitude'][i], + lon_next=self.traj_data['latitude'][i + 1], + lat_next=self.traj_data['latitude'][i + 1], + alt_ft=FL * 100, + tas_kts=fwd_tas, + ) ) - # Calculate distance along route travelled dist = self.traj_data['groundSpeed'][i] * segment_time - - # Account for acceleration/deceleration over the segment using end-of-segment tas - tas_end = self.traj_data['tas'][i+1] - kinetic_energy_chg = 1/2 * seg_start_mass * (tas_end**2 - tas**2) - + + # Account for acceleration/deceleration over + # the segment using end-of-segment tas + tas_end = self.traj_data['tas'][i + 1] + kinetic_energy_chg = 1 / 2 * seg_start_mass * (tas_end**2 - tas**2) + # Calculate fuel required for acceleration # NOTE: I have no idea where AEIC v2 got the efficiency of 0.15 from accel_fuel = kinetic_energy_chg / (self.fuel_LHV) / 0.15 - + segment_fuel += accel_fuel - + # Update the state vector - self.traj_data['fuelMass'][i+1] = self.traj_data['fuelMass'][i] - segment_fuel - self.traj_data['acMass'][i+1] = self.traj_data['acMass'][i] - segment_fuel - self.traj_data['groundDist'][i+1] = self.traj_data['groundDist'][i] + dist - self.traj_data['flightTime'][i+1] = self.traj_data['flightTime'][i] + segment_time - - + self.traj_data['fuelMass'][i + 1] = ( + self.traj_data['fuelMass'][i] - segment_fuel + ) + self.traj_data['acMass'][i + 1] = self.traj_data['acMass'][i] - segment_fuel + self.traj_data['groundDist'][i + 1] = self.traj_data['groundDist'][i] + dist + self.traj_data['flightTime'][i + 1] = ( + self.traj_data['flightTime'][i] + segment_time + ) + def __legacy_cruise(self, dGD): - ''' Computes state over cruise segment using AEIC v2 methods based on BADA-3 formulas ''' + '''Computes state over cruise segment using AEIC v2 methods + based on BADA-3 formulas''' subset_performance = self.ac_performance.performance_table[ - np.ix_(self.crz_FL_inds, # axis 0: flight levels - np.arange(self.ac_performance.performance_table.shape[1]), # axis 1: all TAS's - np.where(self.zero_roc_mask)[0], # axis 2: ROC ≈ 0 - np.arange(self.ac_performance.performance_table.shape[3]), # axis 3: all mass values - ) + np.ix_( + self.crz_FL_inds, + # axis 0: flight levels + np.arange(self.ac_performance.performance_table.shape[1]), + # axis 1: all TAS's + np.where(self.zero_roc_mask)[0], + # axis 2: ROC ≈ 0 + np.arange(self.ac_performance.performance_table.shape[3]), + # axis 3: all mass values + ) ] - + # TAS in cruise is only dependent on flight level - tas_interp, fl_weight = self.__calc_tas_crz(self.NClm, self.crz_start_altitude, subset_performance) - self.traj_data['tas'][self.NClm:self.NClm+self.NCrz] = tas_interp - self.traj_data['rocs'][self.NClm:self.NClm+self.NCrz] = 0 - self.traj_data['FL_weight'][self.NClm:self.NClm+self.NCrz] = fl_weight - + tas_interp, fl_weight = self.__calc_tas_crz( + self.NClm, self.crz_start_altitude, subset_performance + ) + self.traj_data['tas'][self.NClm : self.NClm + self.NCrz] = tas_interp + self.traj_data['rocs'][self.NClm : self.NClm + self.NCrz] = 0 + self.traj_data['FL_weight'][self.NClm : self.NClm + self.NCrz] = fl_weight + # Get fuel flow, ground speed, etc. for cruise segments - for i in range(self.NClm, self.NClm+self.NCrz-1): - - self.traj_data['groundSpeed'][i], self.traj_data['heading'][i], _, _ = compute_ground_speed( - lon=self.traj_data['latitude'][i], - lat=self.traj_data['latitude'][i], - lon_next=self.traj_data['latitude'][i+1], - lat_next=self.traj_data['latitude'][i+1], - alt_ft=self.crz_FL*100, - tas_kts=self.traj_data['tas'][i] + for i in range(self.NClm, self.NClm + self.NCrz - 1): + self.traj_data['groundSpeed'][i], self.traj_data['heading'][i], _, _ = ( + compute_ground_speed( + lon=self.traj_data['latitude'][i], + lat=self.traj_data['latitude'][i], + lon_next=self.traj_data['latitude'][i + 1], + lat_next=self.traj_data['latitude'][i + 1], + alt_ft=self.crz_FL * 100, + tas_kts=self.traj_data['tas'][i], + ) ) # Calculate time required to fly the segment segment_time = dGD / self.traj_data['groundSpeed'][i] - + # Get fuel flow rate based on FL and mass interpolation - ff = self.__calc_ff_cruise(i, self.traj_data['acMass'][i], subset_performance) - + ff = self.__calc_ff_cruise( + i, self.traj_data['acMass'][i], subset_performance + ) + # Calculate fuel burn in [kg] over the segment segment_fuel = ff * segment_time - + # Set aircraft state values - self.traj_data['fuelFlow'][i+1] = ff - self.traj_data['fuelMass'][i+1] = self.traj_data['fuelMass'][i] - segment_fuel - self.traj_data['acMass'][i+1] = self.traj_data['acMass'][i] - segment_fuel - - self.traj_data['flightTime'][i+1] = self.traj_data['flightTime'][i] + segment_time - - + self.traj_data['fuelFlow'][i + 1] = ff + self.traj_data['fuelMass'][i + 1] = ( + self.traj_data['fuelMass'][i] - segment_fuel + ) + self.traj_data['acMass'][i + 1] = self.traj_data['acMass'][i] - segment_fuel + + self.traj_data['flightTime'][i + 1] = ( + self.traj_data['flightTime'][i] + segment_time + ) + def __legacy_descent(self): - ''' Computes state over the descent segment using AEIC v2 methods based on BADA-3 formulas ''' + '''Computes state over the descent segment using AEIC v2 + methods based on BADA-3 formulas''' startN = self.NClm + self.NCrz endN = startN + self.NDes - + # Create a mask for ROC limiting to only positive values (climb) neg_roc_mask = np.array(self.ac_performance.performance_table_cols[2]) < 0 - + # Convert ROC mask to the indices of positive ROC - roc_inds = np.where(neg_roc_mask)[0] - neg_rocs = np.array(self.ac_performance.performance_table_cols[2])[roc_inds] - + # roc_inds = np.where(neg_roc_mask)[0] + # neg_rocs = np.array(self.ac_performance.performance_table_cols[2])[roc_inds] + # Filter performance data to positive ROC neg_roc_perf = self.ac_performance.performance_table[ - np.ix_( np.arange(self.ac_performance.performance_table.shape[0]), # axis 0: flight levels - np.arange(self.ac_performance.performance_table.shape[1]), # axis 1: all TAS's - np.where(neg_roc_mask)[0], # axis 2: all positive ROC - np.arange(self.ac_performance.performance_table.shape[3]), # axis 3: mass value + np.ix_( + np.arange(self.ac_performance.performance_table.shape[0]), + # axis 0: flight levels + np.arange(self.ac_performance.performance_table.shape[1]), + # axis 1: all TAS's + np.where(neg_roc_mask)[0], + # axis 2: all positive ROC + np.arange(self.ac_performance.performance_table.shape[3]), + # axis 3: mass value ) ] - - # We first compute the instantaneous data at each flight level to avoid repeat calculations. + + # We first compute the instantaneous data at each flight level + # to avoid repeat calculations. # In AEIC v2 fuel flow and TAS are only dependent on flight level. for i in range(startN, endN): alt = self.traj_data['altitude'][i] - tas_interp, ff_interp, roc_interp = self.__calc_FL_interp_vals(i, alt, neg_roc_perf) - + tas_interp, ff_interp, roc_interp = self.__calc_FL_interp_vals( + i, alt, neg_roc_perf + ) + self.traj_data['fuelFlow'][i] = ff_interp self.traj_data['tas'][i] = tas_interp self.traj_data['rocs'][i] = roc_interp - + # Now we calculate segment level info by running the flight - for i in range(startN, endN-1): + for i in range(startN, endN - 1): tas = self.traj_data['tas'][i] ff = self.traj_data['fuelFlow'][i] roc = self.traj_data['rocs'][i] seg_start_mass = self.traj_data['acMass'][i] - + # Calculate the forward true airspeed (will be used for ground speed) fwd_tas = np.sqrt(tas**2 - roc**2) - + # Get time to complete alititude change segment and total fuel burned - segment_time = (self.traj_data['altitude'][i+1] - self.traj_data['altitude'][i]) / roc + segment_time = ( + self.traj_data['altitude'][i + 1] - self.traj_data['altitude'][i] + ) / roc segment_fuel = ff * segment_time - - self.traj_data['groundSpeed'][i], self.traj_data['heading'][i], _, _ = compute_ground_speed( - lon=self.traj_data['latitude'][i], - lat=self.traj_data['latitude'][i], - lon_next=self.traj_data['latitude'][i+1], - lat_next=self.traj_data['latitude'][i+1], - alt_ft=meters_to_feet(self.traj_data['altitude'][i]), - tas_kts=fwd_tas + + self.traj_data['groundSpeed'][i], self.traj_data['heading'][i], _, _ = ( + compute_ground_speed( + lon=self.traj_data['latitude'][i], + lat=self.traj_data['latitude'][i], + lon_next=self.traj_data['latitude'][i + 1], + lat_next=self.traj_data['latitude'][i + 1], + alt_ft=meters_to_feet(self.traj_data['altitude'][i]), + tas_kts=fwd_tas, + ) ) - + # Calculate distance along route travelled dist = self.traj_data['groundSpeed'][i] * segment_time - - # Account for acceleration/deceleration over the segment using end-of-segment tas - tas_end = self.traj_data['tas'][i+1] - kinetic_energy_chg = 1/2 * seg_start_mass * (tas_end**2 - tas**2) - + + # Account for acceleration/deceleration over the segment + # using end-of-segment tas + tas_end = self.traj_data['tas'][i + 1] + kinetic_energy_chg = 1 / 2 * seg_start_mass * (tas_end**2 - tas**2) + # Calculate fuel required for acceleration # NOTE: I have no idea where AEIC v2 got the efficiency of 0.15 from accel_fuel = kinetic_energy_chg / (self.fuel_LHV) / 0.15 - + segment_fuel += accel_fuel - + # We cannot gain fuel by decelerating in a conventional fuel A/C if segment_fuel < 0: segment_fuel = 0 - + # Update the state vector - self.traj_data['fuelMass'][i+1] = self.traj_data['fuelMass'][i] - segment_fuel - self.traj_data['acMass'][i+1] = self.traj_data['acMass'][i] - segment_fuel - self.traj_data['groundDist'][i+1] = self.traj_data['groundDist'][i] + dist - self.traj_data['flightTime'][i+1] = self.traj_data['flightTime'][i] + segment_time - \ No newline at end of file + self.traj_data['fuelMass'][i + 1] = ( + self.traj_data['fuelMass'][i] - segment_fuel + ) + self.traj_data['acMass'][i + 1] = self.traj_data['acMass'][i] - segment_fuel + self.traj_data['groundDist'][i + 1] = self.traj_data['groundDist'][i] + dist + self.traj_data['flightTime'][i + 1] = ( + self.traj_data['flightTime'][i] + segment_time + ) diff --git a/src/AEIC/trajectories/tasopt_trajectory.py b/src/AEIC/trajectories/tasopt_trajectory.py index 514c18c3..24a07c5d 100644 --- a/src/AEIC/trajectories/tasopt_trajectory.py +++ b/src/AEIC/trajectories/tasopt_trajectory.py @@ -1,22 +1,20 @@ -import numpy as np from AEIC.performance_model import PerformanceModel from AEIC.trajectories.trajectory import Trajectory + class TASOPTTrajectory(Trajectory): '''Model for determining flight trajectories using the legacy method from AEIC v2. ''' - def __init__(self, ac_performance:PerformanceModel): + + def __init__(self, ac_performance: PerformanceModel): super().__init__(ac_performance) - - + def climb(self): pass - - + def cruise(self): pass - - + def descent(self): - pass \ No newline at end of file + pass diff --git a/src/AEIC/trajectories/trajectory.py b/src/AEIC/trajectories/trajectory.py index 9c2e9cf2..dae7883e 100644 --- a/src/AEIC/trajectories/trajectory.py +++ b/src/AEIC/trajectories/trajectory.py @@ -1,167 +1,179 @@ import numpy as np -from AEIC.performance_model import PerformanceModel -from utils.helpers import nautmiles_to_meters from pyproj import Geod + +from AEIC.performance_model import PerformanceModel + + class Trajectory: - '''Model for determining flight trajectories. - ''' - - def __init__(self, ac_performance:PerformanceModel, mission, optimize_traj:bool, iterate_mass:bool, startMass:float=-1, - max_mass_iters=5, mass_iter_reltol=1e-2): + '''Model for determining flight trajectories.''' + + def __init__( + self, + ac_performance: PerformanceModel, + mission, + optimize_traj: bool, + iterate_mass: bool, + startMass: float = -1, + max_mass_iters=5, + mass_iter_reltol=1e-2, + ): # Save A/C performance model and the mission to be flown - # NOTE: Currently assume that `mission` comes in as a dictionary with the format of a single flight - # in `src/missions/sample_missions_10.json`. We also assume that Load Factor for the flight will be + # NOTE: Currently assume that `mission` comes in as a dictionary with + # the format of a single flight + # in `src/missions/sample_missions_10.json`. We also assume that + # Load Factor for the flight will be # included in the mission object. - self.name = f'{mission["dep_airport"]}_{mission["arr_airport"]}_{mission["ac_code"]}' + self.name = ( + f'{mission["dep_airport"]}_{mission["arr_airport"]}_{mission["ac_code"]}' + ) self.ac_performance = ac_performance - + # Save airport locations and dep/arr times; lat/long in degrees self.dep_lon_lat_alt = mission['dep_location'] self.arr_lon_lat_alt = mission['arr_location'] - + self.start_time = mission["dep_datetime"] self.end_time = mission["arr_datetime"] - + # Convert gc distance to meters - self.gc_distance = mission["distance_nm"] # FIXME: check to make sure this is changed to meters - + self.gc_distance = mission["distance_nm"] + # FIXME: check to make sure this is changed to meters + # Get load factor from mission object self.load_factor = mission["load_factor"] - + # Controls whether or not route optimization is performed # NOTE: This currently does nothing self.optimize_traj = optimize_traj - + # Control whether or not starting mass is iterated on self.iter_mass = iterate_mass self.max_mass_iters = max_mass_iters self.mass_iter_reltol = mass_iter_reltol self.mass_converged = None - + # Allow user to specify starting mass if desired self.starting_mass = startMass - - # Initialize a non-reserve, non-divert/hold fuel mass for mass residual calculation + + # Initialize a non-reserve, non-divert/hold fuel mass for mass residual + # calculation self.fuel_mass = None - - # Initialize values for number of simulated points per segment (to be defined in child classes) + + # Initialize values for number of simulated points per segment + # (to be defined in child classes) self.NClm = None self.NCrz = None self.NDes = None self.Ntot = None - + # Initialize important altitudes (clm = climb, crz = cruise, des = descent) self.clm_start_altitude = None self.crz_start_altitude = None self.des_start_altitude = None - self.des_end_altitude = None - - + self.des_end_altitude = None + def fly_flight(self, **kwargs): # Initialize data array as numpy structured array traj_dtype = [ - ('fuelFlow', np.float64, self.Ntot), - ('acMass', np.float64, self.Ntot), - ('fuelMass', np.float64, self.Ntot), + ('fuelFlow', np.float64, self.Ntot), + ('acMass', np.float64, self.Ntot), + ('fuelMass', np.float64, self.Ntot), ('groundDist', np.float64, self.Ntot), - ('altitude', np.float64, self.Ntot), - ('FLs', np.float64, self.Ntot), - ('rocs', np.float64, self.Ntot), + ('altitude', np.float64, self.Ntot), + ('FLs', np.float64, self.Ntot), + ('rocs', np.float64, self.Ntot), ('flightTime', np.float64, self.Ntot), - ('latitude', np.float64, self.Ntot), - ('longitude', np.float64, self.Ntot), - ('heading', np.float64, self.Ntot), - ('tas', np.float64, self.Ntot), + ('latitude', np.float64, self.Ntot), + ('longitude', np.float64, self.Ntot), + ('heading', np.float64, self.Ntot), + ('tas', np.float64, self.Ntot), ('groundSpeed', np.float64, self.Ntot), - ('FL_weight', np.float64, self.Ntot), + ('FL_weight', np.float64, self.Ntot), ] self.traj_data = np.empty((), dtype=traj_dtype) if self.starting_mass < 0: self.calc_starting_mass(**kwargs) - + # Trajectory optimization if self.optimize_traj: # Will be implemented in a future version pass - + if self.iter_mass: self.mass_converged = False - + for m_iter in range(self.max_mass_iters): mass_res = self.fly_flight_iteration(**kwargs) - + # Keep the calculated trajectory if the mass is sufficiently small if abs(mass_res) < self.mass_iter_reltol: self.mass_converged = True break - + # Perform a `dumb` correction of the starting mass self.starting_mass = self.starting_mass - (mass_res * self.fuel_mass) - + if not self.mass_converged: - print(f'Mass iteration failed to converge; final residual {mass_res * 100}% > {self.mass_iter_reltol * 100}%') - + print( + "Mass iteration failed to converge; final residual" + f"{mass_res * 100}% > {self.mass_iter_reltol * 100}%" + ) + else: self.fly_flight_iteration(**kwargs) - - - + def fly_flight_iteration(self, **kwargs): - ''' Function for running a single flight iteration. In non-weight-iterating mode, - only runs once. `kwargs` used to pass in relevent optimization variables in - applicable cases. - + '''Function for running a single flight iteration. In non-weight-iterating mode + only runs once. `kwargs` used to pass in relevent optimization variables in + applicable cases. Returns: - - `mass_residual`: Difference in fuel burned and calculated required fuel mass + - `mass_residual`: Difference in fuel burned and calculated required + fuel mass ''' self.current_mass = self.starting_mass - + for field in self.traj_data.dtype.names: self.traj_data[field][:] = np.nan - + # Set initial values self.traj_data['flightTime'][0] = 0 self.traj_data['acMass'][0] = self.starting_mass self.traj_data['fuelMass'][0] = self.fuel_mass self.traj_data['groundDist'][0] = 0 self.traj_data['altitude'][0] = self.clm_start_altitude - + # Calculate lat, lon, heading of initial point - # Get great circle trajectory in lat,lon points + # Get great circle trajectory in lat,lon points geod = Geod(ellps="WGS84") lon_dep, lat_dep, _ = self.dep_lon_lat_alt lon_arr, lat_arr, _ = self.arr_lon_lat_alt lat_lon_trajectory = geod.npts(lon_dep, lat_dep, lon_arr, lat_arr, self.Ntot) - self.traj_data['latitude'] = np.array(lat_lon_trajectory)[:,1] - self.traj_data['longitude'] = np.array(lat_lon_trajectory)[:,0] - + self.traj_data['latitude'] = np.array(lat_lon_trajectory)[:, 1] + self.traj_data['longitude'] = np.array(lat_lon_trajectory)[:, 0] + # Fly the climb, cruise, descent segments in order self.climb(**kwargs) self.cruise(**kwargs) self.descent(**kwargs) - + # Calculate weight residual normalized by fuel_mass fuelBurned = self.starting_mass - self.traj_data['acMass'][-1] mass_residual = (self.fuel_mass - fuelBurned) / self.fuel_mass - + return mass_residual - - + ############################################################ # UNIVERSAL TRAJECTORY FUNCTIONS - TO BE DEFINED PER MODEL # ############################################################ def climb(self): pass - - + def cruise(self): pass - - + def descent(self): pass - - + def calc_starting_mass(self): - pass \ No newline at end of file + pass diff --git a/src/BADA/aircraft_parameters.py b/src/BADA/aircraft_parameters.py index e9782c2d..8e577d3b 100644 --- a/src/BADA/aircraft_parameters.py +++ b/src/BADA/aircraft_parameters.py @@ -4,12 +4,11 @@ - uses the BaseAircraftParameter class from fuel_burn_base.py """ -from .fuel_burn_base import BaseAircraftParameters - from dataclasses import dataclass - from typing import Optional +from .fuel_burn_base import BaseAircraftParameters + @dataclass class Bada3AircraftParameters(BaseAircraftParameters): @@ -52,7 +51,7 @@ class Bada3AircraftParameters(BaseAircraftParameters): cas_cruise_lo: Optional[float] = None cas_cruise_hi: Optional[float] = None cas_cruise_mach: Optional[float] = None - + def assign_parameters_fromdict(self, parameters: dict): """ Assigns the parameters from a dictionary. diff --git a/src/BADA/fuel_burn_base.py b/src/BADA/fuel_burn_base.py index e7e96ad3..43e56315 100644 --- a/src/BADA/fuel_burn_base.py +++ b/src/BADA/fuel_burn_base.py @@ -2,10 +2,10 @@ # Standard library imports from abc import ABC, abstractmethod -from typing import List, Optional, Tuple, Union from dataclasses import dataclass -from scipy.integrate import cumulative_trapezoid + import numpy as np +from scipy.integrate import cumulative_trapezoid @dataclass @@ -48,7 +48,8 @@ def update_mass_vector_backward( specific_ground_range_corrected = np.where( specific_ground_range < 1, np.inf, specific_ground_range ) - # backwards means last element stays and the rest get adjusted by addition instead of subtraction + # backwards means last element stays and the rest get adjusted by + # addition instead of subtraction cumulative_integral = cumulative_trapezoid( 1 / specific_ground_range_corrected[::-1], dx=segment_distance )[::-1] diff --git a/src/BADA/helper_functions.py b/src/BADA/helper_functions.py index e5d2ca53..f5317b94 100644 --- a/src/BADA/helper_functions.py +++ b/src/BADA/helper_functions.py @@ -1,11 +1,15 @@ import os +from typing import dict + from .aircraft_parameters import Bada3AircraftParameters from .model import Bada3FuelBurnModel -from typing import Dict -def read_synonym_file_to_dict(folder_path) -> Dict[str, str]: + +def read_synonym_file_to_dict(folder_path) -> dict[str, str]: """ - Returns a dictionary of synonyms, where the key is the aircraft type and the value is the corresponding aircraft type that has a direct OPF file available. + Returns a dictionary of synonyms, where the key is the aircraft type + and the value is the corresponding aircraft type that has a direct + OPF file available. Parameters: ----------- @@ -15,12 +19,14 @@ def read_synonym_file_to_dict(folder_path) -> Dict[str, str]: Returns: -------- dict - Dictionary of synonyms, where the key is the aircraft type and the value is the corresponding aircraft type that has a direct OPF file available. + Dictionary of synonyms, where the key is the aircraft type and + the value is the corresponding aircraft type that has a direct + OPF file available. """ file_path = f"{folder_path}/SYNONYM.NEW" - with open(file_path, "r", encoding="ISO-8859-1") as f: + with open(file_path, encoding="ISO-8859-1") as f: lines = f.readlines() # lines 17 to -2 define synonyms @@ -32,6 +38,7 @@ def read_synonym_file_to_dict(folder_path) -> Dict[str, str]: synonym_dict[line[2]] = line[-3].strip("_") return synonym_dict + def get_directly_available_aircraft_types(folder_path): """ Returns a list of all directly available aircraft types in the BADA3 database. @@ -70,7 +77,8 @@ def get_all_available_aircraft_types(folder_path): List of all available aircraft types in the BADA3 database. """ - # can either be directly available (the .opf file has the aircraft type in it) or available via synonym, synonyms are defined in SYNONYM.NEW + # can either be directly available (the .opf file has the aircraft type in it) + # or available via synonym, synonyms are defined in SYNONYM.NEW directly_available = get_directly_available_aircraft_types(folder_path) @@ -82,7 +90,7 @@ def get_all_available_aircraft_types(folder_path): def get_aircraft_params_for_all_aircraft_types( folder_path, -) -> Dict[str, Bada3AircraftParameters]: +) -> dict[str, Bada3AircraftParameters]: """ Returns a dictionary containing the Bada3AircraftParameters for all aircraft types. @@ -125,7 +133,7 @@ def get_aircraft_params_for_all_aircraft_types( def get_models_for_all_implemented_aircraft_types( folder_path, -) -> Dict[str, Bada3FuelBurnModel]: +) -> dict[str, Bada3FuelBurnModel]: """ Returns a dictionary containing the Bada3FuelBurnModel for all aircraft types. diff --git a/src/BADA/model.py b/src/BADA/model.py index 3418f0d2..91291927 100644 --- a/src/BADA/model.py +++ b/src/BADA/model.py @@ -1,19 +1,20 @@ -import numpy as np - # from scipy.integrate import cumulative_trapezoid from abc import ABC, abstractmethod -from utils.helpers import mps_to_knots, meters_to_feet -from .fuel_burn_base import BaseFuelBurnModel -from utils.custom_types import FloatOrNDArray -from .aircraft_parameters import Bada3AircraftParameters +import numpy as np + from utils.consts import g0 +from utils.custom_types import FloatOrNDArray +from utils.helpers import meters_to_feet, mps_to_knots from utils.standard_atmosphere import ( - temperature_at_altitude_isa_bada4, calculate_air_density, pressure_at_altitude_isa_bada4, + temperature_at_altitude_isa_bada4, ) +from .aircraft_parameters import Bada3AircraftParameters +from .fuel_burn_base import BaseFuelBurnModel + class Bada3EngineModel(ABC): def __init__(self, aircraft_parameters: Bada3AircraftParameters): @@ -70,7 +71,8 @@ def calculate_max_climb_thrust( return self.calculate_max_climb_thrust_isa(altitude, v_tas) * ( 1 - np.clip( - delta_temperature_eff * np.maximum(0, self.aircraft_parameters['c_tc5']), + delta_temperature_eff + * np.maximum(0, self.aircraft_parameters['c_tc5']), 0, 0.4, ) @@ -126,9 +128,9 @@ def calculate_descent_thrust_high( Union[float, NDArray] Descent thrust [N]. """ - return self.aircraft_parameters['c_tdes_high'] * self.calculate_max_climb_thrust( - altitude, v_tas, temperature - ) + return self.aircraft_parameters[ + 'c_tdes_high' + ] * self.calculate_max_climb_thrust(altitude, v_tas, temperature) def calculate_descent_thrust_low( self, @@ -210,7 +212,6 @@ def calculate_descent_thrust_land( class Bada3JetEngineModel(Bada3EngineModel): - def calculate_specific_fuel_consumption( self, v_tas: FloatOrNDArray ) -> FloatOrNDArray: @@ -301,7 +302,6 @@ def calculate_max_climb_thrust_isa( class Bada3TurbopropEngineModel(Bada3EngineModel): - def calculate_specific_fuel_consumption( self, v_tas: FloatOrNDArray ) -> FloatOrNDArray: @@ -392,7 +392,6 @@ def calculate_max_climb_thrust_isa( class Bada3PistonEngineModel(Bada3EngineModel): - def calculate_specific_fuel_consumption(self) -> None: """ Returns @@ -585,7 +584,8 @@ def calculate_thrust( in_cruise: FloatOrNDArray, ) -> FloatOrNDArray: """ - Calculate thrust from total energy but correct extremes using max and descent thrusts + Calculate thrust from total energy but correct extremes + using max and descent thrusts Parameters ---------- @@ -901,7 +901,8 @@ def iterate_flight_simulation_fuel_burn_dependent_initial_mass_rf_fraction( n_iter: int = 10, ) -> FloatOrNDArray: """ - Iterate flight simulation for fuel burn dependent initial mass where reserve fuel is defined by fraction of fuel burn + Iterate flight simulation for fuel burn dependent initial mass + where reserve fuel is defined by fraction of fuel burn Parameters ---------- @@ -1018,7 +1019,8 @@ def iterate_flight_simulation_fuel_burn_dependent_initial_mass_rf_value( n_iter: int = 10, ) -> FloatOrNDArray: """ - Iterate flight simulation for fuel burn dependent initial mass where reserve fuel is defined by value + Iterate flight simulation for fuel burn dependent initial mass + where reserve fuel is defined by value Parameters ---------- @@ -1110,7 +1112,6 @@ def iterate_flight_simulation_fuel_burn_dependent_initial_mass_rf_value( ) * 100 if final_mass_pct_change < 0.01: - return mass old_final_mass = mass[-1].copy() diff --git a/src/gridding/grid.py b/src/gridding/grid.py index 7da2111b..8ea33d1a 100644 --- a/src/gridding/grid.py +++ b/src/gridding/grid.py @@ -1,11 +1,18 @@ +import warnings from dataclasses import dataclass -from numpy.typing import NDArray, ArrayLike -from typing import Optional, Dict, Any, Tuple +from typing import Optional, tuple + import numpy as np -from utils.helpers import great_circle_distance, meters_to_feet, feet_to_meters, unix_to_datetime_utc,calculate_line_parameters,crosses_dateline +from numpy.typing import NDArray from shapely.geometry import Polygon -import itertools -import warnings + +from utils.helpers import ( + calculate_line_parameters, + crosses_dateline, + great_circle_distance, + meters_to_feet, + unix_to_datetime_utc, +) @dataclass @@ -45,7 +52,7 @@ def n_cells(self) -> int: return n_cells @property - def shape(self) -> Tuple[Optional[int], ...]: + def shape(self) -> tuple[Optional[int], ...]: return (self.n_latitudes, self.n_longitudes, self.n_altitudes, self.n_times) @property @@ -120,14 +127,12 @@ class Gridder(GeospatialGrid): """Main methods""" def grid_polygon(self, polygon_lats, polygon_lons): - polygon_lons_extended = np.concatenate((polygon_lons, [polygon_lons[0]])) dateline_crossing = crosses_dateline( polygon_lons_extended[:-1], polygon_lons_extended[1:] ) if np.any(np.abs(dateline_crossing) > 0): - print("crosses dateline") print(50 * "-") @@ -150,7 +155,6 @@ def grid_polygon(self, polygon_lats, polygon_lons): return touched_cells_left | touched_cells_right else: - return self._polygon_touched_cells(polygon_lats, polygon_lons) def grid_trajectory( @@ -159,13 +163,14 @@ def grid_trajectory( lons: NDArray, altitudes: Optional[NDArray] = None, times: Optional[NDArray] = None, - state_variables: Tuple[NDArray, ...] = (), - integrated_variables: Tuple[NDArray, ...] = (), - ) -> Tuple[ - NDArray, NDArray, NDArray, NDArray, Tuple[NDArray, ...], Tuple[NDArray, ...] + state_variables: tuple[NDArray, ...] = (), + integrated_variables: tuple[NDArray, ...] = (), + ) -> tuple[ + NDArray, NDArray, NDArray, NDArray, tuple[NDArray, ...], tuple[NDArray, ...] ]: """ - this is a refactored version of the cells_touched_by_trajectory_with_state_and_integrated_variables method + this is a refactored version of the + cells_touched_by_trajectory_with_state_and_integrated_variables method """ dateline_crossing = crosses_dateline(lons[:-1], lons[1:]) @@ -192,10 +197,10 @@ def _grid_trajectory_without_dateline_crossing( lons: NDArray, altitudes: Optional[NDArray], times: Optional[NDArray], - state_variables: Tuple[NDArray, ...], - integrated_variables: Tuple[NDArray, ...], - ) -> Tuple[ - NDArray, NDArray, NDArray, NDArray, Tuple[NDArray, ...], Tuple[NDArray, ...] + state_variables: tuple[NDArray, ...], + integrated_variables: tuple[NDArray, ...], + ) -> tuple[ + NDArray, NDArray, NDArray, NDArray, tuple[NDArray, ...], tuple[NDArray, ...] ]: ( touched_cells_lat_indices, @@ -204,7 +209,7 @@ def _grid_trajectory_without_dateline_crossing( touched_cells_time_indices, state_variable_values, integrated_variable_values, - ) = self._cell_idxs_touched_by_trajectory_with_state_and_integrated_variables( + ) = self._cell_idxs_touched_by_trajectory_with_state_and_integrated_vars( lats, lons, altitudes, times, state_variables, integrated_variables ) @@ -235,14 +240,15 @@ def _grid_trajectory_with_dateline_crossing( lons: NDArray, altitudes: Optional[NDArray] = None, times: Optional[NDArray] = None, - state_variables: Tuple[NDArray, ...] = (), - integrated_variables: Tuple[NDArray, ...] = (), - ) -> Tuple[ - NDArray, NDArray, NDArray, NDArray, Tuple[NDArray, ...], Tuple[NDArray, ...] + state_variables: tuple[NDArray, ...] = (), + integrated_variables: tuple[NDArray, ...] = (), + ) -> tuple[ + NDArray, NDArray, NDArray, NDArray, tuple[NDArray, ...], tuple[NDArray, ...] ]: if np.count_nonzero(dateline_crossing) > 1: warnings.warn( - "Trajectory crosses the dateline more than once, this isn't implemented, returning None for all outputs" + "Trajectory crosses the dateline more than once, " + "this isn't implemented, returning None for all outputs" ) return ( np.array([]), @@ -524,7 +530,7 @@ def _cell_idxs_and_variables_for_dateline_split_trajectory( touched_cells_time_indices_first_part, subsegment_state_variable_values_first_part, subsegment_integrated_variable_values_first_part, - ) = self._cell_idxs_touched_by_trajectory_with_state_and_integrated_variables( + ) = self._cell_idxs_touched_by_trajectory_with_state_and_integrated_vars( lats_first_part, lons_first_part, altitudes_first_part, @@ -557,7 +563,7 @@ def _cell_idxs_and_variables_for_dateline_split_trajectory( touched_cells_time_indices_second_part, subsegment_state_variable_values_second_part, subsegment_integrated_variable_values_second_part, - ) = self._cell_idxs_touched_by_trajectory_with_state_and_integrated_variables( + ) = self._cell_idxs_touched_by_trajectory_with_state_and_integrated_vars( lats_second_part, lons_second_part, altitudes_second_part, @@ -628,9 +634,11 @@ def _cell_idxs_and_variables_for_dateline_split_trajectory( def _trajectory_intersection_points_and_cells_horizontal( self, lats: NDArray, lons: NDArray - ) -> Tuple[Tuple[NDArray, NDArray], Tuple[NDArray, NDArray]]: + ) -> tuple[tuple[NDArray, NDArray], tuple[NDArray, NDArray]]: """ - For each consecutive pair of points in the trajectory (a segment) returns the points where the segment intersects the grid and the indexes of the grid cells in which resulting subsegments fall + For each consecutive pair of points in the trajectory (a segment) + returns the points where the segment intersects the grid and the + indexes of the grid cells in which resulting subsegments fall Parameters ---------- @@ -643,16 +651,30 @@ def _trajectory_intersection_points_and_cells_horizontal( Returns ------- - Tuple[Tuple[NDArray, NDArray], Tuple[NDArray,NDArray]] - The latitudes and longitudes of the intersection points and the lat and lon indices of the grid cells in which the resulting subsegments fall. - - The shape of the latitudes and longitudes point arrays - the first output - is (#of trajectory points - 1, maximum segment index change + 2), - the first column are trajectory points and the last column are the following trajectory points (i.e. shifted by one). - The middle columns describe the points where the trajectory segments intersect the grid. if there are fewer intersections than the maximum index change (i.e. the maximum number of gridlines intersected by a single segment in the trajectory) then some columns will be nan. - The reason for the +2 in the number of columns is that the first and following trajectory points are also included in the output. - - The shape of the lat and lon indices arrays - the second output - is (#of trajectory points - 1, maximum segment index change + 1), - it contains the lat and lon indices of the grid cells that each segment of the trajectory intersects, including the ones that the segment starts and ends in. If the segment is fully inside a single gridcell, only the first column is not nan - if the segment crosses 1 gridline (index change = 1), it touches two cells. If it crosses 2 gridlines (index change = 2), it touches 3 cells, etc. Hence why the +1 in the number of columns. - if the segment has a smaller index change than the maximum index change, some columns will be nan. + tuple[tuple[NDArray, NDArray], tuple[NDArray,NDArray]] + The latitudes and longitudes of the intersection points and the lat and lon + indices of the grid cells in which the resulting subsegments fall. + - The shape of the latitudes and longitudes point arrays - the first output- + is (#of trajectory points - 1, maximum segment index change + 2), + the first column are trajectory points and the last column are the + following trajectory points (i.e. shifted by one). + The middle columns describe the points where the trajectory segments + intersect the grid. if there are fewer intersections than the maximum + index change (i.e. the maximum number of gridlines intersected by a single + segment in the trajectory) then some columns will be nan. + The reason for the +2 in the number of columns is that the first and + following trajectory points are also included in the output. + - The shape of the lat and lon indices arrays - the second output - is + (#of trajectory points - 1, maximum segment index change + 1), + it contains the lat and lon indices of the grid cells that each segment of + the trajectory intersects, including the ones that the segment starts and + ends in. If the segment is fully inside a single gridcell, only the first + column is not nan + if the segment crosses 1 gridline (index change = 1), it touches two cells. + If it crosses 2 gridlines (index change = 2), it touches 3 cells, etc. + Hence why the +1 in the number of columns. + if the segment has a smaller index change than the maximum index change, + some columns will be nan. """ # Number of segments is one less than the number of points @@ -690,7 +712,10 @@ def _trajectory_intersection_points_and_cells_horizontal( unique_lat_index_changes = np.unique(lat_index_changes) unique_lon_index_changes = np.unique(lon_index_changes) - # initialize empty arrays for lat and lon lines intersected by the trajectory segments, the lat array should have shape (n_segments, max_lat_index_change) and the lon array should have shape (n_segments, max_lon_index_change) + # initialize empty arrays for lat and lon lines intersected by the + # trajectory segments, the lat array should have shape (n_segments, + # max_lat_index_change) and the lon array should have + # shape (n_segments, max_lon_index_change) lat_lines_intersected = np.empty( (n_segments, max_lat_index_change), dtype=float ) @@ -710,10 +735,13 @@ def _trajectory_intersection_points_and_cells_horizontal( lats_for_lon_intersections[:] = np.nan # find the lat and lon lines intersected by the segment from the index ranges - # each row of lat index range contains the lat index of the cell in which the start and end point of the segment are located - # e.g. if the lat index range is [0,2] then the segment intersects the lat lines at index 1 and 2 + # each row of lat index range contains the lat index of the cell in which + # the start and end point of the segment are located + # e.g. if the lat index range is [0,2] then the segment intersects + # the lat lines at index 1 and 2 # the same applies to lon index range - # store the lat and lon lines intersected by the segment in the lat and lon lines intersected arrays + # store the lat and lon lines intersected by the segment in the + # lat and lon lines intersected arrays # do this in a vectorized way for _lat_index_change in unique_lat_index_changes: @@ -730,7 +758,10 @@ def _trajectory_intersection_points_and_cells_horizontal( else: _change_range += 1 - # now create _lat_indexes_intersected which has shape (sum(_mask, _lat_index_change) and contains the lat indexes of the lat lines intersected by the segment, keep in mind that _start_lat_index has shape (sum(_mask),) + # now create _lat_indexes_intersected which has shape + # (sum(_mask, _lat_index_change) and contains the lat + # indexes of the lat lines intersected by the segment, + # keep in mind that _start_lat_index has shape (sum(_mask),) _lat_indexes_intersected = ( _start_lat_index[:, np.newaxis] + _change_range[np.newaxis, :] ) @@ -741,7 +772,8 @@ def _trajectory_intersection_points_and_cells_horizontal( np.expand_dims(_slopes, axis=1), _lat_lines_intersected ) + np.expand_dims(_intercepts, axis=1) - # now store the lat lines intersected by the segment in the lat_lines_intersected array + # now store the lat lines intersected by the segment in the + # lat_lines_intersected array lat_lines_intersected[_mask, :_abs_lat_index_change] = ( _lat_lines_intersected ) @@ -766,7 +798,10 @@ def _trajectory_intersection_points_and_cells_horizontal( else: _change_range += 1 - # now create _lon_indexes_intersected which has shape (sum(_mask, _lon_index_change) and contains the lon indexes of the lon lines intersected by the segment, keep in mind that _start_lon_index has shape (sum(_mask),) + # now create _lon_indexes_intersected which has shape\ + # (sum(_mask, _lon_index_change) and contains the lon + # indexes of the lon lines intersected by the segment, + # keep in mind that _start_lon_index has shape (sum(_mask),) _lon_indexes_intersected = ( _start_lon_index[:, np.newaxis] + _change_range[np.newaxis, :] ) @@ -788,7 +823,8 @@ def _trajectory_intersection_points_and_cells_horizontal( _lats[_inf_mask], axis=1 ) - # now store the lon lines intersected by the segment in the lon_lines_intersected array + # now store the lon lines intersected by the segment in the + # lon_lines_intersected array lon_lines_intersected[_mask, :_abs_lon_index_change] = ( _lon_lines_intersected ) @@ -796,7 +832,8 @@ def _trajectory_intersection_points_and_cells_horizontal( _lats_for_lon_intersections ) - # now we have the lat and lon lines intersected by the trajectory segments, we combine with lats and lons for lon and lat intersections and sort + # now we have the lat and lon lines intersected by the trajectory segments, + # we combine with lats and lons for lon and lat intersections and sort intersection_point_lats = np.column_stack( (lat_lines_intersected, lats_for_lon_intersections) ) @@ -887,25 +924,28 @@ def _trajectory_segment_altitude_grid_indices(self, altitudes: NDArray) -> NDArr raise ValueError("No altitude grid") return (np.searchsorted(self.grid_altitudes, altitudes) - 1)[:-1] - def _cell_idxs_touched_by_trajectory_with_state_and_integrated_variables( + def _cell_idxs_touched_by_trajectory_with_state_and_integrated_vars( self, lats: NDArray, lons: NDArray, altitudes: Optional[NDArray] = None, times: Optional[NDArray] = None, - state_variables: Tuple[NDArray, ...] = (), - integrated_variables: Tuple[NDArray, ...] = (), - ) -> Tuple[ + state_variables: tuple[NDArray, ...] = (), + integrated_variables: tuple[NDArray, ...] = (), + ) -> tuple[ NDArray, NDArray, NDArray, NDArray, - Tuple[NDArray, ...], - Tuple[NDArray, ...], + tuple[NDArray, ...], + tuple[NDArray, ...], ]: - (all_subsegment_point_lats, all_subsegment_point_lons), ( - all_subsegment_lat_indices, - all_subsegment_lon_indices, + ( + (all_subsegment_point_lats, all_subsegment_point_lons), + ( + all_subsegment_lat_indices, + all_subsegment_lon_indices, + ), ) = self._trajectory_intersection_points_and_cells_horizontal(lats, lons) count_subsegments = np.count_nonzero( @@ -997,7 +1037,6 @@ def _cell_idxs_touched_by_trajectory_with_state_and_integrated_variables( ) def _polygon_touched_cells(self, polygon_lats, polygon_lons): - polygon = Polygon(zip(polygon_lons, polygon_lats)) min_lon, min_lat, max_lon, max_lat = polygon.bounds @@ -1042,24 +1081,25 @@ def cells_touched_by_trajectory_with_state_and_integrated_variables( lons: NDArray, altitudes: Optional[NDArray] = None, times: Optional[NDArray] = None, - state_variables: Tuple[NDArray, ...] = (), - integrated_variables: Tuple[NDArray, ...] = (), - ) -> Tuple[ + state_variables: tuple[NDArray, ...] = (), + integrated_variables: tuple[NDArray, ...] = (), + ) -> tuple[ Optional[NDArray], Optional[NDArray], Optional[NDArray], Optional[NDArray], - Tuple[Optional[NDArray], ...], - Tuple[Optional[NDArray], ...], + tuple[Optional[NDArray], ...], + tuple[Optional[NDArray], ...], ]: - # does the same as cells_touched_by_trajectory but for both state and integrated variables + # does the same as cells_touched_by_trajectory but for + # both state and integrated variables dateline_crossing = crosses_dateline(lons[:-1], lons[1:]) if np.any(dateline_crossing != 0): - if np.count_nonzero(dateline_crossing) > 1: warnings.warn( - "Trajectory crosses the dateline more than once, this isn't implemented, returning None for all outputs" + "Trajectory crosses the dateline more than once, " + "this isn't implemented, returning None for all outputs" ) return None, None, None, None, None, None @@ -1210,7 +1250,7 @@ def cells_touched_by_trajectory_with_state_and_integrated_variables( touched_cells_time_indices_first_part, subsegment_state_variable_values_first_part, subsegment_integrated_variable_values_first_part, - ) = self._cell_idxs_touched_by_trajectory_with_state_and_integrated_variables( + ) = self._cell_idxs_touched_by_trajectory_with_state_and_integrated_vars( lats_first_part, lons_first_part, altitudes_first_part, @@ -1243,7 +1283,7 @@ def cells_touched_by_trajectory_with_state_and_integrated_variables( touched_cells_time_indices_second_part, subsegment_state_variable_values_second_part, subsegment_integrated_variable_values_second_part, - ) = self._cell_idxs_touched_by_trajectory_with_state_and_integrated_variables( + ) = self._cell_idxs_touched_by_trajectory_with_state_and_integrated_vars( lats_second_part, lons_second_part, altitudes_second_part, @@ -1313,7 +1353,6 @@ def cells_touched_by_trajectory_with_state_and_integrated_variables( ) else: - ( touched_cells_lat_indices, touched_cells_lon_indices, @@ -1321,7 +1360,7 @@ def cells_touched_by_trajectory_with_state_and_integrated_variables( touched_cells_time_indices, state_variable_values, integrated_variable_values, - ) = self._cell_idxs_touched_by_trajectory_with_state_and_integrated_variables( + ) = self._cell_idxs_touched_by_trajectory_with_state_and_integrated_vars( lats, lons, altitudes, times, state_variables, integrated_variables ) diff --git a/src/parsers/LTO_reader.py b/src/parsers/LTO_reader.py index fa105a74..a70c3bc5 100644 --- a/src/parsers/LTO_reader.py +++ b/src/parsers/LTO_reader.py @@ -1,12 +1,12 @@ import re + def parseLTO(file_path): """ Reads an aircraft LTO characteristics file and returns a dictionary with: - "Foo": the numeric value from the line "Foo = [kN]" - "eng_data": a list of dictionaries corresponding to the comma-separated engine data after the marker line containing "AEIC ENG_EI output". - Each engine data dictionary has the following keys: "ENG_NAME", "MODE", "CO_EI", "HC_EI", "NOX_EI", "SOX_EI", "SMOKE_NUM", "FUEL_KG/S", "ICAO_UID", "MODE_SN" @@ -15,12 +15,12 @@ def parseLTO(file_path): eng_data = {} capture_eng = False - with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + with open(file_path, encoding='utf-8', errors='ignore') as f: for line in f: line = line.strip() if not line: continue - + # Look for the Foo value (only once) if foo is None: m = re.search(r"Foo\s*=\s*([\d\.]+)", line) @@ -33,7 +33,8 @@ def parseLTO(file_path): capture_eng = True continue - # Once we have encountered the marker, process subsequent lines as engine data + # Once we have encountered the marker, process subsequent lines + # as engine data if capture_eng: # Split the line by commas and strip extra whitespace parts = [p.strip() for p in line.split(",")] @@ -47,7 +48,7 @@ def parseLTO(file_path): "SMOKE_NUM": parts[6], "FUEL_KG/S": parts[7], "ICAO_UID": parts[8], - "MODE_SN": parts[9] + "MODE_SN": parts[9], } eng_data[parts[1]] = entry return {"Foo": foo, "eng_LTO": eng_data} diff --git a/src/parsers/OPF_reader.py b/src/parsers/OPF_reader.py index d21d010a..aa5e91d8 100644 --- a/src/parsers/OPF_reader.py +++ b/src/parsers/OPF_reader.py @@ -5,22 +5,19 @@ def parse_OPF(file_path): - Engine Thrust - Fuel Consumption - Ground Movement - Relies on the "CC=====
=====" comment lines to find section boundaries. """ - + extracted_data = { # Aircraft Type "n_eng": None, "engine_type": None, "wake_cat": None, - # Mass "ref_mass": None, "min_mass": None, "max_mass": None, "max_payload": None, - # Flight Envelope "V_MO": None, "M_MO": None, @@ -28,7 +25,6 @@ def parse_OPF(file_path): "h_max": None, "G_w": None, "G_t": None, - # Aerodynamics "S_ref": None, "c_d0cr": None, @@ -42,7 +38,6 @@ def parse_OPF(file_path): "V_stall_i": None, "C_Lbo_M0": None, "K": None, - # Engine Thrust "c_tc1": None, "c_tc2": None, @@ -56,39 +51,37 @@ def parse_OPF(file_path): "c_tdes_ld": None, "V_des_ref": None, "cas_cruise_mach": None, - # Fuel Flow "c_f1": None, "c_f2": None, "c_f3": None, "c_f4": None, "c_fcr": None, - # Ground Movement "TOL": None, "LDL": None, "span": None, - "length": None + "length": None, } - + def to_float(s): return float(s.strip()) - + # 1) Read in lines that start with "CD" (ignore "CC"), store them in a list raw_lines = [] - with open(file_path, "r", encoding="utf-8") as f: + with open(file_path, encoding="utf-8") as f: for line in f: line = line.rstrip() if line.startswith("CD"): raw_lines.append(line) - + # 2) We also need to locate the "CC===== Section =====" lines # so let's read the entire file again but keep them for splitting: lines_with_sections = [] - with open(file_path, "r", encoding="utf-8") as f: + with open(file_path, encoding="utf-8") as f: for line in f: lines_with_sections.append(line.rstrip()) - + # Identify the index of each major section by searching lines_with_sections section_indices = {} for i, line in enumerate(lines_with_sections): @@ -100,11 +93,12 @@ def to_float(s): section_indices["fuel"] = i elif "Ground" in line: section_indices["ground"] = i - - # (A) Parse the first known lines (0..4 in raw_lines) for B744__, mass, envelope, wing: + + # (A) Parse the first known lines (0..4 in raw_lines) for B744__, mass, + # envelope, wing: if len(raw_lines) > 1: tokens = raw_lines[1].split() # "CD B744___ 4 engines Jet H" - extracted_data["n_eng"] = int(tokens[2]) # 4 + extracted_data["n_eng"] = int(tokens[2]) # 4 extracted_data["engine_type"] = tokens[4] # Jet wc = tokens[5].upper() # "H" if wc == "H": @@ -115,59 +109,59 @@ def to_float(s): extracted_data["wake_cat"] = "Light" else: extracted_data["wake_cat"] = wc - + if len(raw_lines) > 2: parts = raw_lines[2].split() - extracted_data["ref_mass"] = to_float(parts[1]) - extracted_data["min_mass"] = to_float(parts[2]) - extracted_data["max_mass"] = to_float(parts[3]) + extracted_data["ref_mass"] = to_float(parts[1]) + extracted_data["min_mass"] = to_float(parts[2]) + extracted_data["max_mass"] = to_float(parts[3]) extracted_data["max_payload"] = to_float(parts[4]) # If the file truly has 5 floats on this line, store the 5th in G_w if len(parts) > 5: extracted_data["G_w"] = to_float(parts[5]) - + if len(raw_lines) > 3: parts = raw_lines[3].split() - extracted_data["V_MO"] = to_float(parts[1]) - extracted_data["M_MO"] = to_float(parts[2]) - extracted_data["H_MO"] = to_float(parts[3]) + extracted_data["V_MO"] = to_float(parts[1]) + extracted_data["M_MO"] = to_float(parts[2]) + extracted_data["H_MO"] = to_float(parts[3]) extracted_data["h_max"] = to_float(parts[4]) - extracted_data["G_t"] = to_float(parts[5]) - + extracted_data["G_t"] = to_float(parts[5]) + if len(raw_lines) > 4: parts = raw_lines[4].split() - extracted_data["S_ref"] = to_float(parts[2]) + extracted_data["S_ref"] = to_float(parts[2]) extracted_data["C_Lbo_M0"] = to_float(parts[3]) - extracted_data["K"] = to_float(parts[4]) - extracted_data["C_M16"] = to_float(parts[5]) - + extracted_data["K"] = to_float(parts[4]) + extracted_data["C_M16"] = to_float(parts[5]) + # 3) Now we gather each block by looking at the lines_with_sections def get_section_lines(lines_list, start_key, end_key=None): """ - Return a list of lines (that start with "CD") between the section + Return a list of lines (that start with "CD") between the section named start_key and the section named end_key (exclusive). If end_key is None, we go to the end of the file. """ start_idx = section_indices.get(start_key, None) - end_idx = section_indices.get(end_key, None) - + end_idx = section_indices.get(end_key, None) + if start_idx is None: return [] if end_idx is None: end_idx = len(lines_list) - + collected = [] - for i in range(start_idx+1, end_idx): + for i in range(start_idx + 1, end_idx): line = lines_list[i] if line.startswith("CD"): collected.append(line) return collected - - aero_data = get_section_lines(lines_with_sections, "aero", "thrust") + + aero_data = get_section_lines(lines_with_sections, "aero", "thrust") thrust_data = get_section_lines(lines_with_sections, "thrust", "fuel") - fuel_data = get_section_lines(lines_with_sections, "fuel", "ground") + fuel_data = get_section_lines(lines_with_sections, "fuel", "ground") ground_data = get_section_lines(lines_with_sections, "ground", None) - + # # (1) Aerodynamics block # @@ -175,8 +169,8 @@ def get_section_lines(lines_list, start_key, end_key=None): parts = line.split() if "CR" in parts: extracted_data["V_stall_i"] = to_float(parts[4]) - extracted_data["c_d0cr"] = to_float(parts[5]) - extracted_data["c_d2cr"] = to_float(parts[6]) + extracted_data["c_d0cr"] = to_float(parts[5]) + extracted_data["c_d2cr"] = to_float(parts[6]) elif "AP" in parts: extracted_data["C_D0_AP"] = to_float(parts[5]) extracted_data["C_D2_AP"] = to_float(parts[6]) @@ -185,7 +179,7 @@ def get_section_lines(lines_list, start_key, end_key=None): extracted_data["C_D2_LD"] = to_float(parts[6]) elif "DOWN" in parts: extracted_data["C_D0_ALDG"] = to_float(parts[3]) - + # # (2) Engine Thrust block # @@ -198,11 +192,15 @@ def get_section_lines(lines_list, start_key, end_key=None): for line in thrust_data: parts = line.split() # "CD" + 5 floats => len=6 - + if len(parts) == 7: # confirm all floats except 'CD' try: - float(parts[1]); float(parts[2]); float(parts[3]); float(parts[4]); float(parts[5]) + float(parts[1]) + float(parts[2]) + float(parts[3]) + float(parts[4]) + float(parts[5]) # noqa: E702 thrust_5floats.append(parts) except ValueError: pass @@ -215,30 +213,30 @@ def get_section_lines(lines_list, start_key, end_key=None): extracted_data["c_tc3"] = to_float(p[3]) extracted_data["c_tc4"] = to_float(p[4]) extracted_data["c_tc5"] = to_float(p[5]) - + if len(thrust_5floats) >= 2: # parse c_Tdes_low..c_Tdes_ld p = thrust_5floats[1] - extracted_data["c_tdes_low"] = to_float(p[1]) + extracted_data["c_tdes_low"] = to_float(p[1]) extracted_data["c_tdes_high"] = to_float(p[2]) - extracted_data["h_p_des"] = to_float(p[3]) - extracted_data["c_tdes_app"] = to_float(p[4]) - extracted_data["c_tdes_ld"] = to_float(p[5]) - + extracted_data["h_p_des"] = to_float(p[3]) + extracted_data["c_tdes_app"] = to_float(p[4]) + extracted_data["c_tdes_ld"] = to_float(p[5]) + if len(thrust_5floats) >= 3: # parse v_des_ref..m_des_ref (the first 2 floats after 'CD') # the remaining 3 floats are unused in your model p = thrust_5floats[2] extracted_data["V_des_ref"] = to_float(p[1]) extracted_data["cas_cruise_mach"] = to_float(p[2]) - + # # (3) Fuel Flow block # idx_fuel_2floats_1 = None idx_fuel_2floats_2 = None - idx_fuel_5floats = None - + idx_fuel_5floats = None + for i, line in enumerate(fuel_data): parts = line.split() if len(parts) == 4: @@ -250,21 +248,21 @@ def get_section_lines(lines_list, start_key, end_key=None): elif len(parts) == 7: # "CD" + 5 floats => c_fcr line idx_fuel_5floats = i - + if idx_fuel_2floats_1 is not None: p = fuel_data[idx_fuel_2floats_1].split() extracted_data["c_f1"] = to_float(p[1]) extracted_data["c_f2"] = to_float(p[2]) - + if idx_fuel_2floats_2 is not None: p = fuel_data[idx_fuel_2floats_2].split() extracted_data["c_f3"] = to_float(p[1]) extracted_data["c_f4"] = to_float(p[2]) - + if idx_fuel_5floats is not None: p = fuel_data[idx_fuel_5floats].split() extracted_data["c_fcr"] = to_float(p[1]) - + # # (4) Ground block # @@ -272,22 +270,26 @@ def get_section_lines(lines_list, start_key, end_key=None): parts = line.split() if len(parts) == 7: try: - float(parts[1]); float(parts[2]); float(parts[3]); float(parts[4]); float(parts[5]) - extracted_data["TOL"] = to_float(parts[1]) - extracted_data["LDL"] = to_float(parts[2]) - extracted_data["span"] = to_float(parts[3]) + float(parts[1]) + float(parts[2]) + float(parts[3]) + float(parts[4]) + float(parts[5]) + extracted_data["TOL"] = to_float(parts[1]) + extracted_data["LDL"] = to_float(parts[2]) + extracted_data["span"] = to_float(parts[3]) extracted_data["length"] = to_float(parts[4]) except ValueError: pass - + return extracted_data if __name__ == "__main__": # Example usage: - file_path = "/Users/aditeyashukla/Dropbox/Mac (2)/Documents/LAE/AEIC/src/IO/B744__.OPF" # <-- Adjust to your actual file path + file_path = "IO/B744__.OPF" # <-- Adjust to your actual file path data = parse_OPF(file_path) - + # Print each key/value for k, v in data.items(): print(f"{k} = {v}") diff --git a/src/parsers/PTF_reader.py b/src/parsers/PTF_reader.py index faa5d873..4e7dd03d 100644 --- a/src/parsers/PTF_reader.py +++ b/src/parsers/PTF_reader.py @@ -1,5 +1,6 @@ import re + def parse_PTF(file_path): """ Reads the TASOPT performance file in a single pass and returns a dict: @@ -26,7 +27,7 @@ def parse_PTF(file_path): "high_mass_kg": ... } """ - + # Prepare data structures for phases with new keys for speeds climb_data = { "flight_levels_ft": [], @@ -61,7 +62,7 @@ def parse_PTF(file_path): "cas_hi": None, "mach": None, } - + # Prepare storage for top-level info max_alt = None max_payload = None @@ -71,15 +72,16 @@ def parse_PTF(file_path): capture = False - with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + with open(file_path, encoding='utf-8', errors='ignore') as f: for line in f: - # 1) Parse top-level lines before/after the table for alt/payload/mass and speeds: + # 1) Parse top-level lines before/after the table for + # alt/payload/mass and speeds: if not capture: # Max altitude match_alt = re.search(r"Max Alt\.\s*\[ft\]:\s*([\d,]+)", line) if match_alt: max_alt = int(match_alt.group(1).replace(",", "")) - + # Max payload match_payload = re.search(r"Max Payload\s*\[kg\]:\s*([\d,]+)", line) if match_payload: @@ -102,14 +104,15 @@ def parse_PTF(file_path): if match: high_mass = int(match.group(1)) - # Parse speeds for each phase if the line starts with climb, cruise, or descent + # Parse speeds for each phase if the line starts with + # climb, cruise, or descent stripped = line.lstrip() if stripped.startswith("climb"): tokens = stripped.split() if len(tokens) >= 4: try: - cas = tokens[2] # Expecting a format like "250/300" - mach = tokens[3] # e.g., "0.80" + cas = tokens[2] # Expecting a format like "250/300" + mach = tokens[3] # e.g., "0.80" cas_lo, cas_hi = cas.split("/") climb_data["cas_lo"] = int(cas_lo) climb_data["cas_hi"] = int(cas_hi) @@ -152,7 +155,7 @@ def parse_PTF(file_path): # We expect 4 columns: FL, CRUISE, CLIMB, DESCENT if len(parts) < 4: continue - + # Flight level in parts[0] try: fl = int(parts[0].strip()) @@ -160,7 +163,7 @@ def parse_PTF(file_path): # If we can't parse a valid FL, skip continue - # ============ CRUISE ============ + # ============ CRUISE ============ cruise_str = parts[1] # Typically we expect [TAS, fuel_low, fuel_nom, fuel_high] c_vals = re.findall(r"\d+\.?\d*", cruise_str) @@ -174,7 +177,7 @@ def parse_PTF(file_path): # If line is blank for cruise, skip c_tas = None - # ============ CLIMB ============ + # ============ CLIMB ============ climb_str = parts[2] # Typically we expect [TAS, rocd_lo, rocd_nom, rocd_hi, fuel_nom] cl_vals = re.findall(r"\d+\.?\d*", climb_str) @@ -183,14 +186,14 @@ def parse_PTF(file_path): cl_tas = float(cl_vals[0]) # ROCD cl_rocd_lo = float(cl_vals[1]) - cl_rocd_hi = float(cl_vals[3]) # ignoring nominal + cl_rocd_hi = float(cl_vals[3]) # ignoring nominal # Fuel: replicate nominal in lo & hi cl_fuel_nom = float(cl_vals[4]) else: # If line is blank for climb, skip cl_tas = None - # ============ DESCENT ============ + # ============ DESCENT ============ descent_str = parts[3] # Typically we expect [TAS_nom, rocd_nom, fuel_nom] d_vals = re.findall(r"\d+\.?\d*", descent_str) @@ -234,16 +237,12 @@ def parse_PTF(file_path): # Build final data structure data = { - "phases": { - "climb": climb_data, - "cruise": cruise_data, - "descent": descent_data - }, + "phases": {"climb": climb_data, "cruise": cruise_data, "descent": descent_data}, # Top-level info "max_alt_ft": max_alt, "max_payload_kg": max_payload, "low_mass_kg": low_mass, "nominal_mass_kg": nominal_mass, - "high_mass_kg": high_mass + "high_mass_kg": high_mass, } return data diff --git a/src/utils/__init__.py b/src/utils/__init__.py index adc5355b..188acd13 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -13,4 +13,4 @@ def file_location(path: str) -> str: if os.path.exists(path): return path - raise FileNotFoundError(f"File {path} not found in local or data directory.") \ No newline at end of file + raise FileNotFoundError(f"File {path} not found in local or data directory.") diff --git a/src/utils/custom_types.py b/src/utils/custom_types.py index 406f117a..19609547 100644 --- a/src/utils/custom_types.py +++ b/src/utils/custom_types.py @@ -1,4 +1,5 @@ from typing import Union + import numpy as np # from numpy import ndarray as NDArray diff --git a/src/utils/helpers.py b/src/utils/helpers.py index 703bfcc3..3bc0d423 100644 --- a/src/utils/helpers.py +++ b/src/utils/helpers.py @@ -1,12 +1,12 @@ -from .custom_types import FloatOrNDArray -from .consts import R_E import numpy as np -from typing import Tuple -from numpy.typing import NDArray import pandas as pd +from numpy.typing import NDArray from pandas._libs.tslibs import Timestamp from pandas.core.indexes.datetimes import DatetimeIndex +from .consts import R_E +from .custom_types import FloatOrNDArray + def great_circle_distance( lat1: FloatOrNDArray, @@ -80,9 +80,7 @@ def feet_to_meters(ft: FloatOrNDArray) -> FloatOrNDArray: return ft * 0.3048 -def unix_to_datetime_utc( - unix_time: FloatOrNDArray -) -> Timestamp | DatetimeIndex: +def unix_to_datetime_utc(unix_time: FloatOrNDArray) -> Timestamp | DatetimeIndex: """Convert unix time to UTC Args: @@ -95,7 +93,7 @@ def unix_to_datetime_utc( return pd.to_datetime(unix_time, unit="s") -def calculate_line_parameters(x: NDArray, y: NDArray) -> Tuple[NDArray, NDArray]: +def calculate_line_parameters(x: NDArray, y: NDArray) -> tuple[NDArray, NDArray]: """ Calculates the slope and intercept of the lines defined by the points (x, y). @@ -108,7 +106,7 @@ def calculate_line_parameters(x: NDArray, y: NDArray) -> Tuple[NDArray, NDArray] Returns ------- - Tuple[NDArray, NDArray] + tuple[NDArray, NDArray] The slopes and intercepts of the lines defined by the points (x, y). """ @@ -139,7 +137,7 @@ def nautmiles_to_meters(nautmiles: FloatOrNDArray) -> FloatOrNDArray: def filter_order_duplicates(seq): - ''' Filters duplicate list entries while perserving order ''' + '''Filters duplicate list entries while perserving order''' seen = set() seen_add = seen.add return [x for x in seq if not (x in seen or seen_add(x))] diff --git a/src/utils/standard_atmosphere.py b/src/utils/standard_atmosphere.py index 83090be2..b75cc2e6 100644 --- a/src/utils/standard_atmosphere.py +++ b/src/utils/standard_atmosphere.py @@ -1,8 +1,10 @@ -import numpy as np from typing import Union + +import numpy as np +from numpy.typing import NDArray + +from .consts import T0, R_air, beta_tropo, g0, h_p_tropo, p0 from .custom_types import FloatOrNDArray -from numpy.typing import NDArray, ArrayLike -from .consts import *#h_p_tropo, beta_tropo, p0, T0, rho0, a0, R_air, g0, kappa def temperature_at_altitude_isa_bada4(altitude: FloatOrNDArray) -> NDArray: diff --git a/src/utils/weather_utils.py b/src/utils/weather_utils.py index 6907bd74..bec74b5e 100644 --- a/src/utils/weather_utils.py +++ b/src/utils/weather_utils.py @@ -1,14 +1,15 @@ -from pyproj import Geod import numpy as np -from scipy.interpolate import RegularGridInterpolator import xarray as xr +from pyproj import Geod +from scipy.interpolate import RegularGridInterpolator + def altitude_to_pressure_hpa(alt_ft): """Convert altitude in feet to pressure in hPa using ISA approximation.""" alt_m = np.array(alt_ft) * 0.3048 pressure = 1013.25 * (1 - (0.0065 * alt_m) / 288.15) ** 5.2561 return pressure - + def get_wind_at_points(mission_data, era5_path): """ @@ -30,15 +31,15 @@ def get_wind_at_points(mission_data, era5_path): wind_speed : np.ndarray Wind speed magnitude [m/s] """ - + # Load ERA4 GRIB file (single time slice) ds = xr.open_dataset(era5_path, engine="cfgrib") - + # Ensure proper dimension order lats_era = ds.latitude.values lons_era = ds.longitude.values levels_era = ds.isobaricInhPa.values - + # Sort lattitude in ascending order if lats_era[0] > lats_era[-1]: lats_era = lats_era[::-1] @@ -50,39 +51,44 @@ def get_wind_at_points(mission_data, era5_path): (levels_era, lats_era, lons_era), ds['u'].values, bounds_error=False, - fill_value=np.nan + fill_value=np.nan, ) - + v_interp = RegularGridInterpolator( (levels_era, lats_era, lons_era), ds['v'].values, bounds_error=False, - fill_value=np.nan + fill_value=np.nan, ) - + # Convert altitude (ft) to pressure (hPa) pressures = altitude_to_pressure_hpa(mission_data['H']) - + # Convert lon to [0, 360] for ERA5 grid compatibility lons_360 = [(lon + 360) if lon < 0 else lon for lon in mission_data['lons']] - + # Form (pressure, lat, lon) points for interpolation - points = np.array([ - (p, lat, lon) for p, lat, lon in zip(pressures, mission_data['lats'], lons_360) - ]) - + points = np.array( + [ + (p, lat, lon) + for p, lat, lon in zip(pressures, mission_data['lats'], lons_360) + ] + ) + # Interpolate u_vals = u_interp(points) v_vals = v_interp(points) - + wind_speed = np.sqrt(u_vals**2 + v_vals**2) - + return u_vals, v_vals, wind_speed + def get_tas(mission_data, era5_path): """ - Computes the true airspeed (TAS), heading, and drift angle along a flight path using geodesic - track angles and interpolated wind components from ERA5 reanalysis data. + Computes the true airspeed (TAS), heading, and drift angle along a flight path + using geodesic track angles and interpolated wind components from + ERA5 reanalysis data. Parameters ---------- @@ -98,13 +104,15 @@ def get_tas(mission_data, era5_path): Returns ------- track_angles : np.ndarray - Array of geodesic track angles (course) between consecutive lat-lon points [degrees]. + Array of geodesic track angles (course) between consecutive lat-lon points + [degrees]. headings : np.ndarray Array of aircraft headings (direction of motion through the air) [degrees]. drifts : np.ndarray - Array of drift angles (heading - track), indicating crosswind correction required [degrees]. + Array of drift angles (heading - track), indicating crosswind correction + required [degrees]. tas_vals : np.ndarray Array of computed true airspeeds (TAS) along the path [knots]. @@ -120,25 +128,26 @@ def get_tas(mission_data, era5_path): Notes ----- - - TAS is computed using the wind triangle from the vector difference between ground speed vector and wind vector. + - TAS is computed using the wind triangle from the vector difference between + ground speed vector and wind vector. - Drift angle is signed and bounded to the range [-180, 180] degrees. - Requires `pyproj.Geod` and NumPy. """ # Get wind components based on path u_vals, v_vals, wind_speed = get_wind_at_points(mission_data, era5_path) - + # Compute geodesic track angles between consecutive flight segments geod = Geod(ellps="WGS84") lons = mission_data["lons"] lats = mission_data["lats"] gs = mission_data["GS"] - + track_angles = [] headings = [] drifts = [] tas_vals = [] - + # Loop over all points along the arc for i in range(len(lons) - 1): # Compute track angle for the two lat-lon pairs @@ -147,32 +156,31 @@ def get_tas(mission_data, era5_path): az_fwd, _, _ = geod.inv(lon1, lat1, lon2, lat2) track = az_fwd % 360 track_angles.append(track) - - # Use wind triangle to compute heading and drift + + # Use wind triangle to compute heading and drift track_rad = np.deg2rad(track) vgx = gs[i] * np.cos(track_rad) # ground speed-x along track vgy = gs[i] * np.sin(track_rad) # ground speed-y along track - - + # Substract wind vector (souce ERA-5 dummy weather) - vax = vgx - u_vals[i] # TAS x-component - vay = vgy - v_vals[i] # TAS y-component - + vax = vgx - u_vals[i] # TAS x-component + vay = vgy - v_vals[i] # TAS y-component + tas_mps = np.sqrt(vax**2 + vay**2) tas_knots = tas_mps / 0.514444 # m/s -> knots - + tas_vals.append(tas_knots) - + heading_rad = np.arctan2(vay, vax) heading_deg = np.rad2deg(heading_rad) % 360 - + # Compute path drift drift = (heading_deg - track + 360) % 360 if drift > 180: drift -= 360 # Convert to signed drift headings.append(heading_deg) drifts.append(drift) - + return ( np.array(track_angles), np.array(headings), @@ -180,9 +188,9 @@ def get_tas(mission_data, era5_path): np.array(tas_vals), u_vals, v_vals, - wind_speed + wind_speed, ) - + def apply_drift_to_mission_points(mission_data, drift_angles): """ @@ -265,13 +273,19 @@ def build_era5_interpolators(era5_path): v_data = ds["v"].values # Interpolator over (level, lat, lon) (Filling with NaNs for safety) - u_interp = RegularGridInterpolator((levels, lats, lons), u_data, bounds_error=False, fill_value=4.0) - v_interp = RegularGridInterpolator((levels, lats, lons), v_data, bounds_error=False, fill_value=4.0) + u_interp = RegularGridInterpolator( + (levels, lats, lons), u_data, bounds_error=False, fill_value=4.0 + ) + v_interp = RegularGridInterpolator( + (levels, lats, lons), v_data, bounds_error=False, fill_value=4.0 + ) return u_interp, v_interp, {"levels": levels, "lats": lats, "lons": lons} -def compute_ground_speed(lon, lat, lon_next, lat_next, alt_ft, tas_kts, weather_data = None): - + +def compute_ground_speed( + lon, lat, lon_next, lat_next, alt_ft, tas_kts, weather_data=None +): """ Computes ground speed for a single point using TAS, heading, and interpolated winds. @@ -300,7 +314,7 @@ def compute_ground_speed(lon, lat, lon_next, lat_next, alt_ft, tas_kts, weather_ # Convert altitude and TAS to S.I units pressure_level = float(altitude_to_pressure_hpa(alt_ft)) # hPa - tas_ms = tas_kts * 0.514444 + tas_ms = tas_kts * 0.514444 # Compute heading in radians from current to next point geod = Geod(ellps="WGS84") azimuth_deg, _, _ = geod.inv(lon, lat, lon_next, lat_next) @@ -310,7 +324,7 @@ def compute_ground_speed(lon, lat, lon_next, lat_next, alt_ft, tas_kts, weather_ u_air = tas_ms * np.cos(heading_rad) v_air = tas_ms * np.sin(heading_rad) - if weather_data == None: + if weather_data is None: u = 0 v = 0 else: @@ -319,13 +333,12 @@ def compute_ground_speed(lon, lat, lon_next, lat_next, alt_ft, tas_kts, weather_ # Interpolate wind at this location u = u_interp([[pressure_level, lat, lon]])[0] v = v_interp([[pressure_level, lat, lon]])[0] - + # Ground speed = air vector + wind vector u_ground = u_air + u v_ground = v_air + v - + gs_ms = np.sqrt(u_ground**2 + v_ground**2) gs_kts = gs_ms / 0.514444 - + return gs_kts, heading_rad, u, v - \ No newline at end of file diff --git a/src/weather/draw.py b/src/weather/draw.py index f7e76563..834ded0a 100644 --- a/src/weather/draw.py +++ b/src/weather/draw.py @@ -1,18 +1,16 @@ -import matplotlib.pyplot as plt import cartopy.crs as ccrs import cartopy.feature as cfeature +import matplotlib.pyplot as plt from pyproj import Geod -import numpy as np def plot_flight_arc(mission): - # Set up map - - fig = plt.figure(figsize=(10, 7)) + + plt.figure(figsize=(10, 7)) ax = plt.axes(projection=ccrs.PlateCarree()) ax.set_extent([-130, -65, 22, 50], crs=ccrs.PlateCarree()) - + # Add map features ax.add_feature(cfeature.STATES, linewidth=0.5) ax.add_feature(cfeature.COASTLINE) @@ -20,30 +18,30 @@ def plot_flight_arc(mission): ax.add_feature(cfeature.LAND, facecolor='lightgray') ax.add_feature(cfeature.OCEAN, facecolor='lightblue') ax.gridlines(draw_labels=True, linewidth=0.5, color='gray', alpha=0.5) - + # Extract OD lat lon position lon_dep, lat_dep, _ = mission["dep_location"] lon_arr, lat_arr, _ = mission["arr_location"] - + # Use WGS84 ellipse definition geod = Geod(ellps="WGS84") - + # Get 100 equally spaced points along the arc points = geod.npts(lon_dep, lat_dep, lon_arr, lat_arr, 100) lons = [lon_dep] + [pt[0] for pt in points] + [lon_arr] lats = [lat_dep] + [pt[1] for pt in points] + [lat_arr] ax.plot(lons, lats, 'k-', linewidth=0.8, alpha=0.7) - + # Mark endpoints ax.plot(lon_dep, lat_dep, 'go', markersize=6, label="Departure") ax.plot(lon_arr, lat_arr, 'ro', markersize=6, label="Arrival") - + # Add legend and title ax.legend(loc='lower left') dep_code = mission['dep_airport'] arr_code = mission['arr_airport'] ax.set_title(f"Flight Arc: {dep_code} → {arr_code}") - + plt.tight_layout() - plt.savefig("sample.png", dpi=300) \ No newline at end of file + plt.savefig("sample.png", dpi=300) diff --git a/src/weather/main.py b/src/weather/main.py index 5f4fbb17..311fc7b7 100644 --- a/src/weather/main.py +++ b/src/weather/main.py @@ -1,16 +1,16 @@ import json -import draw as dr -import utils as util + import trajectory as traj +import utils as util mission_path = "../missions/sample_missions_10.json" weather_data = "ERA5/sample.grib" # Load JSON data -with open(mission_path, 'r') as file: +with open(mission_path) as file: missions = json.load(file) - + # Select a dummy mission for now first_mission = missions[0] @@ -49,13 +49,13 @@ u_interp=u_interp, v_interp=v_interp, ) - + # Append to respective lists for sanity check -> gs_list.append(gs) heading_list.append(heading) u_list.append(u) v_list.append(v) - + # Append to trajectory -> trajectory["GS"] = gs_list trajectory["heading_rad"] = heading_list @@ -65,28 +65,30 @@ # Print the first few points print("\n") -print("#--------------------------------SAMPLE TRAJECTORY (SNAPSHOT) -----------------------------#\n") -for lon, lat, tas, gs, alt in zip(trajectory["lons"][:5], trajectory["lats"][:5], trajectory["TAS"][:5], trajectory["GS"][:5], trajectory["H"][:5]): - print(f"Lon: {lon:.2f}, Lat: {lat:.2f}, TAS: {tas:.2f} kt, GS: {gs:.2f} kt, Alt: {alt:.2f} ft") -print("#------------------------------------------------------------------------------------------#\n") - - - -#track, heading, drift, tas, u, v, wind_mag = util.get_tas(trajectory, "era5.grib") - -#for i in range(5): -# print(f"Pt {i}: Track={track[i]:.1f}°, Heading={heading[i]:.1f}°, Drift={drift[i]:+.1f}°, " -# f"TAS={tas[i]:.1f} kt, U={u[i]:.1f}, V={v[i]:.1f}, Wind={wind_mag[i]:.1f} m/s") - - - -#dr.plot_flight_arc(first_mission) - -#util.get_flight_track(first_mission) - +print("#----------------SAMPLE TRAJECTORY (SNAPSHOT) -------------------------#\n") +for lon, lat, tas, gs, alt in zip( + trajectory["lons"][:5], + trajectory["lats"][:5], + trajectory["TAS"][:5], + trajectory["GS"][:5], + trajectory["H"][:5], +): + print( + f"Lon: {lon:.2f}, Lat: {lat:.2f}, TAS: {tas:.2f} kt,\ + GS: {gs:.2f} kt, Alt: {alt:.2f} ft" + ) +print("#-----------------------------------------------------------------------#\n") +# track, heading, drift, tas, u, v, wind_mag = util.get_tas(trajectory, "era5.grib") +# for i in range(5): +# print(f"Pt {i}: Track={track[i]:.1f}°, Heading={heading[i]:.1f}°,\ +# Drift={drift[i]:+.1f}°, " +# f"TAS={tas[i]:.1f} kt, U={u[i]:.1f}, V={v[i]:.1f}, \ +# Wind={wind_mag[i]:.1f} m/s") +# dr.plot_flight_arc(first_mission) +# util.get_flight_track(first_mission) diff --git a/src/weather/trajectory.py b/src/weather/trajectory.py index d3d4865e..1eb2c269 100644 --- a/src/weather/trajectory.py +++ b/src/weather/trajectory.py @@ -1,19 +1,22 @@ -from pyproj import Geod import numpy as np - +from pyproj import Geod def get_mission_points(mission): """ - Generates a discretized set of latitude and longitude points along a great-circle path - between departure and arrival locations, assuming constant cruise altitude and ground speed. + Generates a discretized set of latitude and longitude points along a great-circle + path between departure and arrival locations, assuming constant cruise altitude + and ground speed. Parameters ---------- mission : dict - Dictionary containing origin and destination coordinates with the following keys: - - 'dep_location': tuple of (longitude, latitude, altitude) for the departure point [degrees, degrees, feet] - - 'arr_location': tuple of (longitude, latitude, altitude) for the arrival point [degrees, degrees, feet] + Dictionary containing origin and destination coordinates + with the following keys: + - 'dep_location': tuple of (longitude, latitude, altitude) for the + departure point [degrees, degrees, feet] + - 'arr_location': tuple of (longitude, latitude, altitude) for the + arrival point [degrees, degrees, feet] Returns ------- @@ -26,39 +29,38 @@ def get_mission_points(mission): Notes ----- - - The path is discretized using 100 intermediate points (plus endpoints), resulting in 102 total waypoints. - - This function uses `pyproj.Geod` with the WGS84 ellipsoid to compute a geodesic path. - - All values for speed and altitude are placeholders; replace them with mission-specific data in actual use. + - The path is discretized using 100 intermediate points (plus endpoints), + resulting in 102 total waypoints. + - This function uses `pyproj.Geod` with the WGS84 ellipsoid to compute + a geodesic path. + - All values for speed and altitude are placeholders; replace them with + mission-specific data in actual use. """ - + # Instantiate WGS84 ellipsoid - geod = Geod(ellps ="WGS84") - - # Extract OD lat-lon + geod = Geod(ellps="WGS84") + + # Extract OD lat-lon lon_dep, lat_dep, _ = mission["dep_location"] lon_arr, lat_arr, _ = mission["arr_location"] - + # Assume 100 points for discretization (for demo only) # Note: This will change when flying actual missions - + points = geod.npts(lon_dep, lat_dep, lon_arr, lat_arr, 100) - + lons = [lon_dep] + [pt[0] for pt in points] + [lon_arr] lats = [lat_dep] + [pt[1] for pt in points] + [lat_arr] - + # Assign a dummy ground speed ground_speeds = [450] * len(lons) - + # Assign a dummy cruise altitude altitude_ft = [35000] * len(lons) - - return { - "lons": lons, - "lats": lats, - "GS": ground_speeds, - "H": altitude_ft - } - + + return {"lons": lons, "lats": lats, "GS": ground_speeds, "H": altitude_ft} + + def create_dummy_traj(mission): """ Generates a discretized trapezoidal trajectory profile (altitude and speed) @@ -68,8 +70,10 @@ def create_dummy_traj(mission): ---------- mission : dict Dictionary with: - - 'dep_location': (longitude, latitude, altitude) of departure [degrees, degrees, feet] - - 'arr_location': (longitude, latitude, altitude) of arrival [degrees, degrees, feet] + - 'dep_location': (longitude, latitude, altitude) of departure + [degrees, degrees, feet] + - 'arr_location': (longitude, latitude, altitude) of arrival + [degrees, degrees, feet] Returns ------- @@ -82,51 +86,42 @@ def create_dummy_traj(mission): """ # Instantiate GGS84 ellipsoid for geodesic calculation geod = Geod(ellps="WGS84") - + # Extract dept + Arrival lat-lon lon_dep, lat_dep, _ = mission["dep_location"] lon_arr, lat_arr, _ = mission["arr_location"] - + # Assume 100 points for trajectory discretization (demo only) n_total = 100 points = geod.npts(lon_dep, lat_dep, lon_arr, lat_arr, n_total) lons = [lon_dep] + [pt[0] for pt in points] + [lon_arr] lats = [lat_dep] + [pt[1] for pt in points] + [lat_arr] - + # Define climb and descent length as 25 % of total trajectory n_climb = n_descent = n_total // 4 n_cruise = len(lons) - n_climb - n_descent - - # Define end points for speed and altitude + + # Define end points for speed and altitude alt_start, alt_cruise = 1000, 35000 # feet - spd_start, spd_cruise = 140, 450 # knots - + spd_start, spd_cruise = 140, 450 # knots + # Define climb profile climb_alt = np.linspace(alt_start, alt_cruise, n_climb) climb_spd = np.linspace(spd_start, spd_cruise, n_climb) - + # Define cruise profile cruise_alt = np.full(n_cruise, alt_cruise) cruise_spd = np.full(n_cruise, spd_cruise) - + # Define descent profile descent_alt = np.linspace(alt_cruise, alt_start, n_descent) descent_spd = np.linspace(spd_cruise, spd_start, n_descent) - - # Collect height and TAS H = np.concatenate([climb_alt, cruise_alt, descent_alt]) TAS = np.concatenate([climb_spd, cruise_spd, descent_spd]) - - H = H[:len(lons)] - TAS = TAS[:len(lons)] - - return { - "lons": lons, - "lats": lats, - "TAS": TAS, - "H": H - } - - \ No newline at end of file + + H = H[: len(lons)] + TAS = TAS[: len(lons)] + + return {"lons": lons, "lats": lats, "TAS": TAS, "H": H} diff --git a/tests/test_performance_model.py b/tests/test_performance_model.py index 56ec302a..a5efdc9f 100644 --- a/tests/test_performance_model.py +++ b/tests/test_performance_model.py @@ -1,11 +1,8 @@ -import pytest -import tempfile -import tomllib import numpy as np -import os from AEIC.performance_model import PerformanceModel + def test_performance_model_initialization(): """Test initialization and basic structure of the PerformanceModel.""" @@ -32,7 +29,10 @@ def test_performance_model_initialization(): # Check performance table exists assert isinstance(model.performance_table, np.ndarray) - assert model.performance_table.shape == (26, 51, 105, 3) or model.performance_table.ndim == 4 + assert ( + model.performance_table.shape == (26, 51, 105, 3) + or model.performance_table.ndim == 4 + ) # Check input column names and values assert "FL" in model.performance_table_colnames