From 233beaeadb7384d84eda8a7cbbc28f4868fb532d Mon Sep 17 00:00:00 2001 From: Jonah Lefkoff Date: Sat, 16 Aug 2025 23:13:12 -0600 Subject: [PATCH 1/5] [weather] add wind grid processing --- app.py | 33 ++++++++++- blueprints/weather_bp.py | 39 +++++++++++- libs/weather_data_lock.py | 9 +++ libs/wind_lib.py | 121 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 200 insertions(+), 2 deletions(-) create mode 100644 libs/weather_data_lock.py create mode 100644 libs/wind_lib.py diff --git a/app.py b/app.py index 6f89ed2..81d21b0 100644 --- a/app.py +++ b/app.py @@ -1,3 +1,6 @@ +import time +import os +from datetime import datetime, timedelta from flask import Flask from flask_cors import CORS import threading @@ -9,6 +12,8 @@ from blueprints.route_analysis_bp import route_analysis_blueprint import mongo_client +from libs.wind_lib import download_rap_grib2, interpolate_uv_temp_at_flight_levels, get_date + PREFIX = '/api' @@ -35,7 +40,33 @@ def _close_mongo_clients(response): mongo_client.close_reader_mongo_client() return response +def run_updater(): + """Run once at startup and then every hour at HH:05 UTC.""" + while True: + try: + # Always run once immediately at startup + date_str, cycle_hour, forecast_hour, save_path = get_date() + download_rap_grib2(date_str, cycle_hour, forecast_hour, save_path) + interpolate_uv_temp_at_flight_levels(save_path) + print(f"[Updater] Weather data updated {date_str} {cycle_hour}z f{forecast_hour}") + + # Calculate next "5 minutes past the hour" in UTC + now = datetime.utcnow() + next_run = (now + timedelta(hours=1)).replace(minute=5, second=0, microsecond=0) + sleep_time = (next_run - datetime.utcnow()).total_seconds() + print(f"[Updater] Sleeping {int(sleep_time)}s until {next_run} UTC") + time.sleep(sleep_time) + + except Exception as e: + print("Updater error:", e) + time.sleep(300) # retry in 5 min if failed + if __name__ == '__main__': + # Only start updater in the *main process*, not the reloader + if os.environ.get("WERKZEUG_RUN_MAIN") == "true": + updater_thread = threading.Thread(target=run_updater, daemon=True) + updater_thread.start() + app = create_app() - app.run(use_reloader=True) + app.run(use_reloader=True) \ No newline at end of file diff --git a/blueprints/weather_bp.py b/blueprints/weather_bp.py index 3f0be11..3e40fd0 100644 --- a/blueprints/weather_bp.py +++ b/blueprints/weather_bp.py @@ -2,9 +2,11 @@ import re from typing import Optional +import numpy as np import requests from lxml import etree -from flask import Blueprint, jsonify +from flask import Blueprint, jsonify, request +from libs.weather_data_lock import weather_data, data_lock weather_blueprint = Blueprint('weather', __name__) @@ -36,6 +38,41 @@ def _metar(airport): metar_text = response.content.decode('utf-8') return jsonify([metar_text]) +@weather_blueprint.route('/winds') +def winds(): + # query params + top_lat = float(request.args.get("toplat")) + top_lon = float(request.args.get("toplong")) + bottom_lat = float(request.args.get("bottomlat")) + bottom_lon = float(request.args.get("bottomlong")) + fl = int(request.args.get("fl", 300)) # default FL300 + + with data_lock: + lats = weather_data["lats"] + lons = weather_data["lons"] + levels = weather_data["levels"] + + if lats is None or fl not in levels: + return jsonify({"error": "No data loaded"}), 503 + + lat_min, lat_max = sorted([bottom_lat, top_lat]) + lon_min, lon_max = sorted([bottom_lon, top_lon]) + + mask = ((lats >= lat_min) & (lats <= lat_max) & + (lons >= lon_min) & (lons <= lon_max)) + + points = [] + for i, j in zip(*np.where(mask)): + points.append({ + "latitude": float(round(lats[i, j], 2)), + "longitude": float(round(lons[i, j], 2)), + "wind_speed_kt": int(levels[fl]["spd"][i, j]), + "wind_direction_deg_true": int(levels[fl]["dir"][i, j]), + "temperature_c": int(levels[fl]["temp"][i, j]), + }) + + return jsonify({"points": points}) + @weather_blueprint.route('/sigmets') def _get_sigmets(): diff --git a/libs/weather_data_lock.py b/libs/weather_data_lock.py new file mode 100644 index 0000000..6123315 --- /dev/null +++ b/libs/weather_data_lock.py @@ -0,0 +1,9 @@ +import threading + +# thread-safe global storage +data_lock = threading.Lock() +weather_data = { + "lats": None, + "lons": None, + "levels": {} # { FL: {"temp": 2Darray, "dir": 2Darray, "spd": 2Darray} } +} diff --git a/libs/wind_lib.py b/libs/wind_lib.py new file mode 100644 index 0000000..1d77df9 --- /dev/null +++ b/libs/wind_lib.py @@ -0,0 +1,121 @@ +import pygrib +import requests +import numpy as np +from libs.weather_data_lock import weather_data, data_lock +import json +from datetime import datetime +import os +import csv + +# Function to download the latest RAP GRIB2 file based on the current date and cycle hour +def download_rap_grib2(date_str, cycle_hour, forecast_hour, save_path): + base_url = 'https://nomads.ncep.noaa.gov/pub/data/nccf/com/rap/prod' + file_name = f"rap.t{cycle_hour}z.awp130pgrbf{forecast_hour.zfill(2)}.grib2" + url = f"{base_url}/rap.{date_str}/{file_name}" + + response = requests.get(url, stream=True) + if response.status_code == 200: + with open(save_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + print(f"Downloaded: {save_path}") + else: + print(f"Failed to download file. HTTP {response.status_code}: {url}") + +# Function to interpolate U, V wind components and temperature to flight levels and save to CSV files +def interpolate_uv_temp_at_flight_levels(grib_file): + grbs = pygrib.open(grib_file) + + hgt_grbs = grbs.select(name='Geopotential height', typeOfLevel='isobaricInhPa') + u_grbs = grbs.select(name='U component of wind', typeOfLevel='isobaricInhPa') + v_grbs = grbs.select(name='V component of wind', typeOfLevel='isobaricInhPa') + t_grbs = grbs.select(name='Temperature', typeOfLevel='isobaricInhPa') + + hgt_3d = np.array([g.values for g in hgt_grbs]) + u_3d = np.array([g.values for g in u_grbs]) + v_3d = np.array([g.values for g in v_grbs]) + t_3d = np.array([g.values for g in t_grbs]) - 273.15 + + lats, lons = hgt_grbs[0].latlons() + + hgt_ft = hgt_3d * 3.281 + fl_3d = hgt_ft / 100.0 + + fl_target = np.arange(0, 510, 10) + level_dict = {} + + for fl in fl_target: + temp_2d = np.full(lats.shape, np.nan) + speed_2d = np.full(lats.shape, np.nan) + dir_2d = np.full(lats.shape, np.nan) + + for i in range(lats.shape[0]): + for j in range(lats.shape[1]): + profile_fl = fl_3d[:, i, j] + profile_u = u_3d[:, i, j] + profile_v = v_3d[:, i, j] + profile_t = t_3d[:, i, j] + + if (np.any(np.isnan(profile_fl)) or np.any(np.isnan(profile_u)) or + np.any(np.isnan(profile_v)) or np.any(np.isnan(profile_t))): + continue + + u_val = np.interp(fl, profile_fl[::-1], profile_u[::-1]) + v_val = np.interp(fl, profile_fl[::-1], profile_v[::-1]) + t_val = np.interp(fl, profile_fl[::-1], profile_t[::-1]) + + speed = np.sqrt(u_val**2 + v_val**2) + direction = (270 - np.degrees(np.arctan2(v_val, u_val))) % 360 + + temp_2d[i, j] = int(round(t_val)) + speed_2d[i, j] = int(round(speed)) + dir_2d[i, j] = int(round(direction)) + + level_dict[int(fl)] = {"temp": temp_2d, "spd": speed_2d, "dir": dir_2d} + + with data_lock: + weather_data["lats"] = lats + weather_data["lons"] = lons + weather_data["levels"] = level_dict + + grbs.close() + +# Function to get the current RAP forecast date and time, and build file path info +def get_date(): + now = datetime.utcnow() + date_str = now.strftime('%Y%m%d') + cycle_hour = '12' if now.hour >= 12 else '00' + forecast_hour = '00' + save_path = 'rap_latest.grib2' + + # Store state in JSON format + output = {'String': date_str, 'Cycle': cycle_hour, 'Forecast Hour': forecast_hour} + with open('wx_state.json', 'w') as f: + json.dump(output, f, indent=4) + + return date_str, cycle_hour, forecast_hour, save_path + +# Function to check if new forecast hour differs from previously stored state +def check_state(new_forecast): + f = open('state.json') + data = json.load(f) + + if new_forecast != data['Forecast Hour']: + output = True + else: + output = False + return output + +# Wrapper function to execute the full workflow conditionally based on forecast state +def run_grid(): + date_str, cycle_hour, forecast_hour, save_path = get_date() + state_check = check_state(forecast_hour) + + if state_check == True: + download_rap_grib2(date_str, cycle_hour, forecast_hour, save_path) + interpolate_uv_temp_at_flight_levels(save_path) + output = 'Completed' + else: + output = 'Error, forecast is the same as previous forecast' + + return output From 80aa3337b6ff5fb5ef1514ee93029a6e017c391d Mon Sep 17 00:00:00 2001 From: Jonah Lefkoff Date: Sun, 17 Aug 2025 20:05:20 -0600 Subject: [PATCH 2/5] final wind grid libbing --- app.py | 29 +++++++++++++++++++++------- blueprints/weather_bp.py | 36 ++++++++++++++++++++++++++--------- libs/weather_data_lock.py | 5 ++++- libs/wind_lib.py | 40 ++++++++++++++++++++++----------------- 4 files changed, 76 insertions(+), 34 deletions(-) diff --git a/app.py b/app.py index 81d21b0..9cffa93 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,6 @@ import time import os -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from flask import Flask from flask_cors import CORS import threading @@ -13,6 +13,7 @@ import mongo_client from libs.wind_lib import download_rap_grib2, interpolate_uv_temp_at_flight_levels, get_date +from libs.weather_data_lock import weather_data, data_lock PREFIX = '/api' @@ -48,14 +49,28 @@ def run_updater(): date_str, cycle_hour, forecast_hour, save_path = get_date() download_rap_grib2(date_str, cycle_hour, forecast_hour, save_path) interpolate_uv_temp_at_flight_levels(save_path) + processed_at = datetime.now(timezone.utc).isoformat() + new_state = { + 'String': date_str, + 'Cycle': cycle_hour, + 'Forecast Hour': forecast_hour, + 'Processed At': processed_at + } + with data_lock: + weather_data['wx_state'] = new_state + weather_data['last_updated_utc'] = processed_at + weather_data['grib_path'] = save_path + print(f"[Updater] Weather data updated {date_str} {cycle_hour}z f{forecast_hour}") - # Calculate next "5 minutes past the hour" in UTC - now = datetime.utcnow() - next_run = (now + timedelta(hours=1)).replace(minute=5, second=0, microsecond=0) - sleep_time = (next_run - datetime.utcnow()).total_seconds() - print(f"[Updater] Sleeping {int(sleep_time)}s until {next_run} UTC") - time.sleep(sleep_time) + # schedule next run at the next HH:05 UTC + now = datetime.now(timezone.utc) + next_run = now.replace(minute=5, second=0, microsecond=0) + if now >= next_run: + next_run = next_run + timedelta(hours=1) + sleep_seconds = (next_run - now).total_seconds() + print(f"[Updater] Next run scheduled at {next_run.isoformat()} (in {int(sleep_seconds)}s)") + time.sleep(sleep_seconds) except Exception as e: print("Updater error:", e) diff --git a/blueprints/weather_bp.py b/blueprints/weather_bp.py index 3e40fd0..db451e7 100644 --- a/blueprints/weather_bp.py +++ b/blueprints/weather_bp.py @@ -61,17 +61,35 @@ def winds(): mask = ((lats >= lat_min) & (lats <= lat_max) & (lons >= lon_min) & (lons <= lon_max)) + # find indices inside mask + indices = np.where(mask) + if indices[0].size == 0: + return jsonify({"points": []}) + + # Limit to at most 15x15 samples regardless of zoom + MAX_DIM = 15 + unique_rows = np.unique(indices[0]) + unique_cols = np.unique(indices[1]) + + keep_rows = unique_rows[np.linspace(0, len(unique_rows) - 1, min(len(unique_rows), MAX_DIM), dtype=int)] + keep_cols = unique_cols[np.linspace(0, len(unique_cols) - 1, min(len(unique_cols), MAX_DIM), dtype=int)] + points = [] - for i, j in zip(*np.where(mask)): - points.append({ - "latitude": float(round(lats[i, j], 2)), - "longitude": float(round(lons[i, j], 2)), - "wind_speed_kt": int(levels[fl]["spd"][i, j]), - "wind_direction_deg_true": int(levels[fl]["dir"][i, j]), - "temperature_c": int(levels[fl]["temp"][i, j]), - }) + for i in keep_rows: + for j in keep_cols: + if not mask[i, j]: + continue + points.append({ + "latitude": float(round(lats[i, j], 2)), + "longitude": float(round(lons[i, j], 2)), + "wind_speed_kt": int(levels[fl]["spd"][i, j]), + "wind_direction_deg_true": int(levels[fl]["dir"][i, j]), + "temperature_c": int(levels[fl]["temp"][i, j]), + }) + with data_lock: + metadata = weather_data.get("wx_state") - return jsonify({"points": points}) + return jsonify({"points": points, "metadata": metadata}) @weather_blueprint.route('/sigmets') diff --git a/libs/weather_data_lock.py b/libs/weather_data_lock.py index 6123315..a94726a 100644 --- a/libs/weather_data_lock.py +++ b/libs/weather_data_lock.py @@ -5,5 +5,8 @@ weather_data = { "lats": None, "lons": None, - "levels": {} # { FL: {"temp": 2Darray, "dir": 2Darray, "spd": 2Darray} } + "levels": {}, # { FL: {"temp": 2Darray, "dir": 2Darray, "spd": 2Darray} } + "wx_state": None, # {'String': 'YYYYMMDD', 'Cycle': '00'|'12', 'Forecast Hour': '00'} + "last_updated_utc": None, # ISO timestamp string set when interpolate_uv_temp_at_flight_levels finishes + "grib_path": None, # optional: path to last downloaded grib file } diff --git a/libs/wind_lib.py b/libs/wind_lib.py index 1d77df9..1d6f3bb 100644 --- a/libs/wind_lib.py +++ b/libs/wind_lib.py @@ -3,7 +3,7 @@ import numpy as np from libs.weather_data_lock import weather_data, data_lock import json -from datetime import datetime +from datetime import datetime, timezone import os import csv @@ -67,8 +67,11 @@ def interpolate_uv_temp_at_flight_levels(grib_file): speed = np.sqrt(u_val**2 + v_val**2) direction = (270 - np.degrees(np.arctan2(v_val, u_val))) % 360 + # convert speed from m/s to knots (1 m/s = 1.94384449 kt) + speed_kt = speed * 1.94384449 + temp_2d[i, j] = int(round(t_val)) - speed_2d[i, j] = int(round(speed)) + speed_2d[i, j] = int(round(speed_kt)) dir_2d[i, j] = int(round(direction)) level_dict[int(fl)] = {"temp": temp_2d, "spd": speed_2d, "dir": dir_2d} @@ -82,29 +85,22 @@ def interpolate_uv_temp_at_flight_levels(grib_file): # Function to get the current RAP forecast date and time, and build file path info def get_date(): - now = datetime.utcnow() + now = datetime.now(timezone.utc) date_str = now.strftime('%Y%m%d') cycle_hour = '12' if now.hour >= 12 else '00' forecast_hour = '00' save_path = 'rap_latest.grib2' - - # Store state in JSON format - output = {'String': date_str, 'Cycle': cycle_hour, 'Forecast Hour': forecast_hour} - with open('wx_state.json', 'w') as f: - json.dump(output, f, indent=4) - return date_str, cycle_hour, forecast_hour, save_path # Function to check if new forecast hour differs from previously stored state def check_state(new_forecast): - f = open('state.json') - data = json.load(f) - - if new_forecast != data['Forecast Hour']: - output = True - else: - output = False - return output + # read previous forecast from in-memory store + with data_lock: + prev_state = weather_data.get('wx_state') + if not prev_state: + # no previous state -> treat as new + return True + return new_forecast != prev_state.get('Forecast Hour') # Wrapper function to execute the full workflow conditionally based on forecast state def run_grid(): @@ -114,6 +110,16 @@ def run_grid(): if state_check == True: download_rap_grib2(date_str, cycle_hour, forecast_hour, save_path) interpolate_uv_temp_at_flight_levels(save_path) + # persist new state in memory (add processed timestamp) + processed_at = datetime.now(timezone.utc).isoformat() + new_state = { + 'String': date_str, + 'Cycle': cycle_hour, + 'Forecast Hour': forecast_hour, + 'Processed At': processed_at + } + with data_lock: + weather_data['wx_state'] = new_state output = 'Completed' else: output = 'Error, forecast is the same as previous forecast' From 9181ec85d4645f209be848f609f946425032e363 Mon Sep 17 00:00:00 2001 From: Jonah Lefkoff Date: Sun, 17 Aug 2025 20:51:26 -0600 Subject: [PATCH 3/5] add pygrib to reqs --- requirements.txt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/requirements.txt b/requirements.txt index b2147a8..3e2b484 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,7 +14,11 @@ itsdangerous==2.2.0 Jinja2==3.1.6 lxml==5.3.2 MarkupSafe==3.0.2 +numpy==2.3.2 +packaging==25.0 +pygrib==2.1.6 pymongo==4.11.3 +pyproj==3.7.2 requests==2.32.3 urllib3==2.3.0 Werkzeug==3.1.3 From a73303bbe408b54dd4a47d17fe75489ed362a66e Mon Sep 17 00:00:00 2001 From: Jonah Lefkoff Date: Fri, 23 Jan 2026 18:01:51 -0500 Subject: [PATCH 4/5] address comments, add units to calculations --- app.py | 31 ++++---- blueprints/weather_bp.py | 112 +++++++++++++++------------- libs/weather_data_lock.py | 26 ++++--- libs/wind_lib.py | 149 ++++++++++++++++++++------------------ 4 files changed, 172 insertions(+), 146 deletions(-) diff --git a/app.py b/app.py index 9cffa93..2562c5d 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,7 @@ import time import os from datetime import datetime, timedelta, timezone +import traceback from flask import Flask from flask_cors import CORS import threading @@ -12,7 +13,7 @@ from blueprints.route_analysis_bp import route_analysis_blueprint import mongo_client -from libs.wind_lib import download_rap_grib2, interpolate_uv_temp_at_flight_levels, get_date +from libs.wind_lib import get_rap_grib2_stream, interpolate_uv_temp_at_flight_levels, get_date from libs.weather_data_lock import weather_data, data_lock PREFIX = '/api' @@ -46,22 +47,21 @@ def run_updater(): while True: try: # Always run once immediately at startup - date_str, cycle_hour, forecast_hour, save_path = get_date() - download_rap_grib2(date_str, cycle_hour, forecast_hour, save_path) - interpolate_uv_temp_at_flight_levels(save_path) - processed_at = datetime.now(timezone.utc).isoformat() - new_state = { - 'String': date_str, - 'Cycle': cycle_hour, - 'Forecast Hour': forecast_hour, - 'Processed At': processed_at - } + + date_info = get_date() + grib_stream = get_rap_grib2_stream(date_info.date_str, date_info.cycle_hour, date_info.forecast_hour) + if grib_stream is None: + print("[Updater] Error, failed to download GRIB2 stream") + time.sleep(300) # retry in 5 min if failed + continue + interpolate_uv_temp_at_flight_levels(grib_stream) with data_lock: - weather_data['wx_state'] = new_state - weather_data['last_updated_utc'] = processed_at - weather_data['grib_path'] = save_path + weather_data.process_date = date_info.date_str + weather_data.process_cycle = date_info.cycle_hour + weather_data.forecast_hour = date_info.forecast_hour + weather_data.processed_at_utc = datetime.now(timezone.utc).isoformat() - print(f"[Updater] Weather data updated {date_str} {cycle_hour}z f{forecast_hour}") + print(f"[Updater] Weather data updated {date_info.date_str} {date_info.cycle_hour}z f{date_info.forecast_hour}") # schedule next run at the next HH:05 UTC now = datetime.now(timezone.utc) @@ -74,6 +74,7 @@ def run_updater(): except Exception as e: print("Updater error:", e) + traceback.print_exc() time.sleep(300) # retry in 5 min if failed diff --git a/blueprints/weather_bp.py b/blueprints/weather_bp.py index db451e7..5e649c9 100644 --- a/blueprints/weather_bp.py +++ b/blueprints/weather_bp.py @@ -1,12 +1,11 @@ -import logging import re -from typing import Optional - import numpy as np import requests from lxml import etree from flask import Blueprint, jsonify, request from libs.weather_data_lock import weather_data, data_lock +from dataclasses import dataclass +from typing import List weather_blueprint = Blueprint('weather', __name__) @@ -40,57 +39,66 @@ def _metar(airport): @weather_blueprint.route('/winds') def winds(): - # query params - top_lat = float(request.args.get("toplat")) - top_lon = float(request.args.get("toplong")) - bottom_lat = float(request.args.get("bottomlat")) - bottom_lon = float(request.args.get("bottomlong")) - fl = int(request.args.get("fl", 300)) # default FL300 + # query parameters, ignore type warnings due to manual check below + top_lat = float(request.args.get("toplat")) # type: ignore + top_lon = float(request.args.get("toplong")) # type: ignore + bottom_lat = float(request.args.get("bottomlat")) # type: ignore + bottom_lon = float(request.args.get("bottomlong")) # type: ignore + fl = int(request.args.get("fl")) # type: ignore - with data_lock: - lats = weather_data["lats"] - lons = weather_data["lons"] - levels = weather_data["levels"] - - if lats is None or fl not in levels: - return jsonify({"error": "No data loaded"}), 503 - - lat_min, lat_max = sorted([bottom_lat, top_lat]) - lon_min, lon_max = sorted([bottom_lon, top_lon]) - - mask = ((lats >= lat_min) & (lats <= lat_max) & - (lons >= lon_min) & (lons <= lon_max)) - - # find indices inside mask - indices = np.where(mask) - if indices[0].size == 0: - return jsonify({"points": []}) - - # Limit to at most 15x15 samples regardless of zoom - MAX_DIM = 15 - unique_rows = np.unique(indices[0]) - unique_cols = np.unique(indices[1]) - - keep_rows = unique_rows[np.linspace(0, len(unique_rows) - 1, min(len(unique_rows), MAX_DIM), dtype=int)] - keep_cols = unique_cols[np.linspace(0, len(unique_cols) - 1, min(len(unique_cols), MAX_DIM), dtype=int)] - - points = [] - for i in keep_rows: - for j in keep_cols: - if not mask[i, j]: - continue - points.append({ - "latitude": float(round(lats[i, j], 2)), - "longitude": float(round(lons[i, j], 2)), - "wind_speed_kt": int(levels[fl]["spd"][i, j]), - "wind_direction_deg_true": int(levels[fl]["dir"][i, j]), - "temperature_c": int(levels[fl]["temp"][i, j]), - }) - with data_lock: - metadata = weather_data.get("wx_state") - - return jsonify({"points": points, "metadata": metadata}) + if None in [top_lat, top_lon, bottom_lat, bottom_lon, fl]: + return jsonify({"error": "Missing required parameters: toplat, toplong, bottomlat, bottomlong, fl"}), 400 + with data_lock: + lats = weather_data.lats + lons = weather_data.lons + levels = weather_data.levels + metadata = weather_data.wx_state + + if lats is None or fl not in levels: + return jsonify({"error": "No data loaded"}), 503 + + lat_min, lat_max = sorted([bottom_lat, top_lat]) + lon_min, lon_max = sorted([bottom_lon, top_lon]) + + mask = ((lats >= lat_min) & (lats <= lat_max) & + (lons >= lon_min) & (lons <= lon_max)) + + # find indices inside mask + indices = np.where(mask) + if indices[0].size == 0: + return jsonify({"points": []}) + + # Limit to at most 15x15 samples regardless of zoom + MAX_DIM = 15 + unique_rows = np.unique(indices[0]) + unique_cols = np.unique(indices[1]) + + keep_rows = unique_rows[np.linspace(0, len(unique_rows) - 1, min(len(unique_rows), MAX_DIM), dtype=int)] + keep_cols = unique_cols[np.linspace(0, len(unique_cols) - 1, min(len(unique_cols), MAX_DIM), dtype=int)] + + @dataclass + class WindPoint: + latitude: float + longitude: float + wind_speed_kt: int + wind_direction_deg_true: int + temperature_c: int + + point: List[WindPoint] = [] + for i in keep_rows: + for j in keep_cols: + if not mask[i, j]: + continue + point.append(WindPoint( + latitude=float(round(lats[i, j], 2)), + longitude=float(round(lons[i, j], 2)), + wind_speed_kt=int(levels[fl]["spd"][i, j]), + wind_direction_deg_true=int(levels[fl]["dir"][i, j]), + temperature_c=int(levels[fl]["temp"][i, j]) + )) + + return jsonify({"points": point, "metadata": metadata}) @weather_blueprint.route('/sigmets') def _get_sigmets(): diff --git a/libs/weather_data_lock.py b/libs/weather_data_lock.py index a94726a..127afb3 100644 --- a/libs/weather_data_lock.py +++ b/libs/weather_data_lock.py @@ -1,12 +1,20 @@ import threading +from dataclasses import dataclass, field +from typing import Optional, Dict, Any -# thread-safe global storage +# thread-safe lock shared by callers data_lock = threading.Lock() -weather_data = { - "lats": None, - "lons": None, - "levels": {}, # { FL: {"temp": 2Darray, "dir": 2Darray, "spd": 2Darray} } - "wx_state": None, # {'String': 'YYYYMMDD', 'Cycle': '00'|'12', 'Forecast Hour': '00'} - "last_updated_utc": None, # ISO timestamp string set when interpolate_uv_temp_at_flight_levels finishes - "grib_path": None, # optional: path to last downloaded grib file -} + +@dataclass +class WeatherData: + lats: Optional[Any] = None # expected: 2D numpy array or None + lons: Optional[Any] = None # expected: 2D numpy array or None + levels: Dict[int, Dict[str, Any]] = field(default_factory=dict) # { FL: {"temp": 2Darray, "dir": 2Darray, "spd": 2Darray} } + process_date: Optional[str] = None # 'YYYYMMDD' + process_cycle: Optional[str] = None # '00' or '12' + forecast_hour: Optional[str] = None # '00', '01', ..., '18' + processed_at_utc: Optional[str] = None # ISO format timestamp + wx_state: Optional[Dict[str, str]] = None # {'String': 'YYYYMMDD', 'Cycle': '00'|'12', 'Forecast Hour': '00'} + +# global instance used throughout the app +weather_data = WeatherData() \ No newline at end of file diff --git a/libs/wind_lib.py b/libs/wind_lib.py index 1d6f3bb..c84a4b0 100644 --- a/libs/wind_lib.py +++ b/libs/wind_lib.py @@ -1,45 +1,60 @@ +from dataclasses import dataclass +from io import BufferedReader, BytesIO import pygrib import requests import numpy as np -from libs.weather_data_lock import weather_data, data_lock -import json +from libs.weather_data_lock import weather_data, data_lock, WeatherData from datetime import datetime, timezone -import os -import csv -# Function to download the latest RAP GRIB2 file based on the current date and cycle hour -def download_rap_grib2(date_str, cycle_hour, forecast_hour, save_path): - base_url = 'https://nomads.ncep.noaa.gov/pub/data/nccf/com/rap/prod' +RAP_BASE_URL = 'https://nomads.ncep.noaa.gov/pub/data/nccf/com/rap/prod' +KELVIN_TO_CELSIUS = -273.15 +METER_TO_FEET = 3.281 +FEET_PER_FLIGHT_LEVEL = 100.0 +METERS_PER_SEC_TO_KNOTS = 1.94384449 + +def get_rap_grib2_stream(date_str: str, cycle_hour: str, forecast_hour: str) -> BufferedReader | None: + """Acquire a stream of RAP GRIB2 data from NOAA + + Args: + date_str (str): Date string in 'YYYYMMDD' format + cycle_hour (str): Cycle hour ('00' or '12') + forecast_hour (str): Forecast hour (e.g., '00', '01', ...) + + Returns: + BufferedReader: Stream of GRIB2 data, or None if download fails + """ file_name = f"rap.t{cycle_hour}z.awp130pgrbf{forecast_hour.zfill(2)}.grib2" - url = f"{base_url}/rap.{date_str}/{file_name}" + url = f"{RAP_BASE_URL}/rap.{date_str}/{file_name}" response = requests.get(url, stream=True) if response.status_code == 200: - with open(save_path, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - print(f"Downloaded: {save_path}") + return(BufferedReader(BytesIO(response.content))) else: print(f"Failed to download file. HTTP {response.status_code}: {url}") + return None -# Function to interpolate U, V wind components and temperature to flight levels and save to CSV files -def interpolate_uv_temp_at_flight_levels(grib_file): - grbs = pygrib.open(grib_file) +def interpolate_uv_temp_at_flight_levels(grib_buffer: BufferedReader): + """ + Interpolate U and V wind components and temperature to flight levels - hgt_grbs = grbs.select(name='Geopotential height', typeOfLevel='isobaricInhPa') - u_grbs = grbs.select(name='U component of wind', typeOfLevel='isobaricInhPa') - v_grbs = grbs.select(name='V component of wind', typeOfLevel='isobaricInhPa') - t_grbs = grbs.select(name='Temperature', typeOfLevel='isobaricInhPa') + :param grib_buffer: BufferedReader stream of GRIB2 data from requests.raw + """ + grbs = pygrib.open(grib_buffer) - hgt_3d = np.array([g.values for g in hgt_grbs]) - u_3d = np.array([g.values for g in u_grbs]) - v_3d = np.array([g.values for g in v_grbs]) - t_3d = np.array([g.values for g in t_grbs]) - 273.15 + geopotential_height = grbs.select(name='Geopotential height', typeOfLevel='isobaricInhPa') + u_component = grbs.select(name='U component of wind', typeOfLevel='isobaricInhPa') + v_component = grbs.select(name='V component of wind', typeOfLevel='isobaricInhPa') + temp_component = grbs.select(name='Temperature', typeOfLevel='isobaricInhPa') - lats, lons = hgt_grbs[0].latlons() + geopotential_height_3d = np.array([g.values for g in geopotential_height]) + u_component_3d = np.array([g.values for g in u_component]) + v_component_3d = np.array([g.values for g in v_component]) + temp_component_3d = np.array([g.values for g in temp_component]) + KELVIN_TO_CELSIUS - hgt_ft = hgt_3d * 3.281 - fl_3d = hgt_ft / 100.0 + lats, lons = geopotential_height[0].latlons() + + geopotential_height_3d_feet = geopotential_height_3d * METER_TO_FEET + flight_level_3d = geopotential_height_3d_feet / FEET_PER_FLIGHT_LEVEL fl_target = np.arange(0, 510, 10) level_dict = {} @@ -51,10 +66,10 @@ def interpolate_uv_temp_at_flight_levels(grib_file): for i in range(lats.shape[0]): for j in range(lats.shape[1]): - profile_fl = fl_3d[:, i, j] - profile_u = u_3d[:, i, j] - profile_v = v_3d[:, i, j] - profile_t = t_3d[:, i, j] + profile_fl = flight_level_3d[:, i, j] + profile_u = u_component_3d[:, i, j] + profile_v = v_component_3d[:, i, j] + profile_t = temp_component_3d[:, i, j] if (np.any(np.isnan(profile_fl)) or np.any(np.isnan(profile_u)) or np.any(np.isnan(profile_v)) or np.any(np.isnan(profile_t))): @@ -68,7 +83,7 @@ def interpolate_uv_temp_at_flight_levels(grib_file): direction = (270 - np.degrees(np.arctan2(v_val, u_val))) % 360 # convert speed from m/s to knots (1 m/s = 1.94384449 kt) - speed_kt = speed * 1.94384449 + speed_kt = speed * METERS_PER_SEC_TO_KNOTS temp_2d[i, j] = int(round(t_val)) speed_2d[i, j] = int(round(speed_kt)) @@ -77,51 +92,45 @@ def interpolate_uv_temp_at_flight_levels(grib_file): level_dict[int(fl)] = {"temp": temp_2d, "spd": speed_2d, "dir": dir_2d} with data_lock: - weather_data["lats"] = lats - weather_data["lons"] = lons - weather_data["levels"] = level_dict + weather_data.lats = lats + weather_data.lons = lons + weather_data.levels = level_dict grbs.close() -# Function to get the current RAP forecast date and time, and build file path info -def get_date(): - now = datetime.now(timezone.utc) - date_str = now.strftime('%Y%m%d') - cycle_hour = '12' if now.hour >= 12 else '00' - forecast_hour = '00' - save_path = 'rap_latest.grib2' - return date_str, cycle_hour, forecast_hour, save_path +def different_hour(new_forecast: WeatherData) -> bool: + """Checks if the provided new forecast is newer than the existing in-memory forecast -# Function to check if new forecast hour differs from previously stored state -def check_state(new_forecast): + Args: + new_forecast (WeatherData): the new forecast data to compare + + Returns: + bool: True if the new forecast is different from the previous forecast hour, False otherwise + """ # read previous forecast from in-memory store with data_lock: - prev_state = weather_data.get('wx_state') - if not prev_state: + prev_state = weather_data + if prev_state.forecast_hour is None: # no previous state -> treat as new return True - return new_forecast != prev_state.get('Forecast Hour') - -# Wrapper function to execute the full workflow conditionally based on forecast state -def run_grid(): - date_str, cycle_hour, forecast_hour, save_path = get_date() - state_check = check_state(forecast_hour) - - if state_check == True: - download_rap_grib2(date_str, cycle_hour, forecast_hour, save_path) - interpolate_uv_temp_at_flight_levels(save_path) - # persist new state in memory (add processed timestamp) - processed_at = datetime.now(timezone.utc).isoformat() - new_state = { - 'String': date_str, - 'Cycle': cycle_hour, - 'Forecast Hour': forecast_hour, - 'Processed At': processed_at - } - with data_lock: - weather_data['wx_state'] = new_state - output = 'Completed' - else: - output = 'Error, forecast is the same as previous forecast' + return new_forecast != prev_state.forecast_hour + +@dataclass +class RAPDateInfo: + date_str: str + cycle_hour: str + forecast_hour: str + +# Function to get the current RAP forecast date and time, and build file path info +def get_date() -> RAPDateInfo: + """Gets the current datetime and formulates the string for use in RAP model fetching + + Returns: + RAPDateInfo: Current forecast date information + """ + now = datetime.now(timezone.utc) + date_str = now.strftime('%Y%m%d') + cycle_hour = '12' if now.hour >= 12 else '00' + forecast_hour = '00' + return RAPDateInfo(date_str, cycle_hour, forecast_hour) - return output From 188b35886485a5a031836e7039502cda01f67fe5 Mon Sep 17 00:00:00 2001 From: Jonah Lefkoff Date: Sun, 25 Jan 2026 21:08:54 -0500 Subject: [PATCH 5/5] add better RAP fetching logic --- app.py | 38 +++++++++------- libs/weather_data_lock.py | 17 ++++--- libs/wind_lib.py | 93 ++++++++++++++++++++++++++++++++++----- 3 files changed, 115 insertions(+), 33 deletions(-) diff --git a/app.py b/app.py index 2562c5d..b7085ba 100644 --- a/app.py +++ b/app.py @@ -5,6 +5,7 @@ from flask import Flask from flask_cors import CORS import threading +import logging from blueprints.edst_bp import edst_blueprint from blueprints.navdata_bp import navdata_blueprint @@ -13,8 +14,13 @@ from blueprints.route_analysis_bp import route_analysis_blueprint import mongo_client -from libs.wind_lib import get_rap_grib2_stream, interpolate_uv_temp_at_flight_levels, get_date -from libs.weather_data_lock import weather_data, data_lock +from libs.wind_lib import get_rap_grib2_stream, interpolate_uv_temp_at_flight_levels, get_latest_rap_forecast +from libs.weather_data_lock import WindGridState, weather_data, data_lock + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) PREFIX = '/api' @@ -48,20 +54,22 @@ def run_updater(): try: # Always run once immediately at startup - date_info = get_date() + date_info = get_latest_rap_forecast() grib_stream = get_rap_grib2_stream(date_info.date_str, date_info.cycle_hour, date_info.forecast_hour) + logging.info(f"Downloaded GRIB2 stream for {date_info.date_str} {date_info.cycle_hour}z f{date_info.forecast_hour}") if grib_stream is None: - print("[Updater] Error, failed to download GRIB2 stream") + logging.error("[Updater] Error, failed to download GRIB2 stream") time.sleep(300) # retry in 5 min if failed continue interpolate_uv_temp_at_flight_levels(grib_stream) with data_lock: - weather_data.process_date = date_info.date_str - weather_data.process_cycle = date_info.cycle_hour - weather_data.forecast_hour = date_info.forecast_hour - weather_data.processed_at_utc = datetime.now(timezone.utc).isoformat() - - print(f"[Updater] Weather data updated {date_info.date_str} {date_info.cycle_hour}z f{date_info.forecast_hour}") + weather_data.wx_state = WindGridState( + process_date=date_info.date_str, + process_cycle=date_info.cycle_hour, + forecast_hour=date_info.forecast_hour, + processed_at_utc=datetime.now(timezone.utc).isoformat() + ) + logging.info(f"[Updater] Weather data updated {date_info.date_str} {date_info.cycle_hour}z f{date_info.forecast_hour}") # schedule next run at the next HH:05 UTC now = datetime.now(timezone.utc) @@ -69,20 +77,18 @@ def run_updater(): if now >= next_run: next_run = next_run + timedelta(hours=1) sleep_seconds = (next_run - now).total_seconds() - print(f"[Updater] Next run scheduled at {next_run.isoformat()} (in {int(sleep_seconds)}s)") + logging.info(f"[Updater] Next run scheduled at {next_run.isoformat()} (in {int(sleep_seconds)}s)") time.sleep(sleep_seconds) except Exception as e: - print("Updater error:", e) + logging.error(f"Updater error: {e}") traceback.print_exc() time.sleep(300) # retry in 5 min if failed if __name__ == '__main__': - # Only start updater in the *main process*, not the reloader - if os.environ.get("WERKZEUG_RUN_MAIN") == "true": - updater_thread = threading.Thread(target=run_updater, daemon=True) - updater_thread.start() + updater_thread = threading.Thread(target=run_updater, daemon=True) + updater_thread.start() app = create_app() app.run(use_reloader=True) \ No newline at end of file diff --git a/libs/weather_data_lock.py b/libs/weather_data_lock.py index 127afb3..5f0c98a 100644 --- a/libs/weather_data_lock.py +++ b/libs/weather_data_lock.py @@ -6,15 +6,18 @@ data_lock = threading.Lock() @dataclass -class WeatherData: +class WindGridState: + process_date: str + process_cycle: str + forecast_hour: str + processed_at_utc: str + +@dataclass +class WindGridData: lats: Optional[Any] = None # expected: 2D numpy array or None lons: Optional[Any] = None # expected: 2D numpy array or None levels: Dict[int, Dict[str, Any]] = field(default_factory=dict) # { FL: {"temp": 2Darray, "dir": 2Darray, "spd": 2Darray} } - process_date: Optional[str] = None # 'YYYYMMDD' - process_cycle: Optional[str] = None # '00' or '12' - forecast_hour: Optional[str] = None # '00', '01', ..., '18' - processed_at_utc: Optional[str] = None # ISO format timestamp - wx_state: Optional[Dict[str, str]] = None # {'String': 'YYYYMMDD', 'Cycle': '00'|'12', 'Forecast Hour': '00'} + wx_state: Optional[WindGridState] = None # global instance used throughout the app -weather_data = WeatherData() \ No newline at end of file +weather_data = WindGridData() \ No newline at end of file diff --git a/libs/wind_lib.py b/libs/wind_lib.py index c84a4b0..6c3918b 100644 --- a/libs/wind_lib.py +++ b/libs/wind_lib.py @@ -3,14 +3,17 @@ import pygrib import requests import numpy as np -from libs.weather_data_lock import weather_data, data_lock, WeatherData -from datetime import datetime, timezone +from libs.weather_data_lock import weather_data, data_lock, WindGridData +from datetime import datetime, timezone, timedelta +from html.parser import HTMLParser +import re RAP_BASE_URL = 'https://nomads.ncep.noaa.gov/pub/data/nccf/com/rap/prod' KELVIN_TO_CELSIUS = -273.15 METER_TO_FEET = 3.281 FEET_PER_FLIGHT_LEVEL = 100.0 METERS_PER_SEC_TO_KNOTS = 1.94384449 +RAP_GRIB_FILE_IDENTIFIER = 'awp130pgrbf' def get_rap_grib2_stream(date_str: str, cycle_hour: str, forecast_hour: str) -> BufferedReader | None: """Acquire a stream of RAP GRIB2 data from NOAA @@ -98,7 +101,7 @@ def interpolate_uv_temp_at_flight_levels(grib_buffer: BufferedReader): grbs.close() -def different_hour(new_forecast: WeatherData) -> bool: +def different_hour(new_forecast: WindGridData) -> bool: """Checks if the provided new forecast is newer than the existing in-memory forecast Args: @@ -121,16 +124,86 @@ class RAPDateInfo: cycle_hour: str forecast_hour: str -# Function to get the current RAP forecast date and time, and build file path info -def get_date() -> RAPDateInfo: - """Gets the current datetime and formulates the string for use in RAP model fetching +class RAP_GribDirectoryFileParser(HTMLParser): + """HTML parser to extract GRIB2 file names from RAP directory listing""" + def __init__(self): + super().__init__() + self.grib_files = [] + + def handle_starttag(self, tag, attrs): + if tag == 'a': + for attr, value in attrs: + if attr == 'href' and value and RAP_GRIB_FILE_IDENTIFIER in value and value.endswith('.grib2'): + self.grib_files.append(value) + + +def _search_for_latest_rap(date_str: str) -> RAPDateInfo | None: + """Query the RAP directory to find the latest available forecast file + + Args: + date_str (str): Date string in 'YYYYMMDD' format + Returns: - RAPDateInfo: Current forecast date information + RAPDateInfo: Latest available forecast info, or None if no files found + """ + url = f"{RAP_BASE_URL}/rap.{date_str}/" + + try: + response = requests.get(url, timeout=10) + if response.status_code != 200: + return None + + # Parse HTML to extract GRIB2 file names + parser = RAP_GribDirectoryFileParser() + parser.feed(response.text) + + if not parser.grib_files: + return None + + # Sort files to get the latest one + # Files are in format: rap.t{cycle_hour}z.awp130pgrbf{forecast_hour}.grib2 + latest_file = sorted(parser.grib_files)[-1] + + # Extract cycle_hour and forecast_hour from filename + # Example: rap.t23z.awp130pgrbf21.grib2 + match = re.match(r'rap\.t(\d{2})z\.awp130pgrbf(\d{2})\.grib2', latest_file) + if match: + cycle_hour = match.group(1) + forecast_hour = match.group(2) + return RAPDateInfo(date_str, cycle_hour, forecast_hour) + + return None + + except Exception as e: + print(f"Error querying RAP directory: {e}") + return None + + +def get_latest_rap_forecast() -> RAPDateInfo: + """Gets the latest available RAP forecast, falling back to previous day if needed + + Queries the RAP server to find the most recent forecast file available, + making the approach robust to delays in forecast publication. + + Returns: + RAPDateInfo: Latest available forecast date information """ now = datetime.now(timezone.utc) date_str = now.strftime('%Y%m%d') - cycle_hour = '12' if now.hour >= 12 else '00' - forecast_hour = '00' - return RAPDateInfo(date_str, cycle_hour, forecast_hour) + + # Try to find latest forecast for today + latest = _search_for_latest_rap(date_str) + if latest: + return latest + + # If not found today, try yesterday + yesterday = now - timedelta(days=1) + date_str_yesterday = yesterday.strftime('%Y%m%d') + latest = _search_for_latest_rap(date_str_yesterday) + if latest: + return latest + + # No forecast files found + raise Exception("No RAP forecast files found for today or yesterday")