Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ibicus/__meta__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"""Container for ibicus metadata."""

__name__ = "ibicus"
__version__ = "1.2.0"
__version__ = "2.0.1"
__author__ = "Fiona Spuler, Jakob Wessel & European Centre for Medium-Range Weather Forecasts (ECMWF)"
__author_email__ = "ibicus.py@gmail.com"
__license__ = "Apache License Version 2.0"
Expand Down
202 changes: 105 additions & 97 deletions ibicus/debias/_debiaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,27 +179,61 @@ def _is_correct_type(df):

@staticmethod
def _has_correct_shape(df):
return df.ndim == 3
return df.ndim > 0

@staticmethod
def _have_same_shape(obs, cm_hist, cm_future):
return (
obs.shape[1:] == cm_hist.shape[1:] and obs.shape[1:] == cm_future.shape[1:]
obs.shape[:-1] == cm_hist.shape[:-1] and obs.shape[:-1] == cm_future.shape[:-1]
)

@staticmethod
def _contains_inf_nan(x):
return np.any(np.logical_or(np.isnan(x), np.isinf(x)))

def _not_if_or_nan_vals_outside_reasonable_physical_range(self, x):
if self.reasonable_physical_range is not None:
return not np.all(
(x >= self.reasonable_physical_range[0])
& (x <= self.reasonable_physical_range[1])
| np.isinf(x)
| np.isnan(x)
def _not_if_or_nan_vals_outside_reasonable_physical_range(self, x, vtype):
"""
Combine methods to improver performance

In previous version, had a _contains_inf_nan static method and a
method to check for data within range with secondary check of out
of range data was nan/inf.

Functionality of these methods have been combined to improve
performance so don't have to compute nan/inf masks twice

Arguments:
x: Data values to check
vtype: Label/type of data; e.g., obs, cm_hist, cm_future, etc.
Used for warning labels

"""

nan_inf = np.isinf(x) | np.isnan(x)
if nan_inf.any():
warnings.warn(
f"{vtype} contains inf or nan values. Not all debiasers support missing values and their "
"presence might lead to infs or nans inside of the debiased values. Consider infilling the missing values.",
stacklevel=2,
)
return False

if self.reasonable_physical_range is None:
return False

if np.all(
(x >= self.reasonable_physical_range[0])
& (x <= self.reasonable_physical_range[1])
| nan_inf
):
return False

warnings.warn(
"%s contains values outside the reasonable physical range of %s for the variable: %s. "
"This might be due to different units of to data problems. It is recommended to check the input."
% (vtype, self.reasonable_physical_range, self.variable),
stacklevel=2,
)
return True

@staticmethod
def _has_float_dtype(x):
Expand Down Expand Up @@ -229,8 +263,17 @@ def _fill_masked_array_with_nan(x):
return x.filled(np.nan)

# ----- Input checks ----- #

def _check_inputs_and_convert_if_possible(self, obs, cm_hist, cm_future):
"""
Check inputs are correct

Updates:
Uses the updated
_not_if_or_nan_vals_outside_reasonable_physical_range() method for
speedup

"""

# correct type
if not Debiaser._is_correct_type(obs):
raise TypeError("Wrong type for obs. Needs to be np.ndarray")
Expand Down Expand Up @@ -260,54 +303,21 @@ def _check_inputs_and_convert_if_possible(self, obs, cm_hist, cm_future):

# correct shape
if not Debiaser._has_correct_shape(obs):
raise ValueError("obs needs to have 3 dimensions: time, x, y")
raise ValueError("obs needs to have 3 dimensions: x, y, time")
if not Debiaser._has_correct_shape(cm_hist):
raise ValueError("cm_hist needs to have 3 dimensions: time, x, y")
raise ValueError("cm_hist needs to have 3 dimensions: x, y, time")
if not Debiaser._has_correct_shape(cm_future):
raise ValueError("cm_future needs to have 3 dimensions: time, x, y")
raise ValueError("cm_future needs to have 3 dimensions: x, y, time")

# have shame shape
if not Debiaser._have_same_shape(obs, cm_hist, cm_future):
raise ValueError(
"obs, cm_hist, cm_future need to have same (number of) spatial dimensions. The arrays of obs, cm_hist and cm_future are assumed to have the following structure: [t, x, y] where t is the time dimension and x, y are spatial ones."
"obs, cm_hist, cm_future need to have same (number of) spatial dimensions. The arrays of obs, cm_hist and cm_future are assumed to have the following structure: [x, y, t] where t is the time dimension and x, y are spatial ones."
)

# contains inf or nan
if Debiaser._contains_inf_nan(obs):
warnings.warn(
"obs contains inf or nan values. Not all debiasers support missing values and their presence might lead to infs or nans inside of the debiased values. Consider infilling the missing values.",
stacklevel=2,
)
if Debiaser._contains_inf_nan(cm_hist):
warnings.warn(
"cm_hist contains inf or nan values. Not all debiasers support missing values and their presence might lead to infs or nans inside of the debiased values. Consider infilling the missing values.",
stacklevel=2,
)
if Debiaser._contains_inf_nan(cm_future):
warnings.warn(
"cm_future contains inf or nan values. Not all debiasers support missing values and their presence might lead to infs or nans inside of the debiased values. Consider infilling the missing values.",
stacklevel=2,
)

# in reasonable physical range:
if self._not_if_or_nan_vals_outside_reasonable_physical_range(obs):
warnings.warn(
"obs contains values outside the reasonable physical range of %s for the variable: %s. This might be due to different units of to data problems. It is recommended to check the input."
% (self.reasonable_physical_range, self.variable),
stacklevel=2,
)
if self._not_if_or_nan_vals_outside_reasonable_physical_range(cm_hist):
warnings.warn(
"cm_hist contains values outside the reasonable physical range of %s for the variable: %s. This might be due to different units of to data problems. It is recommended to check the input."
% (self.reasonable_physical_range, self.variable),
stacklevel=2,
)
if self._not_if_or_nan_vals_outside_reasonable_physical_range(cm_future):
warnings.warn(
"cm_future contains values outside the reasonable physical range of %s for the variable: %s. This might be due to different units of to data problems. It is recommended to check the input."
% (self.reasonable_physical_range, self.variable),
stacklevel=2,
)
self._not_if_or_nan_vals_outside_reasonable_physical_range(obs, 'obs')
self._not_if_or_nan_vals_outside_reasonable_physical_range(cm_hist, 'cm_hist')
self._not_if_or_nan_vals_outside_reasonable_physical_range(cm_future, 'cm_future')

# masked arrays
if Debiaser._is_masked_array(obs):
Expand Down Expand Up @@ -345,24 +355,12 @@ def _check_inputs_and_convert_if_possible(self, obs, cm_hist, cm_future):
"cm_future is a masked array, but contains no invalid data. It is converted to a normal numpy array.",
stacklevel=2,
)
cm_future = Debiaser._fill_masked_array_with_nan(cm_future)

return obs, cm_hist, cm_future

def _check_output(self, output):
if Debiaser._contains_inf_nan(output):
warnings.warn(
"The debiaser output contains inf or nan values. This might be due to inf or nan values inside the input, or to a problem of the debiaser for the given dataset at hand. It is recommended to check the output carefully",
stacklevel=2,
)

# in reasonable physical range:
if self._not_if_or_nan_vals_outside_reasonable_physical_range(output):
warnings.warn(
"The debiaser output contains values outside the reasonable physical range of %s for the variable: %s. This might be due to values outside the range in the input, or to a problem of the debiaser for the given dataset at hand. It is recommended to check the output carefully."
% (self.reasonable_physical_range, self.variable),
stacklevel=2,
)
self._not_if_or_nan_vals_outside_reasonable_physical_range(output, "The debiaser output")

# ----- Helpers ----- #

Expand Down Expand Up @@ -402,25 +400,27 @@ def _run_func_on_location_and_catch_error(
@staticmethod
def map_over_locations(
func,
output_size,
obs,
cm_hist,
cm_future,
progressbar=True,
failsafe=False,
**kwargs,
):
output = np.empty(output_size, dtype=cm_future.dtype)
output = np.empty_like(cm_future)

indices = np.ndindex(obs.shape[1:])
# Generator object of indices of all but the time (last) dimension
indices = np.ndindex(obs.shape[:-1])
if progressbar:
indices = tqdm(indices, total=np.prod(obs.shape[1:]))

for i, j in indices:
output[:, i, j] = Debiaser._run_func_on_location_and_catch_error(
obs[:, i, j],
cm_hist[:, i, j],
cm_future[:, i, j],
indices = tqdm(indices, total=np.prod(obs.shape[:-1]))

# Iterate over all indices; applies function to timeseries at each
# location sequentially
for idx in indices:
output[*idx, :] = Debiaser._run_func_on_location_and_catch_error(
obs[*idx, :],
cm_hist[*idx, :],
cm_future[*idx, :],
func,
failsafe=failsafe,
**kwargs,
Expand All @@ -430,37 +430,43 @@ def map_over_locations(
@staticmethod
def parallel_map_over_locations(
func,
output_size,
obs,
cm_hist,
cm_future,
nr_processes=4,
failsafe=False,
**kwargs,
):

# compute results
indices = [(i, j) for i in range(obs.shape[1]) for j in range(obs.shape[2])]
with Pool(processes=nr_processes) as pool:
result = pool.starmap(
partial(
Debiaser._run_func_on_location_and_catch_error,
func=func,
failsafe=failsafe,
**kwargs,
),
[
(obs[:, i, j], cm_hist[:, i, j], cm_future[:, i, j])
for (i, j) in indices
],
func = partial(
Debiaser._run_func_on_location_and_catch_error,
func=func,
failsafe=failsafe,
progressbar=False, # Disable progress bar
**kwargs,
)

# Open pool an pass chunks to it for processing. By passing chunks
# we reduce communcation (slow) with each process and allow them to
# work for a while
pool = Pool(processes=nr_processes)
objs = [
pool.apply_async(
func,
(obs[i, ...], cm_hist[i, ...], cm_future[i, ...]),
)
for i in range(obs.shape[0])
]
pool.close()
pool.join()

# fill output
output = np.empty(output_size, dtype=cm_future.dtype)
for k, index in enumerate(indices):
output[:, index[0], index[1]] = result[k]

# result = np.array(result)
# result = np.moveaxis(result.reshape(obs.shape[1], obs.shape[2], obs.shape[0]), -1, 0)
i = -1
output = np.empty_like(cm_future)
while len(objs) > 0:
output[i, ...] = objs.pop().get()
i -= 1

return output

Expand Down Expand Up @@ -533,13 +539,16 @@ def apply(
obs, cm_hist, cm_future
)

if obs.shape[:-1] == () and parallel:
logger.info("Running on single location, disabling parallel")
parallel = False

if parallel:
if progressbar:
warnings.warn("progressbar argument is ignored when parallel = True.")

output = Debiaser.parallel_map_over_locations(
self.apply_location,
output_size=cm_future.shape,
partial(self.map_over_locations, self.apply_location),
obs=obs,
cm_hist=cm_hist,
cm_future=cm_future,
Expand All @@ -550,7 +559,6 @@ def apply(
else:
output = Debiaser.map_over_locations(
self.apply_location,
output_size=cm_future.shape,
obs=obs,
cm_hist=cm_hist,
cm_future=cm_future,
Expand Down
46 changes: 0 additions & 46 deletions ibicus/debias/_delta_change.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
RunningWindowOverDaysOfYear,
check_time_information_and_raise_error,
day_of_year,
get_library_logger,
infer_and_create_time_arrays_if_not_given,
)
from ..variables import (
Expand Down Expand Up @@ -221,48 +220,3 @@ def apply_location(
return debiased_cm_future
else:
return self._apply_on_within_year_window(obs, cm_hist, cm_future)

def apply(
self,
obs,
cm_hist,
cm_future,
progressbar=True,
parallel=False,
nr_processes=4,
failsafe=False,
**kwargs
):
logger = get_library_logger()
logger.info("----- Running debiasing for variable: %s -----" % self.variable)

obs, cm_hist, cm_future = self._check_inputs_and_convert_if_possible(
obs, cm_hist, cm_future
)

if parallel:
output = Debiaser.parallel_map_over_locations(
self.apply_location,
output_size=obs.shape,
obs=obs,
cm_hist=cm_hist,
cm_future=cm_future,
nr_processes=nr_processes,
failsafe=failsafe,
**kwargs,
)
else:
output = Debiaser.map_over_locations(
self.apply_location,
output_size=obs.shape,
obs=obs,
cm_hist=cm_hist,
cm_future=cm_future,
progressbar=progressbar,
failsafe=failsafe,
**kwargs,
)

self._check_output(output)

return output
Loading