diff --git a/Project.toml b/Project.toml index e33fb9936..ab7f75323 100644 --- a/Project.toml +++ b/Project.toml @@ -58,9 +58,9 @@ Breeze = "0.4" CDSAPI = "2.2.1" CFTime = "0.1, 0.2" ClimaSeaIce = "0.4.4, 0.5" -CubedSphere = "0.3.4" CondaPkg = "0.2.33" CopernicusMarine = "0.1.1" +CubedSphere = "0.3.4" DataDeps = "0.7" Dates = "<0.0.1, 1" Distances = "0.10" diff --git a/examples/ERA5_bounded_pressure_level_data.jl b/examples/ERA5_bounded_pressure_level_data.jl new file mode 100644 index 000000000..767773d6c --- /dev/null +++ b/examples/ERA5_bounded_pressure_level_data.jl @@ -0,0 +1,120 @@ +# Download 3-D snapshots of the atmospheric state from ERA5, e.g., to be used with FieldTimeSeries +# +# ## Install dependencies +# +# ```julia +# using Pkg +# pkg"add NumericalEarth CDSAPI" +# ``` +# +# You also need CDS API credentials in `~/.cdsapirc`. +# See for setup instructions. +# +# See for +# more information about pressure-level data + +using NumericalEarth +using NumericalEarth.DataWrangling: Metadata, BoundingBox, download_dataset +using NumericalEarth.DataWrangling.ERA5 +using CDSAPI +using Dates +using Oceananigans + +using Statistics +using CairoMakie + +selected_levels = filter(≥(250hPa), ERA5_all_pressure_levels) # select all levels below 250 hPa +dataset = ERA5HourlyPressureLevels(selected_levels) + +dates = DateTime(2004, 12, 16):Hour(1):DateTime(2005, 01, 09) # van Zanten et al 2011 study period + +# Rauber et al 2007, Fig 1: focus region +region = BoundingBox(latitude=(17, 18.5), longitude=(-62.5, -61)) + +# OPTIONAL: download all variables at once (fewer CDS API requests) +variables = [:geopotential, # for calculating zlevels as the mean geopotential height + :eastward_velocity, + :northward_velocity, + :temperature, + :specific_humidity] +download_dataset(variables, dataset, dates; region) + +# ## Create time series + +T_meta = Metadata(:temperature; dataset, dates, region) +q_meta = Metadata(:specific_humidity; dataset, dates, region) +u_meta = Metadata(:eastward_velocity; dataset, dates, region) +v_meta = Metadata(:northward_velocity; dataset, dates, region) + +T_series = FieldTimeSeries(T_meta; time_indices_in_memory=2) +q_series = FieldTimeSeries(q_meta; time_indices_in_memory=2) +u_series = FieldTimeSeries(u_meta; time_indices_in_memory=2) +v_series = FieldTimeSeries(v_meta; time_indices_in_memory=2) + +# ## Vertical profiles (mean ± spread) + +# Height coordinate (m) from the grid +z = znodes(T_series[1]) +Nz = length(z) +Nt = length(T_series.times) + +# Pressure at each level (hPa), ordered bottom-to-top (k=1 ⇒ highest pressure) +p_levels = sort(selected_levels, rev=true) ./ hPa # Pa → hPa + +# Horizontal-mean profile for each snapshot → (Nz, Nt) arrays +function horizontal_mean_profiles(series) + profiles = zeros(Nz, Nt) + for n in 1:Nt + data = interior(series[n], :, :, :) + profiles[:, n] = mean(data, dims=(1, 2)) + end + return profiles +end + +T_profiles = horizontal_mean_profiles(T_series) +q_profiles = horizontal_mean_profiles(q_series) +u_profiles = horizontal_mean_profiles(u_series) +v_profiles = horizontal_mean_profiles(v_series) + +# Convert T → potential temperature: θ = T (p₀/p)^(R/cₚ) +Rₐ_over_cₚ = 0.286 +θ_profiles = T_profiles .* (1000 ./ p_levels) .^ Rₐ_over_cₚ + +# Convert specific humidity from kg/kg → g/kg +q_profiles .*= 1000 + +# ## Plot + +fig = Figure(size=(900, 500), fontsize=12) + +ax_θ = Axis(fig[1, 1], xlabel="θ [K]", ylabel="Height [m]") +ax_q = Axis(fig[1, 2], xlabel="qᵥ [g kg⁻¹]", ylabel="Height [m]") +ax_u = Axis(fig[1, 3], xlabel="u [m s⁻¹]", ylabel="Height [m]") +ax_v = Axis(fig[1, 4], xlabel="v [m s⁻¹]", ylabel="Height [m]") + +for (ax, profiles) in [(ax_θ, θ_profiles), + (ax_q, q_profiles), + (ax_u, u_profiles), + (ax_v, v_profiles)] + μ = vec(mean(profiles, dims=2)) + #lo = vec(minimum(profiles, dims=2)) + #hi = vec(maximum(profiles, dims=2)) + lo = [quantile(r, 0.25) for r in eachrow(profiles)] + hi = [quantile(r, 0.75) for r in eachrow(profiles)] + band!(ax, z, lo, hi; direction=:y, color=(:gray, 0.4)) + lines!(ax, μ, z; color=:black, linewidth=2) +end + +xlims!(ax_θ, 293, 317) +xlims!(ax_q, 0, 20.4) +xlims!(ax_u, -10, 0) +xlims!(ax_v, -10, 0) + +linkyaxes!(ax_θ, ax_q, ax_u, ax_v) +ylims!(ax_θ, 0, 4000) + +hideydecorations!(ax_q) +hideydecorations!(ax_u) +hideydecorations!(ax_v) + +save("ERA5_pressure_level_profiles.png", fig; px_per_unit=4) diff --git a/examples/ERA5_bounded_surface_data.jl b/examples/ERA5_bounded_surface_data.jl new file mode 100644 index 000000000..0eed0d70e --- /dev/null +++ b/examples/ERA5_bounded_surface_data.jl @@ -0,0 +1,92 @@ +# Download 3-D snapshots of the atmospheric state from ERA5, e.g., to be used with FieldTimeSeries +# +# ## Install dependencies +# +# ```julia +# using Pkg +# pkg"add NumericalEarth CDSAPI" +# ``` +# +# You also need CDS API credentials in `~/.cdsapirc`. +# See for setup instructions. +# +# See for +# more information about single-level data + +using NumericalEarth.DataWrangling: Metadata, BoundingBox, download_dataset +using NumericalEarth.DataWrangling.ERA5 +using CDSAPI +using Dates +using Oceananigans + +using Statistics: mean +using CairoMakie +#precip_cmap = :rain +precip_cmap = cgrad([:indigo, :darkblue, :blue, :deepskyblue, :cyan, + :palegreen, :green, :yellow, :orange, :red, :pink]) + + +dataset = ERA5HourlySingleLevel() + +#dates = DateTime(2004, 12, 16):Hour(1):DateTime(2005, 01, 09) # van Zanten et al 2011 study period +dates = DateTime(2004, 12, 16):Hour(1):DateTime(2004, 12, 23) # shorter time range for demo + +# Rauber et al 2007, Fig 1: precip map +region = BoundingBox(latitude=(-25, 35), longitude=(-110, 30)) + +# OPTIONAL: download all variables at once (fewer CDS API requests) +# variables = [:total_precipitation, +# :sea_surface_temperature, +# :temperature, # at 2 m +# :dewpoint_temperature, # at 2 m +# :surface_pressure] +# download_dataset(variables, dataset, dates; region) + +# ## Create time series + +precip_meta = Metadata(:total_precipitation; dataset, dates, region) +precip_series = FieldTimeSeries(precip_meta) + +# ## Plotting + +Nt = length(precip_series.times) +λ, φ, _ = nodes(precip_series[1]) + +# ERA5 total_precipitation is in metres per hour; convert to mm/day +to_mm_day = 1000 * 24 + +# ### Time-averaged precipitation map + +precip_avg = mean(interior(precip_series[n], :, :, 1) for n in 1:Nt) .* to_mm_day + +fig1 = Figure(size=(900, 400)) +ax1 = Axis(fig1[1, 1], + title = "Mean precipitation (2004-12-16 to 2005-01-08)", + xlabel = "Longitude (°)", + ylabel = "Latitude (°)") +ax1.xticks = -90:30:30 +hm = heatmap!(ax1, λ, φ, precip_avg; colormap=precip_cmap, colorrange=(0, 12)) +Colorbar(fig1[1, 2], hm, label="Precipitation (mm/day)") +save("ERA5_mean_precipitation.png", fig1) + +# ### Precipitation animation + +fig2 = Figure(size=(900, 400)) + +n = Observable(1) +precip_n = @lift interior(precip_series[$n], :, :, 1) .* to_mm_day +anim_title = @lift "Precipitation (mm/day), " * string(first(dates) + Second(round(Int, precip_series.times[$n]))) + +ax2 = Axis(fig2[1, 1], + title = anim_title, + xlabel = "Longitude (°)", + ylabel = "Latitude (°)") +ax2.xticks = -90:30:30 + +hm2 = heatmap!(ax2, λ, φ, precip_n; colormap=precip_cmap, colorrange=(0, 12)) +Colorbar(fig2[1, 2], hm2, label="Precipitation (mm/day)") + +record(fig2, "ERA5_precipitation.mp4", 1:Nt; framerate=12) do nn + n[] = nn +end + diff --git a/examples/ERA5_winds_and_stokes_drift.jl b/examples/ERA5_winds_and_stokes_drift.jl index 7a2a4eb97..2ccfedecc 100644 --- a/examples/ERA5_winds_and_stokes_drift.jl +++ b/examples/ERA5_winds_and_stokes_drift.jl @@ -16,7 +16,7 @@ using NumericalEarth using NumericalEarth.DataWrangling: Metadatum -using NumericalEarth.DataWrangling.ERA5: ERA5Hourly +using NumericalEarth.DataWrangling.ERA5: ERA5HourlySingleLevel using CDSAPI using Oceananigans @@ -29,7 +29,7 @@ using Dates # while ocean wave variables (Stokes drift) live on a 0.5° grid (720×361). # We define metadata for each variable at a single date. -dataset = ERA5Hourly() +dataset = ERA5HourlySingleLevel() date = DateTime(2020, 1, 15, 12) # January 15, 2020 at 12:00 UTC u_stokes_meta = Metadatum(:eastward_stokes_drift; dataset, date) @@ -42,7 +42,7 @@ v_wind_meta = Metadatum(:northward_velocity; dataset, date) # We build a single `LatitudeLongitudeGrid` and use `set!` to download # and interpolate all four variables onto it. -grid = LatitudeLongitudeGrid(size = (1440, 721, 1), +grid = LatitudeLongitudeGrid(size = (1440, 720, 1), longitude = (0, 360), latitude = (-90, 90), z = (0, 1)) diff --git a/ext/NumericalEarthCDSAPIExt.jl b/ext/NumericalEarthCDSAPIExt.jl index 8232e6c77..9b979a491 100644 --- a/ext/NumericalEarthCDSAPIExt.jl +++ b/ext/NumericalEarthCDSAPIExt.jl @@ -2,30 +2,92 @@ module NumericalEarthCDSAPIExt using NumericalEarth using CDSAPI +using NCDatasets using Oceananigans using Oceananigans.DistributedComputations: @root using Dates -using NumericalEarth.DataWrangling.ERA5: ERA5Metadata, ERA5Metadatum, ERA5_dataset_variable_names +using NumericalEarth.DataWrangling.ERA5: ERA5Dataset, ERA5Metadata, ERA5Metadatum, + ERA5_dataset_variable_names, ERA5_netcdf_variable_names +using NumericalEarth.DataWrangling.ERA5: ERA5PressureLevelsDataset, + ERA5PressureMetadata, ERA5PressureMetadatum, + ERA5PL_dataset_variable_names, ERA5PL_netcdf_variable_names import NumericalEarth.DataWrangling: download_dataset +##### +##### Dispatch helpers — encapsulate single-level vs pressure-level differences +##### + +cds_product(::ERA5Dataset) = "reanalysis-era5-single-levels" +cds_product(::ERA5PressureLevelsDataset) = "reanalysis-era5-pressure-levels" + +cds_varnames(::ERA5Dataset) = ERA5_dataset_variable_names +cds_varnames(::ERA5PressureLevelsDataset) = ERA5PL_dataset_variable_names + +nc_varnames(::ERA5Dataset) = ERA5_netcdf_variable_names +nc_varnames(::ERA5PressureLevelsDataset) = ERA5PL_netcdf_variable_names + +# Coordinate / dimension variables to propagate into each split file +const ERA5_COORD_VARS = Set(["longitude", "latitude", + "time", "valid_time", + "expver", "number"]) + +const ERA5PL_COORD_VARS = Set(["longitude", "latitude", + "pressure_level", "level", + "time", "valid_time", + "expver", "number"]) + +coord_vars(::ERA5Dataset) = ERA5_COORD_VARS +coord_vars(::ERA5PressureLevelsDataset) = ERA5PL_COORD_VARS + +extra_request_keys!(request, ::ERA5Dataset) = nothing +function extra_request_keys!(request, ds::ERA5PressureLevelsDataset) + p_hPa = [round(Int, p * 1e-2) for p in ds.pressure_levels] + request["pressure_level"] = [string(p) for p in p_hPa] +end + +##### +##### ZIP detection — CDS returns a ZIP when mixing step types (inst/accum/avg) +##### + +const ZIP_MAGIC = UInt8[0x50, 0x4b, 0x03, 0x04] + +function is_zip(path) + open(path, "r") do io + magic = read(io, 4) + return length(magic) >= 4 && magic == ZIP_MAGIC + end +end + """ - download_dataset(metadata::ERA5Metadata; kwargs...) + foreach_nc(f, download_path, cleanup_dir) -Download ERA5 data for each date in the metadata, returning paths to downloaded files. +If `download_path` is a ZIP archive (as CDS returns when mixing variable step types), +extract all NetCDF files and call `f(nc_path)` on each. Otherwise call `f` directly +on `download_path`. """ -function download_dataset(metadata::ERA5Metadata; kwargs...) - paths = Array{String}(undef, length(metadata)) - for (m, metadatum) in enumerate(metadata) - paths[m] = download_dataset(metadatum; kwargs...) +function foreach_nc(f, download_path, cleanup_dir) + if is_zip(download_path) + tmp_dir = mktempdir(cleanup_dir) + run(`unzip -qo $download_path -d $tmp_dir`) + nc_files = filter(p -> endswith(p, ".nc"), readdir(tmp_dir; join=true)) + for nc_file in nc_files + f(nc_file) + end + rm(tmp_dir; recursive=true, force=true) + else + f(download_path) end - return paths end +##### +##### Single-date download +##### + """ - download_dataset(meta::ERA5Metadatum; skip_existing=true, kwargs...) + download_dataset(meta::ERA5Metadatum; skip_existing=true) Download ERA5 data for a single date/time using the CDSAPI package. @@ -41,33 +103,169 @@ Before downloading, you must: See https://cds.climate.copernicus.eu/how-to-api for details. """ function download_dataset(meta::ERA5Metadatum; skip_existing=true) + output_path = NumericalEarth.DataWrangling.metadata_path(meta) + + # Skip download if file already exists + skip_existing && isfile(output_path) && return output_path + + mkpath(dirname(output_path)) + + date = meta.dates + request = Dict( + "product_type" => ["reanalysis"], + "variable" => [cds_varnames(meta.dataset)[meta.name]], + "year" => [string(Dates.year(date))], + "month" => [lpad(string(Dates.month(date)), 2, '0')], + "day" => [lpad(string(Dates.day(date)), 2, '0')], + "time" => [lpad(string(Dates.hour(date)), 2, '0') * ":00"], + "data_format" => "netcdf", + "download_format" => "unarchived", + ) + + extra_request_keys!(request, meta.dataset) + area = build_era5_area(meta.region) + isnothing(area) || (request["area"] = area) + + @root CDSAPI.retrieve(cds_product(meta.dataset), request, output_path) + + return output_path +end + +##### +##### Multi-date download — batches by calendar day +##### + +function download_dataset(metadata::ERA5Metadata; skip_existing=true, cleanup=true) + dates = metadata.dates isa AbstractVector ? metadata.dates : [metadata.dates] + grouped = _group_by_calendar_day(dates) + + for day in sort(collect(keys(grouped))) + download_era5_day(metadata.name, metadata.dataset, grouped[day]; + region = metadata.region, + dir = metadata.dir, + skip_existing, cleanup) + end +end + +""" + _group_by_calendar_day(datetimes) + +Group an iterable of `DateTime`s by calendar day. Returns a `Dict{Date, Vector}` +where each value is the subset of `datetimes` whose calendar day equals the key. +The `00:00` instant of a day belongs to that day (not the previous one). +""" +function _group_by_calendar_day(datetimes) + return Dict(d => filter(dt -> Dates.Date(dt) == d, datetimes) + for d in unique(Dates.Date.(datetimes))) +end + +function download_era5_day(name, dataset, day_dates; + region, dir, skip_existing, cleanup) + + MDatum = NumericalEarth.DataWrangling.Metadatum + meta_path = NumericalEarth.DataWrangling.metadata_path + + all_pairs = [(dt, meta_path(MDatum(name; dataset, date=dt, region, dir))) + for dt in day_dates] + + pending = skip_existing ? filter(((_, p),) -> !isfile(p), all_pairs) : all_pairs + isempty(pending) && return nothing + + sorted_dts = sort(unique([dt for (dt, _) in pending])) + hours_str = [lpad(string(Dates.hour(dt)), 2, '0') * ":00" for dt in sorted_dts] + dt_to_tidx = Dict(dt => i for (i, dt) in enumerate(sorted_dts)) + + dt = first(sorted_dts) + year = string(Dates.year(dt)) + month = lpad(string(Dates.month(dt)), 2, '0') + day = lpad(string(Dates.day(dt)), 2, '0') + + request = Dict( + "product_type" => ["reanalysis"], + "variable" => [cds_varnames(dataset)[name]], + "year" => [year], + "month" => [month], + "day" => [day], + "time" => hours_str, + "data_format" => "netcdf", + "download_format" => "unarchived", + ) - output_directory = meta.dir - output_filename = meta.filename - output_path = joinpath(output_directory, output_filename) + extra_request_keys!(request, dataset) + area = build_era5_area(region) + isnothing(area) || (request["area"] = area) - # Skip if file already exists - if skip_existing && isfile(output_path) - return output_path + mkpath(dir) + tmp_path = joinpath(dir, "_tmp_$(year)$(month)$(day).nc") + nc_varname = nc_varnames(dataset)[name] + nc_triples = [(nc_varname, dt_to_tidx[dt], path) for (dt, path) in pending] + + time_dimnames = Set(["time", "valid_time"]) + + @root begin + CDSAPI.retrieve(cds_product(dataset), request, tmp_path) + foreach_nc(tmp_path, dir) do nc_path + split_era5_nc_multistep(nc_path, nc_triples, coord_vars(dataset), time_dimnames) + end + cleanup && rm(tmp_path; force=true) end - # Ensure output directory exists - mkpath(output_directory) + return nothing +end - # Get the ERA5 variable name - variable_name = ERA5_dataset_variable_names[meta.name] +##### +##### Multi-variable ERA5 pressure-level download +##### - # Extract date information - date = meta.dates +""" + download_dataset(names::Vector{Symbol}, metadata::ERA5PressureMetadata; kwargs...) + +Download multiple ERA5 pressure-level variables for each date in `metadata`. +""" +function download_dataset(names::Vector{Symbol}, metadata::ERA5PressureMetadata; kwargs...) + for metadatum in metadata + download_dataset(names, metadatum; kwargs...) + end + return nothing +end + +""" + download_dataset(names::Vector{Symbol}, meta::ERA5PressureMetadatum; skip_existing=true) + +Download multiple ERA5 pressure-level variables for a single date in one CDS API request. +The multi-variable NetCDF is split into individual per-variable files. +""" +function download_dataset(names::Vector{Symbol}, meta::ERA5PressureMetadatum; skip_existing=true) + name_path_pairs = [] + for name in names + metadatum = NumericalEarth.DataWrangling.Metadatum(name; + dataset = meta.dataset, + region = meta.region, + date = meta.dates, + dir = meta.dir) + path = NumericalEarth.DataWrangling.metadata_path(metadatum) + push!(name_path_pairs, (name, path)) + end + + pending = if skip_existing + [(n, p) for (n, p) in name_path_pairs if !isfile(p)] + else + name_path_pairs + end + + isempty(pending) && return [path for (_, path) in name_path_pairs] + + cds_vars = unique([cds_varnames(meta.dataset)[name] for (name, _) in pending]) + + date = meta.dates year = string(Dates.year(date)) month = lpad(string(Dates.month(date)), 2, '0') - day = lpad(string(Dates.day(date)), 2, '0') - hour = lpad(string(Dates.hour(date)), 2, '0') * ":00" + day = lpad(string(Dates.day(date)), 2, '0') + hour = lpad(string(Dates.hour(date)), 2, '0') * ":00" - # Build request parameters request = Dict( "product_type" => ["reanalysis"], - "variable" => [variable_name], + "variable" => cds_vars, "year" => [year], "month" => [month], "day" => [day], @@ -76,18 +274,242 @@ function download_dataset(meta::ERA5Metadatum; skip_existing=true) "download_format" => "unarchived", ) - # Add area constraint from bounding box + extra_request_keys!(request, meta.dataset) area = build_era5_area(meta.region) - if !isnothing(area) - request["area"] = area + isnothing(area) || (request["area"] = area) + + mkpath(meta.dir) + tmp_path = joinpath(meta.dir, "_tmp_multi_$(year)$(month)$(day)T$(hour[1:2]).nc") + + nc_name_path_pairs = [(nc_varnames(meta.dataset)[name], path) for (name, path) in pending] + + @root begin + CDSAPI.retrieve(cds_product(meta.dataset), request, tmp_path) + foreach_nc(tmp_path, meta.dir) do nc_path + split_era5_nc(nc_path, nc_name_path_pairs, coord_vars(meta.dataset)) + end + rm(tmp_path; force=true) + end + + return [path for (_, path) in name_path_pairs] +end + +""" + download_dataset(names, dataset::ERA5Dataset, datetime; ...) + +Download one or more ERA5 variables at a single datetime. +""" +function download_dataset(names::Vector{Symbol}, dataset::ERA5Dataset, datetime; + region = nothing, + dir = NumericalEarth.DataWrangling.default_download_directory(dataset)) + meta = NumericalEarth.DataWrangling.Metadatum(first(names); dataset, date=datetime, region, dir) + return download_dataset(names, meta) +end + +function download_dataset(name::Symbol, dataset::ERA5Dataset, datetime; + region = nothing, + dir = NumericalEarth.DataWrangling.default_download_directory(dataset)) + return download_dataset([name], dataset, datetime; region, dir) +end + +""" + download_dataset(names, dataset::ERA5Dataset, datetimes::AbstractVector; ...) + +Download one or more ERA5 variables for multiple datetimes, batching by calendar day. +""" +function download_dataset(names::Vector{Symbol}, + dataset::ERA5Dataset, + datetimes::AbstractVector; + region = nothing, + dir = NumericalEarth.DataWrangling.default_download_directory(dataset), + skip_existing = true, + cleanup = true) + + grouped = _group_by_calendar_day(datetimes) + + for day in sort(collect(keys(grouped))) + download_era5_multivar_day(names, dataset, grouped[day]; region, dir, skip_existing, cleanup) end - # Perform the download using CDSAPI + return nothing +end + +function download_dataset(name::Symbol, + dataset::ERA5Dataset, + datetimes::AbstractVector; + region = nothing, + dir = NumericalEarth.DataWrangling.default_download_directory(dataset), + skip_existing = true, + cleanup = true) + return download_dataset([name], dataset, datetimes; region, dir, skip_existing, cleanup) +end + +function download_era5_multivar_day(names, dataset, day_dates; + region, dir, skip_existing, cleanup) + + MDatum = NumericalEarth.DataWrangling.Metadatum + meta_path = NumericalEarth.DataWrangling.metadata_path + + all_triples = [(name, dt, meta_path(MDatum(name; dataset, date=dt, region, dir))) + for name in names for dt in day_dates] + + pending = skip_existing ? filter(((_, _, p),) -> !isfile(p), all_triples) : all_triples + isempty(pending) && return nothing + + cds_vars = unique([cds_varnames(dataset)[name] for (name, _, _) in pending]) + sorted_dts = sort(unique([dt for (_, dt, _) in pending])) + hours_str = [lpad(string(Dates.hour(dt)), 2, '0') * ":00" for dt in sorted_dts] + dt_to_tidx = Dict(dt => i for (i, dt) in enumerate(sorted_dts)) + + dt = first(sorted_dts) + year = string(Dates.year(dt)) + month = lpad(string(Dates.month(dt)), 2, '0') + day = lpad(string(Dates.day(dt)), 2, '0') + + request = Dict( + "product_type" => ["reanalysis"], + "variable" => cds_vars, + "year" => [year], + "month" => [month], + "day" => [day], + "time" => hours_str, + "data_format" => "netcdf", + "download_format" => "unarchived", + ) + + extra_request_keys!(request, dataset) + area = build_era5_area(region) + isnothing(area) || (request["area"] = area) + + mkpath(dir) + tmp_path = joinpath(dir, "_tmp_multi_$(year)$(month)$(day).nc") + nc_triples = [(nc_varnames(dataset)[name], dt_to_tidx[dt], path) + for (name, dt, path) in pending] + + time_dimnames = Set(["time", "valid_time"]) + @root begin - CDSAPI.retrieve("reanalysis-era5-single-levels", request, output_path) + CDSAPI.retrieve(cds_product(dataset), request, tmp_path) + foreach_nc(tmp_path, dir) do nc_path + split_era5_nc_multistep(nc_path, nc_triples, coord_vars(dataset), time_dimnames) + end + cleanup && rm(tmp_path; force=true) end - return output_path + return nothing +end + +##### +##### NetCDF splitting utilities +##### + +""" + split_era5_nc(src_path, nc_name_path_pairs, coord_vars) + +Split a multi-variable NetCDF into individual per-variable files (single time step). +""" +function split_era5_nc(src_path, nc_name_path_pairs, coord_vars) + NCDatasets.Dataset(src_path, "r") do src + src_varnames = Set(keys(src)) + for (nc_varname, dst_path) in nc_name_path_pairs + nc_varname in src_varnames || continue + NCDatasets.Dataset(dst_path, "c") do dst + unlimited = NCDatasets.unlimited(src) + for (dname, dlen) in src.dim + NCDatasets.defDim(dst, dname, dname in unlimited ? Inf : dlen) + end + + for (k, v) in src.attrib + dst.attrib[k] = v + end + + for (vname, var) in src + (vname in coord_vars || vname == nc_varname) || continue + ncvar_copy!(dst, var, vname) + end + end + end + end +end + +""" + split_era5_nc_multistep(src_path, triples, coord_vars, time_dimnames) + +Split a multi-timestep NetCDF into individual per-variable, per-timestep files. +`triples` is a vector of `(nc_varname, time_index, dst_path)`. +""" +function split_era5_nc_multistep(src_path, nc_varname_tidx_path_triples, coord_vars, time_dimnames) + NCDatasets.Dataset(src_path, "r") do src + src_varnames = Set(keys(src)) + unlimited = NCDatasets.unlimited(src) + + for (nc_varname, tidx, dst_path) in nc_varname_tidx_path_triples + nc_varname in src_varnames || continue + NCDatasets.Dataset(dst_path, "c") do dst + for (dname, dlen) in src.dim + out_len = dname in time_dimnames ? 1 : + dname in unlimited ? Inf : dlen + NCDatasets.defDim(dst, dname, out_len) + end + + for (k, v) in src.attrib + dst.attrib[k] = v + end + + for (vname, var) in src + (vname in coord_vars || vname == nc_varname) || continue + ncvar_copy_tslice!(dst, var, vname, tidx, time_dimnames) + end + end + end + end +end + +function ncvar_copy!(dst, src_var, vname) + dims = NCDatasets.dimnames(src_var) + T = eltype(src_var.var) + attribs = src_var.attrib + fill_val = haskey(attribs, "_FillValue") ? attribs["_FillValue"] : nothing + + dst_var = isnothing(fill_val) ? + NCDatasets.defVar(dst, vname, T, dims) : + NCDatasets.defVar(dst, vname, T, dims; fillvalue=fill_val) + + for (k, v) in attribs + k == "_FillValue" && continue + dst_var.attrib[k] = v + end + + dst_var.var[:] = src_var.var[:] + return nothing +end + +function ncvar_copy_tslice!(dst, src_var, vname, tidx, time_dimnames) + dims = NCDatasets.dimnames(src_var) + T = eltype(src_var.var) + attribs = src_var.attrib + fill_val = haskey(attribs, "_FillValue") ? attribs["_FillValue"] : nothing + + dst_var = isnothing(fill_val) ? + NCDatasets.defVar(dst, vname, T, dims) : + NCDatasets.defVar(dst, vname, T, dims; fillvalue=fill_val) + + for (k, v) in attribs + k == "_FillValue" && continue + dst_var.attrib[k] = v + end + + has_time = any(d -> d in time_dimnames, dims) + if has_time + idx = ntuple(ndims(src_var.var)) do i + dims[i] in time_dimnames ? (tidx:tidx) : Colon() + end + dst_var.var[:] = src_var.var[idx...] + else + dst_var.var[:] = src_var.var[:] + end + + return nothing end ##### @@ -97,11 +519,11 @@ end build_era5_area(::Nothing) = nothing const BBOX = NumericalEarth.DataWrangling.BoundingBox +const COL = NumericalEarth.DataWrangling.Column +const LIN = NumericalEarth.DataWrangling.Linear +const NR = NumericalEarth.DataWrangling.Nearest function build_era5_area(bbox::BBOX) - # CDS API uses [north, west, south, east] ordering - # BoundingBox has longitude = (west, east), latitude = (south, north) - lon = bbox.longitude lat = bbox.latitude @@ -117,4 +539,19 @@ function build_era5_area(bbox::BBOX) return [north, west, south, east] end +# Column with Nearest interpolation: tight box; CDS returns the nearest cell. +function build_era5_area(col::COL{<:Any, <:Any, <:Any, <:NR}) + lon, lat = col.longitude, col.latitude + ε = 1e-3 + return [lat + ε, lon - ε, lat - ε, lon + ε] # [N, W, S, E] +end + +# Column with Linear interpolation: pad by slightly more than ERA5's native +# 0.25° spacing so the file contains the 2x2 stencil bilinear interp needs. +function build_era5_area(col::COL{<:Any, <:Any, <:Any, <:LIN}) + lon, lat = col.longitude, col.latitude + ε = 0.3 + return [lat + ε, lon - ε, lat - ε, lon + ε] +end + end # module NumericalEarthCDSAPIExt diff --git a/src/DataWrangling/DataWrangling.jl b/src/DataWrangling/DataWrangling.jl index 92fa94cd7..3741c6e8a 100644 --- a/src/DataWrangling/DataWrangling.jl +++ b/src/DataWrangling/DataWrangling.jl @@ -10,7 +10,7 @@ export WOAClimatology, WOAAnnual, WOAMonthly export metadata_time_step, metadata_epoch export LinearlyTaperedPolarMask export DatasetRestoring, SurfaceFluxRestoring -export ERA5Hourly, ERA5Monthly +export ERA5HourlySingleLevel, ERA5MonthlySingleLevel, ERA5HourlyPressureLevels, ERA5MonthlyPressureLevels export native_grid using Oceananigans @@ -242,4 +242,9 @@ using .WOA using .JRA55 using .OSPapa +# Fallback: if no download extension is loaded, check that all files already exist +function download_dataset(metadata::Metadata) + error("No download method for $metadata is available (is the backend package loaded?)") +end + end # module diff --git a/src/DataWrangling/ERA5/ERA5.jl b/src/DataWrangling/ERA5/ERA5.jl index 359a2d50e..610655d52 100644 --- a/src/DataWrangling/ERA5/ERA5.jl +++ b/src/DataWrangling/ERA5/ERA5.jl @@ -1,13 +1,20 @@ module ERA5 -export ERA5Hourly, ERA5Monthly +# 2-D data +export ERA5HourlySingleLevel, ERA5MonthlySingleLevel + +# 3-D data +export ERA5HourlyPressureLevels, ERA5MonthlyPressureLevels, ERA5_all_pressure_levels, pressure_field, hPa +export standard_atmosphere_z_interfaces, mean_geopotential_z_interfaces using NCDatasets using Printf using Scratch +using Statistics -using Oceananigans.Fields: Center -using NumericalEarth.DataWrangling: Metadata, Metadatum, metadata_path +using Oceananigans.Fields: Center, set! +using Oceananigans: Field, fill_halo_regions!, CPU +using NumericalEarth.DataWrangling: Metadata, Metadatum, metadata_path, native_grid, InverseGravity, download_dataset using Dates using Dates: DateTime, Day, Month, Hour @@ -16,6 +23,7 @@ import NumericalEarth.DataWrangling: dataset_variable_name, dataset_location, default_download_directory, + default_inpainting, longitude_interfaces, latitude_interfaces, z_interfaces, @@ -24,7 +32,10 @@ import NumericalEarth.DataWrangling: available_variables, retrieve_data, metadata_path, - reversed_latitude_axis + is_three_dimensional, + reversed_vertical_axis, + reversed_latitude_axis, + conversion_units import Base: eltype @@ -42,133 +53,35 @@ abstract type ERA5Dataset end default_download_directory(::ERA5Dataset) = download_ERA5_cache -struct ERA5Hourly <: ERA5Dataset end -struct ERA5Monthly <: ERA5Dataset end - -dataset_name(::ERA5Hourly) = "ERA5Hourly" -dataset_name(::ERA5Monthly) = "ERA5Monthly" - +# ERA5 stores latitude north-to-south (90 → -90); flip on read reversed_latitude_axis(::ERA5Dataset) = true -# Wave variables are on a 0.5° grid (720×361), atmospheric variables on 0.25° (1440×721) -const ERA5_wave_variables = Set([ - :eastward_stokes_drift, :northward_stokes_drift, - :significant_wave_height, :mean_wave_period, :mean_wave_direction, -]) +const ERA5Metadata{D} = Metadata{<:ERA5Dataset, D} +const ERA5Metadatum = Metadatum{<:ERA5Dataset} -function Base.size(::ERA5Dataset, variable) - if variable in ERA5_wave_variables - return (720, 361, 1) - else - return (1440, 721, 1) - end -end +##### +##### Grid interfaces +##### -# ERA5 reanalysis data available from 1940 to present (we use a practical range here) -all_dates(::ERA5Hourly, var) = range(DateTime("1940-01-01"), stop=DateTime("2024-12-31"), step=Hour(1)) -all_dates(::ERA5Monthly, var) = range(DateTime("1940-01-01"), stop=DateTime("2024-12-01"), step=Month(1)) +# ERA5 global coverage: 0-359.75 longitude, -90 to 90 latitude at 0.25 degree resolution +longitude_interfaces(::ERA5Metadata) = (-0.125, 359.875) +latitude_interfaces(::ERA5Metadata) = (-90, 90) -const ERA5Metadata{D} = Metadata{<:ERA5Dataset, D} -const ERA5Metadatum = Metadatum{<:ERA5Dataset} +# ERA5 single-levels (2-D) data product +z_interfaces(::ERA5Metadata) = (0, 1) -# ERA5 is a spatially 2D dataset (atmospheric surface variables) -is_three_dimensional(::ERA5Metadata) = false - -# Variable name mappings from NumericalEarth names to ERA5/CDS API variable names -ERA5_dataset_variable_names = Dict( - :temperature => "2m_temperature", - :dewpoint_temperature => "2m_dewpoint_temperature", - :eastward_velocity => "10m_u_component_of_wind", - :northward_velocity => "10m_v_component_of_wind", - :surface_pressure => "surface_pressure", - :mean_sea_level_pressure => "mean_sea_level_pressure", - :total_precipitation => "total_precipitation", - :sea_surface_temperature => "sea_surface_temperature", - :downwelling_shortwave_radiation => "surface_solar_radiation_downwards", - :downwelling_longwave_radiation => "surface_thermal_radiation_downwards", - :total_cloud_cover => "total_cloud_cover", - :evaporation => "evaporation", - :specific_humidity => "specific_humidity", - :eastward_stokes_drift => "u_component_stokes_drift", - :northward_stokes_drift => "v_component_stokes_drift", - :significant_wave_height => "significant_height_of_combined_wind_waves_and_swell", - :mean_wave_period => "mean_wave_period", - :mean_wave_direction => "mean_wave_direction", -) - -# Variables available for download -ERA5_variable_names = keys(ERA5_dataset_variable_names) - -available_variables(::ERA5Dataset) = ERA5_dataset_variable_names - -dataset_variable_name(metadata::ERA5Metadata) = ERA5_dataset_variable_names[metadata.name] - -# NetCDF short variable names (what's actually in the downloaded files) -# These differ from the CDS API variable names above -ERA5_netcdf_variable_names = Dict( - :temperature => "t2m", - :dewpoint_temperature => "d2m", - :eastward_velocity => "u10", - :northward_velocity => "v10", - :surface_pressure => "sp", - :mean_sea_level_pressure => "msl", - :total_precipitation => "tp", - :sea_surface_temperature => "sst", - :downwelling_shortwave_radiation => "ssrd", - :downwelling_longwave_radiation => "strd", - :total_cloud_cover => "tcc", - :evaporation => "e", - :specific_humidity => "q", - :eastward_stokes_drift => "ust", - :northward_stokes_drift => "vst", - :significant_wave_height => "swh", - :mean_wave_period => "mwp", - :mean_wave_direction => "mwd", -) - -netcdf_variable_name(metadata::ERA5Metadata) = ERA5_netcdf_variable_names[metadata.name] - -""" - retrieve_data(metadata::ERA5Metadatum) - -Retrieve ERA5 data from NetCDF file according to `metadata`. -ERA5 is 2D surface data, so we return a 2D array with an added singleton z-dimension. -""" -function retrieve_data(metadata::ERA5Metadatum) - path = metadata_path(metadata) - name = netcdf_variable_name(metadata) - - ds = NCDatasets.Dataset(path) - - # ERA5 is 2D + time, we take the first time step - # Data shape is typically (lon, lat) or (lon, lat, time) - raw_data = ds[name] - ndim = ndims(raw_data) - - if ndim == 2 - data_2d = raw_data[:, :] - elseif ndim == 3 - data_2d = raw_data[:, :, 1] - else - error("Unexpected ERA5 data dimensions: $ndim") - end - - close(ds) - - # Add singleton z-dimension for 3D field compatibility - # Return as (Nx, Ny, 1) - return reshape(data_2d, size(data_2d, 1), size(data_2d, 2), 1) -end +# ERA5 data is stored as Float32 +eltype(::ERA5Metadata) = Float32 ##### -##### Metadata filename construction +##### Shared filename utilities ##### function date_str(date::DateTime) y = Dates.year(date) m = lpad(Dates.month(date), 2, '0') - d = lpad(Dates.day(date), 2, '0') - h = lpad(Dates.hour(date), 2, '0') + d = lpad(Dates.day(date), 2, '0') + h = lpad(Dates.hour(date), 2, '0') return "$(y)-$(m)-$(d)T$(h)" end @@ -192,9 +105,7 @@ function bbox_strs(c) return first, second end -function region_suffix(::Nothing) - return "" -end +region_suffix(::Nothing) = "" function region_suffix(region) w, e = bbox_strs(region.longitude) @@ -203,13 +114,18 @@ function region_suffix(region) end function metadata_prefix(dataset::ERA5Dataset, name, date, region) - var = ERA5_dataset_variable_names[name] + var = available_variables(dataset)[name] ds = dataset_name(dataset) start_date = start_date_str(date) end_date = end_date_str(date) suffix = region_suffix(region) - prefix = string(var, "_", ds, "_", start_date, "_", end_date, suffix) + + if start_date == end_date + prefix = string(var, "_", ds, "_", start_date, suffix) + else + prefix = string(var, "_", ds, "_", start_date, "_", end_date, suffix) + end prefix = colon2dash(prefix) prefix = underscore_spaces(prefix) return prefix @@ -228,21 +144,10 @@ end inpainted_metadata_path(metadata::ERA5Metadatum) = joinpath(metadata.dir, inpainted_metadata_filename(metadata)) ##### -##### Grid interfaces +##### Single-level and pressure-level specifics ##### -# ERA5 is a 2D surface dataset — vertical location is Nothing -dataset_location(::ERA5Dataset, name) = (Center, Center, Nothing) - -# ERA5 global coverage: 0-360 longitude, -90 to 90 latitude at 0.25 degree resolution -longitude_interfaces(::ERA5Metadata) = (0, 360) -latitude_interfaces(::ERA5Metadata) = (-90, 90) - -# ERA5 is a 2D surface dataset, so z is a single level at the surface -z_interfaces(::ERA5Metadata) = (0, 1) - -# ERA5 data is stored as Float32 -eltype(::ERA5Metadata) = Float32 +include("ERA5_single_levels.jl") +include("ERA5_pressure_levels.jl") end # module ERA5 - diff --git a/src/DataWrangling/ERA5/ERA5_pressure_levels.jl b/src/DataWrangling/ERA5/ERA5_pressure_levels.jl new file mode 100644 index 000000000..3be6eadb1 --- /dev/null +++ b/src/DataWrangling/ERA5/ERA5_pressure_levels.jl @@ -0,0 +1,291 @@ +abstract type ERA5PressureLevelsDataset <: ERA5Dataset end + +# ERA5PressureMetadata is a subtype of ERA5Metadata +const ERA5PressureMetadata{D} = Metadata{<:ERA5PressureLevelsDataset, D} +const ERA5PressureMetadatum = Metadatum{<:ERA5PressureLevelsDataset} + +struct ERA5HourlyPressureLevels <: ERA5PressureLevelsDataset + pressure_levels :: Vector{Float64} + z :: Union{Nothing, Vector{Float64}} + mean_geopotential_height :: Bool + ERA5HourlyPressureLevels(pressure_levels, z=nothing; mean_geopotential_height=true) = + new(sort(pressure_levels, rev=true), z, mean_geopotential_height) +end +ERA5HourlyPressureLevels(; pressure_levels=ERA5_all_pressure_levels, z=nothing, mean_geopotential_height=true) = + ERA5HourlyPressureLevels(pressure_levels, z; mean_geopotential_height) + +struct ERA5MonthlyPressureLevels <: ERA5PressureLevelsDataset + pressure_levels :: Vector{Float64} + z :: Union{Nothing, Vector{Float64}} + mean_geopotential_height :: Bool + ERA5MonthlyPressureLevels(pressure_levels, z=nothing; mean_geopotential_height=true) = + new(sort(pressure_levels, rev=true), z, mean_geopotential_height) +end +ERA5MonthlyPressureLevels(; pressure_levels=ERA5_all_pressure_levels, z=nothing, mean_geopotential_height=true) = + ERA5MonthlyPressureLevels(pressure_levels, z; mean_geopotential_height) + +dataset_name(::ERA5HourlyPressureLevels) = "ERA5HourlyPressureLevels" +dataset_name(::ERA5MonthlyPressureLevels) = "ERA5MonthlyPressureLevels" + +##### +##### ERA5 pressure-level data availability +##### + +# ERA5 reanalysis data available from 1940 to present (we use a practical range here) +all_dates(::ERA5HourlyPressureLevels, var) = range(DateTime("1940-01-01"), stop=DateTime("2024-12-31"), step=Hour(1)) +all_dates(::ERA5MonthlyPressureLevels, var) = range(DateTime("1940-01-01"), stop=DateTime("2024-12-01"), step=Month(1)) + +# ERA5 pressure-level data is a spatially 3-D dataset +is_three_dimensional(::ERA5PressureMetadata) = true + +# TODO: drop once Oceananigans.Units exports `hPa` in a tagged release +const hPa = 100 + +const ERA5_all_pressure_levels = [1, 2, 3, 5, 7, 10, 20, 30, 50, 70, 100, 125, 150, + 175, 200, 225, 250, 300, 350, 400, 450, 500, 550, 600, 650, 700, 750, 775, 800, + 825, 850, 875, 900, 925, 950, 975, 1000]hPa + +# ERA5 stores pressure levels bottom-to-top +reversed_vertical_axis(::ERA5PressureLevelsDataset) = false + +Base.size(ds::ERA5PressureLevelsDataset, variable) = (1440, 720, length(ds.pressure_levels)) + +##### +##### ERA5 pressure-level variable name mappings +##### + +ERA5PL_dataset_variable_names = Dict( + :temperature => "temperature", + :eastward_velocity => "u_component_of_wind", + :northward_velocity => "v_component_of_wind", + :vertical_velocity => "vertical_velocity", + :geopotential => "geopotential", + :geopotential_height => "geopotential", + :specific_humidity => "specific_humidity", + :relative_humidity => "relative_humidity", + :vorticity => "vorticity", + :divergence => "divergence", + :potential_vorticity => "potential_vorticity", + :ozone_mass_mixing_ratio => "ozone_mass_mixing_ratio", + :fraction_of_cloud_cover => "fraction_of_cloud_cover", + :specific_cloud_liquid_water_content => "specific_cloud_liquid_water_content", + :specific_cloud_ice_water_content => "specific_cloud_ice_water_content", + :specific_rain_water_content => "specific_rain_water_content", + :specific_snow_water_content => "specific_snow_water_content", +) + +# NetCDF short variable names (what's actually in the downloaded files) +# These differ from the CDS API variable names above +ERA5PL_netcdf_variable_names = Dict( + :temperature => "t", + :eastward_velocity => "u", + :northward_velocity => "v", + :vertical_velocity => "w", + :geopotential => "z", + :geopotential_height => "z", + :specific_humidity => "q", + :relative_humidity => "r", + :vorticity => "vo", + :divergence => "d", + :potential_vorticity => "pv", + :ozone_mass_mixing_ratio => "o3", + :fraction_of_cloud_cover => "cc", + :specific_cloud_liquid_water_content => "clwc", + :specific_cloud_ice_water_content => "ciwc", + :specific_rain_water_content => "crwc", + :specific_snow_water_content => "cswc", +) + +# Variables available for download +available_variables(::ERA5PressureLevelsDataset) = ERA5PL_dataset_variable_names + +# `dataset_variable_name` returns the short name as stored in the NetCDF file +# (e.g. "u"). The CDS API catalog name (e.g. "u_component_of_wind") used in +# download requests is accessed via the `ERA5PL_dataset_variable_names` dict +# directly in `NumericalEarthCDSAPIExt`. +dataset_variable_name(md::ERA5PressureMetadata) = ERA5PL_netcdf_variable_names[md.name] + +conversion_units(md::ERA5PressureMetadata) = + md.name == :geopotential_height ? InverseGravity() : nothing + +default_inpainting(md::ERA5PressureMetadata) = nothing + +""" + retrieve_data(metadata::ERA5PressureMetadatum) + +Retrieve ERA5 pressure-level data from a NetCDF file. +Returns a 3D array (lon, lat, level) with levels ordered bottom-to-top +(highest pressure at k=1, lowest pressure at k=Nz). +""" +function retrieve_data(metadata::ERA5PressureMetadatum) + path = metadata_path(metadata) + name = dataset_variable_name(metadata) + ds = NCDatasets.Dataset(path) + data = ds[name][:, :, :, 1] # (lon, lat, pressure_level, time=1) + close(ds) + return reverse(data, dims=2) # Latitude is stored from 90°N → 90°S +end + +##### +##### Metadata filename construction +##### + +function metadata_prefix(md::ERA5PressureMetadata) + var = ERA5PL_dataset_variable_names[md.name] + dataset = dataset_name(md.dataset) + start_date = start_date_str(md.dates) + end_date = end_date_str(md.dates) + bbox = md.region + + if !isnothing(bbox) + w, e = bbox_strs(bbox.longitude) + s, n = bbox_strs(bbox.latitude) + suffix = string(w, e, s, n) + else + suffix = "" + end + + if start_date == end_date + prefix = string(var, "_", dataset, "_", start_date, suffix) + else + prefix = string(var, "_", dataset, "_", start_date, "_", end_date, suffix) + end + prefix = colon2dash(prefix) + prefix = underscore_spaces(prefix) + return prefix +end + +##### +##### Pressure-level vertical coordinate +##### + +const ERA5_gravitational_acceleration = 9.80665 + +# International Standard Atmosphere height (m) for a given pressure +function standard_atmosphere_geopotential_height(p) + g = ERA5_gravitational_acceleration + T⁰ = 288.15 # K + p⁰ = 101325 + Rᵈ = 287.0528 # J/(kg-K) + + return (Rᵈ * T⁰ / g) * log(p⁰ / p) +end + +# Build z-interfaces (Nz+1 values) from pressure levels. +# Levels may be in any order; output is sorted so k=1 is highest pressure (lowest altitude). +function standard_atmosphere_z_interfaces(levels) + sorted_levels = sort(levels, rev=true) # highest pressure first → k=1 is bottom + heights = standard_atmosphere_geopotential_height.(Float64.(sorted_levels)) + Nz = length(heights) + + interfaces = Vector{Float64}(undef, Nz + 1) + + if Nz == 1 + interfaces[1] = heights[1] - 0.5 + interfaces[2] = heights[1] + 0.5 + else + interfaces[1] = heights[1] - (heights[2] - heights[1]) / 2 + for k in 2:Nz + interfaces[k] = (heights[k-1] + heights[k]) / 2 + end + interfaces[Nz+1] = heights[Nz] + (heights[Nz] - heights[Nz-1]) / 2 + end + + return interfaces +end + +# ERA5 pressure-levels (3-D) data product +function z_interfaces(metadata::ERA5PressureMetadata) + # Return cached z if already set + !isnothing(metadata.dataset.z) && return metadata.dataset.z + + # If mean_geopotential_height is enabled, try to download and compute + if metadata.dataset.mean_geopotential_height + ϕ_metadata = Metadata(:geopotential; dataset=metadata.dataset, + dates=metadata.dates, region=metadata.region, + dir=metadata.dir) + try + download_dataset(ϕ_metadata) + return mean_geopotential_z_interfaces(metadata) + catch e + @warn "Failed to derive geopotential heights; falling back to standard atmosphere" exception=(e, catch_backtrace()) + end + end + + # Fallback + return standard_atmosphere_z_interfaces(metadata.dataset.pressure_levels) +end + +##### +##### mean_geopotential_heights — data-derived static z-coordinate +##### + +""" + mean_geopotential_heights(metadata::ERA5PressureMetadata) + +Compute spatially and temporally averaged geopotential heights (m) for each +pressure level in `metadata`. + +Downloads the `:geopotential` field for every date in `metadata`, divides by g, +averages over the horizontal domain and all dates, and returns one representative +height per pressure level in bottom-to-top order (k=1 is highest pressure). +""" +function mean_geopotential_heights(metadata::ERA5PressureMetadata) + ϕ_metadata = Metadata(:geopotential; dataset=metadata.dataset, + dates=metadata.dates, region=metadata.region, + dir=metadata.dir) + Nz = length(metadata.dataset.pressure_levels) + heights = zeros(Nz) + # average over time + for ϕ_datum in ϕ_metadata + data = retrieve_data(ϕ_datum) ./ Float32(ERA5_gravitational_acceleration) # Φ → Z (m) + if size(data, 3) != Nz + error("Cached geopotential file at $(metadata_path(ϕ_datum)) has " * + "$(size(data, 3)) pressure levels, but the dataset configuration " * + "expects $Nz. This is most likely a stale cache from a previous " * + "run with different `pressure_levels`. Delete the file and re-run.") + end + # average over horizontal dims + data_mean = mean(data; dims=(1, 2)) + heights .+= dropdims(data_mean; dims=(1, 2)) + end + heights ./= length(ϕ_metadata) + + return sort(heights) +end + +function mean_geopotential_z_interfaces(metadata::ERA5PressureMetadata) + return stagger(mean_geopotential_heights(metadata)) +end + +function stagger(zc::AbstractVector) + # heights are ascending (k=1 = highest pressure = lowest altitude, + # consistent with retrieve_data's reverse() and Oceananigans bottom-to-top + # convention); bottom and top interfaces are extrapolated + zf = (zc[1:end-1] .+ zc[2:end]) / 2 # Nz-1 interior interfaces + pushfirst!(zf, zc[1] - (zf[1] - zc[1])) # bottom interface + push!(zf, zc[end] + (zc[end] - zf[end])) # top interface + return zf +end + +##### +##### pressure_field — synthetic pressure coordinate field +##### + +""" + pressure_field(metadata::ERA5PressureMetadatum, arch=CPU(); halo=(3,3,3)) + +Return a `Field{Nothing, Nothing, Center}` on the native grid of `metadata` +holding the pressure value (Pa) at each vertical level. Levels are ordered +bottom-to-top (k=1 is the highest pressure level). The `Nothing` horizontal +locations make this field broadcast against full 3-D fields without copying. +""" +function pressure_field(metadata::ERA5PressureMetadatum, arch=CPU(); halo=(3,3,3)) + grid = native_grid(metadata, arch; halo) + field = Field{Nothing, Nothing, Center}(grid) + reversed_levels = sort(metadata.dataset.pressure_levels, rev=true) # highest pressure → k=1 + set!(field, reversed_levels) + fill_halo_regions!(field) + return field +end + diff --git a/src/DataWrangling/ERA5/ERA5_single_levels.jl b/src/DataWrangling/ERA5/ERA5_single_levels.jl new file mode 100644 index 000000000..5c00626da --- /dev/null +++ b/src/DataWrangling/ERA5/ERA5_single_levels.jl @@ -0,0 +1,182 @@ +struct ERA5HourlySingleLevel <: ERA5Dataset end +struct ERA5MonthlySingleLevel <: ERA5Dataset end + +dataset_name(::ERA5HourlySingleLevel) = "ERA5HourlySingleLevel" +dataset_name(::ERA5MonthlySingleLevel) = "ERA5MonthlySingleLevel" + +# Wave variables are on a 0.5° grid (720×361), atmospheric variables on 0.25° (1440×721) +const ERA5_wave_variables = Set([ + :eastward_stokes_drift, :northward_stokes_drift, + :significant_wave_height, :mean_wave_period, :mean_wave_direction, +]) + +# Mean rate / accumulated variables (CDS "step type" = accum). +# All other single-level variables are instantaneous. +# See ECMWF ERA5 documentation, Tables 3 and 4: +# https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation#ERA5:datadocumentation-Meanrates/fluxesandaccumulations +const ERA5_single_level_accumulated_variables = Set([ + :total_precipitation, + :mean_surface_sensible_heat_flux, + :mean_surface_latent_heat_flux, + :mean_surface_momentum_flux_x, + :mean_surface_momentum_flux_y, + :downwelling_shortwave_radiation, + :downwelling_longwave_radiation, + :evaporation, + :mean_evaporation_rate, +]) + +##### +##### ERA5 single-level data availability +##### + +# ERA5 reanalysis data available from 1940 to present (we use a practical range here) +all_dates(::ERA5HourlySingleLevel, var) = range(DateTime("1940-01-01"), stop=DateTime("2024-12-31"), step=Hour(1)) +all_dates(::ERA5MonthlySingleLevel, var) = range(DateTime("1940-01-01"), stop=DateTime("2024-12-01"), step=Month(1)) + +# ERA5 single-level data is a spatially 2-D dataset +is_three_dimensional(::ERA5Metadata) = false + +function Base.size(::ERA5Dataset, variable) + if variable in ERA5_wave_variables + return (720, 360, 1) + else + return (1440, 720, 1) + end +end + +##### +##### ERA5 single-level variable name mappings +##### + +# Variable name mappings from NumericalEarth names to ERA5/CDS API variable names +ERA5_dataset_variable_names = Dict( + :temperature => "2m_temperature", + :dewpoint_temperature => "2m_dewpoint_temperature", + :eastward_velocity => "10m_u_component_of_wind", + :northward_velocity => "10m_v_component_of_wind", + :surface_pressure => "surface_pressure", + :mean_sea_level_pressure => "mean_sea_level_pressure", + :total_precipitation => "total_precipitation", + :mean_surface_momentum_flux_x => "mean_eastward_turbulent_surface_stress", + :mean_surface_momentum_flux_y => "mean_northward_turbulent_surface_stress", + :sea_surface_temperature => "sea_surface_temperature", + :mean_surface_sensible_heat_flux => "mean_surface_sensible_heat_flux", + :mean_surface_latent_heat_flux => "mean_surface_latent_heat_flux", + :downwelling_shortwave_radiation => "surface_solar_radiation_downwards", + :downwelling_longwave_radiation => "surface_thermal_radiation_downwards", + :total_cloud_cover => "total_cloud_cover", + :evaporation => "evaporation", + :mean_evaporation_rate => "mean_evaporation_rate", + :specific_humidity => "specific_humidity", + :eastward_stokes_drift => "u_component_stokes_drift", + :northward_stokes_drift => "v_component_stokes_drift", + :significant_wave_height => "significant_height_of_combined_wind_waves_and_swell", + :mean_wave_period => "mean_wave_period", + :mean_wave_direction => "mean_wave_direction", +) + +# NetCDF short variable names (what's actually in the downloaded files) +# - These differ from the CDS API variable names above +# - The expected "shortName" (see https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation#heading-Parameterlistings) +# did not always match the netcdf variable names?! In those cases, the longName and units were manually verified. +ERA5_netcdf_variable_names = Dict( + :temperature => "t2m", + :dewpoint_temperature => "d2m", + :eastward_velocity => "u10", + :northward_velocity => "v10", + :surface_pressure => "sp", + :mean_sea_level_pressure => "msl", + :total_precipitation => "tp", + :mean_surface_momentum_flux_x => "avg_iews", # shortName: metss + :mean_surface_momentum_flux_y => "avg_inss", # shortName: mntss + :sea_surface_temperature => "sst", + :mean_surface_sensible_heat_flux => "avg_ishf", # shortName: msshf + :mean_surface_latent_heat_flux => "avg_slhtf", # shortName: mslhf + :downwelling_shortwave_radiation => "ssrd", + :downwelling_longwave_radiation => "strd", + :total_cloud_cover => "tcc", + :evaporation => "e", + :mean_evaporation_rate => "avg_ie", # shortName: mer + :specific_humidity => "q", + :eastward_stokes_drift => "ust", + :northward_stokes_drift => "vst", + :significant_wave_height => "swh", + :mean_wave_period => "mwp", + :mean_wave_direction => "mwd", +) + +# Variables available for download +available_variables(::ERA5Dataset) = ERA5_dataset_variable_names + +# `dataset_variable_name` returns the short name as stored in the NetCDF file +# (e.g. "t2m"). The CDS API catalog name (e.g. "2m_temperature") used in +# download requests is accessed via the `ERA5_dataset_variable_names` dict +# directly in `NumericalEarthCDSAPIExt`. +dataset_variable_name(md::ERA5Metadata) = ERA5_netcdf_variable_names[md.name] + +default_inpainting(md::ERA5Metadata) = nothing + +""" + retrieve_data(metadata::ERA5Metadatum) + +Retrieve ERA5 data from NetCDF file according to `metadata`. +ERA5 is 2D surface data, so we return a 2D array with an added singleton z-dimension. +""" +function retrieve_data(metadata::ERA5Metadatum) + path = metadata_path(metadata) + name = dataset_variable_name(metadata) + + ds = NCDatasets.Dataset(path) + + # ERA5 is 2D + time, we take the first time step + # Data shape is typically (lon, lat) or (lon, lat, time) + raw_data = ds[name] + ndim = ndims(raw_data) + + if ndim == 2 + data_2d = raw_data[:, :] + elseif ndim == 3 + data_2d = raw_data[:, :, 1] + else + error("Unexpected ERA5 data dimensions: $ndim") + end + + close(ds) + + # Latitude is stored from 90°N → 90°S + data_2d = reverse(data_2d, dims=2) + + # Add singleton z-dimension for 3D field compatibility + # Return as (Nx, Ny, 1) + return reshape(data_2d, size(data_2d, 1), size(data_2d, 2), 1) +end + +##### +##### Metadata filename construction +##### + +function metadata_prefix(md::ERA5Metadata) + var = ERA5_dataset_variable_names[md.name] + dataset = dataset_name(md.dataset) + start_date = start_date_str(md.dates) + end_date = end_date_str(md.dates) + bbox = md.region + + if !isnothing(bbox) + w, e = bbox_strs(bbox.longitude) + s, n = bbox_strs(bbox.latitude) + suffix = string(w, e, s, n) + else + suffix = "" + end + + if start_date == end_date + prefix = string(var, "_", dataset, "_", start_date, suffix) + else + prefix = string(var, "_", dataset, "_", start_date, "_", end_date, suffix) + end + prefix = colon2dash(prefix) + prefix = underscore_spaces(prefix) + return prefix +end diff --git a/src/DataWrangling/metadata.jl b/src/DataWrangling/metadata.jl index cb1153e16..a6f3fb092 100644 --- a/src/DataWrangling/metadata.jl +++ b/src/DataWrangling/metadata.jl @@ -405,6 +405,7 @@ struct NanomolePerKilogram end struct NanomolePerLiter end struct InverseSign end +struct InverseGravity end struct GramPerKilogramMinus35 end # Salinity anomaly struct MilliliterPerLiter end # Sometimes for disssolved_oxygen diff --git a/src/DataWrangling/metadata_field.jl b/src/DataWrangling/metadata_field.jl index 6a7c60b4d..d5656f320 100644 --- a/src/DataWrangling/metadata_field.jl +++ b/src/DataWrangling/metadata_field.jl @@ -26,11 +26,16 @@ restrict(::Nothing, interfaces, N) = interfaces, N # TODO support stretched native grids function restrict(bbox_interfaces, interfaces, N) - extent = interfaces[end] - interfaces[1] - rΔ = bbox_interfaces[2] - bbox_interfaces[1] - rN = round(Int, rΔ / extent * N) - rN = max(rN, 1) # at least one cell - return bbox_interfaces, rN + LΔ = interfaces[2] - interfaces[1] + Δ = LΔ / N + grid_interfaces = (bbox_interfaces[1] - Δ/2, + bbox_interfaces[2] + Δ/2) + + rΔ = grid_interfaces[2] - grid_interfaces[1] + ϵ = rΔ / LΔ + rN = ceil(Int, ϵ * N) # Round up to ensure bounding box is covered + + return grid_interfaces, rN end """ @@ -235,7 +240,12 @@ function set!(target_field::Field, metadata::Metadatum; kw...) Lzt = grid.Lz Lzm = meta_field.grid.Lz - if Lzt > Lzm && is_three_dimensional(metadata) + # Allow up to 1% vertical mismatch for pressure-level datasets with time-varying + # geopotential heights — the per-timestep vertical extent can be slightly smaller + # than the temporal-mean extent used for the target grid (e.g. when the atmosphere + # is compressed). Oceananigans' interpolate! does not extrapolate, so target points + # just outside the source domain will use the nearest interior values. + if is_three_dimensional(metadata) && Lzt > Lzm * (1 + 1e-2) throw("The vertical range of the $(metadata.dataset) dataset ($(Lzm) m) is smaller than " * "the target grid ($(Lzt) m). Some vertical levels cannot be filled with data.") end @@ -272,6 +282,16 @@ function column_field_from_file(metadata, arch; halo=(3, 3, 3), kw...) end _, _, Nz, _ = size(metadata) + + # Validate that the cached file's vertical extent matches the dataset + # configuration. A common cause of mismatch is a stale cache from a previous + # run with a different vertical configuration (e.g. ERA5 `pressure_levels`). + if is_three_dimensional(metadata) && length(data_size) >= 3 && data_size[3] != Nz + error("Cached file $(path) has $(data_size[3]) vertical levels, but the " * + "dataset configuration expects $Nz. This is most likely a stale " * + "cache from a previous run with a different vertical configuration. " * + "Delete the file and re-run.") + end z = z_interfaces(metadata) FT = eltype(metadata) @@ -389,9 +409,12 @@ function set_metadata_field!(field, data, metadatum) arch = architecture(grid) Nx, Ny, Nz = size(metadatum) + mangling = if size(data, 2) == Ny-1 + @debug "Shifting field southward" ShiftSouth() elseif size(data, 2) == Ny+1 + @debug "Averaging field in north-south dir" AverageNorthSouth() else nothing @@ -485,6 +508,7 @@ end @inline convert_units(C::FT, ::Union{NanomolePerLiter, NanomolePerKilogram}) where FT = C * convert(FT, 1e-6) @inline convert_units(C::FT, ::MilliliterPerLiter) where FT = C / convert(FT, 22.3916) @inline convert_units(C::FT, ::GramPerKilogramMinus35) where FT = C + convert(FT, 35) +@inline convert_units(Φ::FT, ::InverseGravity) where FT = Φ / convert(FT, 9.80665) @inline convert_units(V::FT, ::CentimetersPerSecond) where FT = V / convert(FT, 100) diff --git a/test/test_cds_downloading.jl b/test/test_cds_downloading.jl index 9802c2be2..e1bd366f1 100644 --- a/test/test_cds_downloading.jl +++ b/test/test_cds_downloading.jl @@ -6,8 +6,16 @@ using Dates using NCDatasets using NumericalEarth.DataWrangling.ERA5 -using NumericalEarth.DataWrangling.ERA5: ERA5Hourly, ERA5Monthly, ERA5_dataset_variable_names -using NumericalEarth.DataWrangling: metadata_path, download_dataset +using NumericalEarth.DataWrangling.ERA5: ERA5HourlySingleLevel, ERA5MonthlySingleLevel, + ERA5_dataset_variable_names, ERA5_netcdf_variable_names +using NumericalEarth.DataWrangling.ERA5: ERA5HourlyPressureLevels, ERA5MonthlyPressureLevels, + ERA5_all_pressure_levels, ERA5PL_dataset_variable_names, + ERA5PL_netcdf_variable_names, pressure_field +using NumericalEarth.DataWrangling: metadata_path, download_dataset, BoundingBox, Column, Linear, Nearest + +# Internal extension module — exposes dispatch helpers and NetCDF utilities +# that are not part of the public API but worth pinning behavior for. +const CDSExt = Base.get_extension(NumericalEarth, :NumericalEarthCDSAPIExt) # Test date: Kyoto Protocol ratification date, February 16, 2005 start_date = DateTime(2005, 2, 16, 12) @@ -15,7 +23,7 @@ start_date = DateTime(2005, 2, 16, 12) @testset "ERA5 data downloading and utilities" begin @info "Testing ERA5 downloading and NetCDF file verification..." - dataset = ERA5Hourly() + dataset = ERA5HourlySingleLevel() # Use a small bounding box to reduce download time region = NumericalEarth.DataWrangling.BoundingBox(longitude=(0, 5), latitude=(40, 45)) @@ -63,8 +71,7 @@ start_date = DateTime(2005, 2, 16, 12) close(ds) - # Clean up - rm(filepath; force=true) + # Note: leave `filepath` in place; downstream surface-level testsets reuse it. end @testset "Availability of ERA5 variables" begin @@ -88,14 +95,14 @@ start_date = DateTime(2005, 2, 16, 12) # Test metadata properties @test metadatum.name == :temperature - @test metadatum.dataset isa ERA5Hourly + @test metadatum.dataset isa ERA5HourlySingleLevel @test metadatum.dates == start_date @test metadatum.region == region # Test size (should be global ERA5 size with 1 time step) Nx, Ny, Nz, Nt = size(metadatum) @test Nx == 1440 # ERA5 longitude points - @test Ny == 721 # ERA5 latitude points + @test Ny == 720 # ERA5 latitude points (poles averaged into adjacent cells) @test Nz == 1 # 2D surface data @test Nt == 1 # Single time step @@ -104,31 +111,31 @@ start_date = DateTime(2005, 2, 16, 12) end @testset "ERA5 wave variable metadata sizes" begin - # Wave variables should be on the 0.5° grid (720×361) + # Wave variables should be on the 0.5° grid (720×360) for wave_var in (:eastward_stokes_drift, :northward_stokes_drift, :significant_wave_height, :mean_wave_period, :mean_wave_direction) metadatum = Metadatum(wave_var; dataset, date=start_date) Nx, Ny, Nz, Nt = size(metadatum) @test Nx == 720 - @test Ny == 361 + @test Ny == 360 @test Nz == 1 @test Nt == 1 end - # Atmospheric variables should remain on the 0.25° grid (1440×721) + # Atmospheric variables should remain on the 0.25° grid (1440×720) for atmos_var in (:temperature, :eastward_velocity, :surface_pressure) metadatum = Metadatum(atmos_var; dataset, date=start_date) Nx, Ny, Nz, Nt = size(metadatum) @test Nx == 1440 - @test Ny == 721 + @test Ny == 720 @test Nz == 1 @test Nt == 1 end end @testset "ERA5 Monthly dataset" begin - monthly_dataset = ERA5Monthly() - @test monthly_dataset isa ERA5Monthly + monthly_dataset = ERA5MonthlySingleLevel() + @test monthly_dataset isa ERA5MonthlySingleLevel # Test that all_dates returns a valid range dates = NumericalEarth.DataWrangling.all_dates(monthly_dataset, :temperature) @@ -136,6 +143,156 @@ start_date = DateTime(2005, 2, 16, 12) @test step(dates) == Month(1) end + @testset "ERA5 single-level all_dates (Hourly)" begin + hourly_dataset = ERA5HourlySingleLevel() + dates = NumericalEarth.DataWrangling.all_dates(hourly_dataset, :temperature) + @test first(dates) == DateTime("1940-01-01") + @test step(dates) == Hour(1) + end + + @testset "ERA5 single-level dispatch helpers" begin + ds = ERA5HourlySingleLevel() + md = Metadatum(:temperature; dataset=ds, region, date=start_date) + + # API-name and netcdf-name dicts cover the same variable set — + # catches forgetting to add a new variable to one of the two + @test keys(ERA5_dataset_variable_names) == keys(ERA5_netcdf_variable_names) + + # available_variables returns the API-name dict (used to build CDS requests), + # not the netcdf short-name dict — guards against the easy swap-mistake + @test NumericalEarth.DataWrangling.available_variables(ds) === ERA5_dataset_variable_names + + # dataset_variable_name returns the netcdf short name (read from file), + # not the API catalog name — same swap risk + @test NumericalEarth.DataWrangling.dataset_variable_name(md) == "t2m" + + # default_inpainting is nothing for ERA5 (vs NearestNeighborInpainting for ECCO); + # accidentally enabling it would massively slow Field construction + @test NumericalEarth.DataWrangling.default_inpainting(md) === nothing + end + + @testset "ERA5 single-level metadata_prefix" begin + ds = ERA5HourlySingleLevel() + + # Single-date metadatum, with region: prefix should not duplicate the date + md_single = Metadatum(:temperature; dataset=ds, region, date=start_date) + prefix_single = NumericalEarth.DataWrangling.ERA5.metadata_prefix(md_single) + @test occursin("2m_temperature", prefix_single) + @test occursin("ERA5HourlySingleLevel", prefix_single) + @test occursin("2005-02-16", prefix_single) + @test count("2005-02-16", prefix_single) == 1 # date appears once for single-date + @test occursin("0.0", prefix_single) # west bound + @test occursin("5.0", prefix_single) # east bound + @test occursin("40.0", prefix_single) # south bound + @test occursin("45.0", prefix_single) # north bound + # Filename safety + @test !occursin(":", prefix_single) # colons replaced by dashes + @test !occursin(" ", prefix_single) # spaces replaced by underscores + + # Single-date metadatum, no region: suffix should be empty + md_no_region = Metadatum(:temperature; dataset=ds, date=start_date) + prefix_no_region = NumericalEarth.DataWrangling.ERA5.metadata_prefix(md_no_region) + @test !occursin("0.0", prefix_no_region) + @test !occursin("nothing", prefix_no_region) + + # Multi-date metadata: prefix should include both start and end dates + end_date = start_date + Hour(2) + md_multi = Metadata(:temperature; dataset=ds, region, + dates=start_date:Hour(1):end_date) + prefix_multi = NumericalEarth.DataWrangling.ERA5.metadata_prefix(md_multi) + @test occursin("2005-02-16T12", prefix_multi) + @test occursin("2005-02-16T14", prefix_multi) + end + + @testset "ERA5HourlyPressureLevels construction and metadata" begin + # Default constructor uses all 37 standard levels + ds_full = ERA5HourlyPressureLevels() + @test ds_full isa ERA5HourlyPressureLevels + @test length(ds_full.pressure_levels) == 37 + @test Base.size(ds_full, :temperature) == (1440, 720, 37) + + # Subset constructor + ds_sub = ERA5HourlyPressureLevels(pressure_levels=[850, 500]hPa) + @test Base.size(ds_sub, :temperature) == (1440, 720, 2) + + # Monthly variant + ds_monthly = ERA5MonthlyPressureLevels() + @test ds_monthly isa ERA5MonthlyPressureLevels + + # Metadatum size propagates Nz correctly + meta = Metadatum(:temperature; dataset=ds_sub, region=region, date=start_date) + Nx, Ny, Nz, Nt = size(meta) + @test Nz == 2 + @test NumericalEarth.DataWrangling.ERA5.is_three_dimensional(meta) == true + + # Variable name lookups + @test ERA5PL_dataset_variable_names[:temperature] == "temperature" + @test ERA5PL_dataset_variable_names[:geopotential_height] == "geopotential" + end + + @testset "ERA5 pressure-level z_interfaces (standard atmosphere)" begin + levels_2 = [850, 500]hPa + z = standard_atmosphere_z_interfaces(levels_2) + @test length(z) == 3 # Nz+1 interfaces + @test issorted(z) # monotonically increasing with altitude + # 850 hPa ≈ 1457 m, 500 hPa ≈ 5575 m + @test z[1] < 1457.0 < z[2] < 5575.0 < z[3] + + # Single level + z1 = standard_atmosphere_z_interfaces([500]hPa) + @test length(z1) == 2 + @test z1[1] < z1[2] + end + + @testset "ERA5 pressure-level constructors sort levels descending" begin + # Pass ASCENDING input so the test fails if the inner constructor's + # `sort(...; rev=true)` regresses to a no-op or different order. + ds_h = ERA5HourlyPressureLevels([500, 850]hPa) + @test ds_h.pressure_levels == [850 * hPa, 500 * hPa] # stored highest-pressure-first + @test ds_h.z === nothing + @test ds_h.mean_geopotential_height == true # default kwarg + + ds_m = ERA5MonthlyPressureLevels([500, 850]hPa) + @test ds_m.pressure_levels == [850 * hPa, 500 * hPa] + @test ds_m.z === nothing + @test ds_m.mean_geopotential_height == true + + # Already-descending input is preserved (sort is a no-op here) + ds_h2 = ERA5HourlyPressureLevels([850, 500]hPa) + @test ds_h2.pressure_levels == [850 * hPa, 500 * hPa] + + # Three-level shuffled input + ds_h3 = ERA5HourlyPressureLevels([500, 850, 700]hPa) + @test ds_h3.pressure_levels == [850 * hPa, 700 * hPa, 500 * hPa] + end + + @testset "ERA5 pressure-level stagger" begin + stagger = NumericalEarth.DataWrangling.ERA5.stagger + + # Two-element input (evenly spaced): bottom and top faces are + # extrapolated symmetrically; result is Nz+1 monotonic. + zf = stagger([0.0, 1.0]) + @test length(zf) == 3 + @test issorted(zf) + @test zf ≈ [-0.5, 0.5, 1.5] + + # Three-element evenly-spaced: every interior interface is the + # midpoint of the adjacent centers; bottom/top are extrapolated. + zf = stagger([1.0, 3.0, 5.0]) + @test length(zf) == 4 + @test zf ≈ [0.0, 2.0, 4.0, 6.0] + + # Three-element irregularly-spaced: interior midpoints honor the + # actual spacing (not assumed-uniform). + zf = stagger([1.0, 3.0, 7.0]) + @test length(zf) == 4 + @test zf[2] ≈ 2.0 # midpoint(1, 3) + @test zf[3] ≈ 5.0 # midpoint(3, 7) + # Boundaries extrapolated at half the adjacent interior spacing + @test zf[1] ≈ 1.0 - (zf[2] - 1.0) + @test zf[4] ≈ 7.0 + (7.0 - zf[3]) + end + for arch in test_architectures A = typeof(arch) @@ -162,10 +319,7 @@ start_date = DateTime(2005, 2, 16, 12) @test !all(iszero, interior(ψ)) end - # Clean up - rm(filepath; force=true) - inpainted_path = NumericalEarth.DataWrangling.inpainted_metadata_path(metadatum) - isfile(inpainted_path) && rm(inpainted_path; force=true) + # Note: cleanup happens in the last surface-level testset below. end @testset "Setting a field from ERA5 metadata on $A" begin @@ -201,4 +355,478 @@ start_date = DateTime(2005, 2, 16, 12) isfile(inpainted_path) && rm(inpainted_path; force=true) end end + + @testset "ERA5 pressure-level download and Field on CPU" begin + arch = CPU() + ds_pl = ERA5HourlyPressureLevels(pressure_levels=[850, 500]hPa) + + @testset "Download and 3D Field" begin + meta = Metadatum(:temperature; dataset=ds_pl, region, date=start_date) + filepath = metadata_path(meta) + isfile(filepath) && rm(filepath; force=true) + + download_dataset(meta) + @test isfile(filepath) + + # Verify the NetCDF has a pressure_level dimension and the right variable + ds_nc = NCDataset(filepath) + @test haskey(ds_nc, "t") + @test haskey(ds_nc, "pressure_level") || haskey(ds_nc, "level") + close(ds_nc) + + f = Field(meta, arch) + @test f isa Field + Nx, Ny, Nz = size(f) + @test Nz == 2 + + @allowscalar begin + @test !all(iszero, interior(f)) + # Temperature at these levels should be in a plausible range (K) + @test all(x -> 180 < x < 340, filter(!isnan, vec(interior(f)))) + end + + rm(filepath; force=true) + inpainted_path = NumericalEarth.DataWrangling.inpainted_metadata_path(meta) + isfile(inpainted_path) && rm(inpainted_path; force=true) + end + + @testset "Geopotential height conversion" begin + meta_z = Metadatum(:geopotential_height; dataset=ds_pl, region, date=start_date) + filepath = metadata_path(meta_z) + + # Field() downloads if needed; the file may already be on disk from + # the previous testset's z_interfaces side-effect. + fz = Field(meta_z, arch) + + @allowscalar begin + max_z = maximum(filter(!isnan, vec(interior(fz)))) + # 500 hPa geopotential height ≈ 5500 m + @test 4000 < max_z < 7000 + end + + rm(filepath; force=true) + inpainted_path = NumericalEarth.DataWrangling.inpainted_metadata_path(meta_z) + isfile(inpainted_path) && rm(inpainted_path; force=true) + end + + @testset "pressure_field" begin + meta = Metadatum(:temperature; dataset=ds_pl, region, date=start_date) + pf = pressure_field(meta, arch) + @test pf isa Field + Nx, Ny, Nz = size(pf) + @test Nz == 2 + + @allowscalar begin + # k=1 should be 850 hPa = 85000 Pa (highest pressure, lowest altitude) + @test interior(pf)[1, 1, 1] ≈ Float32(850hPa) + # k=2 should be 500 hPa = 50000 Pa + @test interior(pf)[1, 1, 2] ≈ Float32(500hPa) + end + end + end +end + +@testset "ERA5 CDSAPIExt dispatch helpers and area construction" begin + sl = ERA5HourlySingleLevel() + pl = ERA5HourlyPressureLevels(pressure_levels=[500hPa, 850hPa]) + + @testset "cds_product / cds_varnames / nc_varnames" begin + @test CDSExt.cds_product(sl) == "reanalysis-era5-single-levels" + @test CDSExt.cds_product(pl) == "reanalysis-era5-pressure-levels" + + @test CDSExt.cds_varnames(sl) === ERA5_dataset_variable_names + @test CDSExt.cds_varnames(pl) === ERA5PL_dataset_variable_names + + @test CDSExt.nc_varnames(sl) === ERA5_netcdf_variable_names + @test CDSExt.nc_varnames(pl) === ERA5PL_netcdf_variable_names + end + + @testset "coord_vars" begin + sl_coords = CDSExt.coord_vars(sl) + pl_coords = CDSExt.coord_vars(pl) + + @test sl_coords isa Set + @test "longitude" in sl_coords + @test "latitude" in sl_coords + @test "valid_time" in sl_coords + @test !("pressure_level" in sl_coords) + + @test "longitude" in pl_coords + @test "pressure_level" in pl_coords + @test "level" in pl_coords + end + + @testset "extra_request_keys!" begin + # ERA5Dataset (single level): no-op + request = Dict{String, Any}("variable" => ["2m_temperature"]) + CDSExt.extra_request_keys!(request, sl) + @test !haskey(request, "pressure_level") + + # ERA5PressureLevelsDataset: populates `pressure_level` (in hPa, as strings) + CDSExt.extra_request_keys!(request, pl) + @test haskey(request, "pressure_level") + @test Set(request["pressure_level"]) == Set(["500", "850"]) + end + + @testset "build_era5_area" begin + # Nothing → nothing + @test CDSExt.build_era5_area(nothing) === nothing + + # BoundingBox with both axes → [N, W, S, E] + bbox = BoundingBox(longitude=(-10.0, 5.0), latitude=(40.0, 50.0)) + @test CDSExt.build_era5_area(bbox) == [50.0, -10.0, 40.0, 5.0] + + # BoundingBox with one axis missing → nothing (CDS gets the global slab) + bbox_no_lat = BoundingBox(longitude=(-10.0, 5.0)) + @test CDSExt.build_era5_area(bbox_no_lat) === nothing + bbox_no_lon = BoundingBox(latitude=(40.0, 50.0)) + @test CDSExt.build_era5_area(bbox_no_lon) === nothing + + # Column with Nearest interpolation → tight ε=1e-3 box around the point + col_nr = Column(-61.5, 18.0; interpolation=Nearest()) + area_nr = CDSExt.build_era5_area(col_nr) + @test length(area_nr) == 4 + # [N, W, S, E] + @test area_nr[1] ≈ 18.0 + 1e-3 # north + @test area_nr[2] ≈ -61.5 - 1e-3 # west + @test area_nr[3] ≈ 18.0 - 1e-3 # south + @test area_nr[4] ≈ -61.5 + 1e-3 # east + + # Column with Linear interpolation → ε=0.3 padding for 2x2 stencil + col_lin = Column(-61.5, 18.0; interpolation=Linear()) + area_lin = CDSExt.build_era5_area(col_lin) + @test area_lin[1] ≈ 18.0 + 0.3 + @test area_lin[2] ≈ -61.5 - 0.3 + @test area_lin[3] ≈ 18.0 - 0.3 + @test area_lin[4] ≈ -61.5 + 0.3 + # Linear box must enclose more than one ERA5 grid cell (0.25°) + @test (area_lin[1] - area_lin[3]) > 0.25 + @test (area_lin[4] - area_lin[2]) > 0.25 + end +end + +@testset "ERA5 CDSAPIExt _group_by_calendar_day" begin + # Single calendar day with multiple hours + same_day = [DateTime(2005, 2, 16, 0), + DateTime(2005, 2, 16, 6), + DateTime(2005, 2, 16, 23)] + g = CDSExt._group_by_calendar_day(same_day) + @test length(g) == 1 + @test Date(2005, 2, 16) in keys(g) + @test length(g[Date(2005, 2, 16)]) == 3 + + # Boundary: 00:00 belongs to its OWN day (not the previous one) + midnight_pair = [DateTime(2005, 2, 16, 23), + DateTime(2005, 2, 17, 0)] + g = CDSExt._group_by_calendar_day(midnight_pair) + @test length(g) == 2 + @test g[Date(2005, 2, 16)] == [DateTime(2005, 2, 16, 23)] + @test g[Date(2005, 2, 17)] == [DateTime(2005, 2, 17, 0)] + + # Multiple days, interleaved order — grouping must be order-independent + mixed = [DateTime(2005, 2, 17, 6), + DateTime(2005, 2, 16, 6), + DateTime(2005, 2, 17, 12), + DateTime(2005, 2, 16, 18)] + g = CDSExt._group_by_calendar_day(mixed) + @test length(g) == 2 + @test Set(g[Date(2005, 2, 16)]) == Set([DateTime(2005, 2, 16, 6), DateTime(2005, 2, 16, 18)]) + @test Set(g[Date(2005, 2, 17)]) == Set([DateTime(2005, 2, 17, 6), DateTime(2005, 2, 17, 12)]) + + # Duplicate datetimes are preserved (CDS will dedupe; we don't) + dups = [DateTime(2005, 2, 16, 12), DateTime(2005, 2, 16, 12)] + g = CDSExt._group_by_calendar_day(dups) + @test length(g) == 1 + @test length(g[Date(2005, 2, 16)]) == 2 + + # Single-element input + g = CDSExt._group_by_calendar_day([DateTime(2005, 2, 16, 12)]) + @test length(g) == 1 + @test g[Date(2005, 2, 16)] == [DateTime(2005, 2, 16, 12)] +end + +@testset "ERA5 CDSAPIExt skip_existing short-circuit" begin + # Build a temporary directory and pre-create the expected output files so + # `download_dataset(...; skip_existing=true)` returns without contacting CDS. + # If the short-circuit ever regresses, these tests will throw a credentials + # error (or 4xx from the CDS API) and fail loudly. + region = NumericalEarth.DataWrangling.BoundingBox(longitude=(0, 5), latitude=(40, 45)) + mktempdir() do tmp + ds_pl = ERA5HourlyPressureLevels(pressure_levels=[850, 500]hPa) + date1 = DateTime(2005, 2, 16, 12) + date2 = DateTime(2005, 2, 16, 18) + names = [:temperature, :eastward_velocity] + + # Helper: pre-create the file that `download_dataset` would write + function touch_expected(name, dataset, date) + md = Metadatum(name; dataset, region, date, dir=tmp) + path = metadata_path(md) + mkpath(dirname(path)) + touch(path) + return path + end + + @testset "multi-variable pressure-level (single date)" begin + paths = [touch_expected(name, ds_pl, date1) for name in names] + meta = Metadatum(:temperature; dataset=ds_pl, region, date=date1, dir=tmp) + + result = download_dataset(names, meta; skip_existing=true) + @test result isa Vector{String} + @test length(result) == length(names) + @test Set(result) == Set(paths) + end + + @testset "single-variable multi-date (download_era5_day)" begin + # All hours of date1, date2 already on disk + ds_sl = ERA5HourlySingleLevel() + for dt in (date1, date2) + touch_expected(:temperature, ds_sl, dt) + end + + # Returns nothing without raising — the early-return guard fires + @test CDSExt.download_era5_day(:temperature, ds_sl, [date1, date2]; + region, dir=tmp, + skip_existing=true, cleanup=true) === nothing + end + + @testset "multi-variable multi-date (download_era5_multivar_day)" begin + ds_sl = ERA5HourlySingleLevel() + for name in names, dt in (date1, date2) + touch_expected(name, ds_sl, dt) + end + + @test CDSExt.download_era5_multivar_day(names, ds_sl, [date1, date2]; + region, dir=tmp, + skip_existing=true, cleanup=true) === nothing + end + end +end + +@testset "ERA5 CDSAPIExt NetCDF copy and split helpers" begin + # Helper: write a synthetic ERA5-like NetCDF with `Nt` timesteps and two + # variables (`u`, `v`) on dims (longitude, latitude, valid_time). + function write_synthetic_era5_nc(path; Nx=2, Ny=2, Nt=3) + NCDatasets.Dataset(path, "c") do ds + NCDatasets.defDim(ds, "longitude", Nx) + NCDatasets.defDim(ds, "latitude", Ny) + NCDatasets.defDim(ds, "valid_time", Nt) + ds.attrib["title"] = "synthetic_era5_test" + + lon = NCDatasets.defVar(ds, "longitude", Float64, ("longitude",)) + lat = NCDatasets.defVar(ds, "latitude", Float64, ("latitude",)) + t = NCDatasets.defVar(ds, "valid_time", Int64, ("valid_time",)) + lon[:] = collect(range(-1.0, 1.0; length=Nx)) + lat[:] = collect(range(40.0, 41.0; length=Ny)) + t[:] = collect(1:Nt) + + # u: includes _FillValue and a custom attribute + u = NCDatasets.defVar(ds, "u", Float32, + ("longitude", "latitude", "valid_time"); + fillvalue=Float32(-9999.0)) + u.attrib["units"] = "m s**-1" + u.attrib["long_name"] = "u_component_of_wind" + for k in 1:Nt, j in 1:Ny, i in 1:Nx + u[i, j, k] = Float32(100k + 10j + i) + end + + # v: no fill value, no extra attributes + v = NCDatasets.defVar(ds, "v", Float32, + ("longitude", "latitude", "valid_time")) + for k in 1:Nt, j in 1:Ny, i in 1:Nx + v[i, j, k] = Float32(-(100k + 10j + i)) + end + end + end + + coord_vars = CDSExt.ERA5_COORD_VARS + + @testset "ncvar_copy! preserves data, attributes, fill value" begin + mktempdir() do dir + src_path = joinpath(dir, "src.nc") + dst_path = joinpath(dir, "dst.nc") + write_synthetic_era5_nc(src_path; Nx=3, Ny=2, Nt=1) + + NCDatasets.Dataset(src_path, "r") do src + NCDatasets.Dataset(dst_path, "c") do dst + for (dname, dlen) in src.dim + NCDatasets.defDim(dst, dname, dlen) + end + CDSExt.ncvar_copy!(dst, src["u"], "u") + end + end + + NCDatasets.Dataset(dst_path, "r") do dst + @test haskey(dst, "u") + @test eltype(dst["u"].var) == Float32 + @test dst["u"].attrib["units"] == "m s**-1" + @test dst["u"].attrib["long_name"] == "u_component_of_wind" + @test dst["u"].attrib["_FillValue"] == Float32(-9999.0) + + NCDatasets.Dataset(src_path, "r") do src + @test dst["u"].var[:] == src["u"].var[:] + end + end + end + end + + @testset "ncvar_copy_tslice! extracts a single timestep" begin + mktempdir() do dir + src_path = joinpath(dir, "src.nc") + dst_path = joinpath(dir, "dst.nc") + write_synthetic_era5_nc(src_path; Nx=2, Ny=2, Nt=3) + + tidx = 2 + time_dimnames = Set(["valid_time"]) + + NCDatasets.Dataset(src_path, "r") do src + NCDatasets.Dataset(dst_path, "c") do dst + for (dname, dlen) in src.dim + out_len = dname in time_dimnames ? 1 : dlen + NCDatasets.defDim(dst, dname, out_len) + end + CDSExt.ncvar_copy_tslice!(dst, src["u"], "u", tidx, time_dimnames) + # `valid_time` is a coord variable in the file — copy that too, + # using the same tslice path. Exercises the has_time branch. + CDSExt.ncvar_copy_tslice!(dst, src["valid_time"], "valid_time", tidx, time_dimnames) + # `longitude` has no time dim — exercises the !has_time branch. + CDSExt.ncvar_copy_tslice!(dst, src["longitude"], "longitude", tidx, time_dimnames) + end + end + + NCDatasets.Dataset(dst_path, "r") do dst + @test dst.dim["valid_time"] == 1 + @test size(dst["u"]) == (2, 2, 1) + @test dst["valid_time"][:] == [tidx] + + NCDatasets.Dataset(src_path, "r") do src + @test dst["u"].var[:, :, 1] == src["u"].var[:, :, tidx] + @test dst["longitude"][:] == src["longitude"][:] + end + end + end + end + + @testset "split_era5_nc produces per-variable files" begin + mktempdir() do dir + src_path = joinpath(dir, "src.nc") + write_synthetic_era5_nc(src_path; Nx=2, Ny=2, Nt=1) + + pairs = [ + ("u", joinpath(dir, "u_only.nc")), + ("v", joinpath(dir, "v_only.nc")), + ("missing_var", joinpath(dir, "should_not_exist.nc")), + ] + + CDSExt.split_era5_nc(src_path, pairs, coord_vars) + + @test !isfile(joinpath(dir, "should_not_exist.nc")) + + for (vname, dst_path) in pairs[1:2] + @test isfile(dst_path) + NCDatasets.Dataset(dst_path, "r") do dst + @test haskey(dst, vname) + other = vname == "u" ? "v" : "u" + @test !haskey(dst, other) + NCDatasets.Dataset(src_path, "r") do src + @test dst[vname].var[:] == src[vname].var[:] + end + end + end + end + end + + @testset "split_era5_nc_multistep produces per-(var,timestep) files" begin + mktempdir() do dir + src_path = joinpath(dir, "src.nc") + write_synthetic_era5_nc(src_path; Nx=2, Ny=2, Nt=3) + + triples = [ + ("u", 1, joinpath(dir, "u_t1.nc")), + ("u", 3, joinpath(dir, "u_t3.nc")), + ("v", 2, joinpath(dir, "v_t2.nc")), + # Variable not present in source — silently skipped, no file. + ("missing_var", 1, joinpath(dir, "should_not_exist.nc")), + ] + time_dimnames = Set(["valid_time"]) + + CDSExt.split_era5_nc_multistep(src_path, triples, coord_vars, time_dimnames) + + # The skipped variable produces no output. + @test !isfile(joinpath(dir, "should_not_exist.nc")) + + for (vname, tidx, dst_path) in triples[1:3] + @test isfile(dst_path) + NCDatasets.Dataset(dst_path, "r") do dst + @test haskey(dst, vname) + @test dst.dim["valid_time"] == 1 + @test haskey(dst, "longitude") + @test haskey(dst, "latitude") + # The other ERA5 variable should not have leaked in. + other = vname == "u" ? "v" : "u" + @test !haskey(dst, other) + + NCDatasets.Dataset(src_path, "r") do src + @test dst[vname].var[:, :, 1] == src[vname].var[:, :, tidx] + end + end + end + end + end +end + +@testset "ERA5 CDSAPIExt is_zip and foreach_nc" begin + @testset "is_zip" begin + mktempdir() do tmp + # File starting with the ZIP magic header + zip_path = joinpath(tmp, "fake.zip") + open(zip_path, "w") do io + write(io, UInt8[0x50, 0x4b, 0x03, 0x04, 0x00, 0x00]) + end + @test CDSExt.is_zip(zip_path) == true + + # File with arbitrary non-magic bytes (NetCDF-3 starts with "CDF\x01") + nc_path = joinpath(tmp, "fake.nc") + open(nc_path, "w") do io + write(io, UInt8[0x43, 0x44, 0x46, 0x01]) + end + @test CDSExt.is_zip(nc_path) == false + + # Short file (<4 bytes) — length check guards against false positives + short_path = joinpath(tmp, "short.bin") + open(short_path, "w") do io + write(io, UInt8[0x50, 0x4b]) # only 2 of the 4 magic bytes + end + @test CDSExt.is_zip(short_path) == false + end + end + + @testset "foreach_nc — non-zip path calls f exactly once" begin + mktempdir() do tmp + nc_path = joinpath(tmp, "data.nc") + touch(nc_path) + + received = String[] + CDSExt.foreach_nc(p -> push!(received, p), nc_path, tmp) + + @test received == [nc_path] + end + end + + @testset "foreach_nc — zip path extracts and visits each .nc" begin + mktempdir() do tmp + # Build a ZIP fixture containing two .nc files (and a non-.nc file + # that should be ignored). + nc1 = joinpath(tmp, "a.nc"); touch(nc1) + nc2 = joinpath(tmp, "b.nc"); touch(nc2) + other = joinpath(tmp, "readme.txt"); touch(other) + + zip_path = joinpath(tmp, "bundle.zip") + run(`zip -j -q $zip_path $nc1 $nc2 $other`) + + received = String[] + CDSExt.foreach_nc(p -> push!(received, basename(p)), zip_path, tmp) + + @test sort(received) == ["a.nc", "b.nc"] # readme.txt filtered out + end + end end diff --git a/test/test_column_field.jl b/test/test_column_field.jl index ea33bad6c..bb9004e15 100644 --- a/test/test_column_field.jl +++ b/test/test_column_field.jl @@ -212,7 +212,7 @@ end @testset "ERA5 Column grid" begin col = Column(200.0, 35.0) - md = Metadatum(:temperature; dataset=ERA5Hourly(), + md = Metadatum(:temperature; dataset=ERA5HourlySingleLevel(), date=DateTime(2020, 1, 1), region=col) grid = native_grid(md) @@ -231,3 +231,50 @@ end @test eltype(grid) == Float32 end end + +@testset "restrict (BoundingBox grid construction helper)" begin + restrict = NumericalEarth.DataWrangling.restrict + + # Identity case: bbox covers the full domain. Grid pads by Δ/2 on each side + # so that face midpoints land on data centers. The padded extent is N+1 + # cells of width Δ exactly, but Float64 rounding can push the ceil one cell + # past that — so allow [N+1, N+2]. + grid_interfaces, rN = restrict((0.0, 360.0), (0.0, 360.0), 1440) + @test grid_interfaces[1] ≈ -0.125 + @test grid_interfaces[2] ≈ 360.125 + @test 1441 <= rN <= 1442 + + # Half-domain bbox: rN should be just over half of N. + _, rN = restrict((0.0, 180.0), (0.0, 360.0), 1440) + @test 720 < rN <= 722 # ceil(0.5 * 1440 + small) = 721 + + # Small bbox (5° wide on a 1440-cell grid): rN should be ceil(20 + small) = 21. + grid_interfaces, rN = restrict((0.0, 5.0), (0.0, 360.0), 1440) + @test grid_interfaces[1] ≈ -0.125 + @test grid_interfaces[2] ≈ 5.125 + @test rN == 21 + + # Off-origin bbox preserves width: 5° wide on a 720-cell, 180°-tall grid → + # rΔ = 5° + Δ = 5.25°, rN = ceil((5.25/180) * 720) = 21. + _, rN_off = restrict((40.0, 45.0), (-90.0, 90.0), 720) + @test rN_off == 21 + + # Pass-through for `nothing` (the no-restriction case). + @test restrict(nothing, (0.0, 360.0), 1440) == ((0.0, 360.0), 1440) +end + +@testset "restrict_location dispatch" begin + bbox = BoundingBox(longitude=(0, 5), latitude=(40, 45)) + col = Column(2.5, 42.5) + + # BoundingBox: locations passed through unchanged + @test restrict_location((Center, Center, Center), bbox) == (Center, Center, Center) + @test restrict_location((Face, Face, Center), bbox) == (Face, Face, Center) + + # Nothing: same — no restriction + @test restrict_location((Center, Center, Center), nothing) == (Center, Center, Center) + + # Column: horizontal locations reduce to Nothing, vertical preserved + @test restrict_location((Center, Center, Center), col) == (Nothing, Nothing, Center) + @test restrict_location((Face, Face, Center), col) == (Nothing, Nothing, Center) +end