Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions ext/NumericalEarthCDSAPIExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,15 @@ function download_dataset(metadata::ERA5Metadata; skip_existing=true, cleanup=tr
dates = metadata.dates isa AbstractVector ? metadata.dates : [metadata.dates]
grouped = _group_by_calendar_day(dates)

paths = String[]
for day in sort(collect(keys(grouped)))
download_era5_day(metadata.name, metadata.dataset, grouped[day];
region = metadata.region,
dir = metadata.dir,
skip_existing, cleanup)
append!(paths, download_era5_day(metadata.name, metadata.dataset, grouped[day];
region = metadata.region,
dir = metadata.dir,
skip_existing, cleanup))
end

return paths
end

"""
Expand All @@ -160,16 +163,15 @@ function _group_by_calendar_day(datetimes)
end

function download_era5_day(name, dataset, day_dates;
region, dir, skip_existing, cleanup)
region, dir, skip_existing, cleanup)

MDatum = NumericalEarth.DataWrangling.Metadatum
meta_path = NumericalEarth.DataWrangling.metadata_path
meta_filename = NumericalEarth.DataWrangling.metadata_filename

all_pairs = [(dt, meta_path(MDatum(name; dataset, date=dt, region, dir)))
all_pairs = [(dt, joinpath(dir, meta_filename(dataset, name, dt, region)))
for dt in day_dates]

pending = skip_existing ? filter(((_, p),) -> !isfile(p), all_pairs) : all_pairs
isempty(pending) && return nothing
isempty(pending) && return [p for (_, p) in all_pairs]

sorted_dts = sort(unique([dt for (dt, _) in pending]))
hours_str = [lpad(string(Dates.hour(dt)), 2, '0') * ":00" for dt in sorted_dts]
Expand Down Expand Up @@ -210,7 +212,7 @@ function download_era5_day(name, dataset, day_dates;
cleanup && rm(tmp_path; force=true)
end

return nothing
return [p for (_, p) in all_pairs]
end

#####
Expand All @@ -223,10 +225,11 @@ end
Download multiple ERA5 pressure-level variables for each date in `metadata`.
"""
function download_dataset(names::Vector{Symbol}, metadata::ERA5PressureMetadata; kwargs...)
paths = String[]
for metadatum in metadata
download_dataset(names, metadatum; kwargs...)
append!(paths, download_dataset(names, metadatum; kwargs...))
end
return nothing
return paths
end

"""
Expand Down Expand Up @@ -327,11 +330,13 @@ function download_dataset(names::Vector{Symbol},

grouped = _group_by_calendar_day(datetimes)

paths = String[]
for day in sort(collect(keys(grouped)))
download_era5_multivar_day(names, dataset, grouped[day]; region, dir, skip_existing, cleanup)
append!(paths, download_era5_multivar_day(names, dataset, grouped[day];
region, dir, skip_existing, cleanup))
end

return nothing
return paths
end

function download_dataset(name::Symbol,
Expand All @@ -345,16 +350,15 @@ function download_dataset(name::Symbol,
end

function download_era5_multivar_day(names, dataset, day_dates;
region, dir, skip_existing, cleanup)
region, dir, skip_existing, cleanup)

MDatum = NumericalEarth.DataWrangling.Metadatum
meta_path = NumericalEarth.DataWrangling.metadata_path
meta_filename = NumericalEarth.DataWrangling.metadata_filename

all_triples = [(name, dt, meta_path(MDatum(name; dataset, date=dt, region, dir)))
all_triples = [(name, dt, joinpath(dir, meta_filename(dataset, name, dt, region)))
for name in names for dt in day_dates]

pending = skip_existing ? filter(((_, _, p),) -> !isfile(p), all_triples) : all_triples
isempty(pending) && return nothing
isempty(pending) && return [p for (_, _, p) in all_triples]

cds_vars = unique([cds_varnames(dataset)[name] for (name, _, _) in pending])
sorted_dts = sort(unique([dt for (_, dt, _) in pending]))
Expand Down Expand Up @@ -396,7 +400,7 @@ function download_era5_multivar_day(names, dataset, day_dates;
cleanup && rm(tmp_path; force=true)
end

return nothing
return [p for (_, _, p) in all_triples]
end

#####
Expand Down
2 changes: 1 addition & 1 deletion ext/NumericalEarthWOAExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ function download_dataset(metadata::Metadata{<:WOAClimatology}; skip_existing=tr
cp(source, linkpath)
end

return nothing
return metadata_path(metadata)
end

end # module
2 changes: 1 addition & 1 deletion src/DataWrangling/ECCO/ECCO.jl
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ function download_dataset(metadata::ECCOMetadata)
end
end

return nothing
return metadata_path(metadata)
end

function inpainted_metadata_filename(metadata::ECCOMetadatum)
Expand Down
2 changes: 1 addition & 1 deletion src/DataWrangling/EN4/EN4.jl
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ function download_dataset(metadata::Metadata{<:EN4Monthly})
end
end

return nothing
return metadata_path(metadata)
end

end # Module
3 changes: 1 addition & 2 deletions src/DataWrangling/ERA5/ERA5.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ using Oceananigans.Fields: Center, set!
using Oceananigans: Field, fill_halo_regions!, CPU
using NumericalEarth.DataWrangling: Metadata, Metadatum, metadata_path, native_grid, InverseGravity, download_dataset
using Dates
using Dates: DateTime, Day, Month, Hour
using Dates: DateTime, Month, Hour

import NumericalEarth.DataWrangling:
all_dates,
Expand All @@ -31,7 +31,6 @@ import NumericalEarth.DataWrangling:
inpainted_metadata_path,
available_variables,
retrieve_data,
metadata_path,
is_three_dimensional,
reversed_vertical_axis,
reversed_latitude_axis,
Expand Down
29 changes: 0 additions & 29 deletions src/DataWrangling/ERA5/ERA5_pressure_levels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -126,35 +126,6 @@ function retrieve_data(metadata::ERA5PressureMetadatum)
return reverse(data, dims=2) # Latitude is stored from 90°N → 90°S
end

#####
##### Metadata filename construction
#####

function metadata_prefix(md::ERA5PressureMetadata)
var = ERA5PL_dataset_variable_names[md.name]
dataset = dataset_name(md.dataset)
start_date = start_date_str(md.dates)
end_date = end_date_str(md.dates)
bbox = md.region

if !isnothing(bbox)
w, e = bbox_strs(bbox.longitude)
s, n = bbox_strs(bbox.latitude)
suffix = string(w, e, s, n)
else
suffix = ""
end

if start_date == end_date
prefix = string(var, "_", dataset, "_", start_date, suffix)
else
prefix = string(var, "_", dataset, "_", start_date, "_", end_date, suffix)
end
prefix = colon2dash(prefix)
prefix = underscore_spaces(prefix)
return prefix
end

#####
##### Pressure-level vertical coordinate
#####
Expand Down
44 changes: 0 additions & 44 deletions src/DataWrangling/ERA5/ERA5_single_levels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,6 @@ const ERA5_wave_variables = Set([
:significant_wave_height, :mean_wave_period, :mean_wave_direction,
])

# Mean rate / accumulated variables (CDS "step type" = accum).
# All other single-level variables are instantaneous.
# See ECMWF ERA5 documentation, Tables 3 and 4:
# https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation#ERA5:datadocumentation-Meanrates/fluxesandaccumulations
const ERA5_single_level_accumulated_variables = Set([
:total_precipitation,
:mean_surface_sensible_heat_flux,
:mean_surface_latent_heat_flux,
:mean_surface_momentum_flux_x,
:mean_surface_momentum_flux_y,
:downwelling_shortwave_radiation,
:downwelling_longwave_radiation,
:evaporation,
:mean_evaporation_rate,
])

#####
##### ERA5 single-level data availability
#####
Expand Down Expand Up @@ -152,31 +136,3 @@ function retrieve_data(metadata::ERA5Metadatum)
return reshape(data_2d, size(data_2d, 1), size(data_2d, 2), 1)
end

#####
##### Metadata filename construction
#####

function metadata_prefix(md::ERA5Metadata)
var = ERA5_dataset_variable_names[md.name]
dataset = dataset_name(md.dataset)
start_date = start_date_str(md.dates)
end_date = end_date_str(md.dates)
bbox = md.region

if !isnothing(bbox)
w, e = bbox_strs(bbox.longitude)
s, n = bbox_strs(bbox.latitude)
suffix = string(w, e, s, n)
else
suffix = ""
end

if start_date == end_date
prefix = string(var, "_", dataset, "_", start_date, suffix)
else
prefix = string(var, "_", dataset, "_", start_date, "_", end_date, suffix)
end
prefix = colon2dash(prefix)
prefix = underscore_spaces(prefix)
return prefix
end
2 changes: 1 addition & 1 deletion src/DataWrangling/JRA55/JRA55_metadata.jl
Original file line number Diff line number Diff line change
Expand Up @@ -201,5 +201,5 @@ function download_dataset(metadata::JRA55Metadata)
end
end

return nothing
return metadata_path(metadata)
end
4 changes: 2 additions & 2 deletions src/DataWrangling/OSPapa/OSPapa_flux_observations.jl
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ build_filename(::OSPapaFluxHourly, name, dates::AbstractArray, region) =

function download_dataset(md::OSPapaFluxMetadata)
uniform_path = joinpath(md.dir, metadata_filename(md))
isfile(uniform_path) && return nothing
isfile(uniform_path) && return uniform_path

if !(md.dates isa AbstractArray)
error("OSPapaFluxHourly uniform cache $(uniform_path) is missing; " *
Expand All @@ -107,7 +107,7 @@ function download_dataset(md::OSPapaFluxMetadata)
end_date = last(md.dates)
raw_path = download_ospapa_flux(; start_date, end_date, dir=md.dir)
_write_uniform_flux_file(raw_path, uniform_path, start_date, end_date)
return nothing
return uniform_path
end

function _write_uniform_flux_file(raw_path, uniform_path, start_date, end_date)
Expand Down
78 changes: 55 additions & 23 deletions test/test_cds_downloading.jl
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ start_date = DateTime(2005, 2, 16, 12)

@testset "ERA5 single-level metadata_prefix" begin
ds = ERA5HourlySingleLevel()
mp = NumericalEarth.DataWrangling.ERA5.metadata_prefix

# Single-date metadatum, with region: prefix should not duplicate the date
md_single = Metadatum(:temperature; dataset=ds, region, date=start_date)
prefix_single = NumericalEarth.DataWrangling.ERA5.metadata_prefix(md_single)
# Single-date with region: prefix should not duplicate the date
prefix_single = mp(ds, :temperature, start_date, region)
@test occursin("2m_temperature", prefix_single)
@test occursin("ERA5HourlySingleLevel", prefix_single)
@test occursin("2005-02-16", prefix_single)
Expand All @@ -189,17 +189,14 @@ start_date = DateTime(2005, 2, 16, 12)
@test !occursin(":", prefix_single) # colons replaced by dashes
@test !occursin(" ", prefix_single) # spaces replaced by underscores

# Single-date metadatum, no region: suffix should be empty
md_no_region = Metadatum(:temperature; dataset=ds, date=start_date)
prefix_no_region = NumericalEarth.DataWrangling.ERA5.metadata_prefix(md_no_region)
# Single-date, no region: suffix should be empty
prefix_no_region = mp(ds, :temperature, start_date, nothing)
@test !occursin("0.0", prefix_no_region)
@test !occursin("nothing", prefix_no_region)

# Multi-date metadata: prefix should include both start and end dates
# Multi-date: prefix should include both start and end dates
end_date = start_date + Hour(2)
md_multi = Metadata(:temperature; dataset=ds, region,
dates=start_date:Hour(1):end_date)
prefix_multi = NumericalEarth.DataWrangling.ERA5.metadata_prefix(md_multi)
prefix_multi = mp(ds, :temperature, start_date:Hour(1):end_date, region)
@test occursin("2005-02-16T12", prefix_multi)
@test occursin("2005-02-16T14", prefix_multi)
end
Expand Down Expand Up @@ -579,25 +576,60 @@ end
@testset "single-variable multi-date (download_era5_day)" begin
# All hours of date1, date2 already on disk
ds_sl = ERA5HourlySingleLevel()
for dt in (date1, date2)
touch_expected(:temperature, ds_sl, dt)
end
expected = [touch_expected(:temperature, ds_sl, dt) for dt in (date1, date2)]

# Returns nothing without raising — the early-return guard fires
@test CDSExt.download_era5_day(:temperature, ds_sl, [date1, date2];
region, dir=tmp,
skip_existing=true, cleanup=true) === nothing
# Returns the existing paths without raising — the early-return guard fires
result = CDSExt.download_era5_day(:temperature, ds_sl, [date1, date2];
region, dir=tmp,
skip_existing=true, cleanup=true)
@test result isa Vector{String}
@test Set(result) == Set(expected)
end

@testset "multi-variable multi-date (download_era5_multivar_day)" begin
ds_sl = ERA5HourlySingleLevel()
for name in names, dt in (date1, date2)
touch_expected(name, ds_sl, dt)
end
expected = [touch_expected(name, ds_sl, dt) for name in names for dt in (date1, date2)]

result = CDSExt.download_era5_multivar_day(names, ds_sl, [date1, date2];
region, dir=tmp,
skip_existing=true, cleanup=true)
@test result isa Vector{String}
@test Set(result) == Set(expected)
end

# Dates spanning two calendar days — exercises the parents'
# path-collection across multiple `_group_by_calendar_day` groups.
# Catches regressions that drop or overwrite paths from one group.
date_day1 = DateTime(2005, 2, 16, 12)
date_day2 = DateTime(2005, 2, 17, 6)

@testset "ERA5Metadata parent (multi-day)" begin
ds_sl = ERA5HourlySingleLevel()
expected = [touch_expected(:temperature, ds_sl, dt) for dt in (date_day1, date_day2)]
meta = Metadata(:temperature; dataset=ds_sl, dates=[date_day1, date_day2], region, dir=tmp)

result = download_dataset(meta; skip_existing=true)
@test result isa Vector{String}
@test Set(result) == Set(expected)
end

@testset "ERA5PressureMetadata parent (multi-day, multi-name)" begin
expected = [touch_expected(name, ds_pl, dt) for name in names for dt in (date_day1, date_day2)]
meta = Metadata(:temperature; dataset=ds_pl, dates=[date_day1, date_day2], region, dir=tmp)

result = download_dataset(names, meta; skip_existing=true)
@test result isa Vector{String}
@test Set(result) == Set(expected)
end

@test CDSExt.download_era5_multivar_day(names, ds_sl, [date1, date2];
region, dir=tmp,
skip_existing=true, cleanup=true) === nothing
@testset "names + dataset + datetimes convenience overload (multi-day)" begin
ds_sl = ERA5HourlySingleLevel()
expected = [touch_expected(name, ds_sl, dt) for name in names for dt in (date_day1, date_day2)]

result = download_dataset(names, ds_sl, [date_day1, date_day2];
region, dir=tmp, skip_existing=true, cleanup=true)
@test result isa Vector{String}
@test Set(result) == Set(expected)
end
end
end
Expand Down
Loading