diff --git a/argopy/stores/index/implementations/pandas/index.py b/argopy/stores/index/implementations/pandas/index.py index ae43da43e..2ef69d681 100644 --- a/argopy/stores/index/implementations/pandas/index.py +++ b/argopy/stores/index/implementations/pandas/index.py @@ -4,6 +4,7 @@ import gzip from pathlib import Path from typing import List +import concurrent.futures from argopy.options import OPTIONS from argopy.errors import DataNotFound, InvalidDatasetStructure @@ -355,7 +356,7 @@ def read_files(self, index : bool = False, multi : bool = False) -> List[str]: else: return mono2multi(flist, convention=self.convention, sep=sep) - def records_per_wmo(self, index=False): + def records_per_wmo_legacy(self, index=False): """Return the number of records per unique WMOs in search results Fall back on full index if search not triggered @@ -379,6 +380,47 @@ def records_per_wmo(self, index=False): count[wmo] = self.index[search_filter].shape[0] return count + def records_per_wmo(self, index=False): + """Return the number of records per unique WMOs in search results + + Fall back on full index if search not triggered + + Returns + ------- + dict + + Notes + ----- + Computation is parallelized over all WMOs with a ThreadPoolExecutor + """ + + def count_wmo(self, wmo: int, index: bool): + if hasattr(self, "search") and not index: + search_filter = self.search["file"].str.contains( + "/%i/" % wmo, regex=True, case=False + ) + return wmo, self.search[search_filter].shape[0] + else: + search_filter = self.index["file"].str.contains( + "/%i/" % wmo, regex=True, case=False + ) + return wmo, self.index[search_filter].shape[0] + + ulist = self.read_wmo() + count = {} + with concurrent.futures.ThreadPoolExecutor() as executor: + # Submit tasks for each WMO + futures = { + executor.submit(count_wmo, self, wmo, index): wmo for wmo in ulist + } + + # Process results as they complete + for future in concurrent.futures.as_completed(futures): + wmo, cnt = future.result() + count[wmo] = cnt + + return count + def to_indexfile(self, outputfile): """Save search results on file, following the Argo standard index formats diff --git a/argopy/stores/index/implementations/pandas/search_engine.py b/argopy/stores/index/implementations/pandas/search_engine.py index 484496577..172483bc2 100644 --- a/argopy/stores/index/implementations/pandas/search_engine.py +++ b/argopy/stores/index/implementations/pandas/search_engine.py @@ -1,18 +1,49 @@ +import warnings import logging import pandas as pd import numpy as np from typing import List - -from .....options import OPTIONS -from .....errors import InvalidDatasetStructure, OptionValueError -from .....utils import is_indexbox, check_wmo, check_cyc, to_list, conv_lon -from ...extensions import register_ArgoIndex_accessor, ArgoIndexSearchEngine -from ..index_s3 import search_s3 -from .index import indexstore +from functools import lru_cache + +from argopy.options import OPTIONS +from argopy.errors import InvalidDatasetStructure, OptionValueError +from argopy.utils.monitored_threadpool import pmap +from argopy.utils.checkers import is_indexbox, check_wmo, check_cyc +from argopy.utils.casting import to_list +from argopy.utils.geo import conv_lon +from argopy.stores.index.extensions import register_ArgoIndex_accessor, ArgoIndexSearchEngine +from argopy.stores.index.implementations.index_s3 import search_s3 +from argopy.stores.index.implementations.pandas.index import indexstore log = logging.getLogger("argopy.stores.index.pd") +@lru_cache(maxsize=25_000) +def compute_wmo(wmo: int, obj): + return obj.index["file"].str.contains("/%i/" % wmo, regex=False, case=True) + + +@lru_cache(maxsize=25_000) +def compute_cyc(cyc: int, obj): + pattern = "_%0.3d.nc" % cyc + if cyc >= 1000: + pattern = "_%0.4d.nc" % cyc + return obj.index["file"].str.contains(pattern, regex=False, case=True) + + +@lru_cache(maxsize=25_000) +def compute_wmo_cyc(wmo: int, obj, cyc=None): + filt = [] + for c in cyc: + filt.append(compute_cyc(c, obj)) + return np.logical_and.reduce([compute_wmo(wmo, obj), np.logical_or.reduce(filt)]) + + +@lru_cache(maxsize=1_000) +def compute_params(param: str, obj): + return obj.index["variables"].apply(lambda x: param in x) + + @register_ArgoIndex_accessor("query", indexstore) class SearchEngine(ArgoIndexSearchEngine): @@ -24,24 +55,20 @@ def checker(WMOs): "Argo index searching for WMOs=[%s] ..." % ";".join([str(wmo) for wmo in WMOs]) ) + if len(WMOs) > 30: + warnings.warn("Searching a large amount of Argo floats with the Pandas backend is quite slow. We strongly recommend to install Pyarrow to improve performances ! Pyarrow is about 10 times faster than Pandas for this use-case.") return WMOs def namer(WMOs): return {"WMO": WMOs} - def composer(WMOs): - filt = [] - for wmo in WMOs: - filt.append( - self._obj.index["file"].str.contains( - "/%i/" % wmo, regex=True, case=False - ) - ) - return self._obj._reduce_a_filter_list(filt, op="or") + def composer(obj, WMOs): + filt = pmap(obj, compute_wmo, WMOs) + return obj._reduce_a_filter_list(filt, op="or") WMOs = checker(WMOs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(WMOs) + search_filter = composer(self._obj, WMOs) if not composed: self._obj.search_type = namer(WMOs) self._obj.search_filter = search_filter @@ -63,28 +90,20 @@ def checker(CYCs): "Argo index searching for CYCs=[%s] ..." % (";".join([str(cyc) for cyc in CYCs])) ) + if len(CYCs) > 50: + warnings.warn("Searching a large amount of Argo float cycles with the Pandas backend is quite slow. We strongly recommend to install Pyarrow to improve performances ! Pyarrow is about 10 times faster than Pandas for this use-case.") return CYCs def namer(CYCs): return {"CYC": CYCs} - def composer(CYCs): - filt = [] - for cyc in CYCs: - if cyc < 1000: - pattern = "_%0.3d.nc" % (cyc) - else: - pattern = "_%0.4d.nc" % (cyc) - filt.append( - self._obj.index["file"].str.contains( - pattern, regex=True, case=False - ) - ) - return self._obj._reduce_a_filter_list(filt, op="or") + def composer(obj, CYCs): + filt = pmap(obj, compute_cyc, CYCs) + return obj._reduce_a_filter_list(filt) CYCs = checker(CYCs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(CYCs) + search_filter = composer(self._obj, CYCs) if not composed: self._obj.search_type = namer(CYCs) self._obj.search_filter = search_filter @@ -110,29 +129,20 @@ def checker(WMOs, CYCs): ";".join([str(cyc) for cyc in CYCs]), ) ) + if len(WMOs) > 30 or len(CYCs) > 50: + warnings.warn("Searching a large amount of Argo float cycles with the Pandas backend is quite slow. We strongly recommend to install Pyarrow to improve performances ! Pyarrow is about 10 times faster than Pandas for this use-case.") return WMOs, CYCs def namer(WMOs, CYCs): return {"WMO": WMOs, "CYC": CYCs} - def composer(WMOs, CYCs): - filt = [] - for wmo in WMOs: - for cyc in CYCs: - if cyc < 1000: - pattern = "%i_%0.3d.nc" % (wmo, cyc) - else: - pattern = "%i_%0.4d.nc" % (wmo, cyc) - filt.append( - self._obj.index["file"].str.contains( - pattern, regex=True, case=False - ) - ) - return self._obj._reduce_a_filter_list(filt, op="or") + def composer(obj, WMOs, CYCs): + filt = pmap(obj, compute_wmo_cyc, WMOs, kw={"cyc": tuple(CYCs)}) + return obj._reduce_a_filter_list(filt) WMOs, CYCs = checker(WMOs, CYCs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(WMOs, CYCs) + search_filter = composer(self._obj, WMOs, CYCs) if not composed: self._obj.search_type = namer(WMOs, CYCs) self._obj.search_filter = search_filter @@ -338,19 +348,17 @@ def checker(PARAMs): def namer(PARAMs, logical): return {"PARAMS": (PARAMs, logical)} - def composer(PARAMs, logical): - filt = [] - self._obj.index["variables"] = self._obj.index["parameters"].apply( + def composer(obj, PARAMs, logical): + obj.index["variables"] = obj.index["parameters"].apply( lambda x: x.split() ) - for param in PARAMs: - filt.append(self._obj.index["variables"].apply(lambda x: param in x)) - self._obj.index = self._obj.index.drop("variables", axis=1) - return self._obj._reduce_a_filter_list(filt, op=logical) + filt = pmap(obj, compute_params, PARAMs) + obj.index = obj.index.drop("variables", axis=1) + return obj._reduce_a_filter_list(filt, op=logical) PARAMs = checker(PARAMs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(PARAMs, logical) + search_filter = composer(self._obj, PARAMs, logical) if not composed: self._obj.search_type = namer(PARAMs, logical) self._obj.search_filter = search_filter @@ -482,15 +490,21 @@ def composer(profiler_type): def institution_code(self, institution_code: List[str], nrows=None, composed=False): def checker(institution_code): if "institution" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for institution codes in this index)") - log.debug("Argo index searching for institution code in %s ..." % institution_code) + raise InvalidDatasetStructure( + "Cannot search for institution codes in this index)" + ) + log.debug( + "Argo index searching for institution code in %s ..." % institution_code + ) institution_code = to_list(institution_code) valid_codes = [] for code in institution_code: - if self._obj.valid('institution_code', code): + if self._obj.valid("institution_code", code): valid_codes.append(code.upper()) if len(valid_codes) == 0: - raise OptionValueError(f"No valid codes found for institution in {institution_code}. Valid codes are: {self._obj.valid.institution_code}") + raise OptionValueError( + f"No valid codes found for institution in {institution_code}. Valid codes are: {self._obj.valid.institution_code}" + ) else: return valid_codes @@ -498,11 +512,7 @@ def namer(institution_code): return {"INST_CODE": institution_code} def composer(institution_code): - return ( - self._obj.index["institution"] - .fillna("") - .isin(institution_code) - ) + return self._obj.index["institution"].fillna("").isin(institution_code) institution_code = checker(institution_code) self._obj.load(nrows=self._obj._nrows_index) diff --git a/argopy/stores/index/implementations/pyarrow/index.py b/argopy/stores/index/implementations/pyarrow/index.py index 37bfdf32d..9dfbfc245 100644 --- a/argopy/stores/index/implementations/pyarrow/index.py +++ b/argopy/stores/index/implementations/pyarrow/index.py @@ -6,6 +6,7 @@ from packaging import version from pathlib import Path from typing import List +import concurrent.futures try: import pyarrow.csv as csv # noqa: F401 @@ -415,7 +416,7 @@ def xmax(xtble): tmax(self.index["date"]), ] - def read_files(self, index : bool = False, multi : bool = False) -> List[str]: + def read_files(self, index: bool = False, multi: bool = False) -> List[str]: sep = self.fs["src"].fs.sep if hasattr(self, "search") and not index: flist = [ @@ -432,7 +433,7 @@ def read_files(self, index : bool = False, multi : bool = False) -> List[str]: else: return mono2multi(flist, convention=self.convention, sep=sep) - def records_per_wmo(self, index=False): + def records_per_wmo_legacy(self, index=False): """Return the number of records per unique WMOs in search results Fall back on full index if search not triggered @@ -454,6 +455,45 @@ def records_per_wmo(self, index=False): count[wmo] = self.index.filter(search_filter).shape[0] return count + def records_per_wmo(self, index=False): + """Return the number of records per unique WMOs in search results + + Fall back on full index if search not triggered + + Notes + ----- + Computation is parallelized over all WMOs with a ThreadPoolExecutor + """ + + def count_wmo(self, wmo: int, index: bool): + if hasattr(self, "search") and not index: + search_filter = pa.compute.match_substring_regex( + self.search["file"], pattern="/%i/" % wmo + ) + return wmo, self.search.filter(search_filter).shape[0] + else: + if not hasattr(self, "index"): + self.load(nrows=self._nrows_index) + search_filter = pa.compute.match_substring_regex( + self.index["file"], pattern="/%i/" % wmo + ) + return wmo, self.index.filter(search_filter).shape[0] + + ulist = self.read_wmo() + count = {} + with concurrent.futures.ThreadPoolExecutor() as executor: + # Submit tasks for each WMO + futures = { + executor.submit(count_wmo, self, wmo, index): wmo for wmo in ulist + } + + # Process results as they complete + for future in concurrent.futures.as_completed(futures): + wmo, cnt = future.result() + count[wmo] = cnt + + return count + def to_indexfile(self, file): """Save search results on file, following the Argo standard index formats diff --git a/argopy/stores/index/implementations/pyarrow/search_engine.py b/argopy/stores/index/implementations/pyarrow/search_engine.py index 7148781ce..b3bed815f 100644 --- a/argopy/stores/index/implementations/pyarrow/search_engine.py +++ b/argopy/stores/index/implementations/pyarrow/search_engine.py @@ -2,6 +2,8 @@ import pandas as pd import numpy as np from typing import List +from functools import lru_cache + try: import pyarrow.csv as csv # noqa: F401 @@ -11,17 +13,52 @@ except ModuleNotFoundError: pass -from .....options import OPTIONS -from .....errors import InvalidDatasetStructure, OptionValueError -from .....utils import is_indexbox, check_wmo, check_cyc, to_list, conv_lon -from ...extensions import register_ArgoIndex_accessor, ArgoIndexSearchEngine -from ..index_s3 import search_s3 -from .index import indexstore +from argopy.options import OPTIONS +from argopy.errors import InvalidDatasetStructure, OptionValueError +from argopy.utils.monitored_threadpool import pmap +from argopy.utils.checkers import is_indexbox, check_wmo, check_cyc +from argopy.utils.casting import to_list +from argopy.utils.geo import conv_lon +from argopy.stores.index.extensions import ( + register_ArgoIndex_accessor, + ArgoIndexSearchEngine, +) +from argopy.stores.index.implementations.index_s3 import search_s3 +from argopy.stores.index.implementations.pyarrow.index import indexstore log = logging.getLogger("argopy.stores.index.pa") -@register_ArgoIndex_accessor('query', indexstore) +@lru_cache(maxsize=25_000) +def compute_wmo(wmo: int, obj): + return pa.compute.match_substring_regex(obj.index["file"], pattern="/%i/" % wmo) + + +@lru_cache(maxsize=25_000) +def compute_cyc(cyc: int, obj): + pattern = "_%0.3d.nc" % cyc + if cyc >= 1000: + pattern = "_%0.4d.nc" % cyc + return pa.compute.match_substring_regex(obj.index["file"], pattern=pattern) + + +@lru_cache(maxsize=25_000) +def compute_wmo_cyc(wmo: int, obj, cyc=None): + filt = [] + for c in cyc: + filt.append(compute_cyc(c, obj)) + return np.logical_and.reduce([compute_wmo(wmo, obj), np.logical_or.reduce(filt)]) + + +@lru_cache(maxsize=1_000) +def compute_params(param: str, obj): + return pa.compute.match_substring_regex( + obj.index["parameters"], + options=pa.compute.MatchSubstringOptions(param, ignore_case=True), + ) + + +@register_ArgoIndex_accessor("query", indexstore) class SearchEngine(ArgoIndexSearchEngine): @search_s3 @@ -37,19 +74,13 @@ def checker(WMOs): def namer(WMOs): return {"WMO": WMOs} - def composer(WMOs): - filt = [] - for wmo in WMOs: - filt.append( - pa.compute.match_substring_regex( - self._obj.index["file"], pattern="/%i/" % wmo - ) - ) - return self._obj._reduce_a_filter_list(filt) + def composer(obj, WMOs): + filt = pmap(obj, compute_wmo, WMOs) + return obj._reduce_a_filter_list(filt, op="or") WMOs = checker(WMOs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(WMOs) + search_filter = composer(self._obj, WMOs) if not composed: self._obj.search_type = namer(WMOs) self._obj.search_filter = search_filter @@ -76,21 +107,13 @@ def checker(CYCs): def namer(CYCs): return {"CYC": CYCs} - def composer(CYCs): - filt = [] - for cyc in CYCs: - if cyc < 1000: - pattern = "_%0.3d.nc" % (cyc) - else: - pattern = "_%0.4d.nc" % (cyc) - filt.append( - pa.compute.match_substring_regex(self._obj.index["file"], pattern=pattern) - ) - return self._obj._reduce_a_filter_list(filt) + def composer(obj, CYCs): + filt = pmap(obj, compute_cyc, CYCs) + return obj._reduce_a_filter_list(filt) CYCs = checker(CYCs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(CYCs) + search_filter = composer(self._obj, CYCs) if not composed: self._obj.search_type = namer(CYCs) self._obj.search_filter = search_filter @@ -121,24 +144,13 @@ def checker(WMOs, CYCs): def namer(WMOs, CYCs): return {"WMO": WMOs, "CYC": CYCs} - def composer(WMOs, CYCs): - filt = [] - for wmo in WMOs: - for cyc in CYCs: - if cyc < 1000: - pattern = "%i_%0.3d.nc" % (wmo, cyc) - else: - pattern = "%i_%0.4d.nc" % (wmo, cyc) - filt.append( - pa.compute.match_substring_regex( - self._obj.index["file"], pattern=pattern - ) - ) - return self._obj._reduce_a_filter_list(filt) + def composer(obj, WMOs, CYCs): + filt = pmap(obj, compute_wmo_cyc, WMOs, kw={"cyc": tuple(CYCs)}) + return obj._reduce_a_filter_list(filt) WMOs, CYCs = checker(WMOs, CYCs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(WMOs, CYCs) + search_filter = composer(self._obj, WMOs, CYCs) if not composed: self._obj.search_type = namer(WMOs, CYCs) self._obj.search_filter = search_filter @@ -190,7 +202,9 @@ def composer(BOX, key): def lat(self, BOX, nrows=None, composed=False): def checker(BOX): if "latitude" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for latitude in this index") + raise InvalidDatasetStructure( + "Cannot search for latitude in this index" + ) is_indexbox(BOX) log.debug("Argo index searching for latitude in BOX=%s ..." % BOX) @@ -218,7 +232,9 @@ def composer(BOX): def lon(self, BOX, nrows=None, composed=False): def checker(BOX): if "longitude" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for longitude in this index") + raise InvalidDatasetStructure( + "Cannot search for longitude in this index" + ) is_indexbox(BOX) log.debug("Argo index searching for longitude in BOX=%s ..." % BOX) @@ -227,12 +243,28 @@ def namer(BOX): def composer(BOX): filt = [] - if OPTIONS['longitude_convention'] == '360': - filt.append(pa.compute.greater_equal(self._obj.index["longitude_360"], conv_lon(BOX[0], '360'))) - filt.append(pa.compute.less_equal(self._obj.index["longitude_360"], conv_lon(BOX[1], '360'))) - elif OPTIONS['longitude_convention'] == '180': - filt.append(pa.compute.greater_equal(self._obj.index["longitude"], conv_lon(BOX[0], '180'))) - filt.append(pa.compute.less_equal(self._obj.index["longitude"], conv_lon(BOX[1], '180'))) + if OPTIONS["longitude_convention"] == "360": + filt.append( + pa.compute.greater_equal( + self._obj.index["longitude_360"], conv_lon(BOX[0], "360") + ) + ) + filt.append( + pa.compute.less_equal( + self._obj.index["longitude_360"], conv_lon(BOX[1], "360") + ) + ) + elif OPTIONS["longitude_convention"] == "180": + filt.append( + pa.compute.greater_equal( + self._obj.index["longitude"], conv_lon(BOX[0], "180") + ) + ) + filt.append( + pa.compute.less_equal( + self._obj.index["longitude"], conv_lon(BOX[1], "180") + ) + ) return self._obj._reduce_a_filter_list(filt, op="and") checker(BOX) @@ -259,12 +291,28 @@ def namer(BOX): def composer(BOX): filt = [] - if OPTIONS['longitude_convention'] == '360': - filt.append(pa.compute.greater_equal(self._obj.index["longitude_360"], conv_lon(BOX[0], '360'))) - filt.append(pa.compute.less_equal(self._obj.index["longitude_360"], conv_lon(BOX[1], '360'))) - elif OPTIONS['longitude_convention'] == '180': - filt.append(pa.compute.greater_equal(self._obj.index["longitude"], conv_lon(BOX[0], '180'))) - filt.append(pa.compute.less_equal(self._obj.index["longitude"], conv_lon(BOX[1], '180'))) + if OPTIONS["longitude_convention"] == "360": + filt.append( + pa.compute.greater_equal( + self._obj.index["longitude_360"], conv_lon(BOX[0], "360") + ) + ) + filt.append( + pa.compute.less_equal( + self._obj.index["longitude_360"], conv_lon(BOX[1], "360") + ) + ) + elif OPTIONS["longitude_convention"] == "180": + filt.append( + pa.compute.greater_equal( + self._obj.index["longitude"], conv_lon(BOX[0], "180") + ) + ) + filt.append( + pa.compute.less_equal( + self._obj.index["longitude"], conv_lon(BOX[1], "180") + ) + ) filt.append(pa.compute.greater_equal(self._obj.index["latitude"], BOX[2])) filt.append(pa.compute.less_equal(self._obj.index["latitude"], BOX[3])) return self._obj._reduce_a_filter_list(filt, op="and") @@ -284,7 +332,9 @@ def composer(BOX): def box(self, BOX, nrows=None, composed=False): def checker(BOX): if "longitude" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for coordinates in this index") + raise InvalidDatasetStructure( + "Cannot search for coordinates in this index" + ) is_indexbox(BOX) log.debug("Argo index searching for lat/lon/date in BOX=%s ..." % BOX) return "date" # Return key to use for time axis @@ -294,12 +344,28 @@ def namer(BOX): def composer(BOX, key): filt = [] - if OPTIONS['longitude_convention'] == '360': - filt.append(pa.compute.greater_equal(self._obj.index["longitude_360"], conv_lon(BOX[0], '360'))) - filt.append(pa.compute.less_equal(self._obj.index["longitude_360"], conv_lon(BOX[1], '360'))) - elif OPTIONS['longitude_convention'] == '180': - filt.append(pa.compute.greater_equal(self._obj.index["longitude"], conv_lon(BOX[0], '180'))) - filt.append(pa.compute.less_equal(self._obj.index["longitude"], conv_lon(BOX[1], '180'))) + if OPTIONS["longitude_convention"] == "360": + filt.append( + pa.compute.greater_equal( + self._obj.index["longitude_360"], conv_lon(BOX[0], "360") + ) + ) + filt.append( + pa.compute.less_equal( + self._obj.index["longitude_360"], conv_lon(BOX[1], "360") + ) + ) + elif OPTIONS["longitude_convention"] == "180": + filt.append( + pa.compute.greater_equal( + self._obj.index["longitude"], conv_lon(BOX[0], "180") + ) + ) + filt.append( + pa.compute.less_equal( + self._obj.index["longitude"], conv_lon(BOX[1], "180") + ) + ) filt.append(pa.compute.greater_equal(self._obj.index["latitude"], BOX[2])) filt.append(pa.compute.less_equal(self._obj.index["latitude"], BOX[3])) filt.append( @@ -331,27 +397,23 @@ def composer(BOX, key): def params(self, PARAMs, logical="and", nrows=None, composed=False): def checker(PARAMs): if "parameters" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for parameters in this index (%s: %s)." % (self._obj.convention, self._obj.convention_title)) + raise InvalidDatasetStructure( + "Cannot search for parameters in this index (%s: %s)." + % (self._obj.convention, self._obj.convention_title) + ) log.debug("Argo index searching for parameters in PARAM=%s." % PARAMs) return to_list(PARAMs) # Make sure we deal with a list def namer(PARAMs, logical): return {"PARAMS": (PARAMs, logical)} - def composer(PARAMs, logical): - filt = [] - for param in PARAMs: - filt.append( - pa.compute.match_substring_regex( - self._obj.index["parameters"], - options=pa.compute.MatchSubstringOptions(param, ignore_case=True), - ) - ) - return self._obj._reduce_a_filter_list(filt, op=logical) + def composer(obj, PARAMs, logical): + filt = pmap(obj, compute_params, PARAMs) + return obj._reduce_a_filter_list(filt, op=logical) PARAMs = checker(PARAMs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(PARAMs, logical) + search_filter = composer(self._obj, PARAMs, logical) if not composed: self._obj.search_type = namer(PARAMs, logical) self._obj.search_filter = search_filter @@ -361,7 +423,9 @@ def composer(PARAMs, logical): self._obj.search_type.update(namer(PARAMs, logical)) return search_filter - def parameter_data_mode(self, PARAMs: dict, logical="and", nrows=None, composed=False): + def parameter_data_mode( + self, PARAMs: dict, logical="and", nrows=None, composed=False + ): def checker(PARAMs): if self._obj.convention not in [ "ar_index_global_prof", @@ -377,11 +441,13 @@ def checker(PARAMs): ) # Validate PARAMs argument type - [ - PARAMs.update({p: to_list(PARAMs[p])}) for p in PARAMs - ] + [PARAMs.update({p: to_list(PARAMs[p])}) for p in PARAMs] if not np.all( - [v in ["R", "A", "D", "", " "] for vals in PARAMs.values() for v in vals] + [ + v in ["R", "A", "D", "", " "] + for vals in PARAMs.values() + for v in vals + ] ): raise ValueError("Data mode must be a value in 'R', 'A', 'D', ' ', ''") if self._obj.convention in ["argo_aux-profile_index"]: @@ -449,8 +515,12 @@ def fct(this_x, this_y): def profiler_type(self, profiler_type: List[int], nrows=None, composed=False): def checker(profiler_type): if "profiler_type" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for profiler types in this index)") - log.debug("Argo index searching for profiler type in %s ..." % profiler_type) + raise InvalidDatasetStructure( + "Cannot search for profiler types in this index)" + ) + log.debug( + "Argo index searching for profiler type in %s ..." % profiler_type + ) return to_list(profiler_type) def namer(profiler_type): @@ -474,18 +544,24 @@ def composer(profiler_type): return search_filter @search_s3 - def institution_code(self, institution_code: List[str], nrows=None, composed=False): + def institution_code(self, institution_code: List[str], nrows=None, composed=False): def checker(institution_code): if "institution" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for institution codes in this index)") - log.debug("Argo index searching for institution code in %s ..." % institution_code) + raise InvalidDatasetStructure( + "Cannot search for institution codes in this index)" + ) + log.debug( + "Argo index searching for institution code in %s ..." % institution_code + ) institution_code = to_list(institution_code) valid_codes = [] for code in institution_code: - if self._obj.valid('institution_code', code): + if self._obj.valid("institution_code", code): valid_codes.append(code.upper()) if len(valid_codes) == 0: - raise OptionValueError(f"No valid codes found for institution in {institution_code}. Valid codes are: {self._obj.valid.institution_code}") + raise OptionValueError( + f"No valid codes found for institution in {institution_code}. Valid codes are: {self._obj.valid.institution_code}" + ) else: return valid_codes @@ -510,7 +586,7 @@ def composer(institution_code): return search_filter @search_s3 - def dac(self, dac: list[str], nrows=None, composed=False): + def dac(self, dac: list[str], nrows=None, composed=False): def checker(dac): if "file" not in self._obj.convention_columns: raise InvalidDatasetStructure("Cannot search for DAC in this index)") @@ -540,4 +616,4 @@ def composer(DACs): return self._obj else: self._obj.search_type.update(namer(dac)) - return search_filter \ No newline at end of file + return search_filter diff --git a/argopy/tests/test_stores_index.py b/argopy/tests/test_stores_index.py index d6c083fa0..281970b25 100644 --- a/argopy/tests/test_stores_index.py +++ b/argopy/tests/test_stores_index.py @@ -279,7 +279,7 @@ def a_store(self, request): if not HAS_S3FS and 's3' in fetcher_args['host']: xfail, reason = True, 's3fs not available' elif 's3' in fetcher_args['host']: - xfail, reason = True, 's3 is experimental' + xfail, reason = 0, 's3 is experimental (store)' yield self.create_store(fetcher_args, xfail=xfail, reason=reason).load(nrows=N_RECORDS) @pytest.fixture @@ -298,7 +298,7 @@ def a_search(self, request): if not HAS_S3FS and 's3' in host: xfail, reason = True, 's3fs not available' elif 's3' in host: - xfail, reason = True, 's3 is experimental' + xfail, reason = 0, 's3 is experimental (search)' yield run_a_search(self.new_idx, {"host": host, "cache": True}, srch, xfail=xfail, reason=reason) diff --git a/argopy/utils/monitored_threadpool.py b/argopy/utils/monitored_threadpool.py index 66e0a9064..5814710fe 100644 --- a/argopy/utils/monitored_threadpool.py +++ b/argopy/utils/monitored_threadpool.py @@ -12,7 +12,7 @@ from concurrent.futures import as_completed from threading import Lock import logging -from typing import Union +from typing import Union, Callable, Iterable, Any from abc import ABC, abstractmethod import importlib @@ -589,3 +589,15 @@ def my_final(obj_list, opt=True): """ pass + + +def pmap(obj, mapper: Callable, a_list: Iterable, kw: dict[Any] = {}) -> list[Any]: + """A method to execute some computation with multithreading""" + results: list[Any] = [] + + with ThreadPoolExecutor() as executor: + futures = {executor.submit(mapper, item, obj, **kw): item for item in a_list} + for future in as_completed(futures): + results.append(future.result()) + + return results \ No newline at end of file