diff --git a/fme/ace/testing/__init__.py b/fme/ace/testing/__init__.py index 034a41a7e..717162fc2 100644 --- a/fme/ace/testing/__init__.py +++ b/fme/ace/testing/__init__.py @@ -4,6 +4,7 @@ FV3GFSData, MonthlyReferenceData, StatsData, + get_nd_dataset, save_nd_netcdf, save_scalar_netcdf, ) diff --git a/scripts/data_process/get_stats.py b/scripts/data_process/get_stats.py index aabd2deb4..202a5d3b7 100644 --- a/scripts/data_process/get_stats.py +++ b/scripts/data_process/get_stats.py @@ -88,11 +88,18 @@ def get_stats( debug: bool, ): # Import dask-related things here to enable testing in environments without dask. - import dask - import distributed + try: + import dask + import distributed + + client = distributed.Client(n_workers=16) + except ImportError as e: + # warn and continue + logging.warning(f"Could not import dask ({e}), chunking is disabled.") + client = None + dask = None initial_time = time.time() - client = distributed.Client(n_workers=16) xr.set_options(keep_attrs=True, display_max_rows=100) logging.info(f"Reading data from {input_zarr}") @@ -100,8 +107,11 @@ def get_stats( # Open data with roughly 128 MiB chunks via dask's automatic chunking. This # is useful when opening sharded zarr stores with an inner chunk size of 1, # which is otherwise inefficient for the type of computation done here. - with dask.config.set({"array.chunk-size": "128MiB"}): - ds = xr.open_zarr(input_zarr, chunks={"time": "auto"}) + if dask is not None: + with dask.config.set({"array.chunk-size": "128MiB"}): + ds = xr.open_zarr(input_zarr, chunks={"time": "auto"}) + else: + ds = xr.open_zarr(input_zarr) ds = ds.drop_vars(DROP_VARIABLES, errors="ignore") ds = ds.sel(time=slice(config.start_date, config.end_date)) @@ -186,7 +196,8 @@ def get_stats( total_time = time.time() - initial_time logging.info(f"Total time for computing stats: {total_time:0.2f} seconds.") - client.close() + if client is not None: + client.close() client = None diff --git a/scripts/time_coarsen/c96-shield-stats.yaml b/scripts/time_coarsen/c96-shield-stats.yaml new file mode 100644 index 000000000..b46588c7c --- /dev/null +++ b/scripts/time_coarsen/c96-shield-stats.yaml @@ -0,0 +1,12 @@ +runs: + ic_0001: "" + ic_0002: "" +data_output_directory: /climate-default/2026-01-28-vertically-resolved-c96-1deg-daily-shield-amip-ensemble-dataset +stats: + output_directory: /climate-default/2026-01-28-vertically-resolved-c96-1deg-daily-shield-amip-ensemble-dataset-stats + beaker_dataset: 2026-01-28-vertically-resolved-c96-1deg-daily-shield-amip-ensemble-dataset-stats + start_date: "1940-01-01" + end_date: "2021-12-31" + data_type: FV3GFS + exclude_runs: + - "ic_0002" diff --git a/scripts/time_coarsen/c96-shield.yaml b/scripts/time_coarsen/c96-shield.yaml new file mode 100644 index 000000000..94843e7dd --- /dev/null +++ b/scripts/time_coarsen/c96-shield.yaml @@ -0,0 +1,93 @@ +paths: + - input: /climate-default/2026-01-28-vertically-resolved-c96-1deg-shield-amip-ensemble-dataset/ic_0001.zarr + output: /climate-default/2026-01-28-vertically-resolved-c96-1deg-daily-shield-amip-ensemble-dataset/ic_0001.zarr + - input: /climate-default/2026-01-28-vertically-resolved-c96-1deg-shield-amip-ensemble-dataset/ic_0002.zarr + output: /climate-default/2026-01-28-vertically-resolved-c96-1deg-daily-shield-amip-ensemble-dataset/ic_0002.zarr +coarsen_factor: 4 +snapshot_names: + - PRESsfc + - Q2m + - RH200 + - RH500 + - RH850 + - TMP200 + - TMP2m + - TMP500 + - TMP850 + - UGRD1000 + - UGRD10m + - UGRD200 + - UGRD500 + - UGRD850 + - VGRD1000 + - VGRD10m + - VGRD200 + - VGRD500 + - VGRD850 + - air_temperature_0 + - air_temperature_1 + - air_temperature_2 + - air_temperature_3 + - air_temperature_4 + - air_temperature_5 + - air_temperature_6 + - air_temperature_7 + - eastward_wind_0 + - eastward_wind_1 + - eastward_wind_2 + - eastward_wind_3 + - eastward_wind_4 + - eastward_wind_5 + - eastward_wind_6 + - eastward_wind_7 + - global_mean_co2 + - h1000 + - h50 + - h500 + - h850 + - land_fraction + - northward_wind_0 + - northward_wind_1 + - northward_wind_2 + - northward_wind_3 + - northward_wind_4 + - northward_wind_5 + - northward_wind_6 + - northward_wind_7 + - ocean_fraction + - sea_ice_fraction + - snow_cover_fraction + - soil_moisture_0 + - soil_moisture_1 + - soil_moisture_2 + - soil_moisture_3 + - specific_total_water_0 + - specific_total_water_1 + - specific_total_water_2 + - specific_total_water_3 + - specific_total_water_4 + - specific_total_water_5 + - specific_total_water_6 + - specific_total_water_7 + - surface_temperature + - total_water_path +window_names: + - DLWRFsfc + - DSWRFsfc + - DSWRFtoa + - LHTFLsfc + - PRATEsfc + - SHTFLsfc + - ULWRFsfc + - ULWRFtoa + - USWRFsfc + - USWRFtoa + - tendency_of_total_water_path + - tendency_of_total_water_path_due_to_advection + - total_frozen_precipitation_rate +constant_prefixes: + - ak_ + - bk_ + - HGTsfc + - grid_xt + - grid_yt diff --git a/scripts/time_coarsen/run.sh b/scripts/time_coarsen/run.sh new file mode 100755 index 000000000..aae3b11d8 --- /dev/null +++ b/scripts/time_coarsen/run.sh @@ -0,0 +1,45 @@ + +#!/bin/bash + +set -e + +SCRIPT_PATH=$(git rev-parse --show-prefix) # relative to the root of the repository +REPO_ROOT=$(git rev-parse --show-toplevel) + +cd "$REPO_ROOT" + +run_coarsen() { + local config_filename="$1" + local job_name="$2" + local CONFIG_PATH="$SCRIPT_PATH/$config_filename" + + # Extract additional args from config header + local extra_args=() + while IFS= read -r line; do + [[ "$line" =~ ^#\ arg:\ (.*) ]] && extra_args+=(${BASH_REMATCH[1]}) + done < "$CONFIG_PATH" + + gantry run \ + --name "$job_name" \ + --description 'Run ACE training' \ + --beaker-image "$(cat $REPO_ROOT/latest_deps_only_image.txt)" \ + --workspace ai2/climate-titan \ + --priority urgent \ + --not-preemptible \ + --cluster ai2/jupiter \ + --cluster ai2/ceres \ + --env GOOGLE_APPLICATION_CREDENTIALS=/tmp/google_application_credentials.json \ + --dataset-secret google-credentials:/tmp/google_application_credentials.json \ + --gpus 0 \ + --shared-memory 400GiB \ + --weka climate-default:/climate-default \ + --budget ai2/climate \ + --system-python \ + --install "pip install --no-deps ." \ + "${extra_args[@]}" \ + -- python $SCRIPT_PATH/time_coarsen.py "$CONFIG_PATH" +} + +base_name="time-coarsen" + +run_coarsen "c96-shield.yaml" "$base_name-c96-shield" diff --git a/scripts/time_coarsen/run_stats.sh b/scripts/time_coarsen/run_stats.sh new file mode 100755 index 000000000..aa6853e47 --- /dev/null +++ b/scripts/time_coarsen/run_stats.sh @@ -0,0 +1,46 @@ + +#!/bin/bash + +set -e + +SCRIPT_PATH=$(git rev-parse --show-prefix) # relative to the root of the repository +REPO_ROOT=$(git rev-parse --show-toplevel) + +cd "$REPO_ROOT" + +run_stats() { + local config_filename="$1" + local job_name="$2" + local CONFIG_PATH="$SCRIPT_PATH/$config_filename" + + # Extract additional args from config header + local extra_args=() + while IFS= read -r line; do + [[ "$line" =~ ^#\ arg:\ (.*) ]] && extra_args+=(${BASH_REMATCH[1]}) + done < "$CONFIG_PATH" + + gantry run \ + --name "$job_name" \ + --description 'Run ACE stats computation' \ + --beaker-image "$(cat $REPO_ROOT/latest_deps_only_image.txt)" \ + --workspace ai2/climate-titan \ + --priority urgent \ + --not-preemptible \ + --cluster ai2/jupiter \ + --cluster ai2/ceres \ + --env GOOGLE_APPLICATION_CREDENTIALS=/tmp/google_application_credentials.json \ + --dataset-secret google-credentials:/tmp/google_application_credentials.json \ + --gpus 0 \ + --shared-memory 400GiB \ + --weka climate-default:/climate-default \ + --budget ai2/climate \ + --system-python \ + --install "pip install --no-deps ." \ + "${extra_args[@]}" \ + -- python $SCRIPT_PATH/../data_process/get_stats.py "$CONFIG_PATH" 0 +} + +base_name="time-coarsen-stats" + +run_stats "c96-shield-stats.yaml" "$base_name-c96-shield" + diff --git a/scripts/time_coarsen/test_time_coarsen.py b/scripts/time_coarsen/test_time_coarsen.py new file mode 100644 index 000000000..a1450b7e6 --- /dev/null +++ b/scripts/time_coarsen/test_time_coarsen.py @@ -0,0 +1,72 @@ +import numpy as np +import xarray as xr +from time_coarsen import Config, PathPair, main + +from fme.ace.testing import DimSize, DimSizes, get_nd_dataset + + +def test_time_coarsen() -> None: + n_input_times = 5 + nz_interface = 3 + # Create a small but non-trivial dataset (coords + attrs) + ds = get_nd_dataset( + dim_sizes=DimSizes( + n_time=n_input_times, + horizontal=[ + DimSize(name="lat", size=4), + DimSize(name="lon", size=8), + ], + nz_interface=nz_interface, + ), + variable_names=["temp", "temp_tendency", "flag"], + timestep_days=1.0, + include_vertical_coordinate=True, + ) + ds["temp"].attrs["units"] = "K" + ds["temp_tendency"].attrs["units"] = "K/day" + constant_names = [] + for name in ds.data_vars: + if name.startswith("ak_") or name.startswith("bk_"): + constant_names.append(name) + assert len(constant_names) == 2 * nz_interface # sanity check + input_path = "memory://test-time-coarsen/dataset.zarr" + output_path = "memory://test-time-coarsen/dataset_coarsened.zarr" + + # Write to an in-memory filesystem (xarray recognizes the memory:// protocol) + ds.to_zarr(input_path) + ds = xr.open_zarr(input_path) # for comparison later, use fresh read + + config = Config( + paths=[PathPair(input=input_path, output=output_path)], + coarsen_factor=2, + snapshot_names=["temp"], + window_names=["temp_tendency"], + ) + main(config) + + # Read back the coarsened dataset + ds_coarsened = xr.open_zarr(output_path) + # Note on each timestep, we have the tendencies which led to the current + # snapshot alongside the snapshot itself. This means the first snapshot + # of the coarsened dataset is not the first snapshot of the input dataset. + assert ds_coarsened.dims["time"] == n_input_times // config.coarsen_factor + expected_snapshot_slice = slice( + config.coarsen_factor - 1, None, config.coarsen_factor + ) + np.testing.assert_array_equal( + ds_coarsened["time"].values, + ds["time"].isel(time=expected_snapshot_slice).values, + ) + np.testing.assert_array_equal( + ds_coarsened["temp"].values, + ds["temp"].isel(time=expected_snapshot_slice).values, + ) + np.testing.assert_array_equal( + ds_coarsened["temp_tendency"].isel(time=0).values, + ds["temp_tendency"] + .isel(time=slice(0, config.coarsen_factor)) + .mean("time") + .values, + ) + for name in constant_names: + np.testing.assert_array_equal(ds_coarsened[name].values, ds[name].values) diff --git a/scripts/time_coarsen/time_coarsen.py b/scripts/time_coarsen/time_coarsen.py new file mode 100644 index 000000000..b1a617f0b --- /dev/null +++ b/scripts/time_coarsen/time_coarsen.py @@ -0,0 +1,91 @@ +import argparse +import dataclasses +import logging +import os + +import dacite +import xarray as xr +import yaml + + +@dataclasses.dataclass +class PathPair: + input: str + output: str + + +@dataclasses.dataclass +class Config: + """ + Configuration for time coarsening of a dataset. + + Attributes: + input_path: Path to the input dataset. + output_path: Path to save the coarsened dataset as a zarr store. + coarsen_factor: Factor by which to coarsen the time dimension. + snapshot_names: List of snapshot variable names to coarsen. These will be + coarsened by skipping each coarsen_factor times. + window_names: List of window variable names to coarsen. These will be + coarsened by averaging over each coarsen_factor times. + constant_prefixes: List of prefixes for constant data variables to copy without + modification. Raises an exception if any of these have a "time" dimension. + """ + + paths: list[PathPair] + coarsen_factor: int + snapshot_names: list[str] + window_names: list[str] + constant_prefixes: list[str] = dataclasses.field( + default_factory=lambda: ["ak_", "bk_"] + ) + + +def main(config: Config): + for paths in config.paths: + process_path_pair(paths, config) + + +def process_path_pair(pair: PathPair, config: Config): + logging.info(f"Processing input: {pair.input} to output: {pair.output}") + if os.path.exists(pair.output): + logging.warning(f"Output path {pair.output} already exists. Skipping.") + return + ds = xr.open_dataset(pair.input) + constant_names = [ + name + for name in ds.data_vars + if any(name.startswith(prefix) for prefix in config.constant_prefixes) + ] + assert set(constant_names).intersection(set(config.snapshot_names)) == set() + assert set(constant_names).intersection(set(config.window_names)) == set() + for name in constant_names: + if "time" in ds[name].dims: + raise ValueError( + f"Constant data variable {name} has a 'time' dimension, " + "which is not allowed." + ) + ds_constants = ds[constant_names] + ds_snapshot = ds[config.snapshot_names].isel( + time=slice(config.coarsen_factor - 1, None, config.coarsen_factor) + ) + ds_window = ( + ds[config.window_names] + .coarsen(time=config.coarsen_factor, boundary="trim") + .mean() + .drop("time") + ) # use time of snapshots + # ds_coarsened = ds_snapshot#xr.merge([ds_snapshot, ds_window, ds_coords]) + ds_coarsened = xr.merge([ds_snapshot, ds_window, ds_constants]) + ds_coarsened.to_zarr(pair.output) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Time Coarsen Script") + parser.add_argument("config_yaml", type=str, help="Path to configuration yaml file") + args = parser.parse_args() + with open(args.config_yaml, "r") as f: + config_dict = yaml.safe_load(f) + config = dacite.from_dict( + data_class=Config, data=config_dict, config=dacite.Config(strict=True) + ) + main(config)