diff --git a/app.py b/app.py index 6f89ed2..b7085ba 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,11 @@ +import time +import os +from datetime import datetime, timedelta, timezone +import traceback from flask import Flask from flask_cors import CORS import threading +import logging from blueprints.edst_bp import edst_blueprint from blueprints.navdata_bp import navdata_blueprint @@ -9,6 +14,14 @@ from blueprints.route_analysis_bp import route_analysis_blueprint import mongo_client +from libs.wind_lib import get_rap_grib2_stream, interpolate_uv_temp_at_flight_levels, get_latest_rap_forecast +from libs.weather_data_lock import WindGridState, weather_data, data_lock + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) + PREFIX = '/api' @@ -35,7 +48,47 @@ def _close_mongo_clients(response): mongo_client.close_reader_mongo_client() return response +def run_updater(): + """Run once at startup and then every hour at HH:05 UTC.""" + while True: + try: + # Always run once immediately at startup + + date_info = get_latest_rap_forecast() + grib_stream = get_rap_grib2_stream(date_info.date_str, date_info.cycle_hour, date_info.forecast_hour) + logging.info(f"Downloaded GRIB2 stream for {date_info.date_str} {date_info.cycle_hour}z f{date_info.forecast_hour}") + if grib_stream is None: + logging.error("[Updater] Error, failed to download GRIB2 stream") + time.sleep(300) # retry in 5 min if failed + continue + interpolate_uv_temp_at_flight_levels(grib_stream) + with data_lock: + weather_data.wx_state = WindGridState( + process_date=date_info.date_str, + process_cycle=date_info.cycle_hour, + forecast_hour=date_info.forecast_hour, + processed_at_utc=datetime.now(timezone.utc).isoformat() + ) + logging.info(f"[Updater] Weather data updated {date_info.date_str} {date_info.cycle_hour}z f{date_info.forecast_hour}") + + # schedule next run at the next HH:05 UTC + now = datetime.now(timezone.utc) + next_run = now.replace(minute=5, second=0, microsecond=0) + if now >= next_run: + next_run = next_run + timedelta(hours=1) + sleep_seconds = (next_run - now).total_seconds() + logging.info(f"[Updater] Next run scheduled at {next_run.isoformat()} (in {int(sleep_seconds)}s)") + time.sleep(sleep_seconds) + + except Exception as e: + logging.error(f"Updater error: {e}") + traceback.print_exc() + time.sleep(300) # retry in 5 min if failed + if __name__ == '__main__': + updater_thread = threading.Thread(target=run_updater, daemon=True) + updater_thread.start() + app = create_app() - app.run(use_reloader=True) + app.run(use_reloader=True) \ No newline at end of file diff --git a/blueprints/weather_bp.py b/blueprints/weather_bp.py index 3f0be11..5e649c9 100644 --- a/blueprints/weather_bp.py +++ b/blueprints/weather_bp.py @@ -1,10 +1,11 @@ -import logging import re -from typing import Optional - +import numpy as np import requests from lxml import etree -from flask import Blueprint, jsonify +from flask import Blueprint, jsonify, request +from libs.weather_data_lock import weather_data, data_lock +from dataclasses import dataclass +from typing import List weather_blueprint = Blueprint('weather', __name__) @@ -36,6 +37,68 @@ def _metar(airport): metar_text = response.content.decode('utf-8') return jsonify([metar_text]) +@weather_blueprint.route('/winds') +def winds(): + # query parameters, ignore type warnings due to manual check below + top_lat = float(request.args.get("toplat")) # type: ignore + top_lon = float(request.args.get("toplong")) # type: ignore + bottom_lat = float(request.args.get("bottomlat")) # type: ignore + bottom_lon = float(request.args.get("bottomlong")) # type: ignore + fl = int(request.args.get("fl")) # type: ignore + + if None in [top_lat, top_lon, bottom_lat, bottom_lon, fl]: + return jsonify({"error": "Missing required parameters: toplat, toplong, bottomlat, bottomlong, fl"}), 400 + + with data_lock: + lats = weather_data.lats + lons = weather_data.lons + levels = weather_data.levels + metadata = weather_data.wx_state + + if lats is None or fl not in levels: + return jsonify({"error": "No data loaded"}), 503 + + lat_min, lat_max = sorted([bottom_lat, top_lat]) + lon_min, lon_max = sorted([bottom_lon, top_lon]) + + mask = ((lats >= lat_min) & (lats <= lat_max) & + (lons >= lon_min) & (lons <= lon_max)) + + # find indices inside mask + indices = np.where(mask) + if indices[0].size == 0: + return jsonify({"points": []}) + + # Limit to at most 15x15 samples regardless of zoom + MAX_DIM = 15 + unique_rows = np.unique(indices[0]) + unique_cols = np.unique(indices[1]) + + keep_rows = unique_rows[np.linspace(0, len(unique_rows) - 1, min(len(unique_rows), MAX_DIM), dtype=int)] + keep_cols = unique_cols[np.linspace(0, len(unique_cols) - 1, min(len(unique_cols), MAX_DIM), dtype=int)] + + @dataclass + class WindPoint: + latitude: float + longitude: float + wind_speed_kt: int + wind_direction_deg_true: int + temperature_c: int + + point: List[WindPoint] = [] + for i in keep_rows: + for j in keep_cols: + if not mask[i, j]: + continue + point.append(WindPoint( + latitude=float(round(lats[i, j], 2)), + longitude=float(round(lons[i, j], 2)), + wind_speed_kt=int(levels[fl]["spd"][i, j]), + wind_direction_deg_true=int(levels[fl]["dir"][i, j]), + temperature_c=int(levels[fl]["temp"][i, j]) + )) + + return jsonify({"points": point, "metadata": metadata}) @weather_blueprint.route('/sigmets') def _get_sigmets(): diff --git a/libs/weather_data_lock.py b/libs/weather_data_lock.py new file mode 100644 index 0000000..5f0c98a --- /dev/null +++ b/libs/weather_data_lock.py @@ -0,0 +1,23 @@ +import threading +from dataclasses import dataclass, field +from typing import Optional, Dict, Any + +# thread-safe lock shared by callers +data_lock = threading.Lock() + +@dataclass +class WindGridState: + process_date: str + process_cycle: str + forecast_hour: str + processed_at_utc: str + +@dataclass +class WindGridData: + lats: Optional[Any] = None # expected: 2D numpy array or None + lons: Optional[Any] = None # expected: 2D numpy array or None + levels: Dict[int, Dict[str, Any]] = field(default_factory=dict) # { FL: {"temp": 2Darray, "dir": 2Darray, "spd": 2Darray} } + wx_state: Optional[WindGridState] = None + +# global instance used throughout the app +weather_data = WindGridData() \ No newline at end of file diff --git a/libs/wind_lib.py b/libs/wind_lib.py new file mode 100644 index 0000000..6c3918b --- /dev/null +++ b/libs/wind_lib.py @@ -0,0 +1,209 @@ +from dataclasses import dataclass +from io import BufferedReader, BytesIO +import pygrib +import requests +import numpy as np +from libs.weather_data_lock import weather_data, data_lock, WindGridData +from datetime import datetime, timezone, timedelta +from html.parser import HTMLParser +import re + +RAP_BASE_URL = 'https://nomads.ncep.noaa.gov/pub/data/nccf/com/rap/prod' +KELVIN_TO_CELSIUS = -273.15 +METER_TO_FEET = 3.281 +FEET_PER_FLIGHT_LEVEL = 100.0 +METERS_PER_SEC_TO_KNOTS = 1.94384449 +RAP_GRIB_FILE_IDENTIFIER = 'awp130pgrbf' + +def get_rap_grib2_stream(date_str: str, cycle_hour: str, forecast_hour: str) -> BufferedReader | None: + """Acquire a stream of RAP GRIB2 data from NOAA + + Args: + date_str (str): Date string in 'YYYYMMDD' format + cycle_hour (str): Cycle hour ('00' or '12') + forecast_hour (str): Forecast hour (e.g., '00', '01', ...) + + Returns: + BufferedReader: Stream of GRIB2 data, or None if download fails + """ + file_name = f"rap.t{cycle_hour}z.awp130pgrbf{forecast_hour.zfill(2)}.grib2" + url = f"{RAP_BASE_URL}/rap.{date_str}/{file_name}" + + response = requests.get(url, stream=True) + if response.status_code == 200: + return(BufferedReader(BytesIO(response.content))) + else: + print(f"Failed to download file. HTTP {response.status_code}: {url}") + return None + +def interpolate_uv_temp_at_flight_levels(grib_buffer: BufferedReader): + """ + Interpolate U and V wind components and temperature to flight levels + + :param grib_buffer: BufferedReader stream of GRIB2 data from requests.raw + """ + grbs = pygrib.open(grib_buffer) + + geopotential_height = grbs.select(name='Geopotential height', typeOfLevel='isobaricInhPa') + u_component = grbs.select(name='U component of wind', typeOfLevel='isobaricInhPa') + v_component = grbs.select(name='V component of wind', typeOfLevel='isobaricInhPa') + temp_component = grbs.select(name='Temperature', typeOfLevel='isobaricInhPa') + + geopotential_height_3d = np.array([g.values for g in geopotential_height]) + u_component_3d = np.array([g.values for g in u_component]) + v_component_3d = np.array([g.values for g in v_component]) + temp_component_3d = np.array([g.values for g in temp_component]) + KELVIN_TO_CELSIUS + + lats, lons = geopotential_height[0].latlons() + + geopotential_height_3d_feet = geopotential_height_3d * METER_TO_FEET + flight_level_3d = geopotential_height_3d_feet / FEET_PER_FLIGHT_LEVEL + + fl_target = np.arange(0, 510, 10) + level_dict = {} + + for fl in fl_target: + temp_2d = np.full(lats.shape, np.nan) + speed_2d = np.full(lats.shape, np.nan) + dir_2d = np.full(lats.shape, np.nan) + + for i in range(lats.shape[0]): + for j in range(lats.shape[1]): + profile_fl = flight_level_3d[:, i, j] + profile_u = u_component_3d[:, i, j] + profile_v = v_component_3d[:, i, j] + profile_t = temp_component_3d[:, i, j] + + if (np.any(np.isnan(profile_fl)) or np.any(np.isnan(profile_u)) or + np.any(np.isnan(profile_v)) or np.any(np.isnan(profile_t))): + continue + + u_val = np.interp(fl, profile_fl[::-1], profile_u[::-1]) + v_val = np.interp(fl, profile_fl[::-1], profile_v[::-1]) + t_val = np.interp(fl, profile_fl[::-1], profile_t[::-1]) + + speed = np.sqrt(u_val**2 + v_val**2) + direction = (270 - np.degrees(np.arctan2(v_val, u_val))) % 360 + + # convert speed from m/s to knots (1 m/s = 1.94384449 kt) + speed_kt = speed * METERS_PER_SEC_TO_KNOTS + + temp_2d[i, j] = int(round(t_val)) + speed_2d[i, j] = int(round(speed_kt)) + dir_2d[i, j] = int(round(direction)) + + level_dict[int(fl)] = {"temp": temp_2d, "spd": speed_2d, "dir": dir_2d} + + with data_lock: + weather_data.lats = lats + weather_data.lons = lons + weather_data.levels = level_dict + + grbs.close() + +def different_hour(new_forecast: WindGridData) -> bool: + """Checks if the provided new forecast is newer than the existing in-memory forecast + + Args: + new_forecast (WeatherData): the new forecast data to compare + + Returns: + bool: True if the new forecast is different from the previous forecast hour, False otherwise + """ + # read previous forecast from in-memory store + with data_lock: + prev_state = weather_data + if prev_state.forecast_hour is None: + # no previous state -> treat as new + return True + return new_forecast != prev_state.forecast_hour + +@dataclass +class RAPDateInfo: + date_str: str + cycle_hour: str + forecast_hour: str + + +class RAP_GribDirectoryFileParser(HTMLParser): + """HTML parser to extract GRIB2 file names from RAP directory listing""" + def __init__(self): + super().__init__() + self.grib_files = [] + + def handle_starttag(self, tag, attrs): + if tag == 'a': + for attr, value in attrs: + if attr == 'href' and value and RAP_GRIB_FILE_IDENTIFIER in value and value.endswith('.grib2'): + self.grib_files.append(value) + + +def _search_for_latest_rap(date_str: str) -> RAPDateInfo | None: + """Query the RAP directory to find the latest available forecast file + + Args: + date_str (str): Date string in 'YYYYMMDD' format + + Returns: + RAPDateInfo: Latest available forecast info, or None if no files found + """ + url = f"{RAP_BASE_URL}/rap.{date_str}/" + + try: + response = requests.get(url, timeout=10) + if response.status_code != 200: + return None + + # Parse HTML to extract GRIB2 file names + parser = RAP_GribDirectoryFileParser() + parser.feed(response.text) + + if not parser.grib_files: + return None + + # Sort files to get the latest one + # Files are in format: rap.t{cycle_hour}z.awp130pgrbf{forecast_hour}.grib2 + latest_file = sorted(parser.grib_files)[-1] + + # Extract cycle_hour and forecast_hour from filename + # Example: rap.t23z.awp130pgrbf21.grib2 + match = re.match(r'rap\.t(\d{2})z\.awp130pgrbf(\d{2})\.grib2', latest_file) + if match: + cycle_hour = match.group(1) + forecast_hour = match.group(2) + return RAPDateInfo(date_str, cycle_hour, forecast_hour) + + return None + + except Exception as e: + print(f"Error querying RAP directory: {e}") + return None + + +def get_latest_rap_forecast() -> RAPDateInfo: + """Gets the latest available RAP forecast, falling back to previous day if needed + + Queries the RAP server to find the most recent forecast file available, + making the approach robust to delays in forecast publication. + + Returns: + RAPDateInfo: Latest available forecast date information + """ + now = datetime.now(timezone.utc) + date_str = now.strftime('%Y%m%d') + + # Try to find latest forecast for today + latest = _search_for_latest_rap(date_str) + if latest: + return latest + + # If not found today, try yesterday + yesterday = now - timedelta(days=1) + date_str_yesterday = yesterday.strftime('%Y%m%d') + latest = _search_for_latest_rap(date_str_yesterday) + if latest: + return latest + + # No forecast files found + raise Exception("No RAP forecast files found for today or yesterday") + diff --git a/requirements.txt b/requirements.txt index b2147a8..3e2b484 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,7 +14,11 @@ itsdangerous==2.2.0 Jinja2==3.1.6 lxml==5.3.2 MarkupSafe==3.0.2 +numpy==2.3.2 +packaging==25.0 +pygrib==2.1.6 pymongo==4.11.3 +pyproj==3.7.2 requests==2.32.3 urllib3==2.3.0 Werkzeug==3.1.3