diff --git a/workflow/rules/data.smk b/workflow/rules/data.smk index 0696657..9892f8f 100644 --- a/workflow/rules/data.smk +++ b/workflow/rules/data.smk @@ -4,60 +4,6 @@ from pathlib import Path include: "common.smk" -if "extract_cosmoe" in config.get("include-optional-rules", []): - - rule extract_cosmoe: - input: - archive=Path("/archive/mch/msopr/osm/COSMO-E"), - output: - fcts=protected( - directory(Path("/store_new/mch/msopr/ml/COSMO-E/FCST{year}.zarr")) - ), - resources: - cpus_per_task=4, - runtime="24h", - params: - year_postfix=lambda wc: f"FCST{wc.year}", - steps="0/120/6", - log: - OUT_ROOT / "logs/extract-cosmoe-fcts-{year}.log", - shell: - """ - python workflow/scripts/extract_baseline_fct.py \ - --archive_dir {input.archive}/{params.year_postfix} \ - --output_store {output.fcts} \ - --steps {params.steps} \ - > {log} 2>&1 - """ - - -if "extract_cosmo1e" in config.get("include-optional-rules", []): - - rule extract_cosmo1e: - input: - archive=Path("/archive/mch/s83/osm/from_GPFS/COSMO-1E"), - output: - fcts=protected( - directory(Path("/store_new/mch/msopr/ml/COSMO-1E/FCST{year}.zarr")) - ), - resources: - cpus_per_task=4, - runtime="24h", - params: - year_postfix=lambda wc: f"FCST{wc.year}", - steps="0/33/1", - log: - OUT_ROOT / "logs/extract-cosmo1e-fcts-{year}.log", - shell: - """ - python workflow/scripts/extract_baseline_fct.py \ - --archive_dir {input.archive}/{params.year_postfix} \ - --output_store {output.fcts} \ - --steps {params.steps} \ - > {log} 2>&1 - """ - - rule download_obs_from_peakweather: localrule: True output: diff --git a/workflow/scripts/extract_baseline.py b/workflow/scripts/extract_baseline.py index f450b82..075ad1d 100644 --- a/workflow/scripts/extract_baseline.py +++ b/workflow/scripts/extract_baseline.py @@ -20,21 +20,45 @@ ) +def get_input(root: Path) -> list[Path]: + """Get list of tarfiles or directories in root directory.""" + input_files = sorted(root.glob("*.tar")) + if not input_files: + gribfiles = sorted(root.glob("*_*/grib/i?eff00000000_000")) + input_files = [f.parent.parent for f in gribfiles] + if not input_files: + raise ValueError(f"No files found in {root}.") + return input_files + + +def get_reftime(file: Path) -> datetime: + if ".tar" in file.suffixes: + return reftime_from_tarfile(file) + else: + return reftime_from_directory(file) + + +def reftime_from_directory(directory: Path) -> datetime: + """Extract reftime from directory name.""" + dir_stem = directory.name.rsplit("_", 1)[0] + return datetime.strptime(dir_stem, "%y%m%d%H") + + def reftime_from_tarfile(tarfile: Path, suffix: str | None = None) -> datetime: """Extract reftime from tarfile name.""" suffix = tarfile.stem[-4:] if suffix is None else suffix return datetime.strptime(tarfile.stem.removesuffix(suffix), "%y%m%d%H") -def check_reftime_consistency(tarfiles: list[Path], delta_h: int = 12): +def check_reftime_consistency(input: list[Path], delta_h: int = 12): """Check that all reftimes are available and every delta_h hours.""" # note the lower case y in the format string, it's for 2-digit years - first_reftime = reftime_from_tarfile(tarfiles[0]) + first_reftime = get_reftime(input[0]) expected_reftime = first_reftime - for file in tarfiles: - reftime = reftime_from_tarfile(file) + for file in input: + reftime = get_reftime(file) if reftime != expected_reftime: raise ValueError(f"Expected reftime {expected_reftime} but got {reftime}.") expected_reftime += timedelta(hours=delta_h) @@ -42,31 +66,47 @@ def check_reftime_consistency(tarfiles: list[Path], delta_h: int = 12): def extract( - tar: Path, lead_times: list[int], run_id: str, params: list[str] + file: Path, lead_times: list[int], run_id: str, params: list[str] ) -> xr.Dataset: - LOG.info(f"Extracting fields from {tar}.") - reftime = reftime_from_tarfile(tar) - if "COSMO-E" in tar.parts: + LOG.info(f"Extracting fields from {file}.") + reftime = reftime_from_tarfile(file) + if "COSMO-E" in file.parts: gribname = "ceffsurf" - elif "COSMO-1E" in tar.parts: + elif "COSMO-1E" in file.parts: gribname = "c1effsurf" + elif "ICON-CH1-EPS" in file.parts: + gribname = "i1eff" + elif "ICON-CH2-EPS" in file.parts: + gribname = "i2eff" else: - raise ValueError("Currently only COSMO-E and COSMO-1E are supported.") - tar_archive = tarfile.open(tar, mode="r:*") + raise ValueError("Currently only COSMO-E/1E and ICON-CH1/2-EPS are supported.") out = ekd.SimpleFieldList() - for lt in lead_times: - filename = f"{tar.stem}/grib/{gribname}{lt:03}_{run_id}" - LOG.info(f"Extracting {filename}.") - stream = tar_archive.extractfile(filename) - - # LOG.info(f"Reading fields...") - streamfieldlist: StreamFieldList = ekd.from_source("stream", stream) - for field in streamfieldlist: - shortname = field.metadata("shortName") - if shortname in params: - out.append(field) - stream.close() - tar_archive.close() + if ".tar" in file.suffixes: + tar_archive = tarfile.open(file, mode="r:*") + for lt in lead_times: + filename = f"{file.stem}/grib/{gribname}{lt:03}_{run_id}" + LOG.info(f"Extracting {filename}.") + stream = tar_archive.extractfile(filename) + + # LOG.info(f"Reading fields...") + streamfieldlist: StreamFieldList = ekd.from_source("stream", stream) + for field in streamfieldlist: + shortname = field.metadata("shortName") + if shortname in params: + out.append(field) + stream.close() + tar_archive.close() + else: + for lt in lead_times: + lh = lt % 24 + ld = lt // 24 + filepath = file / "grib" / f"{gribname}{ld:02}{lh:02}0000_{run_id}" + LOG.info(f"Extracting {filepath}.") + fields = ekd.from_source("file", filepath) + for field in fields: + shortname = field.metadata("shortName") + if shortname in params: + out.append(field) out = out.to_xarray(profile="grib") out = out.expand_dims( @@ -95,16 +135,16 @@ def _parse_steps(steps: str) -> int: def main(cfg: ScriptConfig): - tarfiles = sorted(cfg.archive_dir.glob("*.tar")) + input = get_input(cfg.archive_dir) delta_h = 12 - if "COSMO-1E" in tarfiles[0].parts: + if "COSMO-1E" in input[0].parts or "ICON-CH1-EPS" in input[0].parts: delta_h = 3 - first_reftime, last_reftime = check_reftime_consistency(tarfiles, delta_h) - LOG.info( - f"Found {len(tarfiles)} tar archives from {first_reftime} to {last_reftime}." - ) + if "ICON-CH2-EPS" in input[0].parts: + delta_h = 6 + first_reftime, last_reftime = check_reftime_consistency(input, delta_h) + LOG.info(f"Found {len(input)} forecasts from {first_reftime} to {last_reftime}.") - reftimes = np.array([reftime_from_tarfile(f) for f in tarfiles], dtype="datetime64") + reftimes = np.array([get_reftime(f) for f in input], dtype="datetime64") missing = reftimes if not cfg.overwrite: # only check dataset when we want to append as this is slow existing_reftimes = np.array([]) @@ -130,7 +170,7 @@ def main(cfg: ScriptConfig): _, indices, _ = np.intersect1d(reftimes, missing, return_indices=True) for i in indices: - file = tarfiles[i] + file = input[i] ds = extract(file, cfg.steps, cfg.run_id, cfg.params) LOG.info(f"Extracted: {ds}") @@ -144,9 +184,12 @@ def main(cfg: ScriptConfig): zarr_encoding = { "forecast_reference_time": {"units": "nanoseconds since 1970-01-01"} } + cfg.output_store.parent.mkdir(parents=True, exist_ok=True) if i == 0: + LOG.info(f"Creating new zarr store at {cfg.output_store}.") ds.to_zarr(cfg.output_store, mode="w", encoding=zarr_encoding) else: + LOG.info(f"Appending to existing zarr store at {cfg.output_store}.") ds.to_zarr(cfg.output_store, mode="a", append_dim="forecast_reference_time") @@ -186,13 +229,27 @@ def main(cfg: ScriptConfig): """ Example usage: -python workflow/scripts/extract_baseline_fct.py \ + +To submit as a batch job on compute nodes +sbatch --wrap "uv run python ..." + +python workflow/scripts/extract_baseline.py \ --archive_dir /archive/mch/msopr/osm/COSMO-E/FCST20 \ --output_store /store_new/mch/msopr/ml/COSMO-E/FCST20.zarr \ --steps 0/120/6 -python workflow/scripts/extract_baseline_fct.py \ +python workflow/scripts/extract_baseline.py \ --archive_dir /archive/mch/s83/osm/from_GPFS/COSMO-1E/FCST20 \ --output_store /store_new/mch/msopr/ml/COSMO-1E/FCST20.zarr \ --steps 0/33/1 + +python workflow/scripts/extract_baseline.py \ + --archive_dir /store_new/mch/msopr/osm/ICON-CH1-EPS/FCST24 \ + --output_store /store_new/mch/msopr/ml/ICON-CH1-EPS/FCST24.zarr \ + --steps 0/33/1 + +python workflow/scripts/extract_baseline.py \ + --archive_dir /store_new/mch/msopr/osm/ICON-CH1-EPS/FCST25 \ + --output_store /store_new/mch/msopr/ml/ICON-CH1-EPS/FCST25.zarr \ + --steps 0/33/1 """