Skip to content
54 changes: 0 additions & 54 deletions workflow/rules/data.smk
Original file line number Diff line number Diff line change
Expand Up @@ -4,60 +4,6 @@ from pathlib import Path
include: "common.smk"


if "extract_cosmoe" in config.get("include-optional-rules", []):

rule extract_cosmoe:
input:
archive=Path("/archive/mch/msopr/osm/COSMO-E"),
output:
fcts=protected(
directory(Path("/store_new/mch/msopr/ml/COSMO-E/FCST{year}.zarr"))
),
resources:
cpus_per_task=4,
runtime="24h",
params:
year_postfix=lambda wc: f"FCST{wc.year}",
steps="0/120/6",
log:
OUT_ROOT / "logs/extract-cosmoe-fcts-{year}.log",
shell:
"""
python workflow/scripts/extract_baseline_fct.py \
--archive_dir {input.archive}/{params.year_postfix} \
--output_store {output.fcts} \
--steps {params.steps} \
> {log} 2>&1
"""


if "extract_cosmo1e" in config.get("include-optional-rules", []):

rule extract_cosmo1e:
input:
archive=Path("/archive/mch/s83/osm/from_GPFS/COSMO-1E"),
output:
fcts=protected(
directory(Path("/store_new/mch/msopr/ml/COSMO-1E/FCST{year}.zarr"))
),
resources:
cpus_per_task=4,
runtime="24h",
params:
year_postfix=lambda wc: f"FCST{wc.year}",
steps="0/33/1",
log:
OUT_ROOT / "logs/extract-cosmo1e-fcts-{year}.log",
shell:
"""
python workflow/scripts/extract_baseline_fct.py \
--archive_dir {input.archive}/{params.year_postfix} \
--output_store {output.fcts} \
--steps {params.steps} \
> {log} 2>&1
"""


rule download_obs_from_peakweather:
localrule: True
output:
Expand Down
125 changes: 91 additions & 34 deletions workflow/scripts/extract_baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,53 +20,93 @@
)


def get_input(root: Path) -> list[Path]:
"""Get list of tarfiles or directories in root directory."""
input_files = sorted(root.glob("*.tar"))
if not input_files:
gribfiles = sorted(root.glob("*_*/grib/i?eff00000000_000"))
input_files = [f.parent.parent for f in gribfiles]
if not input_files:
raise ValueError(f"No files found in {root}.")
return input_files


def get_reftime(file: Path) -> datetime:
if ".tar" in file.suffixes:
return reftime_from_tarfile(file)
else:
return reftime_from_directory(file)


def reftime_from_directory(directory: Path) -> datetime:
"""Extract reftime from directory name."""
dir_stem = directory.name.rsplit("_", 1)[0]
return datetime.strptime(dir_stem, "%y%m%d%H")


def reftime_from_tarfile(tarfile: Path, suffix: str | None = None) -> datetime:
"""Extract reftime from tarfile name."""
suffix = tarfile.stem[-4:] if suffix is None else suffix
return datetime.strptime(tarfile.stem.removesuffix(suffix), "%y%m%d%H")


def check_reftime_consistency(tarfiles: list[Path], delta_h: int = 12):
def check_reftime_consistency(input: list[Path], delta_h: int = 12):
"""Check that all reftimes are available and every delta_h hours."""

# note the lower case y in the format string, it's for 2-digit years

first_reftime = reftime_from_tarfile(tarfiles[0])
first_reftime = get_reftime(input[0])
expected_reftime = first_reftime
for file in tarfiles:
reftime = reftime_from_tarfile(file)
for file in input:
reftime = get_reftime(file)
if reftime != expected_reftime:
raise ValueError(f"Expected reftime {expected_reftime} but got {reftime}.")
expected_reftime += timedelta(hours=delta_h)
return first_reftime, expected_reftime - timedelta(hours=delta_h)


def extract(
tar: Path, lead_times: list[int], run_id: str, params: list[str]
file: Path, lead_times: list[int], run_id: str, params: list[str]
) -> xr.Dataset:
LOG.info(f"Extracting fields from {tar}.")
reftime = reftime_from_tarfile(tar)
if "COSMO-E" in tar.parts:
LOG.info(f"Extracting fields from {file}.")
reftime = reftime_from_tarfile(file)
if "COSMO-E" in file.parts:
gribname = "ceffsurf"
elif "COSMO-1E" in tar.parts:
elif "COSMO-1E" in file.parts:
gribname = "c1effsurf"
elif "ICON-CH1-EPS" in file.parts:
gribname = "i1eff"
elif "ICON-CH2-EPS" in file.parts:
gribname = "i2eff"
else:
raise ValueError("Currently only COSMO-E and COSMO-1E are supported.")
tar_archive = tarfile.open(tar, mode="r:*")
raise ValueError("Currently only COSMO-E/1E and ICON-CH1/2-EPS are supported.")
out = ekd.SimpleFieldList()
for lt in lead_times:
filename = f"{tar.stem}/grib/{gribname}{lt:03}_{run_id}"
LOG.info(f"Extracting {filename}.")
stream = tar_archive.extractfile(filename)

# LOG.info(f"Reading fields...")
streamfieldlist: StreamFieldList = ekd.from_source("stream", stream)
for field in streamfieldlist:
shortname = field.metadata("shortName")
if shortname in params:
out.append(field)
stream.close()
tar_archive.close()
if ".tar" in file.suffixes:
tar_archive = tarfile.open(file, mode="r:*")
for lt in lead_times:
filename = f"{file.stem}/grib/{gribname}{lt:03}_{run_id}"
LOG.info(f"Extracting {filename}.")
stream = tar_archive.extractfile(filename)

# LOG.info(f"Reading fields...")
streamfieldlist: StreamFieldList = ekd.from_source("stream", stream)
for field in streamfieldlist:
shortname = field.metadata("shortName")
if shortname in params:
out.append(field)
stream.close()
tar_archive.close()
else:
for lt in lead_times:
lh = lt % 24
ld = lt // 24
filepath = file / "grib" / f"{gribname}{ld:02}{lh:02}0000_{run_id}"
LOG.info(f"Extracting {filepath}.")
fields = ekd.from_source("file", filepath)
for field in fields:
shortname = field.metadata("shortName")
if shortname in params:
out.append(field)

out = out.to_xarray(profile="grib")
out = out.expand_dims(
Expand Down Expand Up @@ -95,16 +135,16 @@ def _parse_steps(steps: str) -> int:


def main(cfg: ScriptConfig):
tarfiles = sorted(cfg.archive_dir.glob("*.tar"))
input = get_input(cfg.archive_dir)
delta_h = 12
if "COSMO-1E" in tarfiles[0].parts:
if "COSMO-1E" in input[0].parts or "ICON-CH1-EPS" in input[0].parts:
delta_h = 3
first_reftime, last_reftime = check_reftime_consistency(tarfiles, delta_h)
LOG.info(
f"Found {len(tarfiles)} tar archives from {first_reftime} to {last_reftime}."
)
if "ICON-CH2-EPS" in input[0].parts:
delta_h = 6
first_reftime, last_reftime = check_reftime_consistency(input, delta_h)
LOG.info(f"Found {len(input)} forecasts from {first_reftime} to {last_reftime}.")

reftimes = np.array([reftime_from_tarfile(f) for f in tarfiles], dtype="datetime64")
reftimes = np.array([get_reftime(f) for f in input], dtype="datetime64")
missing = reftimes
if not cfg.overwrite: # only check dataset when we want to append as this is slow
existing_reftimes = np.array([])
Expand All @@ -130,7 +170,7 @@ def main(cfg: ScriptConfig):
_, indices, _ = np.intersect1d(reftimes, missing, return_indices=True)

for i in indices:
file = tarfiles[i]
file = input[i]
ds = extract(file, cfg.steps, cfg.run_id, cfg.params)

LOG.info(f"Extracted: {ds}")
Expand All @@ -144,9 +184,12 @@ def main(cfg: ScriptConfig):
zarr_encoding = {
"forecast_reference_time": {"units": "nanoseconds since 1970-01-01"}
}
cfg.output_store.parent.mkdir(parents=True, exist_ok=True)
if i == 0:
LOG.info(f"Creating new zarr store at {cfg.output_store}.")
ds.to_zarr(cfg.output_store, mode="w", encoding=zarr_encoding)
else:
LOG.info(f"Appending to existing zarr store at {cfg.output_store}.")
ds.to_zarr(cfg.output_store, mode="a", append_dim="forecast_reference_time")


Expand Down Expand Up @@ -186,13 +229,27 @@ def main(cfg: ScriptConfig):

"""
Example usage:
python workflow/scripts/extract_baseline_fct.py \

To submit as a batch job on compute nodes
sbatch --wrap "uv run python ..."

python workflow/scripts/extract_baseline.py \
--archive_dir /archive/mch/msopr/osm/COSMO-E/FCST20 \
--output_store /store_new/mch/msopr/ml/COSMO-E/FCST20.zarr \
--steps 0/120/6

python workflow/scripts/extract_baseline_fct.py \
python workflow/scripts/extract_baseline.py \
--archive_dir /archive/mch/s83/osm/from_GPFS/COSMO-1E/FCST20 \
--output_store /store_new/mch/msopr/ml/COSMO-1E/FCST20.zarr \
--steps 0/33/1

python workflow/scripts/extract_baseline.py \
--archive_dir /store_new/mch/msopr/osm/ICON-CH1-EPS/FCST24 \
--output_store /store_new/mch/msopr/ml/ICON-CH1-EPS/FCST24.zarr \
--steps 0/33/1

python workflow/scripts/extract_baseline.py \
--archive_dir /store_new/mch/msopr/osm/ICON-CH1-EPS/FCST25 \
--output_store /store_new/mch/msopr/ml/ICON-CH1-EPS/FCST25.zarr \
--steps 0/33/1
"""
Loading