From a91eec00d1750f0b8c96b8a7ae51aded4c13a838 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Thu, 29 Jan 2026 17:39:15 +0100 Subject: [PATCH 1/7] records_per_wmo in parallel --- .../index/implementations/pandas/index.py | 44 ++++++++++++++++++- .../index/implementations/pyarrow/index.py | 44 ++++++++++++++++++- 2 files changed, 85 insertions(+), 3 deletions(-) diff --git a/argopy/stores/index/implementations/pandas/index.py b/argopy/stores/index/implementations/pandas/index.py index ae43da43e..2ef69d681 100644 --- a/argopy/stores/index/implementations/pandas/index.py +++ b/argopy/stores/index/implementations/pandas/index.py @@ -4,6 +4,7 @@ import gzip from pathlib import Path from typing import List +import concurrent.futures from argopy.options import OPTIONS from argopy.errors import DataNotFound, InvalidDatasetStructure @@ -355,7 +356,7 @@ def read_files(self, index : bool = False, multi : bool = False) -> List[str]: else: return mono2multi(flist, convention=self.convention, sep=sep) - def records_per_wmo(self, index=False): + def records_per_wmo_legacy(self, index=False): """Return the number of records per unique WMOs in search results Fall back on full index if search not triggered @@ -379,6 +380,47 @@ def records_per_wmo(self, index=False): count[wmo] = self.index[search_filter].shape[0] return count + def records_per_wmo(self, index=False): + """Return the number of records per unique WMOs in search results + + Fall back on full index if search not triggered + + Returns + ------- + dict + + Notes + ----- + Computation is parallelized over all WMOs with a ThreadPoolExecutor + """ + + def count_wmo(self, wmo: int, index: bool): + if hasattr(self, "search") and not index: + search_filter = self.search["file"].str.contains( + "/%i/" % wmo, regex=True, case=False + ) + return wmo, self.search[search_filter].shape[0] + else: + search_filter = self.index["file"].str.contains( + "/%i/" % wmo, regex=True, case=False + ) + return wmo, self.index[search_filter].shape[0] + + ulist = self.read_wmo() + count = {} + with concurrent.futures.ThreadPoolExecutor() as executor: + # Submit tasks for each WMO + futures = { + executor.submit(count_wmo, self, wmo, index): wmo for wmo in ulist + } + + # Process results as they complete + for future in concurrent.futures.as_completed(futures): + wmo, cnt = future.result() + count[wmo] = cnt + + return count + def to_indexfile(self, outputfile): """Save search results on file, following the Argo standard index formats diff --git a/argopy/stores/index/implementations/pyarrow/index.py b/argopy/stores/index/implementations/pyarrow/index.py index 37bfdf32d..9dfbfc245 100644 --- a/argopy/stores/index/implementations/pyarrow/index.py +++ b/argopy/stores/index/implementations/pyarrow/index.py @@ -6,6 +6,7 @@ from packaging import version from pathlib import Path from typing import List +import concurrent.futures try: import pyarrow.csv as csv # noqa: F401 @@ -415,7 +416,7 @@ def xmax(xtble): tmax(self.index["date"]), ] - def read_files(self, index : bool = False, multi : bool = False) -> List[str]: + def read_files(self, index: bool = False, multi: bool = False) -> List[str]: sep = self.fs["src"].fs.sep if hasattr(self, "search") and not index: flist = [ @@ -432,7 +433,7 @@ def read_files(self, index : bool = False, multi : bool = False) -> List[str]: else: return mono2multi(flist, convention=self.convention, sep=sep) - def records_per_wmo(self, index=False): + def records_per_wmo_legacy(self, index=False): """Return the number of records per unique WMOs in search results Fall back on full index if search not triggered @@ -454,6 +455,45 @@ def records_per_wmo(self, index=False): count[wmo] = self.index.filter(search_filter).shape[0] return count + def records_per_wmo(self, index=False): + """Return the number of records per unique WMOs in search results + + Fall back on full index if search not triggered + + Notes + ----- + Computation is parallelized over all WMOs with a ThreadPoolExecutor + """ + + def count_wmo(self, wmo: int, index: bool): + if hasattr(self, "search") and not index: + search_filter = pa.compute.match_substring_regex( + self.search["file"], pattern="/%i/" % wmo + ) + return wmo, self.search.filter(search_filter).shape[0] + else: + if not hasattr(self, "index"): + self.load(nrows=self._nrows_index) + search_filter = pa.compute.match_substring_regex( + self.index["file"], pattern="/%i/" % wmo + ) + return wmo, self.index.filter(search_filter).shape[0] + + ulist = self.read_wmo() + count = {} + with concurrent.futures.ThreadPoolExecutor() as executor: + # Submit tasks for each WMO + futures = { + executor.submit(count_wmo, self, wmo, index): wmo for wmo in ulist + } + + # Process results as they complete + for future in concurrent.futures.as_completed(futures): + wmo, cnt = future.result() + count[wmo] = cnt + + return count + def to_indexfile(self, file): """Save search results on file, following the Argo standard index formats From e2e8af6f6814995d3d5e0bb3cf0da1ef6b40c5cf Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Fri, 30 Jan 2026 21:25:20 +0100 Subject: [PATCH 2/7] apply multithreadings on pyarrows search entries --- .../implementations/pyarrow/search_engine.py | 255 ++++++++++++------ 1 file changed, 169 insertions(+), 86 deletions(-) diff --git a/argopy/stores/index/implementations/pyarrow/search_engine.py b/argopy/stores/index/implementations/pyarrow/search_engine.py index 7148781ce..5b260abb5 100644 --- a/argopy/stores/index/implementations/pyarrow/search_engine.py +++ b/argopy/stores/index/implementations/pyarrow/search_engine.py @@ -1,7 +1,10 @@ import logging import pandas as pd import numpy as np -from typing import List +from typing import List, Any, Callable, Iterable +import concurrent.futures +from functools import lru_cache + try: import pyarrow.csv as csv # noqa: F401 @@ -21,7 +24,48 @@ log = logging.getLogger("argopy.stores.index.pa") -@register_ArgoIndex_accessor('query', indexstore) +@lru_cache(maxsize=25_000) +def compute_wmo(wmo: int, obj): + return pa.compute.match_substring_regex(obj.index["file"], pattern="/%i/" % wmo) + + +@lru_cache(maxsize=25_000) +def compute_cyc(cyc: int, obj): + pattern = "_%0.3d.nc" % cyc + if cyc >= 1000: + pattern = "_%0.4d.nc" % cyc + return pa.compute.match_substring_regex(obj.index["file"], pattern=pattern) + + +@lru_cache(maxsize=25_000) +def compute_wmo_cyc(wmo: int, obj, cyc=None): + filt = [] + for c in cyc: + filt.append(compute_cyc(c, obj)) + return np.logical_and.reduce([compute_wmo(wmo, obj), np.logical_or.reduce(filt)]) + + +@lru_cache(maxsize=1_000) +def compute_params(param: str, obj): + return pa.compute.match_substring_regex( + obj.index["parameters"], + options=pa.compute.MatchSubstringOptions(param, ignore_case=True), + ) + + +def pmap(obj, mapper: Callable, a_list: Iterable, kw: dict[Any] = {}) -> list[Any]: + """A method to execute some compute in multithreading""" + results: list[Any] = [] + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = {executor.submit(mapper, item, obj, **kw): item for item in a_list} + for future in concurrent.futures.as_completed(futures): + results.append(future.result()) + + return results + + +@register_ArgoIndex_accessor("query", indexstore) class SearchEngine(ArgoIndexSearchEngine): @search_s3 @@ -37,19 +81,13 @@ def checker(WMOs): def namer(WMOs): return {"WMO": WMOs} - def composer(WMOs): - filt = [] - for wmo in WMOs: - filt.append( - pa.compute.match_substring_regex( - self._obj.index["file"], pattern="/%i/" % wmo - ) - ) - return self._obj._reduce_a_filter_list(filt) + def composer(obj, WMOs): + filt = pmap(obj, compute_wmo, WMOs) + return obj._reduce_a_filter_list(filt) WMOs = checker(WMOs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(WMOs) + search_filter = composer(self._obj, WMOs) if not composed: self._obj.search_type = namer(WMOs) self._obj.search_filter = search_filter @@ -76,21 +114,13 @@ def checker(CYCs): def namer(CYCs): return {"CYC": CYCs} - def composer(CYCs): - filt = [] - for cyc in CYCs: - if cyc < 1000: - pattern = "_%0.3d.nc" % (cyc) - else: - pattern = "_%0.4d.nc" % (cyc) - filt.append( - pa.compute.match_substring_regex(self._obj.index["file"], pattern=pattern) - ) - return self._obj._reduce_a_filter_list(filt) + def composer(obj, CYCs): + filt = pmap(obj, compute_cyc, CYCs) + return obj._reduce_a_filter_list(filt) CYCs = checker(CYCs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(CYCs) + search_filter = composer(self._obj.CYCs) if not composed: self._obj.search_type = namer(CYCs) self._obj.search_filter = search_filter @@ -121,24 +151,13 @@ def checker(WMOs, CYCs): def namer(WMOs, CYCs): return {"WMO": WMOs, "CYC": CYCs} - def composer(WMOs, CYCs): - filt = [] - for wmo in WMOs: - for cyc in CYCs: - if cyc < 1000: - pattern = "%i_%0.3d.nc" % (wmo, cyc) - else: - pattern = "%i_%0.4d.nc" % (wmo, cyc) - filt.append( - pa.compute.match_substring_regex( - self._obj.index["file"], pattern=pattern - ) - ) - return self._obj._reduce_a_filter_list(filt) + def composer(obj, WMOs, CYCs): + filt = pmap(obj, compute_wmo_cyc, WMOs, kw={"cyc": tuple(CYCs)}) + return obj._reduce_a_filter_list(filt) WMOs, CYCs = checker(WMOs, CYCs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(WMOs, CYCs) + search_filter = composer(self._obj, WMOs, CYCs) if not composed: self._obj.search_type = namer(WMOs, CYCs) self._obj.search_filter = search_filter @@ -190,7 +209,9 @@ def composer(BOX, key): def lat(self, BOX, nrows=None, composed=False): def checker(BOX): if "latitude" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for latitude in this index") + raise InvalidDatasetStructure( + "Cannot search for latitude in this index" + ) is_indexbox(BOX) log.debug("Argo index searching for latitude in BOX=%s ..." % BOX) @@ -218,7 +239,9 @@ def composer(BOX): def lon(self, BOX, nrows=None, composed=False): def checker(BOX): if "longitude" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for longitude in this index") + raise InvalidDatasetStructure( + "Cannot search for longitude in this index" + ) is_indexbox(BOX) log.debug("Argo index searching for longitude in BOX=%s ..." % BOX) @@ -227,12 +250,28 @@ def namer(BOX): def composer(BOX): filt = [] - if OPTIONS['longitude_convention'] == '360': - filt.append(pa.compute.greater_equal(self._obj.index["longitude_360"], conv_lon(BOX[0], '360'))) - filt.append(pa.compute.less_equal(self._obj.index["longitude_360"], conv_lon(BOX[1], '360'))) - elif OPTIONS['longitude_convention'] == '180': - filt.append(pa.compute.greater_equal(self._obj.index["longitude"], conv_lon(BOX[0], '180'))) - filt.append(pa.compute.less_equal(self._obj.index["longitude"], conv_lon(BOX[1], '180'))) + if OPTIONS["longitude_convention"] == "360": + filt.append( + pa.compute.greater_equal( + self._obj.index["longitude_360"], conv_lon(BOX[0], "360") + ) + ) + filt.append( + pa.compute.less_equal( + self._obj.index["longitude_360"], conv_lon(BOX[1], "360") + ) + ) + elif OPTIONS["longitude_convention"] == "180": + filt.append( + pa.compute.greater_equal( + self._obj.index["longitude"], conv_lon(BOX[0], "180") + ) + ) + filt.append( + pa.compute.less_equal( + self._obj.index["longitude"], conv_lon(BOX[1], "180") + ) + ) return self._obj._reduce_a_filter_list(filt, op="and") checker(BOX) @@ -259,12 +298,28 @@ def namer(BOX): def composer(BOX): filt = [] - if OPTIONS['longitude_convention'] == '360': - filt.append(pa.compute.greater_equal(self._obj.index["longitude_360"], conv_lon(BOX[0], '360'))) - filt.append(pa.compute.less_equal(self._obj.index["longitude_360"], conv_lon(BOX[1], '360'))) - elif OPTIONS['longitude_convention'] == '180': - filt.append(pa.compute.greater_equal(self._obj.index["longitude"], conv_lon(BOX[0], '180'))) - filt.append(pa.compute.less_equal(self._obj.index["longitude"], conv_lon(BOX[1], '180'))) + if OPTIONS["longitude_convention"] == "360": + filt.append( + pa.compute.greater_equal( + self._obj.index["longitude_360"], conv_lon(BOX[0], "360") + ) + ) + filt.append( + pa.compute.less_equal( + self._obj.index["longitude_360"], conv_lon(BOX[1], "360") + ) + ) + elif OPTIONS["longitude_convention"] == "180": + filt.append( + pa.compute.greater_equal( + self._obj.index["longitude"], conv_lon(BOX[0], "180") + ) + ) + filt.append( + pa.compute.less_equal( + self._obj.index["longitude"], conv_lon(BOX[1], "180") + ) + ) filt.append(pa.compute.greater_equal(self._obj.index["latitude"], BOX[2])) filt.append(pa.compute.less_equal(self._obj.index["latitude"], BOX[3])) return self._obj._reduce_a_filter_list(filt, op="and") @@ -284,7 +339,9 @@ def composer(BOX): def box(self, BOX, nrows=None, composed=False): def checker(BOX): if "longitude" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for coordinates in this index") + raise InvalidDatasetStructure( + "Cannot search for coordinates in this index" + ) is_indexbox(BOX) log.debug("Argo index searching for lat/lon/date in BOX=%s ..." % BOX) return "date" # Return key to use for time axis @@ -294,12 +351,28 @@ def namer(BOX): def composer(BOX, key): filt = [] - if OPTIONS['longitude_convention'] == '360': - filt.append(pa.compute.greater_equal(self._obj.index["longitude_360"], conv_lon(BOX[0], '360'))) - filt.append(pa.compute.less_equal(self._obj.index["longitude_360"], conv_lon(BOX[1], '360'))) - elif OPTIONS['longitude_convention'] == '180': - filt.append(pa.compute.greater_equal(self._obj.index["longitude"], conv_lon(BOX[0], '180'))) - filt.append(pa.compute.less_equal(self._obj.index["longitude"], conv_lon(BOX[1], '180'))) + if OPTIONS["longitude_convention"] == "360": + filt.append( + pa.compute.greater_equal( + self._obj.index["longitude_360"], conv_lon(BOX[0], "360") + ) + ) + filt.append( + pa.compute.less_equal( + self._obj.index["longitude_360"], conv_lon(BOX[1], "360") + ) + ) + elif OPTIONS["longitude_convention"] == "180": + filt.append( + pa.compute.greater_equal( + self._obj.index["longitude"], conv_lon(BOX[0], "180") + ) + ) + filt.append( + pa.compute.less_equal( + self._obj.index["longitude"], conv_lon(BOX[1], "180") + ) + ) filt.append(pa.compute.greater_equal(self._obj.index["latitude"], BOX[2])) filt.append(pa.compute.less_equal(self._obj.index["latitude"], BOX[3])) filt.append( @@ -331,27 +404,23 @@ def composer(BOX, key): def params(self, PARAMs, logical="and", nrows=None, composed=False): def checker(PARAMs): if "parameters" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for parameters in this index (%s: %s)." % (self._obj.convention, self._obj.convention_title)) + raise InvalidDatasetStructure( + "Cannot search for parameters in this index (%s: %s)." + % (self._obj.convention, self._obj.convention_title) + ) log.debug("Argo index searching for parameters in PARAM=%s." % PARAMs) return to_list(PARAMs) # Make sure we deal with a list def namer(PARAMs, logical): return {"PARAMS": (PARAMs, logical)} - def composer(PARAMs, logical): - filt = [] - for param in PARAMs: - filt.append( - pa.compute.match_substring_regex( - self._obj.index["parameters"], - options=pa.compute.MatchSubstringOptions(param, ignore_case=True), - ) - ) - return self._obj._reduce_a_filter_list(filt, op=logical) + def composer(obj, PARAMs, logical): + filt = pmap(obj, compute_params, PARAMs) + return obj._reduce_a_filter_list(filt, op=logical) PARAMs = checker(PARAMs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(PARAMs, logical) + search_filter = composer(self._obj, PARAMs, logical) if not composed: self._obj.search_type = namer(PARAMs, logical) self._obj.search_filter = search_filter @@ -361,7 +430,9 @@ def composer(PARAMs, logical): self._obj.search_type.update(namer(PARAMs, logical)) return search_filter - def parameter_data_mode(self, PARAMs: dict, logical="and", nrows=None, composed=False): + def parameter_data_mode( + self, PARAMs: dict, logical="and", nrows=None, composed=False + ): def checker(PARAMs): if self._obj.convention not in [ "ar_index_global_prof", @@ -377,11 +448,13 @@ def checker(PARAMs): ) # Validate PARAMs argument type - [ - PARAMs.update({p: to_list(PARAMs[p])}) for p in PARAMs - ] + [PARAMs.update({p: to_list(PARAMs[p])}) for p in PARAMs] if not np.all( - [v in ["R", "A", "D", "", " "] for vals in PARAMs.values() for v in vals] + [ + v in ["R", "A", "D", "", " "] + for vals in PARAMs.values() + for v in vals + ] ): raise ValueError("Data mode must be a value in 'R', 'A', 'D', ' ', ''") if self._obj.convention in ["argo_aux-profile_index"]: @@ -449,8 +522,12 @@ def fct(this_x, this_y): def profiler_type(self, profiler_type: List[int], nrows=None, composed=False): def checker(profiler_type): if "profiler_type" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for profiler types in this index)") - log.debug("Argo index searching for profiler type in %s ..." % profiler_type) + raise InvalidDatasetStructure( + "Cannot search for profiler types in this index)" + ) + log.debug( + "Argo index searching for profiler type in %s ..." % profiler_type + ) return to_list(profiler_type) def namer(profiler_type): @@ -474,18 +551,24 @@ def composer(profiler_type): return search_filter @search_s3 - def institution_code(self, institution_code: List[str], nrows=None, composed=False): + def institution_code(self, institution_code: List[str], nrows=None, composed=False): def checker(institution_code): if "institution" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for institution codes in this index)") - log.debug("Argo index searching for institution code in %s ..." % institution_code) + raise InvalidDatasetStructure( + "Cannot search for institution codes in this index)" + ) + log.debug( + "Argo index searching for institution code in %s ..." % institution_code + ) institution_code = to_list(institution_code) valid_codes = [] for code in institution_code: - if self._obj.valid('institution_code', code): + if self._obj.valid("institution_code", code): valid_codes.append(code.upper()) if len(valid_codes) == 0: - raise OptionValueError(f"No valid codes found for institution in {institution_code}. Valid codes are: {self._obj.valid.institution_code}") + raise OptionValueError( + f"No valid codes found for institution in {institution_code}. Valid codes are: {self._obj.valid.institution_code}" + ) else: return valid_codes @@ -510,7 +593,7 @@ def composer(institution_code): return search_filter @search_s3 - def dac(self, dac: list[str], nrows=None, composed=False): + def dac(self, dac: list[str], nrows=None, composed=False): def checker(dac): if "file" not in self._obj.convention_columns: raise InvalidDatasetStructure("Cannot search for DAC in this index)") @@ -540,4 +623,4 @@ def composer(DACs): return self._obj else: self._obj.search_type.update(namer(dac)) - return search_filter \ No newline at end of file + return search_filter From 755a9dc168d4930840508adaac73ffea477c9cdd Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Mon, 2 Feb 2026 16:08:03 +0100 Subject: [PATCH 3/7] fix bug --- argopy/stores/index/implementations/pyarrow/search_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/argopy/stores/index/implementations/pyarrow/search_engine.py b/argopy/stores/index/implementations/pyarrow/search_engine.py index 5b260abb5..1d43fd3fd 100644 --- a/argopy/stores/index/implementations/pyarrow/search_engine.py +++ b/argopy/stores/index/implementations/pyarrow/search_engine.py @@ -120,7 +120,7 @@ def composer(obj, CYCs): CYCs = checker(CYCs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(self._obj.CYCs) + search_filter = composer(self._obj, CYCs) if not composed: self._obj.search_type = namer(CYCs) self._obj.search_filter = search_filter From 5964530a2b6adf3cf40402872ad851e049ef17b1 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Wed, 4 Feb 2026 15:33:38 +0100 Subject: [PATCH 4/7] Multithreading for ArgoIndex pandas wmo, wmo_cyc, cy and params search entries --- .../implementations/pandas/search_engine.py | 131 ++++++++++-------- 1 file changed, 72 insertions(+), 59 deletions(-) diff --git a/argopy/stores/index/implementations/pandas/search_engine.py b/argopy/stores/index/implementations/pandas/search_engine.py index 484496577..6ae21adb0 100644 --- a/argopy/stores/index/implementations/pandas/search_engine.py +++ b/argopy/stores/index/implementations/pandas/search_engine.py @@ -1,10 +1,12 @@ import logging import pandas as pd import numpy as np -from typing import List +from typing import List, Any, Callable, Iterable +import concurrent.futures +from functools import lru_cache -from .....options import OPTIONS -from .....errors import InvalidDatasetStructure, OptionValueError +from argopy.options import OPTIONS +from argopy.errors import InvalidDatasetStructure, OptionValueError from .....utils import is_indexbox, check_wmo, check_cyc, to_list, conv_lon from ...extensions import register_ArgoIndex_accessor, ArgoIndexSearchEngine from ..index_s3 import search_s3 @@ -13,6 +15,44 @@ log = logging.getLogger("argopy.stores.index.pd") +@lru_cache(maxsize=25_000) +def compute_wmo(wmo: int, obj): + return obj.index["file"].str.contains("/%i/" % wmo, regex=False, case=True) + + +@lru_cache(maxsize=25_000) +def compute_cyc(cyc: int, obj): + pattern = "_%0.3d.nc" % cyc + if cyc >= 1000: + pattern = "_%0.4d.nc" % cyc + return obj.index["file"].str.contains(pattern, regex=False, case=True) + + +@lru_cache(maxsize=25_000) +def compute_wmo_cyc(wmo: int, obj, cyc=None): + filt = [] + for c in cyc: + filt.append(compute_cyc(c, obj)) + return np.logical_and.reduce([compute_wmo(wmo, obj), np.logical_or.reduce(filt)]) + + +@lru_cache(maxsize=1_000) +def compute_params(param: str, obj): + return obj.index["variables"].apply(lambda x: param in x) + + +def pmap(obj, mapper: Callable, a_list: Iterable, kw: dict[Any] = {}) -> list[Any]: + """A method to execute some computation with multithreading""" + results: list[Any] = [] + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = {executor.submit(mapper, item, obj, **kw): item for item in a_list} + for future in concurrent.futures.as_completed(futures): + results.append(future.result()) + + return results + + @register_ArgoIndex_accessor("query", indexstore) class SearchEngine(ArgoIndexSearchEngine): @@ -29,19 +69,13 @@ def checker(WMOs): def namer(WMOs): return {"WMO": WMOs} - def composer(WMOs): - filt = [] - for wmo in WMOs: - filt.append( - self._obj.index["file"].str.contains( - "/%i/" % wmo, regex=True, case=False - ) - ) - return self._obj._reduce_a_filter_list(filt, op="or") + def composer(obj, WMOs): + filt = pmap(obj, compute_wmo, WMOs) + return obj._reduce_a_filter_list(filt, op="or") WMOs = checker(WMOs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(WMOs) + search_filter = composer(self._obj, WMOs) if not composed: self._obj.search_type = namer(WMOs) self._obj.search_filter = search_filter @@ -68,23 +102,13 @@ def checker(CYCs): def namer(CYCs): return {"CYC": CYCs} - def composer(CYCs): - filt = [] - for cyc in CYCs: - if cyc < 1000: - pattern = "_%0.3d.nc" % (cyc) - else: - pattern = "_%0.4d.nc" % (cyc) - filt.append( - self._obj.index["file"].str.contains( - pattern, regex=True, case=False - ) - ) - return self._obj._reduce_a_filter_list(filt, op="or") + def composer(obj, CYCs): + filt = pmap(obj, compute_cyc, CYCs) + return obj._reduce_a_filter_list(filt) CYCs = checker(CYCs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(CYCs) + search_filter = composer(self._obj, CYCs) if not composed: self._obj.search_type = namer(CYCs) self._obj.search_filter = search_filter @@ -115,24 +139,13 @@ def checker(WMOs, CYCs): def namer(WMOs, CYCs): return {"WMO": WMOs, "CYC": CYCs} - def composer(WMOs, CYCs): - filt = [] - for wmo in WMOs: - for cyc in CYCs: - if cyc < 1000: - pattern = "%i_%0.3d.nc" % (wmo, cyc) - else: - pattern = "%i_%0.4d.nc" % (wmo, cyc) - filt.append( - self._obj.index["file"].str.contains( - pattern, regex=True, case=False - ) - ) - return self._obj._reduce_a_filter_list(filt, op="or") + def composer(obj, WMOs, CYCs): + filt = pmap(obj, compute_wmo_cyc, WMOs, kw={"cyc": tuple(CYCs)}) + return obj._reduce_a_filter_list(filt) WMOs, CYCs = checker(WMOs, CYCs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(WMOs, CYCs) + search_filter = composer(self._obj, WMOs, CYCs) if not composed: self._obj.search_type = namer(WMOs, CYCs) self._obj.search_filter = search_filter @@ -338,19 +351,17 @@ def checker(PARAMs): def namer(PARAMs, logical): return {"PARAMS": (PARAMs, logical)} - def composer(PARAMs, logical): - filt = [] - self._obj.index["variables"] = self._obj.index["parameters"].apply( + def composer(obj, PARAMs, logical): + obj.index["variables"] = obj.index["parameters"].apply( lambda x: x.split() ) - for param in PARAMs: - filt.append(self._obj.index["variables"].apply(lambda x: param in x)) - self._obj.index = self._obj.index.drop("variables", axis=1) - return self._obj._reduce_a_filter_list(filt, op=logical) + filt = pmap(obj, compute_params, PARAMs) + obj.index = obj.index.drop("variables", axis=1) + return obj._reduce_a_filter_list(filt, op=logical) PARAMs = checker(PARAMs) self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(PARAMs, logical) + search_filter = composer(self._obj, PARAMs, logical) if not composed: self._obj.search_type = namer(PARAMs, logical) self._obj.search_filter = search_filter @@ -482,15 +493,21 @@ def composer(profiler_type): def institution_code(self, institution_code: List[str], nrows=None, composed=False): def checker(institution_code): if "institution" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for institution codes in this index)") - log.debug("Argo index searching for institution code in %s ..." % institution_code) + raise InvalidDatasetStructure( + "Cannot search for institution codes in this index)" + ) + log.debug( + "Argo index searching for institution code in %s ..." % institution_code + ) institution_code = to_list(institution_code) valid_codes = [] for code in institution_code: - if self._obj.valid('institution_code', code): + if self._obj.valid("institution_code", code): valid_codes.append(code.upper()) if len(valid_codes) == 0: - raise OptionValueError(f"No valid codes found for institution in {institution_code}. Valid codes are: {self._obj.valid.institution_code}") + raise OptionValueError( + f"No valid codes found for institution in {institution_code}. Valid codes are: {self._obj.valid.institution_code}" + ) else: return valid_codes @@ -498,11 +515,7 @@ def namer(institution_code): return {"INST_CODE": institution_code} def composer(institution_code): - return ( - self._obj.index["institution"] - .fillna("") - .isin(institution_code) - ) + return self._obj.index["institution"].fillna("").isin(institution_code) institution_code = checker(institution_code) self._obj.load(nrows=self._obj._nrows_index) From d79194519bff68ce16fcffe8276e0ece1b174f16 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Wed, 4 Feb 2026 15:33:45 +0100 Subject: [PATCH 5/7] Update search_engine.py --- .../stores/index/implementations/pyarrow/search_engine.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/argopy/stores/index/implementations/pyarrow/search_engine.py b/argopy/stores/index/implementations/pyarrow/search_engine.py index 1d43fd3fd..a3ed6ff98 100644 --- a/argopy/stores/index/implementations/pyarrow/search_engine.py +++ b/argopy/stores/index/implementations/pyarrow/search_engine.py @@ -14,8 +14,8 @@ except ModuleNotFoundError: pass -from .....options import OPTIONS -from .....errors import InvalidDatasetStructure, OptionValueError +from argopy.options import OPTIONS +from argopy.errors import InvalidDatasetStructure, OptionValueError from .....utils import is_indexbox, check_wmo, check_cyc, to_list, conv_lon from ...extensions import register_ArgoIndex_accessor, ArgoIndexSearchEngine from ..index_s3 import search_s3 @@ -54,7 +54,7 @@ def compute_params(param: str, obj): def pmap(obj, mapper: Callable, a_list: Iterable, kw: dict[Any] = {}) -> list[Any]: - """A method to execute some compute in multithreading""" + """A method to execute some computation with multithreading""" results: list[Any] = [] with concurrent.futures.ThreadPoolExecutor() as executor: @@ -83,7 +83,7 @@ def namer(WMOs): def composer(obj, WMOs): filt = pmap(obj, compute_wmo, WMOs) - return obj._reduce_a_filter_list(filt) + return obj._reduce_a_filter_list(filt, op="or") WMOs = checker(WMOs) self._obj.load(nrows=self._obj._nrows_index) From 6a9890ec95180848eb22fe4e19014b0970cb6ff3 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Wed, 4 Feb 2026 16:13:39 +0100 Subject: [PATCH 6/7] refactor pmap to utilities, absolute import in search_engine, long WMO list warning for pandas --- .../implementations/pandas/search_engine.py | 33 +++++++++---------- .../implementations/pyarrow/search_engine.py | 29 +++++++--------- argopy/utils/monitored_threadpool.py | 14 +++++++- 3 files changed, 39 insertions(+), 37 deletions(-) diff --git a/argopy/stores/index/implementations/pandas/search_engine.py b/argopy/stores/index/implementations/pandas/search_engine.py index 6ae21adb0..172483bc2 100644 --- a/argopy/stores/index/implementations/pandas/search_engine.py +++ b/argopy/stores/index/implementations/pandas/search_engine.py @@ -1,16 +1,19 @@ +import warnings import logging import pandas as pd import numpy as np -from typing import List, Any, Callable, Iterable -import concurrent.futures +from typing import List from functools import lru_cache from argopy.options import OPTIONS from argopy.errors import InvalidDatasetStructure, OptionValueError -from .....utils import is_indexbox, check_wmo, check_cyc, to_list, conv_lon -from ...extensions import register_ArgoIndex_accessor, ArgoIndexSearchEngine -from ..index_s3 import search_s3 -from .index import indexstore +from argopy.utils.monitored_threadpool import pmap +from argopy.utils.checkers import is_indexbox, check_wmo, check_cyc +from argopy.utils.casting import to_list +from argopy.utils.geo import conv_lon +from argopy.stores.index.extensions import register_ArgoIndex_accessor, ArgoIndexSearchEngine +from argopy.stores.index.implementations.index_s3 import search_s3 +from argopy.stores.index.implementations.pandas.index import indexstore log = logging.getLogger("argopy.stores.index.pd") @@ -41,18 +44,6 @@ def compute_params(param: str, obj): return obj.index["variables"].apply(lambda x: param in x) -def pmap(obj, mapper: Callable, a_list: Iterable, kw: dict[Any] = {}) -> list[Any]: - """A method to execute some computation with multithreading""" - results: list[Any] = [] - - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {executor.submit(mapper, item, obj, **kw): item for item in a_list} - for future in concurrent.futures.as_completed(futures): - results.append(future.result()) - - return results - - @register_ArgoIndex_accessor("query", indexstore) class SearchEngine(ArgoIndexSearchEngine): @@ -64,6 +55,8 @@ def checker(WMOs): "Argo index searching for WMOs=[%s] ..." % ";".join([str(wmo) for wmo in WMOs]) ) + if len(WMOs) > 30: + warnings.warn("Searching a large amount of Argo floats with the Pandas backend is quite slow. We strongly recommend to install Pyarrow to improve performances ! Pyarrow is about 10 times faster than Pandas for this use-case.") return WMOs def namer(WMOs): @@ -97,6 +90,8 @@ def checker(CYCs): "Argo index searching for CYCs=[%s] ..." % (";".join([str(cyc) for cyc in CYCs])) ) + if len(CYCs) > 50: + warnings.warn("Searching a large amount of Argo float cycles with the Pandas backend is quite slow. We strongly recommend to install Pyarrow to improve performances ! Pyarrow is about 10 times faster than Pandas for this use-case.") return CYCs def namer(CYCs): @@ -134,6 +129,8 @@ def checker(WMOs, CYCs): ";".join([str(cyc) for cyc in CYCs]), ) ) + if len(WMOs) > 30 or len(CYCs) > 50: + warnings.warn("Searching a large amount of Argo float cycles with the Pandas backend is quite slow. We strongly recommend to install Pyarrow to improve performances ! Pyarrow is about 10 times faster than Pandas for this use-case.") return WMOs, CYCs def namer(WMOs, CYCs): diff --git a/argopy/stores/index/implementations/pyarrow/search_engine.py b/argopy/stores/index/implementations/pyarrow/search_engine.py index a3ed6ff98..b3bed815f 100644 --- a/argopy/stores/index/implementations/pyarrow/search_engine.py +++ b/argopy/stores/index/implementations/pyarrow/search_engine.py @@ -1,8 +1,7 @@ import logging import pandas as pd import numpy as np -from typing import List, Any, Callable, Iterable -import concurrent.futures +from typing import List from functools import lru_cache @@ -16,10 +15,16 @@ from argopy.options import OPTIONS from argopy.errors import InvalidDatasetStructure, OptionValueError -from .....utils import is_indexbox, check_wmo, check_cyc, to_list, conv_lon -from ...extensions import register_ArgoIndex_accessor, ArgoIndexSearchEngine -from ..index_s3 import search_s3 -from .index import indexstore +from argopy.utils.monitored_threadpool import pmap +from argopy.utils.checkers import is_indexbox, check_wmo, check_cyc +from argopy.utils.casting import to_list +from argopy.utils.geo import conv_lon +from argopy.stores.index.extensions import ( + register_ArgoIndex_accessor, + ArgoIndexSearchEngine, +) +from argopy.stores.index.implementations.index_s3 import search_s3 +from argopy.stores.index.implementations.pyarrow.index import indexstore log = logging.getLogger("argopy.stores.index.pa") @@ -53,18 +58,6 @@ def compute_params(param: str, obj): ) -def pmap(obj, mapper: Callable, a_list: Iterable, kw: dict[Any] = {}) -> list[Any]: - """A method to execute some computation with multithreading""" - results: list[Any] = [] - - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {executor.submit(mapper, item, obj, **kw): item for item in a_list} - for future in concurrent.futures.as_completed(futures): - results.append(future.result()) - - return results - - @register_ArgoIndex_accessor("query", indexstore) class SearchEngine(ArgoIndexSearchEngine): diff --git a/argopy/utils/monitored_threadpool.py b/argopy/utils/monitored_threadpool.py index 66e0a9064..5814710fe 100644 --- a/argopy/utils/monitored_threadpool.py +++ b/argopy/utils/monitored_threadpool.py @@ -12,7 +12,7 @@ from concurrent.futures import as_completed from threading import Lock import logging -from typing import Union +from typing import Union, Callable, Iterable, Any from abc import ABC, abstractmethod import importlib @@ -589,3 +589,15 @@ def my_final(obj_list, opt=True): """ pass + + +def pmap(obj, mapper: Callable, a_list: Iterable, kw: dict[Any] = {}) -> list[Any]: + """A method to execute some computation with multithreading""" + results: list[Any] = [] + + with ThreadPoolExecutor() as executor: + futures = {executor.submit(mapper, item, obj, **kw): item for item in a_list} + for future in as_completed(futures): + results.append(future.result()) + + return results \ No newline at end of file From 5ae4bb3c3f8275eb5998a86b4bdd3e6fe6400c49 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Wed, 4 Feb 2026 16:13:49 +0100 Subject: [PATCH 7/7] Update test_stores_index.py --- argopy/tests/test_stores_index.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/argopy/tests/test_stores_index.py b/argopy/tests/test_stores_index.py index d6c083fa0..281970b25 100644 --- a/argopy/tests/test_stores_index.py +++ b/argopy/tests/test_stores_index.py @@ -279,7 +279,7 @@ def a_store(self, request): if not HAS_S3FS and 's3' in fetcher_args['host']: xfail, reason = True, 's3fs not available' elif 's3' in fetcher_args['host']: - xfail, reason = True, 's3 is experimental' + xfail, reason = 0, 's3 is experimental (store)' yield self.create_store(fetcher_args, xfail=xfail, reason=reason).load(nrows=N_RECORDS) @pytest.fixture @@ -298,7 +298,7 @@ def a_search(self, request): if not HAS_S3FS and 's3' in host: xfail, reason = True, 's3fs not available' elif 's3' in host: - xfail, reason = True, 's3 is experimental' + xfail, reason = 0, 's3 is experimental (search)' yield run_a_search(self.new_idx, {"host": host, "cache": True}, srch, xfail=xfail, reason=reason)