Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions tests/plugins/xcube/processors/test_mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
from xrlint.plugins.xcube.util import LevelInfo, LevelsMeta
from xrlint.result import Message

# TODO: This tests requires zarr >=2, <3, because the test used fsspec's
# memory filesystem, which is not async but zarr wants all filesystems
# to be async now.


class MultiLevelDatasetProcessorTest(TestCase):
levels_name = "xrlint-test"
Expand Down
36 changes: 36 additions & 0 deletions tests/plugins/xcube/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright © 2025 Brockmann Consult GmbH.
# This software is distributed under the terms and conditions of the
# MIT license (https://mit-license.org/).

from unittest import TestCase

from xrlint.plugins.xcube.util import is_absolute_path
from xrlint.plugins.xcube.util import resolve_path


class UtilTest(TestCase):
def test_is_absolute_path(self):
self.assertTrue(is_absolute_path("/home/forman"))
self.assertTrue(is_absolute_path("//bcserver2/fs1"))
self.assertTrue(is_absolute_path("file://home/forman"))
self.assertTrue(is_absolute_path("s3://xcube-data"))
self.assertTrue(is_absolute_path(r"C:\Users\Norman"))
self.assertTrue(is_absolute_path(r"C:/Users/Norman"))
self.assertTrue(is_absolute_path(r"C:/Users/Norman"))
self.assertTrue(is_absolute_path(r"\\bcserver2\fs1"))

self.assertFalse(is_absolute_path(r"data"))
self.assertFalse(is_absolute_path(r"./data"))
self.assertFalse(is_absolute_path(r"../data"))

def test_resolve_path(self):
self.assertEqual(
"/home/forman/data", resolve_path("data", root_path="/home/forman")
)
self.assertEqual(
"/home/forman/data", resolve_path("./data", root_path="/home/forman")
)
self.assertEqual(
"/home/data", resolve_path("../data", root_path="/home/forman")
)
self.assertEqual("s3://opensr/test.zarr", resolve_path("s3://opensr/test.zarr"))
12 changes: 5 additions & 7 deletions xrlint/_linter/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,11 @@ def _open_and_validate_dataset(
except (OSError, ValueError, TypeError) as e:
return [new_fatal_message(str(e))]
access_latency = time.time() - t0
return processor_op.postprocess(
[
_validate_dataset(config_obj, ds, path, i, access_latency)
for i, (ds, path) in enumerate(ds_path_list)
],
file_path,
)
messages = [
_validate_dataset(config_obj, ds, path, i, access_latency)
for i, (ds, path) in enumerate(ds_path_list)
]
return processor_op.postprocess(messages, file_path)
else:
try:
dataset, access_latency = _open_dataset(
Expand Down
48 changes: 30 additions & 18 deletions xrlint/plugins/xcube/processors/mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@

from xrlint.plugins.xcube.constants import ML_FILE_PATTERN, ML_META_FILENAME
from xrlint.plugins.xcube.plugin import plugin
from xrlint.plugins.xcube.util import LevelsMeta, attach_dataset_level_infos, norm_path
from xrlint.plugins.xcube.util import (
LevelsMeta,
attach_dataset_level_infos,
resolve_path,
)
from xrlint.processor import ProcessorOp
from xrlint.result import Message

level_pattern = re.compile(r"^(\d+)(?:\.zarr)?$")
link_pattern = re.compile(r"^(\d+)(?:\.link)?$")


@plugin.define_processor("multi-level-dataset")
Expand All @@ -25,7 +30,7 @@ class MultiLevelDatasetProcessor(ProcessorOp):

def preprocess(
self, file_path: str, opener_options: dict[str, Any]
) -> list[tuple[xr.Dataset, str]]:
) -> list[tuple[xr.Dataset | xr.DataTree, str]]:
fs, fs_path = get_filesystem(file_path, opener_options)

file_names = [
Expand All @@ -40,18 +45,17 @@ def preprocess(
with fs.open(f"{fs_path}/{ML_META_FILENAME}") as stream:
meta = LevelsMeta.from_value(json.load(stream))

# check for optional ".0.link" that locates level 0 somewhere else
level_0_path = None
if "0.link" in file_names:
level_0_path = fs.read_text(f"{fs_path}/0.link")
# check for optional ".zgroup"
# if ".zgroup" in file_names:
# with fs.open(f"{fs_path}/.zgroup") as stream:
# group_props = json.load(stream)

level_names, num_levels = parse_levels(file_names, level_0_path)
level_paths, num_levels = parse_levels(fs, file_path, file_names)

engine = opener_options.pop("engine", "zarr")

level_datasets: list[xr.Dataset | None] = []
for level, level_name in level_names.items():
level_path = norm_path(f"{file_path}/{level_name}")
for level, level_path in level_paths.items():
level_dataset = xr.open_dataset(level_path, engine=engine, **opener_options)
level_datasets.append((level_dataset, level_path))

Expand Down Expand Up @@ -80,22 +84,30 @@ def get_filesystem(file_path: str, opener_options: dict[str, Any]):


def parse_levels(
file_names: list[str], level_0_path: str | None
fs: fsspec.AbstractFileSystem, dataset_path: str, file_names: list[str]
) -> tuple[dict[int, str], int]:
level_names: dict[int, str] = {0: level_0_path} if level_0_path else {}
num_levels = 0
level_paths: dict[int, str] = {}
for file_name in file_names:
# check for optional "<level>.link" that locates a level somewhere else
m = link_pattern.match(file_name)
if m is not None:
level = int(m.group(1))
link_path = fs.read_text(f"{dataset_path}/{file_name}")
level_paths[level] = resolve_path(link_path, root_path=dataset_path)
# check for regular "<level>.zarr"
m = level_pattern.match(file_name)
if m is not None:
level = int(m.group(1))
level_names[level] = file_name
num_levels = max(num_levels, level + 1)
if not level_names:
level_paths[level] = f"{dataset_path}/{file_name}"

if not level_paths:
raise ValueError("empty multi-level dataset")
num_levels = max(level_names.keys()) + 1

num_levels = max(level_paths.keys()) + 1
for level in range(num_levels):
if level not in level_names:
if level not in level_paths:
raise ValueError(
f"missing dataset for level {level} in multi-level dataset"
)
return level_names, num_levels

return level_paths, num_levels
23 changes: 19 additions & 4 deletions xrlint/plugins/xcube/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,26 @@ def get_spatial_size(
return None


def norm_path(level_path: str) -> str:
parts = level_path.replace("\\", "/").split("/")
level_path = "/".join(
def resolve_path(path: str, root_path: str | None = None) -> str:
abs_level_path = path
if root_path is not None and not is_absolute_path(path):
abs_level_path = f"{root_path}/{path}"
parts = abs_level_path.rstrip("/").replace("\\", "/").split("/")
return "/".join(
p
for i, p in enumerate(parts)
if p not in (".", "..") and (i == len(parts) - 1 or parts[i + 1] != "..")
)
return level_path


def is_absolute_path(path: str) -> bool:
return (
# Unix abs path
path.startswith("/")
# URL
or "://" in path
# Windows abs paths
or path.startswith("\\\\")
or path.find(":\\", 1) == 1
or path.find(":/", 1) == 1
)
4 changes: 2 additions & 2 deletions xrlint/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ProcessorOp(ABC):
@abstractmethod
def preprocess(
self, file_path: str, opener_options: dict[str, Any]
) -> list[tuple[xr.Dataset, str]]:
) -> list[tuple[xr.Dataset | xr.DataTree, str]]:
"""Pre-process a dataset given by its `file_path` and `opener_options`.
In this method you use the `file_path` to read zero, one, or more
datasets to lint.
Expand All @@ -28,7 +28,7 @@ def preprocess(
opener_options: The configuration's `opener_options`.

Returns:
A list of (dataset, file_path) pairs
A list of (dataset or datatree, file_path) pairs
"""

@abstractmethod
Expand Down