Conversation
Add DataSource (CAMS) and ForecastSource (CAMS_FX) for Copernicus Atmosphere Monitoring Service data via the CDS API. CAMS provides atmospheric composition / air quality data not currently available in earth2studio — complementing the existing weather-focused data sources (GFS, IFS, ERA5, etc.). Data sources: - CAMS: EU air quality analysis (0.1 deg, 9 pollutants, 10 height levels) - CAMS_FX: EU + Global forecasts (EU 0.1 deg up to 96h, Global 0.4 deg up to 120h) Variables include: dust, PM2.5, PM10, SO2, NO2, O3, CO, NH3, NO (EU surface and multi-level), plus AOD and total column products (Global). Lexicon: 101 entries covering all 9 pollutants at all 9 EU altitude levels (50-5000m), plus surface and 11 global column/AOD variables. Implementation follows upstream conventions: - Protocol-compliant __call__ and async fetch methods - Badges section for API doc filtering - Time validation, available() classmethod - Lazy CDS client initialization - pathlib-based caching with SHA256 keys - Tests with @pytest.mark.xfail for CI without CDS credentials Requires: cdsapi (already in the 'data' optional dependency group) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… coordinate-based lead-time selection
P1: Use atomic write-then-rename in _download_cams_netcdf to prevent
corrupt partial files from being cached on interrupted downloads.
P1: Fix TypeError in CAMS.available() and CAMS_FX.available() when
called with timezone-aware datetimes (strip tzinfo before comparing
against naive min-time constants, matching _validate_cams_time).
P2: Replace positional lead-time indexing in _extract_field with
coordinate-based selection via forecast_period dimension values,
avoiding silent data misassignment if API reorders slices.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add CAMS to analysis datasources and CAMS_FX to forecast datasources. Add region:europe and product:airquality to badge filters. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Deduplicate api_vars via dict.fromkeys() to avoid duplicate variable names in CDS API requests (CAMS and CAMS_FX) - Use dataset-specific min-time validation in CAMS_FX (EU: 2019-07-01, Global: 2015-01-01) instead of global minimum for all datasets - Sort lead_hours in CAMS_FX cache key so identical lead times in different order produce the same cache hit Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per reviewer feedback (NickGeneva): - Remove CAMS analysis class (no ML models need it currently) - Remove EU dataset support from CAMS_FX (1:1 mapping with remote store) - Reduce CAMSLexicon to 11 Global variables (AOD, column products) - Update docs and tests accordingly Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
/blossom-ci |
|
/blossom-ci |
Greptile SummaryThis PR introduces Key changes:
The primary concern is that
|
| Filename | Overview |
|---|---|
| earth2studio/data/cams.py | New CAMS_FX forecast data source; missing initialization-hour validation (only 00:00 and 12:00 UTC are valid CAMS init times) and product:airquality badge in class docstring |
| earth2studio/lexicon/cams.py | New CAMSGlobalLexicon mapping e2s variable names to CAMS API names and NetCDF keys; vocabulary format is consistent and the identity modifier pattern matches other lexicons |
| earth2studio/lexicon/base.py | Adds 10 new CAMS-specific variable entries (aerosol AOD and trace gas columns) to E2STUDIO_VOCAB; additions look correct and consistent |
| test/data/test_cams.py | Comprehensive unit and mock tests for CAMS_FX; does not include a test for invalid initialization hours (e.g. 06:00 UTC), which mirrors the missing validation in the source |
| test/lexicon/test_cams_lexicon.py | Well-structured lexicon tests covering surface, AOD, trace-gas, and pressure-level variables; also validates VOCAB entry format |
| test/data/test_cds.py | Adds a monkeypatched CDSAPI_URL fixture so CDS tests target the correct endpoint; straightforward improvement |
| earth2studio/data/init.py | Exports CAMS_FX from the data package; change is minimal and correct |
| earth2studio/lexicon/init.py | Exports CAMSGlobalLexicon from the lexicon package; change is minimal and correct |
| docs/modules/datasources_analysis.rst | Adds region:europe and product:airquality to the badge filter; consistent with the new CAMS data source |
| docs/modules/datasources_forecast.rst | Adds region:europe and product:airquality badge filters and registers CAMS_FX in the forecast source toctree |
Comments Outside Diff (1)
-
earth2studio/data/cams.py, line 514-521 (link)Fixed temp-cache path may conflict across concurrent instances
When
cache=False, the cache property returns the fixed path…/cams/tmp_cams_fx. If twoCAMS_FX(cache=False)instances run concurrently (e.g. in a multi-process workflow), one instance'sshutil.rmtree(self.cache)call in__call__will delete files that the other instance may still be using. The CDS data source in this repo uses the same pattern, but it is worth noting for users running parallel pipelines. A per-instance UUID suffix (e.g.tempfile.mkdtemp) would make the temp directory unique.
Reviews (1): Last reviewed commit: "Merge branch 'main' into pr-780" | Re-trigger Greptile
| "are only available at 3-hourly intervals (0, 3, 6, ...)." | ||
| ) | ||
|
|
||
| # Download surface variables | ||
| surface_ds: xr.Dataset | None = None | ||
| if surface_infos: | ||
| surface_api_vars = list(dict.fromkeys(vi.api_name for vi in surface_infos)) | ||
| nc_path = self._download_cached(time, surface_api_vars, lead_hours) | ||
| surface_ds = xr.open_dataset(nc_path, decode_timedelta=False) | ||
|
|
||
| # Download pressure-level variables (grouped by unique levels) | ||
| pressure_ds: xr.Dataset | None = None | ||
| if pressure_infos: |
There was a problem hiding this comment.
Missing CAMS initialization-hour validation
CAMS Global Atmospheric Composition Forecasts are only produced at 00:00 UTC and 12:00 UTC. The current _validate_time only enforces that minutes and seconds are zero — it accepts any hour (e.g. 06:00, 18:00). Passing a non-supported hour will produce an API error or silently return unexpected data.
A check for the valid initialization hours should be added alongside the existing minute/second check:
| "are only available at 3-hourly intervals (0, 3, 6, ...)." | |
| ) | |
| # Download surface variables | |
| surface_ds: xr.Dataset | None = None | |
| if surface_infos: | |
| surface_api_vars = list(dict.fromkeys(vi.api_name for vi in surface_infos)) | |
| nc_path = self._download_cached(time, surface_api_vars, lead_hours) | |
| surface_ds = xr.open_dataset(nc_path, decode_timedelta=False) | |
| # Download pressure-level variables (grouped by unique levels) | |
| pressure_ds: xr.Dataset | None = None | |
| if pressure_infos: | |
| @classmethod | |
| def _validate_time(cls, times: list[datetime]) -> None: | |
| """Validate that requested times are valid for CAMS Global forecast.""" | |
| for t in times: | |
| t_naive = t.replace(tzinfo=None) if t.tzinfo else t | |
| if t_naive < cls.CAMS_MIN_TIME: | |
| raise ValueError( | |
| f"Requested time {t} is before CAMS Global forecast availability " | |
| f"(earliest: {cls.CAMS_MIN_TIME})" | |
| ) | |
| if t_naive.minute != 0 or t_naive.second != 0: | |
| raise ValueError( | |
| f"Requested time {t} must be on the hour for CAMS Global forecast" | |
| ) | |
| if t_naive.hour not in (0, 12): | |
| raise ValueError( | |
| f"Requested time {t} has unsupported initialization hour " | |
| f"({t_naive.hour:02d}:00 UTC). CAMS Global forecasts are only " | |
| "available at 00:00 and 12:00 UTC." | |
| ) |
| data_arrays.append(da) | ||
|
|
||
| if not self._cache: | ||
| shutil.rmtree(self.cache) |
There was a problem hiding this comment.
Missing
product:airquality badge in class docstring
region:europe and product:airquality were added to the RST badge-filter directives, yet the CAMS_FX class docstring only carries product:wind product:temp product:atmos. The class also exposes aerosol optical depth and trace-gas variables, so product:airquality (and possibly region:europe) should be listed here to ensure the class is surfaced correctly in the documentation filter.
| data_arrays.append(da) | |
| if not self._cache: | |
| shutil.rmtree(self.cache) | |
| Badges | |
| ------ | |
| region:global region:europe dataclass:simulation product:wind product:temp product:atmos product:airquality |
| The API endpoint for this data source varies from the Climate Data Store (CDS), be | ||
| sure your api config has the correct url. | ||
|
|
||
| Badges | ||
| ------ | ||
| region:global dataclass:simulation product:wind product:temp product:atmos | ||
| """ | ||
|
|
||
| MAX_LEAD_HOURS = 120 | ||
| CAMS_MIN_TIME = datetime(2015, 1, 1) | ||
| CAMS_DATASET_URI = "cams-global-atmospheric-composition-forecasts" | ||
| CAMS_LAT = np.linspace(90, -90, 451) | ||
| CAMS_LON = np.linspace(0, 359.6, 900) | ||
|
|
||
| def __init__(self, cache: bool = True, verbose: bool = True): | ||
| self._cache = cache | ||
| self._verbose = verbose | ||
| self._cds_client: "cdsapi.Client | None" = None | ||
|
|
||
| @property | ||
| def _client(self) -> "cdsapi.Client": | ||
| if self._cds_client is None: | ||
| if cdsapi is None: | ||
| raise ImportError( | ||
| "cdsapi is required for CAMS_FX. " | ||
| "Install with: pip install 'earth2studio[data]'" | ||
| ) | ||
| self._cds_client = cdsapi.Client( | ||
| debug=False, quiet=True, wait_until_complete=False |
There was a problem hiding this comment.
Silent fallback
isel[d] = 0 for unexpected dimensions
When a dimension is neither forecast_period nor a pressure-level dimension, the code silently selects index 0 with no warning:
else:
isel[d] = 0If a future CAMS NetCDF adds an unexpected extra dimension (e.g. ensemble member, realisation, step), the code will silently extract only the first slice without alerting the user. Consider emitting a logger.warning in this branch so unexpected dimensions are visible:
else:
logger.warning(
f"Unexpected dimension '{d}' in field '{nc_key}'; selecting index 0."
)
isel[d] = 0|
/blossom-ci |
Earth2Studio Pull Request
Description
Checklist
Dependencies