Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import time
import os
from datetime import datetime, timedelta, timezone
import traceback
from flask import Flask
from flask_cors import CORS
import threading
import logging

from blueprints.edst_bp import edst_blueprint
from blueprints.navdata_bp import navdata_blueprint
Expand All @@ -9,6 +14,14 @@
from blueprints.route_analysis_bp import route_analysis_blueprint
import mongo_client

from libs.wind_lib import get_rap_grib2_stream, interpolate_uv_temp_at_flight_levels, get_latest_rap_forecast
from libs.weather_data_lock import WindGridState, weather_data, data_lock

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

PREFIX = '/api'


Expand All @@ -35,7 +48,47 @@ def _close_mongo_clients(response):
mongo_client.close_reader_mongo_client()
return response

def run_updater():
"""Run once at startup and then every hour at HH:05 UTC."""
while True:
try:
# Always run once immediately at startup

date_info = get_latest_rap_forecast()
grib_stream = get_rap_grib2_stream(date_info.date_str, date_info.cycle_hour, date_info.forecast_hour)
logging.info(f"Downloaded GRIB2 stream for {date_info.date_str} {date_info.cycle_hour}z f{date_info.forecast_hour}")
if grib_stream is None:
logging.error("[Updater] Error, failed to download GRIB2 stream")
time.sleep(300) # retry in 5 min if failed
continue
interpolate_uv_temp_at_flight_levels(grib_stream)
with data_lock:
weather_data.wx_state = WindGridState(
process_date=date_info.date_str,
process_cycle=date_info.cycle_hour,
forecast_hour=date_info.forecast_hour,
processed_at_utc=datetime.now(timezone.utc).isoformat()
)
logging.info(f"[Updater] Weather data updated {date_info.date_str} {date_info.cycle_hour}z f{date_info.forecast_hour}")

# schedule next run at the next HH:05 UTC
now = datetime.now(timezone.utc)
next_run = now.replace(minute=5, second=0, microsecond=0)
if now >= next_run:
next_run = next_run + timedelta(hours=1)
sleep_seconds = (next_run - now).total_seconds()
logging.info(f"[Updater] Next run scheduled at {next_run.isoformat()} (in {int(sleep_seconds)}s)")
time.sleep(sleep_seconds)

except Exception as e:
logging.error(f"Updater error: {e}")
traceback.print_exc()
time.sleep(300) # retry in 5 min if failed


if __name__ == '__main__':
updater_thread = threading.Thread(target=run_updater, daemon=True)
updater_thread.start()

app = create_app()
app.run(use_reloader=True)
app.run(use_reloader=True)
71 changes: 67 additions & 4 deletions blueprints/weather_bp.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
import re
from typing import Optional

import numpy as np
import requests
from lxml import etree
from flask import Blueprint, jsonify
from flask import Blueprint, jsonify, request
from libs.weather_data_lock import weather_data, data_lock
from dataclasses import dataclass
from typing import List

weather_blueprint = Blueprint('weather', __name__)

Expand Down Expand Up @@ -36,6 +37,68 @@ def _metar(airport):
metar_text = response.content.decode('utf-8')
return jsonify([metar_text])

@weather_blueprint.route('/winds')
def winds():
# query parameters, ignore type warnings due to manual check below
top_lat = float(request.args.get("toplat")) # type: ignore
top_lon = float(request.args.get("toplong")) # type: ignore
bottom_lat = float(request.args.get("bottomlat")) # type: ignore
bottom_lon = float(request.args.get("bottomlong")) # type: ignore
fl = int(request.args.get("fl")) # type: ignore

if None in [top_lat, top_lon, bottom_lat, bottom_lon, fl]:
return jsonify({"error": "Missing required parameters: toplat, toplong, bottomlat, bottomlong, fl"}), 400

with data_lock:
lats = weather_data.lats
lons = weather_data.lons
levels = weather_data.levels
metadata = weather_data.wx_state

if lats is None or fl not in levels:
return jsonify({"error": "No data loaded"}), 503

lat_min, lat_max = sorted([bottom_lat, top_lat])
lon_min, lon_max = sorted([bottom_lon, top_lon])

mask = ((lats >= lat_min) & (lats <= lat_max) &
(lons >= lon_min) & (lons <= lon_max))

# find indices inside mask
indices = np.where(mask)
if indices[0].size == 0:
return jsonify({"points": []})

# Limit to at most 15x15 samples regardless of zoom
MAX_DIM = 15
unique_rows = np.unique(indices[0])
unique_cols = np.unique(indices[1])

keep_rows = unique_rows[np.linspace(0, len(unique_rows) - 1, min(len(unique_rows), MAX_DIM), dtype=int)]
keep_cols = unique_cols[np.linspace(0, len(unique_cols) - 1, min(len(unique_cols), MAX_DIM), dtype=int)]

@dataclass
class WindPoint:
latitude: float
longitude: float
wind_speed_kt: int
wind_direction_deg_true: int
temperature_c: int

point: List[WindPoint] = []
for i in keep_rows:
for j in keep_cols:
if not mask[i, j]:
continue
point.append(WindPoint(
latitude=float(round(lats[i, j], 2)),
longitude=float(round(lons[i, j], 2)),
wind_speed_kt=int(levels[fl]["spd"][i, j]),
wind_direction_deg_true=int(levels[fl]["dir"][i, j]),
temperature_c=int(levels[fl]["temp"][i, j])
))

return jsonify({"points": point, "metadata": metadata})

@weather_blueprint.route('/sigmets')
def _get_sigmets():
Expand Down
23 changes: 23 additions & 0 deletions libs/weather_data_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import threading
from dataclasses import dataclass, field
from typing import Optional, Dict, Any

# thread-safe lock shared by callers
data_lock = threading.Lock()

@dataclass
class WindGridState:
process_date: str
process_cycle: str
forecast_hour: str
processed_at_utc: str

@dataclass
class WindGridData:
lats: Optional[Any] = None # expected: 2D numpy array or None
lons: Optional[Any] = None # expected: 2D numpy array or None
levels: Dict[int, Dict[str, Any]] = field(default_factory=dict) # { FL: {"temp": 2Darray, "dir": 2Darray, "spd": 2Darray} }
wx_state: Optional[WindGridState] = None

# global instance used throughout the app
weather_data = WindGridData()
209 changes: 209 additions & 0 deletions libs/wind_lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
from dataclasses import dataclass
from io import BufferedReader, BytesIO
import pygrib
import requests
import numpy as np
from libs.weather_data_lock import weather_data, data_lock, WindGridData
from datetime import datetime, timezone, timedelta
from html.parser import HTMLParser
import re

RAP_BASE_URL = 'https://nomads.ncep.noaa.gov/pub/data/nccf/com/rap/prod'
KELVIN_TO_CELSIUS = -273.15
METER_TO_FEET = 3.281
FEET_PER_FLIGHT_LEVEL = 100.0
METERS_PER_SEC_TO_KNOTS = 1.94384449
RAP_GRIB_FILE_IDENTIFIER = 'awp130pgrbf'

def get_rap_grib2_stream(date_str: str, cycle_hour: str, forecast_hour: str) -> BufferedReader | None:
"""Acquire a stream of RAP GRIB2 data from NOAA

Args:
date_str (str): Date string in 'YYYYMMDD' format
cycle_hour (str): Cycle hour ('00' or '12')
forecast_hour (str): Forecast hour (e.g., '00', '01', ...)

Returns:
BufferedReader: Stream of GRIB2 data, or None if download fails
"""
file_name = f"rap.t{cycle_hour}z.awp130pgrbf{forecast_hour.zfill(2)}.grib2"
url = f"{RAP_BASE_URL}/rap.{date_str}/{file_name}"

response = requests.get(url, stream=True)
if response.status_code == 200:
return(BufferedReader(BytesIO(response.content)))
else:
print(f"Failed to download file. HTTP {response.status_code}: {url}")
return None

def interpolate_uv_temp_at_flight_levels(grib_buffer: BufferedReader):
"""
Interpolate U and V wind components and temperature to flight levels

:param grib_buffer: BufferedReader stream of GRIB2 data from requests.raw
"""
grbs = pygrib.open(grib_buffer)

geopotential_height = grbs.select(name='Geopotential height', typeOfLevel='isobaricInhPa')
u_component = grbs.select(name='U component of wind', typeOfLevel='isobaricInhPa')
v_component = grbs.select(name='V component of wind', typeOfLevel='isobaricInhPa')
temp_component = grbs.select(name='Temperature', typeOfLevel='isobaricInhPa')

geopotential_height_3d = np.array([g.values for g in geopotential_height])
u_component_3d = np.array([g.values for g in u_component])
v_component_3d = np.array([g.values for g in v_component])
temp_component_3d = np.array([g.values for g in temp_component]) + KELVIN_TO_CELSIUS

lats, lons = geopotential_height[0].latlons()

geopotential_height_3d_feet = geopotential_height_3d * METER_TO_FEET
flight_level_3d = geopotential_height_3d_feet / FEET_PER_FLIGHT_LEVEL

fl_target = np.arange(0, 510, 10)
level_dict = {}

for fl in fl_target:
temp_2d = np.full(lats.shape, np.nan)
speed_2d = np.full(lats.shape, np.nan)
dir_2d = np.full(lats.shape, np.nan)

for i in range(lats.shape[0]):
for j in range(lats.shape[1]):
profile_fl = flight_level_3d[:, i, j]
profile_u = u_component_3d[:, i, j]
profile_v = v_component_3d[:, i, j]
profile_t = temp_component_3d[:, i, j]

if (np.any(np.isnan(profile_fl)) or np.any(np.isnan(profile_u)) or
np.any(np.isnan(profile_v)) or np.any(np.isnan(profile_t))):
continue

u_val = np.interp(fl, profile_fl[::-1], profile_u[::-1])
v_val = np.interp(fl, profile_fl[::-1], profile_v[::-1])
t_val = np.interp(fl, profile_fl[::-1], profile_t[::-1])

speed = np.sqrt(u_val**2 + v_val**2)
direction = (270 - np.degrees(np.arctan2(v_val, u_val))) % 360

# convert speed from m/s to knots (1 m/s = 1.94384449 kt)
speed_kt = speed * METERS_PER_SEC_TO_KNOTS

temp_2d[i, j] = int(round(t_val))
speed_2d[i, j] = int(round(speed_kt))
dir_2d[i, j] = int(round(direction))

level_dict[int(fl)] = {"temp": temp_2d, "spd": speed_2d, "dir": dir_2d}

with data_lock:
weather_data.lats = lats
weather_data.lons = lons
weather_data.levels = level_dict

grbs.close()

def different_hour(new_forecast: WindGridData) -> bool:
"""Checks if the provided new forecast is newer than the existing in-memory forecast

Args:
new_forecast (WeatherData): the new forecast data to compare

Returns:
bool: True if the new forecast is different from the previous forecast hour, False otherwise
"""
# read previous forecast from in-memory store
with data_lock:
prev_state = weather_data
if prev_state.forecast_hour is None:
# no previous state -> treat as new
return True
return new_forecast != prev_state.forecast_hour

@dataclass
class RAPDateInfo:
date_str: str
cycle_hour: str
forecast_hour: str


class RAP_GribDirectoryFileParser(HTMLParser):
"""HTML parser to extract GRIB2 file names from RAP directory listing"""
def __init__(self):
super().__init__()
self.grib_files = []

def handle_starttag(self, tag, attrs):
if tag == 'a':
for attr, value in attrs:
if attr == 'href' and value and RAP_GRIB_FILE_IDENTIFIER in value and value.endswith('.grib2'):
self.grib_files.append(value)


def _search_for_latest_rap(date_str: str) -> RAPDateInfo | None:
"""Query the RAP directory to find the latest available forecast file

Args:
date_str (str): Date string in 'YYYYMMDD' format

Returns:
RAPDateInfo: Latest available forecast info, or None if no files found
"""
url = f"{RAP_BASE_URL}/rap.{date_str}/"

try:
response = requests.get(url, timeout=10)
if response.status_code != 200:
return None

# Parse HTML to extract GRIB2 file names
parser = RAP_GribDirectoryFileParser()
parser.feed(response.text)

if not parser.grib_files:
return None

# Sort files to get the latest one
# Files are in format: rap.t{cycle_hour}z.awp130pgrbf{forecast_hour}.grib2
latest_file = sorted(parser.grib_files)[-1]

# Extract cycle_hour and forecast_hour from filename
# Example: rap.t23z.awp130pgrbf21.grib2
match = re.match(r'rap\.t(\d{2})z\.awp130pgrbf(\d{2})\.grib2', latest_file)
if match:
cycle_hour = match.group(1)
forecast_hour = match.group(2)
return RAPDateInfo(date_str, cycle_hour, forecast_hour)

return None

except Exception as e:
print(f"Error querying RAP directory: {e}")
return None


def get_latest_rap_forecast() -> RAPDateInfo:
"""Gets the latest available RAP forecast, falling back to previous day if needed

Queries the RAP server to find the most recent forecast file available,
making the approach robust to delays in forecast publication.

Returns:
RAPDateInfo: Latest available forecast date information
"""
now = datetime.now(timezone.utc)
date_str = now.strftime('%Y%m%d')

# Try to find latest forecast for today
latest = _search_for_latest_rap(date_str)
if latest:
return latest

# If not found today, try yesterday
yesterday = now - timedelta(days=1)
date_str_yesterday = yesterday.strftime('%Y%m%d')
latest = _search_for_latest_rap(date_str_yesterday)
if latest:
return latest

# No forecast files found
raise Exception("No RAP forecast files found for today or yesterday")

Loading