feat: add CAMS atmospheric composition data source#780
feat: add CAMS atmospheric composition data source#780bjoernbethge wants to merge 7 commits intoNVIDIA:mainfrom
Conversation
Add DataSource (CAMS) and ForecastSource (CAMS_FX) for Copernicus Atmosphere Monitoring Service data via the CDS API. CAMS provides atmospheric composition / air quality data not currently available in earth2studio — complementing the existing weather-focused data sources (GFS, IFS, ERA5, etc.). Data sources: - CAMS: EU air quality analysis (0.1 deg, 9 pollutants, 10 height levels) - CAMS_FX: EU + Global forecasts (EU 0.1 deg up to 96h, Global 0.4 deg up to 120h) Variables include: dust, PM2.5, PM10, SO2, NO2, O3, CO, NH3, NO (EU surface and multi-level), plus AOD and total column products (Global). Lexicon: 101 entries covering all 9 pollutants at all 9 EU altitude levels (50-5000m), plus surface and 11 global column/AOD variables. Implementation follows upstream conventions: - Protocol-compliant __call__ and async fetch methods - Badges section for API doc filtering - Time validation, available() classmethod - Lazy CDS client initialization - pathlib-based caching with SHA256 keys - Tests with @pytest.mark.xfail for CI without CDS credentials Requires: cdsapi (already in the 'data' optional dependency group) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Greptile SummaryThis PR adds Prior review concerns that have been fully addressed:
Remaining concerns:
|
| Filename | Overview |
|---|---|
| earth2studio/data/cams.py | New CAMS/CAMS_FX data source; atomic download, timezone-aware available() and value-based lead-time selection are all correctly implemented. Two remaining concerns: available() for CAMS_FX checks only the global min time so it silently over-reports availability for EU variables pre-2019, and _extract_field falls back to isel[d]=0 for any dimension name other than 'level'/'forecast_period', which could silently return wrong data for CAMS Global NetCDFs that use a different lead-time dimension name. |
| earth2studio/lexicon/cams.py | New CAMSLexicon with 101 entries for EU pollutants across 10 altitude levels and global AOD/column products; consistent vocab format, all levels covered. |
| test/data/test_cams.py | Good unit-test coverage for time validation, available() timezone handling, cache-key ordering, and API variable deduplication; remote/CDS-credential tests are correctly marked xfail. |
| test/lexicon/test_cams_lexicon.py | Lexicon validation tests check vocab format, all expected level entries, and modifier passthrough; comprehensive coverage. |
| earth2studio/data/init.py | Single-line change adding CAMS and CAMS_FX to the public data module exports. |
| earth2studio/lexicon/init.py | Single-line change adding CAMSLexicon to the public lexicon module exports. |
Reviews (4): Last reviewed commit: "fix: address P2 review findings in CAMS ..." | Re-trigger Greptile
| if cache_path.is_file(): | ||
| return cache_path | ||
|
|
||
| r = client.retrieve(dataset, request_body) | ||
| while True: | ||
| r.update() | ||
| reply = r.reply | ||
| if verbose: | ||
| logger.debug( | ||
| f"Request ID:{reply['request_id']}, state: {reply['state']}" | ||
| ) | ||
| if reply["state"] == "completed": | ||
| break | ||
| elif reply["state"] in ("queued", "running"): | ||
| sleep(5.0) | ||
| elif reply["state"] in ("failed",): | ||
| raise RuntimeError( | ||
| f"CAMS request failed for {dataset}: " | ||
| + reply.get("error", {}).get("message", "unknown error") | ||
| ) | ||
| else: | ||
| sleep(2.0) | ||
| r.download(str(cache_path)) | ||
| return cache_path |
There was a problem hiding this comment.
Partial file cached as valid on interrupted download
If r.download(str(cache_path)) is interrupted mid-stream (e.g. network error, process kill), a partial .nc file is left at cache_path. On the next call, cache_path.is_file() is True so the function returns the corrupted file immediately without re-downloading. xr.open_dataset will then raise an opaque OSError/ValueError with no indication that a re-download is needed.
The standard remedy is to write to a temporary sibling path and rename atomically once the download is confirmed complete:
import tempfile, os
tmp_fd, tmp_name = tempfile.mkstemp(dir=cache_path.parent, suffix=".nc.tmp")
try:
os.close(tmp_fd)
r.download(tmp_name)
os.replace(tmp_name, cache_path)
except Exception:
pathlib.Path(tmp_name).unlink(missing_ok=True)
raise
return cache_path… coordinate-based lead-time selection
P1: Use atomic write-then-rename in _download_cams_netcdf to prevent
corrupt partial files from being cached on interrupted downloads.
P1: Fix TypeError in CAMS.available() and CAMS_FX.available() when
called with timezone-aware datetimes (strip tzinfo before comparing
against naive min-time constants, matching _validate_cams_time).
P2: Replace positional lead-time indexing in _extract_field with
coordinate-based selection via forecast_period dimension values,
avoiding silent data misassignment if API reorders slices.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Cherry-pick atomic download, tz-aware available(), and coordinate-based lead-time selection fixes from feat/cams-datasource (NVIDIA#780). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@greptile-apps All three findings (P1: atomic download, P1: tz-aware available(), P2: coordinate-based lead-time selection) have been addressed in commit 5ae645f. Please re-review. |
Add CAMS to analysis datasources and CAMS_FX to forecast datasources. Add region:europe and product:airquality to badge filters. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@greptile-ai |
- Deduplicate api_vars via dict.fromkeys() to avoid duplicate variable names in CDS API requests (CAMS and CAMS_FX) - Use dataset-specific min-time validation in CAMS_FX (EU: 2019-07-01, Global: 2015-01-01) instead of global minimum for all datasets - Sort lead_hours in CAMS_FX cache key so identical lead times in different order produce the same cache hit Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@greptile-apps All three remaining P2 findings addressed in 7f00dc5: deduplicated api_vars, dataset-specific EU min-time validation, sorted lead_hours in cache key. Tests added for each fix. |
|
Nice to see this data source getting added. Also whats the use case of this data source if its just data access the I think just adding the CAMS_FX is fine, no need for the analysis. The analysis sources are around for model initialization typically, which right now theres no ML models in the package that would need this. So the two items:
|
| .. badge-filter:: region:global region:na region:as region:europe | ||
| dataclass:analysis dataclass:reanalysis dataclass:observation dataclass:simulation | ||
| product:wind product:precip product:temp product:atmos product:ocean product:land product:veg product:solar product:radar product:sat product:insitu | ||
| product:wind product:precip product:temp product:atmos product:ocean product:land product:veg product:solar product:radar product:sat product:insitu product:airquality |
There was a problem hiding this comment.
remove product:airquality
just use product:atmos
for these data sources
|
|
||
| - https://ads.atmosphere.copernicus.eu/datasets/cams-europe-air-quality-forecasts | ||
| - https://ads.atmosphere.copernicus.eu/datasets/cams-global-atmospheric-composition-forecasts | ||
| - https://cds.climate.copernicus.eu/how-to-api |
There was a problem hiding this comment.
update last bullet to with additional note for people using the CDS
- https://ads.atmosphere.copernicus.eu/how-to-api
The API endpoint for this data source varies from the Climate Data Store (CDS), be
sure your api config has the correct url.
test/data/test_cams.py
Outdated
| @pytest.mark.slow | ||
| @pytest.mark.xfail | ||
| @pytest.mark.timeout(120) | ||
| @pytest.mark.parametrize("variable", [["dust", "so2sfc"]]) |
There was a problem hiding this comment.
is so2sfc correct here? Should just be so2 correct
Per reviewer feedback (NickGeneva): - Remove CAMS analysis class (no ML models need it currently) - Remove EU dataset support from CAMS_FX (1:1 mapping with remote store) - Reduce CAMSLexicon to 11 Global variables (AOD, column products) - Update docs and tests accordingly Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@NickGeneva Thanks for the feedback! Addressed both items in e59fe35:
CAMSLexicon is reduced to the 11 Global variables (AOD + column products). Net result: -413 lines, +84 lines. |
|
Merged with needed fixes: #790 Thanks! |
Summary
CAMS(DataSource) andCAMS_FX(ForecastSource) for Copernicus Atmosphere Monitoring Service data via the CDS APICAMSLexiconwith 101 variable entries covering EU air quality (9 pollutants × 10 altitude levels) and global column/AOD productsData coverage
CAMSCAMS_FXImplementation
__call__andasync fetchmethodsBadgessection for API doc filteringavailable()classmethodpathlib-based caching with SHA256 keyscdsapi(already in[data]optional dependency group)e2s-004-data-sources,e2s-008-lexicon-usage,e2s-002-api-documentationrulesFiles
earth2studio/data/cams.pyearth2studio/lexicon/cams.pyearth2studio/data/__init__.pyearth2studio/lexicon/__init__.pytest/data/test_cams.pytest/lexicon/test_cams_lexicon.pyTest plan
ruff checkpasses on all new files@pytest.mark.xfail) require CDS API credentials🤖 Generated with Claude Code