Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion argopy/stores/index/implementations/pandas/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import gzip
from pathlib import Path
from typing import List
import concurrent.futures

from argopy.options import OPTIONS
from argopy.errors import DataNotFound, InvalidDatasetStructure
Expand Down Expand Up @@ -355,7 +356,7 @@ def read_files(self, index : bool = False, multi : bool = False) -> List[str]:
else:
return mono2multi(flist, convention=self.convention, sep=sep)

def records_per_wmo(self, index=False):
def records_per_wmo_legacy(self, index=False):
"""Return the number of records per unique WMOs in search results

Fall back on full index if search not triggered
Expand All @@ -379,6 +380,47 @@ def records_per_wmo(self, index=False):
count[wmo] = self.index[search_filter].shape[0]
return count

def records_per_wmo(self, index=False):
"""Return the number of records per unique WMOs in search results

Fall back on full index if search not triggered

Returns
-------
dict

Notes
-----
Computation is parallelized over all WMOs with a ThreadPoolExecutor
"""

def count_wmo(self, wmo: int, index: bool):
if hasattr(self, "search") and not index:
search_filter = self.search["file"].str.contains(
"/%i/" % wmo, regex=True, case=False
)
return wmo, self.search[search_filter].shape[0]
else:
search_filter = self.index["file"].str.contains(
"/%i/" % wmo, regex=True, case=False
)
return wmo, self.index[search_filter].shape[0]

ulist = self.read_wmo()
count = {}
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit tasks for each WMO
futures = {
executor.submit(count_wmo, self, wmo, index): wmo for wmo in ulist
}

# Process results as they complete
for future in concurrent.futures.as_completed(futures):
wmo, cnt = future.result()
count[wmo] = cnt

return count

def to_indexfile(self, outputfile):
"""Save search results on file, following the Argo standard index formats

Expand Down
136 changes: 73 additions & 63 deletions argopy/stores/index/implementations/pandas/search_engine.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,49 @@
import warnings
import logging
import pandas as pd
import numpy as np
from typing import List

from .....options import OPTIONS
from .....errors import InvalidDatasetStructure, OptionValueError
from .....utils import is_indexbox, check_wmo, check_cyc, to_list, conv_lon
from ...extensions import register_ArgoIndex_accessor, ArgoIndexSearchEngine
from ..index_s3 import search_s3
from .index import indexstore
from functools import lru_cache

from argopy.options import OPTIONS
from argopy.errors import InvalidDatasetStructure, OptionValueError
from argopy.utils.monitored_threadpool import pmap
from argopy.utils.checkers import is_indexbox, check_wmo, check_cyc
from argopy.utils.casting import to_list
from argopy.utils.geo import conv_lon
from argopy.stores.index.extensions import register_ArgoIndex_accessor, ArgoIndexSearchEngine
from argopy.stores.index.implementations.index_s3 import search_s3
from argopy.stores.index.implementations.pandas.index import indexstore

log = logging.getLogger("argopy.stores.index.pd")


@lru_cache(maxsize=25_000)
def compute_wmo(wmo: int, obj):
return obj.index["file"].str.contains("/%i/" % wmo, regex=False, case=True)


@lru_cache(maxsize=25_000)
def compute_cyc(cyc: int, obj):
pattern = "_%0.3d.nc" % cyc
if cyc >= 1000:
pattern = "_%0.4d.nc" % cyc
return obj.index["file"].str.contains(pattern, regex=False, case=True)


@lru_cache(maxsize=25_000)
def compute_wmo_cyc(wmo: int, obj, cyc=None):
filt = []
for c in cyc:
filt.append(compute_cyc(c, obj))
return np.logical_and.reduce([compute_wmo(wmo, obj), np.logical_or.reduce(filt)])


@lru_cache(maxsize=1_000)
def compute_params(param: str, obj):
return obj.index["variables"].apply(lambda x: param in x)


@register_ArgoIndex_accessor("query", indexstore)
class SearchEngine(ArgoIndexSearchEngine):

Expand All @@ -24,24 +55,20 @@ def checker(WMOs):
"Argo index searching for WMOs=[%s] ..."
% ";".join([str(wmo) for wmo in WMOs])
)
if len(WMOs) > 30:
warnings.warn("Searching a large amount of Argo floats with the Pandas backend is quite slow. We strongly recommend to install Pyarrow to improve performances ! Pyarrow is about 10 times faster than Pandas for this use-case.")
return WMOs

def namer(WMOs):
return {"WMO": WMOs}

def composer(WMOs):
filt = []
for wmo in WMOs:
filt.append(
self._obj.index["file"].str.contains(
"/%i/" % wmo, regex=True, case=False
)
)
return self._obj._reduce_a_filter_list(filt, op="or")
def composer(obj, WMOs):
filt = pmap(obj, compute_wmo, WMOs)
return obj._reduce_a_filter_list(filt, op="or")

WMOs = checker(WMOs)
self._obj.load(nrows=self._obj._nrows_index)
search_filter = composer(WMOs)
search_filter = composer(self._obj, WMOs)
if not composed:
self._obj.search_type = namer(WMOs)
self._obj.search_filter = search_filter
Expand All @@ -63,28 +90,20 @@ def checker(CYCs):
"Argo index searching for CYCs=[%s] ..."
% (";".join([str(cyc) for cyc in CYCs]))
)
if len(CYCs) > 50:
warnings.warn("Searching a large amount of Argo float cycles with the Pandas backend is quite slow. We strongly recommend to install Pyarrow to improve performances ! Pyarrow is about 10 times faster than Pandas for this use-case.")
return CYCs

def namer(CYCs):
return {"CYC": CYCs}

def composer(CYCs):
filt = []
for cyc in CYCs:
if cyc < 1000:
pattern = "_%0.3d.nc" % (cyc)
else:
pattern = "_%0.4d.nc" % (cyc)
filt.append(
self._obj.index["file"].str.contains(
pattern, regex=True, case=False
)
)
return self._obj._reduce_a_filter_list(filt, op="or")
def composer(obj, CYCs):
filt = pmap(obj, compute_cyc, CYCs)
return obj._reduce_a_filter_list(filt)

CYCs = checker(CYCs)
self._obj.load(nrows=self._obj._nrows_index)
search_filter = composer(CYCs)
search_filter = composer(self._obj, CYCs)
if not composed:
self._obj.search_type = namer(CYCs)
self._obj.search_filter = search_filter
Expand All @@ -110,29 +129,20 @@ def checker(WMOs, CYCs):
";".join([str(cyc) for cyc in CYCs]),
)
)
if len(WMOs) > 30 or len(CYCs) > 50:
warnings.warn("Searching a large amount of Argo float cycles with the Pandas backend is quite slow. We strongly recommend to install Pyarrow to improve performances ! Pyarrow is about 10 times faster than Pandas for this use-case.")
return WMOs, CYCs

def namer(WMOs, CYCs):
return {"WMO": WMOs, "CYC": CYCs}

def composer(WMOs, CYCs):
filt = []
for wmo in WMOs:
for cyc in CYCs:
if cyc < 1000:
pattern = "%i_%0.3d.nc" % (wmo, cyc)
else:
pattern = "%i_%0.4d.nc" % (wmo, cyc)
filt.append(
self._obj.index["file"].str.contains(
pattern, regex=True, case=False
)
)
return self._obj._reduce_a_filter_list(filt, op="or")
def composer(obj, WMOs, CYCs):
filt = pmap(obj, compute_wmo_cyc, WMOs, kw={"cyc": tuple(CYCs)})
return obj._reduce_a_filter_list(filt)

WMOs, CYCs = checker(WMOs, CYCs)
self._obj.load(nrows=self._obj._nrows_index)
search_filter = composer(WMOs, CYCs)
search_filter = composer(self._obj, WMOs, CYCs)
if not composed:
self._obj.search_type = namer(WMOs, CYCs)
self._obj.search_filter = search_filter
Expand Down Expand Up @@ -338,19 +348,17 @@ def checker(PARAMs):
def namer(PARAMs, logical):
return {"PARAMS": (PARAMs, logical)}

def composer(PARAMs, logical):
filt = []
self._obj.index["variables"] = self._obj.index["parameters"].apply(
def composer(obj, PARAMs, logical):
obj.index["variables"] = obj.index["parameters"].apply(
lambda x: x.split()
)
for param in PARAMs:
filt.append(self._obj.index["variables"].apply(lambda x: param in x))
self._obj.index = self._obj.index.drop("variables", axis=1)
return self._obj._reduce_a_filter_list(filt, op=logical)
filt = pmap(obj, compute_params, PARAMs)
obj.index = obj.index.drop("variables", axis=1)
return obj._reduce_a_filter_list(filt, op=logical)

PARAMs = checker(PARAMs)
self._obj.load(nrows=self._obj._nrows_index)
search_filter = composer(PARAMs, logical)
search_filter = composer(self._obj, PARAMs, logical)
if not composed:
self._obj.search_type = namer(PARAMs, logical)
self._obj.search_filter = search_filter
Expand Down Expand Up @@ -482,27 +490,29 @@ def composer(profiler_type):
def institution_code(self, institution_code: List[str], nrows=None, composed=False):
def checker(institution_code):
if "institution" not in self._obj.convention_columns:
raise InvalidDatasetStructure("Cannot search for institution codes in this index)")
log.debug("Argo index searching for institution code in %s ..." % institution_code)
raise InvalidDatasetStructure(
"Cannot search for institution codes in this index)"
)
log.debug(
"Argo index searching for institution code in %s ..." % institution_code
)
institution_code = to_list(institution_code)
valid_codes = []
for code in institution_code:
if self._obj.valid('institution_code', code):
if self._obj.valid("institution_code", code):
valid_codes.append(code.upper())
if len(valid_codes) == 0:
raise OptionValueError(f"No valid codes found for institution in {institution_code}. Valid codes are: {self._obj.valid.institution_code}")
raise OptionValueError(
f"No valid codes found for institution in {institution_code}. Valid codes are: {self._obj.valid.institution_code}"
)
else:
return valid_codes

def namer(institution_code):
return {"INST_CODE": institution_code}

def composer(institution_code):
return (
self._obj.index["institution"]
.fillna("")
.isin(institution_code)
)
return self._obj.index["institution"].fillna("").isin(institution_code)

institution_code = checker(institution_code)
self._obj.load(nrows=self._obj._nrows_index)
Expand Down
44 changes: 42 additions & 2 deletions argopy/stores/index/implementations/pyarrow/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from packaging import version
from pathlib import Path
from typing import List
import concurrent.futures

try:
import pyarrow.csv as csv # noqa: F401
Expand Down Expand Up @@ -415,7 +416,7 @@ def xmax(xtble):
tmax(self.index["date"]),
]

def read_files(self, index : bool = False, multi : bool = False) -> List[str]:
def read_files(self, index: bool = False, multi: bool = False) -> List[str]:
sep = self.fs["src"].fs.sep
if hasattr(self, "search") and not index:
flist = [
Expand All @@ -432,7 +433,7 @@ def read_files(self, index : bool = False, multi : bool = False) -> List[str]:
else:
return mono2multi(flist, convention=self.convention, sep=sep)

def records_per_wmo(self, index=False):
def records_per_wmo_legacy(self, index=False):
"""Return the number of records per unique WMOs in search results

Fall back on full index if search not triggered
Expand All @@ -454,6 +455,45 @@ def records_per_wmo(self, index=False):
count[wmo] = self.index.filter(search_filter).shape[0]
return count

def records_per_wmo(self, index=False):
"""Return the number of records per unique WMOs in search results

Fall back on full index if search not triggered

Notes
-----
Computation is parallelized over all WMOs with a ThreadPoolExecutor
"""

def count_wmo(self, wmo: int, index: bool):
if hasattr(self, "search") and not index:
search_filter = pa.compute.match_substring_regex(
self.search["file"], pattern="/%i/" % wmo
)
return wmo, self.search.filter(search_filter).shape[0]
else:
if not hasattr(self, "index"):
self.load(nrows=self._nrows_index)
search_filter = pa.compute.match_substring_regex(
self.index["file"], pattern="/%i/" % wmo
)
return wmo, self.index.filter(search_filter).shape[0]

ulist = self.read_wmo()
count = {}
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit tasks for each WMO
futures = {
executor.submit(count_wmo, self, wmo, index): wmo for wmo in ulist
}

# Process results as they complete
for future in concurrent.futures.as_completed(futures):
wmo, cnt = future.result()
count[wmo] = cnt

return count

def to_indexfile(self, file):
"""Save search results on file, following the Argo standard index formats

Expand Down
Loading
Loading