Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Module for processing AORC and NWM data."""

import datetime
import functools
import os
import re
import typing
Expand All @@ -10,6 +11,9 @@
from time import perf_counter

import dask

# Use the Error, Warning, and Trapping System Package for logging
import ewts
import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
Expand All @@ -27,12 +31,11 @@
)
from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import MpiConfig

# Use the Error, Warning, and Trapping System Package for logging
import ewts
LOG = ewts.get_logger(ewts.FORCING_ID)

zarr.config.set({"async.concurrency": 100})


class BaseProcessor:
"""Base class for data processors."""

Expand Down Expand Up @@ -343,10 +346,15 @@ def __init__(
@cached_property
def src_crs(self) -> CRS:
"""Get source CRS from dataset."""
object_store = obstore.store.from_url(
self.url(self.years[0]), skip_signature=True
)
return CRS(xr.open_zarr(ObjectStore(object_store)).rio.crs)
try:
object_store = obstore.store.from_url(
self.url(self.years[0]), skip_signature=True
)
return CRS(xr.open_zarr(ObjectStore(object_store)).rio.crs)
except Exception as e:
raise ValueError(
f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}"
Copy link
Copy Markdown

@mxkpp mxkpp Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message needs to be improved. self.url is a method not a string. Also in this case, the error could be related to the CRS() call. Please revisit each of them.

Exception handling should be improved too. ValueError is not right, and should preserve the original traceback. Something like this.

except Exception as e:
    raise RuntimeError(f"foobar. Error {e}") from e

)

def url(self, year: str) -> str:
"""Generate AORC S3 zarr URL for current year.
Expand Down Expand Up @@ -387,8 +395,16 @@ def s3_lazy_ds(self) -> dict[int, xr.Dataset]:
"""Lazy load dataset from S3."""
year_datasets = {}
for year in self.years:
object_store = obstore.store.from_url(self.url(year), skip_signature=True)
year_datasets[year] = xr.open_zarr(ObjectStore(object_store))
try:
object_store = obstore.store.from_url(
self.url(year), skip_signature=True
)
year_datasets[year] = xr.open_zarr(ObjectStore(object_store))
except Exception as e:
raise ValueError(
f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}"
)

return year_datasets


Expand All @@ -413,9 +429,14 @@ def __init__(
@cached_property
def src_crs(self):
"""Get source CRS from dataset."""
object_store = obstore.store.from_url(
self.url(self.years[0]), skip_signature=True
)
try:
object_store = obstore.store.from_url(
self.url(self.years[0]), skip_signature=True
)
except Exception as e:
raise ValueError(
f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}"
)
return CRS(xr.open_zarr(ObjectStore(object_store)).crs.attrs["spatial_ref"])

def url(self, date: datetime) -> str:
Expand Down Expand Up @@ -525,10 +546,15 @@ def url(self, var: str) -> str:
@cached_property
def src_crs(self) -> CRS:
"""Get source CRS from dataset."""
object_store = obstore.store.from_url(
self.url(self.vars[0]), skip_signature=True
)
return CRS(xr.open_zarr(ObjectStore(object_store)).crs.attrs["spatial_ref"])
try:
object_store = obstore.store.from_url(
self.url(self.vars[0]), skip_signature=True
)
return CRS(xr.open_zarr(ObjectStore(object_store)).crs.attrs["spatial_ref"])
except Exception as e:
raise ValueError(
f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}"
)

@property
def sliced_ds(self) -> xr.Dataset:
Expand Down Expand Up @@ -556,8 +582,15 @@ def s3_lazy_ds(self) -> dict[str, xr.Dataset]:
"""Lazy load dataset from S3."""
variables = {}
for var in self.vars:
object_store = obstore.store.from_url(self.url(var), skip_signature=True)
variables[var] = xr.open_zarr(ObjectStore(object_store))
try:
object_store = obstore.store.from_url(
self.url(var), skip_signature=True
)
variables[var] = xr.open_zarr(ObjectStore(object_store))
except Exception as e:
raise ValueError(
f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}"
)
return variables


Expand Down Expand Up @@ -589,8 +622,13 @@ def url(self) -> str:
@cached_property
def src_crs(self) -> CRS:
"""Get source CRS from dataset."""
object_store = obstore.store.from_url(self.url, skip_signature=True)
return CRS(xr.open_zarr(ObjectStore(object_store)).crs.attrs["spatial_ref"])
try:
object_store = obstore.store.from_url(self.url, skip_signature=True)
return CRS(xr.open_zarr(ObjectStore(object_store)).crs.attrs["spatial_ref"])
except Exception as e:
raise ValueError(
f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}"
)

@property
def sliced_ds(self) -> xr.Dataset:
Expand All @@ -617,8 +655,13 @@ def sliced_ds(self) -> xr.Dataset:
@cached_property
def s3_lazy_ds(self) -> xr.Dataset:
"""Lazy load dataset from S3."""
object_store = obstore.store.from_url(self.url, skip_signature=True)
return xr.open_zarr(ObjectStore(object_store))
try:
object_store = obstore.store.from_url(self.url, skip_signature=True)
return xr.open_zarr(ObjectStore(object_store))
except Exception as e:
raise ValueError(
f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}"
)


class NWMV3AlaskaProcessor(NWMV3Processor):
Expand Down Expand Up @@ -690,8 +733,13 @@ def s3_lazy_ds(self) -> xr.Dataset:
coordinates having to be pulled from the geo_grid and mapped to the actual
data.
"""
object_store = obstore.store.from_url(self.url, skip_signature=True)
ds = xr.open_zarr(ObjectStore(object_store))
try:
object_store = obstore.store.from_url(self.url, skip_signature=True)
ds = xr.open_zarr(ObjectStore(object_store))
except Exception as e:
raise ValueError(
f"Unable to open zarr: {self.url}. Check that the zarr data on s3 is valid. Error: {e}"
)
ds = ds.assign_coords(
{"x": self.geo_grid["x"].values, "y": self.geo_grid["y"].values[::-1]}
)
Expand Down
Loading