diff --git a/pyproject.toml b/pyproject.toml index b8585569..7d5c4ee5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dependencies = [ "obspy", "pandas", "plotly", + "psutil", "setuptools<82.0.0", "scipy", "tqdm", diff --git a/xdas/__init__.py b/xdas/__init__.py index 5f281fc4..908e0276 100644 --- a/xdas/__init__.py +++ b/xdas/__init__.py @@ -34,6 +34,7 @@ combine_by_coords, combine_by_field, concatenate, + fit_into_memory, open_dataarray, open_datacollection, open_mfdataarray, diff --git a/xdas/core/dataarray.py b/xdas/core/dataarray.py index 5100cee4..e2d5f3df 100644 --- a/xdas/core/dataarray.py +++ b/xdas/core/dataarray.py @@ -1,4 +1,5 @@ import copy +import os import warnings from functools import partial @@ -884,6 +885,11 @@ def to_netcdf(self, fname, mode="w", group=None, virtual=None, encoding=None): dataset, variable_attrs = coord.to_dataset(dataset, variable_attrs) # write data + if os.path.dirname(fname) is not "" and not os.path.exists( + os.path.dirname(fname) + ): + os.makedirs(os.path.dirname(fname), exist_ok=True) + with h5netcdf.File(fname, mode=mode) as file: # group if group is not None and group not in file: diff --git a/xdas/core/datacollection.py b/xdas/core/datacollection.py index 4a95a5be..f9adf14a 100644 --- a/xdas/core/datacollection.py +++ b/xdas/core/datacollection.py @@ -239,6 +239,10 @@ def to_netcdf(self, fname, mode="w", group=None, virtual=None, encoding=None): location = "/".join([name, str(key)]) if group is not None: location = "/".join([group, location]) + if os.path.dirname(fname) is not "" and not os.path.exists( + os.path.dirname(fname) + ): + os.makedirs(os.path.dirname(fname), exist_ok=True) self[key].to_netcdf( fname, mode="a", diff --git a/xdas/core/routines.py b/xdas/core/routines.py index c3a75dcb..ffbfca6a 100644 --- a/xdas/core/routines.py +++ b/xdas/core/routines.py @@ -9,6 +9,7 @@ import numpy as np import pandas as pd import plotly.express as px +import psutil import xarray as xr from tqdm import tqdm @@ -743,6 +744,89 @@ def concatenate(objs, dim="first", tolerance=None, virtual=None, verbose=None): return DataArray(data, coords, dims, name, attrs) +import sys +from gc import get_referents +from types import FunctionType, ModuleType + +# Custom objects know their class. +# Function objects seem to know way too much, including modules. +# Exclude modules as well. +BLACKLIST = type, ModuleType, FunctionType + + +def getsize(obj): + """sum size of object & members. See https://stackoverflow.com/a/30316760/12774714""" + if isinstance(obj, BLACKLIST): + raise TypeError("getsize() does not take argument of type: " + str(type(obj))) + seen_ids = set() + size = 0 + objects = [obj] + while objects: + need_referents = [] + for obj in objects: + if not isinstance(obj, BLACKLIST) and id(obj) not in seen_ids: + seen_ids.add(id(obj)) + size += sys.getsizeof(obj) + need_referents.append(obj) + objects = get_referents(*need_referents) + return size + + +def fit_into_memory( + da, + RAM_limit: float = 0.8, + indices_or_sections="discontinuities", + dim="first", + tolerance=None, +): + """ + Check if a data array is too large to fit into a memory limit and split it if it is the case. + + Splitting can either be performed at each discontinuity (along interpolated + coordinates), at a given set of indices (give as a list of int) or in order to get + a given number of equal sized chunks (if a single int is provided). + + Parameters + ---------- + da : DataArray + The data array to split + RAM_limit : float, optional + Ratio of the available memory to not exceed. + indices_or_sections : str, int or list of int, optional + If `indices_or_section` is an integer N, the array will be divided into N + almost equal (can differ by one element if the `dim` size is not a multiple of + N). If `indices_or_section` is a 1-D array of sorted integers, the entries + indicate where the array is split along `dim`. For example, `[2, 3]` would, for + `dim="first"`, result in [da[:2], da[2:3], da[3:]]. If `indices_or_section` is + "discontinuities", the `dim` must be an interpolated coordinate and splitting + will occurs at locations where they are two consecutive tie_indices with only + one index of difference and where the tie_values difference is greater than + `tolerance`. Default to "discontinuities". + dim : str, optional + The dimension along which to split, by default "first" + tolerance : float or timedelta64, optional + If `indices_or_sections="discontinuities"` split will only occur on gaps and + overlaps that are bigger than `tolerance`. Zero tolerance by default. + + Returns + ------- + list of DataArray + The splitted data array. + """ + + available_RAM = psutil.virtual_memory().available # in bytes + n_chunks = 1 + int(getsize(da) / (RAM_limit * available_RAM)) + + # n_chunks = max(2, n_chunks_to_fit) # make at least 2 chunks even if fitting into memory? + + # print(f"DataArray size : {getsize(da)/1e9} MB") + # print(f"Available RAM : {available_RAM/1e9} MB") + # print(f"Memory limit : {RAM_limit * available_RAM/1e9} MB") + # print("n_chunks : ", n_chunks) + + return split(da, n_chunks, dim, tolerance) + + def split(da, indices_or_sections="discontinuities", dim="first", tolerance=None): """ Split a data array along a dimension.