diff --git a/.gitignore b/.gitignore index fe1919ae..88c4d865 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,6 @@ __pycache__/ Pipfile venv .venv + +# Ontologies +*.jsonld diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 1d60bda7..27c449a0 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -72,6 +72,7 @@ Submodules :recursive: model + ontology testing typing diff --git a/src/scitacean/_dataset_fields.py b/src/scitacean/_dataset_fields.py index 2e8d6503..5579c270 100644 --- a/src/scitacean/_dataset_fields.py +++ b/src/scitacean/_dataset_fields.py @@ -11,6 +11,7 @@ from __future__ import annotations +from collections.abc import Iterable from dataclasses import dataclass from datetime import UTC, datetime from typing import Any, ClassVar, Literal, TypeVar @@ -29,6 +30,7 @@ Technique, construct, ) +from .ontology import find_technique from .pid import PID M = TypeVar("M", bound=BaseModel) @@ -54,6 +56,12 @@ def _parse_remote_path(path: str | RemotePath | None) -> RemotePath | None: return RemotePath(path) +def _parse_techniques(arg: Iterable[str | Technique] | None) -> list[Technique] | None: + if arg is None: + return None + return [t if isinstance(t, Technique) else find_technique(t) for t in arg] + + def _validate_checksum_algorithm(algorithm: str | None) -> str | None: if algorithm is None: return algorithm @@ -615,7 +623,7 @@ def __init__( source_folder: RemotePath | str | None = None, source_folder_host: str | None = None, start_time: datetime | None = None, - techniques: list[Technique] | None = None, + techniques: Iterable[str | Technique] | None = None, used_software: list[str] | None = None, validation_status: str | None = None, meta: dict[str, Any] | None = None, @@ -656,7 +664,7 @@ def __init__( self._source_folder = _parse_remote_path(source_folder) self._source_folder_host = source_folder_host self._start_time = start_time - self._techniques = techniques + self._techniques = _parse_techniques(techniques) self._used_software = used_software self._validation_status = validation_status self._api_version = None @@ -1033,9 +1041,9 @@ def techniques(self) -> list[Technique] | None: return self._techniques @techniques.setter - def techniques(self, techniques: list[Technique] | None) -> None: + def techniques(self, techniques: Iterable[str | Technique] | None) -> None: """Stores the metadata information for techniques.""" - self._techniques = techniques + self._techniques = _parse_techniques(techniques) @property def updated_at(self) -> datetime | None: diff --git a/src/scitacean/ontology/__init__.py b/src/scitacean/ontology/__init__.py new file mode 100644 index 00000000..d14b618c --- /dev/null +++ b/src/scitacean/ontology/__init__.py @@ -0,0 +1,122 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 SciCat Project (https://github.com/SciCatProject/scitacean) +"""Tools for working with ontologies.""" + +import bz2 +import importlib.resources +import json +import re +from functools import cache + +from ..model import Technique + + +def _load_ontology(name: str) -> object: + """Load an ontology from the package resources. + + Note that the ontology file was generated using a script in + ``tools/ontologies`` in the Scitacean repository. + """ + with ( + importlib.resources.files("scitacean.ontology") + .joinpath(f"{name}.json.bz2") + .open("rb") as raw_f + ): + with bz2.open(raw_f, "rb") as f: + return json.loads(f.read()) + + +@cache +def expands_techniques() -> dict[str, list[str]]: + """Load the ExPaNDS experimental techniques ontology. + + Returns + ------- + : + A dict mapping from technique ids (IRIs) to labels. + The first element of the list is the primary label. + All labels are lowercase and contain no leading or trailing whitespace. + """ + return _load_ontology("expands_techniques") # type: ignore[return-value] + + +def find_technique(label_or_iri: str) -> Technique: + """Construct a Technique model from an ontology label or IRI. + + The argument specifies a technique from the + `ExPaNDS experimental techniques ontology `_. + + Parameters + ---------- + label_or_iri: + One of: + + - Technique *label* from the ExPaNDS ontology. The input is first converted to + lowercase and leading and trailing whitespace is removed. + - Technique *IRI* from the ExPaNDS ontology. Must exactly match an IRI in + the ontology. + + Returns + ------- + : + The loaded technique encoded as: + + .. code-block:: python + + Technique(name=label, pid=iri) + + Raises + ------ + ValueError + If the label or IRI is not found in the ontology. + """ + if _is_iri(label_or_iri): + return _lookup_iri(label_or_iri) + return _lookup_label(label_or_iri) + + +def _lookup_label(label: str) -> Technique: + label = label.strip().lower() + found = [ + (iri, labels[0]) + for iri, labels in expands_techniques().items() + if label in labels + ] + if len(found) == 1: + return Technique(pid=found[0][0], name=found[0][1]) + elif len(found) > 1: + raise ValueError( + f"Found multiple techniques with label '{label}': {[f[0] for f in found]}. " + "Please specify the exact IRI instead or construct a Technique model " + "manually.\n" + "See the ExPaNDS experimental technique ontology for allowed labels at " + "https://expands-eu.github.io/ExPaNDS-experimental-techniques-ontology/index-en.html" + ) + # else: len(found) == 0 + raise ValueError( + f"Unknown technique label: '{label}'\n" + "See the ExPaNDS experimental technique ontology for allowed labels at " + "https://expands-eu.github.io/ExPaNDS-experimental-techniques-ontology/index-en.html" + ) + + +def _lookup_iri(iri: str) -> Technique: + try: + label = expands_techniques()[iri][0] + except KeyError: + raise ValueError( + f"Unknown technique IRI: '{iri}'\n" + "See the ExPaNDS experimental technique ontology for allowed labels at " + "https://expands-eu.github.io/ExPaNDS-experimental-techniques-ontology/index-en.html" + ) from None + return Technique(pid=iri, name=label) + + +_IRI_REGEX = re.compile(r"^https?://purl\.org/pan-science/PaNET/PaNET\d+$") + + +def _is_iri(iri: str) -> bool: + return bool(_IRI_REGEX.match(iri)) + + +__all__ = ["expands_techniques", "find_technique"] diff --git a/src/scitacean/ontology/expands_techniques.json.bz2 b/src/scitacean/ontology/expands_techniques.json.bz2 new file mode 100644 index 00000000..203a9f08 Binary files /dev/null and b/src/scitacean/ontology/expands_techniques.json.bz2 differ diff --git a/tests/dataset_fields_test.py b/tests/dataset_fields_test.py index df3c13e0..a61f44e7 100644 --- a/tests/dataset_fields_test.py +++ b/tests/dataset_fields_test.py @@ -18,6 +18,7 @@ DownloadDataFile, DownloadDataset, DownloadOrigDatablock, + Technique, UploadDerivedDataset, UploadRawDataset, ) @@ -430,12 +431,12 @@ def test_orcid_validation_valid(good_orcid: str) -> None: dset = Dataset( type="raw", name="test ORCID", - contact_email="jan-lukas.wynen@ess.eu", + contact_email="mail.person@sci.uni", creation_location="scitacean/tests", creation_time="2142-04-02T16:44:56", - owner="Jan-Lukas Wynen", + owner="Mustrum Ridcully", owner_group="ess", - principal_investigator="jan-lukas.wynen@ess.eu", + principal_investigator="mail.person@sci.uni", source_folder=RemotePath("/hex/source62"), orcid_of_owner=good_orcid, ) @@ -454,11 +455,11 @@ def test_orcid_validation_valid(good_orcid: str) -> None: def test_orcid_validation_missing_url(bad_orcid: str) -> None: dset = Dataset( type="raw", - contact_email="jan-lukas.wynen@ess.eu", + contact_email="mail.person@sci.uni", creation_time="2142-04-02T16:44:56", - owner="Jan-Lukas Wynen", + owner="Mustrum Ridcully", owner_group="ess", - principal_investigator="jan-lukas.wynen@ess.eu", + principal_investigator="mail.person@sci.uni", source_folder=RemotePath("/hex/source62"), orcid_of_owner=bad_orcid, ) @@ -466,4 +467,52 @@ def test_orcid_validation_missing_url(bad_orcid: str) -> None: dset.make_upload_model() -# TODO technique +def test_technique_set_model() -> None: + technique = Technique(pid="test/technique", name="Test Technique") + dset = Dataset( + type="raw", + contact_email="mail.person@sci.uni", + creation_time="2142-04-02T16:44:56", + owner="Mustrum Ridcully", + owner_group="ess", + principal_investigator="mail.person@sci.uni", + source_folder=RemotePath("/hex/source62"), + techniques=[technique], + ) + assert dset.techniques == [technique] + + +def test_technique_set_label() -> None: + dset = Dataset( + type="raw", + contact_email="mail.person@sci.uni", + creation_time="2142-04-02T16:44:56", + owner="Mustrum Ridcully", + owner_group="ess", + principal_investigator="mail.person@sci.uni", + source_folder=RemotePath("/hex/source62"), + techniques=["neutron powder diffraction"], + ) + expected = Technique( + pid="http://purl.org/pan-science/PaNET/PaNET01100", + name="neutron powder diffraction", + ) + Technique( + name="neutron powder diffraction", + pid="http://purl.org/pan-science/PaNET/PaNET01100", + ) + assert dset.techniques == [expected] + + +def test_technique_set_invalid_label_raises_value_error() -> None: + dset = Dataset( + type="raw", + contact_email="mail.person@sci.uni", + creation_time="2142-04-02T16:44:56", + owner="Mustrum Ridcully", + owner_group="ess", + principal_investigator="mail.person@sci.uni", + source_folder=RemotePath("/hex/source62"), + ) + with pytest.raises(ValueError, match="Unknown technique"): + dset.techniques = ["bad technique"] diff --git a/tests/ontology_test.py b/tests/ontology_test.py new file mode 100644 index 00000000..8b47f304 --- /dev/null +++ b/tests/ontology_test.py @@ -0,0 +1,69 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 SciCat Project (https://github.com/SciCatProject/scitacean) +import pytest + +from scitacean import model, ontology + + +def test_can_load_expands_technique_ontology() -> None: + techniques = ontology.expands_techniques() + assert len(techniques) > 0 + + # Check IRI + assert all(iri.startswith("http") for iri in techniques.keys()) + + # Check labels + assert all( + all(label.islower() for label in labels) for labels in techniques.values() + ) + assert all( + all(label.strip() == label for label in labels) + for labels in techniques.values() + ) + + +def test_can_look_up_technique_by_label() -> None: + technique = ontology.find_technique("small angle neutron scattering") + expected = model.Technique( + pid="http://purl.org/pan-science/PaNET/PaNET01189", + name="small angle neutron scattering", + ) + assert technique == expected + + +def test_can_look_up_technique_by_label_is_case_insensitive() -> None: + technique = ontology.find_technique("Total Scattering") + expected = model.Technique( + pid="http://purl.org/pan-science/PaNET/PaNET01190", + name="total scattering", + ) + assert technique == expected + + +def test_can_look_up_technique_by_alternative_label() -> None: + regular = ontology.find_technique("x-ray single crystal diffraction") + alternative1 = ontology.find_technique("SXRD") + alternative2 = ontology.find_technique("sxrd") + alternative3 = ontology.find_technique("single crystal x-ray diffraction ") + expected = model.Technique( + pid="http://purl.org/pan-science/PaNET/PaNET01102", + name="x-ray single crystal diffraction", + ) + assert regular == expected + assert alternative1 == expected + assert alternative2 == expected + assert alternative3 == expected + + +def test_can_look_up_technique_by_iri() -> None: + technique = ontology.find_technique("http://purl.org/pan-science/PaNET/PaNET01239") + expected = model.Technique( + pid="http://purl.org/pan-science/PaNET/PaNET01239", + name="neutron reflectometry", + ) + assert technique == expected + + +def test_lookup_rejects_ambiguous_label() -> None: + with pytest.raises(ValueError, match="multiple techniques"): + ontology.find_technique("diffraction") diff --git a/tools/model-generation/spec/dataset-fields.yml b/tools/model-generation/spec/dataset-fields.yml index a9b4b8ca..5eeb39a5 100644 --- a/tools/model-generation/spec/dataset-fields.yml +++ b/tools/model-generation/spec/dataset-fields.yml @@ -34,6 +34,9 @@ conversions: sourceFolder: func: _parse_remote_path arg_type: RemotePath | str + techniques: + func: _parse_techniques + arg_type: Iterable[str | Technique] # Mark those fields as read-only in addition to those identified as read only from the schema. # Read-only fields must be None in uploads. diff --git a/tools/model-generation/templates/dataset_fields.py.jinja b/tools/model-generation/templates/dataset_fields.py.jinja index ddd73ce1..0db8b381 100644 --- a/tools/model-generation/templates/dataset_fields.py.jinja +++ b/tools/model-generation/templates/dataset_fields.py.jinja @@ -32,6 +32,7 @@ type(None) from __future__ import annotations +from collections.abc import Iterable from dataclasses import dataclass from datetime import datetime, UTC from typing import Any, ClassVar, Literal, TypeVar @@ -50,6 +51,7 @@ from .model import ( Technique, construct, ) +from .ontology import find_technique from .pid import PID M = TypeVar("M", bound=BaseModel) @@ -75,6 +77,15 @@ def _parse_remote_path(path: str | RemotePath | None) -> RemotePath | None: return RemotePath(path) +def _parse_techniques(arg: Iterable[str | Technique] | None) -> list[Technique] | None: + if arg is None: + return None + return [ + t if isinstance(t, Technique) else find_technique(t) + for t in arg + ] + + def _validate_checksum_algorithm(algorithm: str | None) -> str | None: if algorithm is None: return algorithm diff --git a/tools/ontologies/expands_techniques.py b/tools/ontologies/expands_techniques.py new file mode 100644 index 00000000..fe09dc33 --- /dev/null +++ b/tools/ontologies/expands_techniques.py @@ -0,0 +1,123 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 SciCat Project (https://github.com/SciCatProject/scitacean) +"""Download and parse the ExPaNDS experimental techniques ontology. + +This script extracts a mapping from ids (IRI) to class labels from the ontology. +The resulting dict has this structure: + +.. code-block:: python + + { + id: [main-label, alternative-label, ...] + } + +Labels are converted to lowercase and stripped of leading and trailing whitespace. + +The results are saved to a given file as a bz2 compressed JSON file. +The bz2 format was chosen as it yielded the smallest file among the +algorithms available in the standard library. +""" +# ruff: noqa: T201 + +import argparse +import bz2 +import json +from pathlib import Path +from typing import Any, TypeAlias + +import httpx + +SOURCE_URL = "https://expands-eu.github.io/ExPaNDS-experimental-techniques-ontology/ontology.jsonld" + +# Prefix used for all ExPaNDS class IRIs: +IRI_PREFIX = "http://purl.org/pan-science/PaNET/" +# @type of classes: +CLASS_TYPE = "http://www.w3.org/2002/07/owl#Class" +# Key for class labels: +LABEL_KEY = "http://www.w3.org/2000/01/rdf-schema#label" +# Key for alternative class labels: +ALT_LABEL_KEY = "http://www.w3.org/2004/02/skos/core#altLabel" + +Ontology: TypeAlias = list[dict[str, Any]] + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("out", type=Path, help="Output file") + parser.add_argument( + "--load", + type=Path, + default=None, + help="Load ontology from file instead of downloading", + ) + parser.add_argument( + "--persist", type=Path, default=None, help="Save full ontology to file" + ) + return parser.parse_args() + + +def download() -> Ontology: + print(f"Downloading ontology from {SOURCE_URL}") + response = httpx.get(SOURCE_URL) + response.raise_for_status() + return response.json() # type: ignore[no-any-return] + + +def load(path: Path | None) -> Ontology: + if path is None: + return download() + else: + print(f"Loading ontology from {path}") + return json.loads(path.read_text()) # type: ignore[no-any-return] + + +def maybe_save_ontology(ontology: Ontology, target: Path | None) -> None: + if target is not None: + print(f"Saving ontology to {target}") + target.write_text(json.dumps(ontology, indent=2)) + + +def filter_techniques(ontology: Ontology) -> Ontology: + return [ + item + for item in ontology + if item.get("@type") == [CLASS_TYPE] + and item.get("@id", "").startswith(IRI_PREFIX) + ] + + +def get_labels(item: dict[str, Any]) -> list[str]: + [entry] = item[LABEL_KEY] + alt_labels = [lab["@value"] for lab in item.get(ALT_LABEL_KEY, [])] + return [entry["@value"], *alt_labels] + + +def process_label(raw_label: str) -> str: + return raw_label.lower().strip() + + +def parse_to_dict(ontology: Ontology) -> dict[str, list[str]]: + return { + item["@id"]: [process_label(label) for label in get_labels(item)] + for item in ontology + } + + +def write_result(mapping: dict[str, list[str]], out: Path) -> None: + serialized = json.dumps(mapping).encode("utf-8") + # replace all suffixes with `.json.bz2` + path = out.parent.joinpath(out.name.split(".", 1)[0] + ".json.bz2") + with bz2.open(path, "wb") as f: + f.write(serialized) + + +def main() -> None: + args = parse_args() + ontology = load(args.load) + maybe_save_ontology(ontology, args.persist) + ids_to_labels = parse_to_dict(filter_techniques(ontology)) + write_result(ids_to_labels, args.out) + + +if __name__ == "__main__": + main()