diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py index 7fdbd06b..4d5c2998 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py @@ -1,6 +1,7 @@ """Module for processing AORC and NWM data.""" import datetime +import functools import os import re import typing @@ -10,6 +11,9 @@ from time import perf_counter import dask + +# Use the Error, Warning, and Trapping System Package for logging +import ewts import geopandas as gpd import matplotlib.pyplot as plt import numpy as np @@ -27,12 +31,11 @@ ) from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import MpiConfig -# Use the Error, Warning, and Trapping System Package for logging -import ewts LOG = ewts.get_logger(ewts.FORCING_ID) zarr.config.set({"async.concurrency": 100}) + class BaseProcessor: """Base class for data processors.""" @@ -343,10 +346,15 @@ def __init__( @cached_property def src_crs(self) -> CRS: """Get source CRS from dataset.""" - object_store = obstore.store.from_url( - self.url(self.years[0]), skip_signature=True - ) - return CRS(xr.open_zarr(ObjectStore(object_store)).rio.crs) + try: + object_store = obstore.store.from_url( + self.url(self.years[0]), skip_signature=True + ) + return CRS(xr.open_zarr(ObjectStore(object_store)).rio.crs) + except Exception as e: + raise ValueError( + f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}" + ) def url(self, year: str) -> str: """Generate AORC S3 zarr URL for current year. @@ -387,8 +395,16 @@ def s3_lazy_ds(self) -> dict[int, xr.Dataset]: """Lazy load dataset from S3.""" year_datasets = {} for year in self.years: - object_store = obstore.store.from_url(self.url(year), skip_signature=True) - year_datasets[year] = xr.open_zarr(ObjectStore(object_store)) + try: + object_store = obstore.store.from_url( + self.url(year), skip_signature=True + ) + year_datasets[year] = xr.open_zarr(ObjectStore(object_store)) + except Exception as e: + raise ValueError( + f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}" + ) + return year_datasets @@ -413,9 +429,14 @@ def __init__( @cached_property def src_crs(self): """Get source CRS from dataset.""" - object_store = obstore.store.from_url( - self.url(self.years[0]), skip_signature=True - ) + try: + object_store = obstore.store.from_url( + self.url(self.years[0]), skip_signature=True + ) + except Exception as e: + raise ValueError( + f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}" + ) return CRS(xr.open_zarr(ObjectStore(object_store)).crs.attrs["spatial_ref"]) def url(self, date: datetime) -> str: @@ -525,10 +546,15 @@ def url(self, var: str) -> str: @cached_property def src_crs(self) -> CRS: """Get source CRS from dataset.""" - object_store = obstore.store.from_url( - self.url(self.vars[0]), skip_signature=True - ) - return CRS(xr.open_zarr(ObjectStore(object_store)).crs.attrs["spatial_ref"]) + try: + object_store = obstore.store.from_url( + self.url(self.vars[0]), skip_signature=True + ) + return CRS(xr.open_zarr(ObjectStore(object_store)).crs.attrs["spatial_ref"]) + except Exception as e: + raise ValueError( + f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}" + ) @property def sliced_ds(self) -> xr.Dataset: @@ -556,8 +582,15 @@ def s3_lazy_ds(self) -> dict[str, xr.Dataset]: """Lazy load dataset from S3.""" variables = {} for var in self.vars: - object_store = obstore.store.from_url(self.url(var), skip_signature=True) - variables[var] = xr.open_zarr(ObjectStore(object_store)) + try: + object_store = obstore.store.from_url( + self.url(var), skip_signature=True + ) + variables[var] = xr.open_zarr(ObjectStore(object_store)) + except Exception as e: + raise ValueError( + f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}" + ) return variables @@ -589,8 +622,13 @@ def url(self) -> str: @cached_property def src_crs(self) -> CRS: """Get source CRS from dataset.""" - object_store = obstore.store.from_url(self.url, skip_signature=True) - return CRS(xr.open_zarr(ObjectStore(object_store)).crs.attrs["spatial_ref"]) + try: + object_store = obstore.store.from_url(self.url, skip_signature=True) + return CRS(xr.open_zarr(ObjectStore(object_store)).crs.attrs["spatial_ref"]) + except Exception as e: + raise ValueError( + f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}" + ) @property def sliced_ds(self) -> xr.Dataset: @@ -617,8 +655,13 @@ def sliced_ds(self) -> xr.Dataset: @cached_property def s3_lazy_ds(self) -> xr.Dataset: """Lazy load dataset from S3.""" - object_store = obstore.store.from_url(self.url, skip_signature=True) - return xr.open_zarr(ObjectStore(object_store)) + try: + object_store = obstore.store.from_url(self.url, skip_signature=True) + return xr.open_zarr(ObjectStore(object_store)) + except Exception as e: + raise ValueError( + f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}" + ) class NWMV3AlaskaProcessor(NWMV3Processor): @@ -690,8 +733,13 @@ def s3_lazy_ds(self) -> xr.Dataset: coordinates having to be pulled from the geo_grid and mapped to the actual data. """ - object_store = obstore.store.from_url(self.url, skip_signature=True) - ds = xr.open_zarr(ObjectStore(object_store)) + try: + object_store = obstore.store.from_url(self.url, skip_signature=True) + ds = xr.open_zarr(ObjectStore(object_store)) + except Exception as e: + raise ValueError( + f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}" + ) ds = ds.assign_coords( {"x": self.geo_grid["x"].values, "y": self.geo_grid["y"].values[::-1]} )