Skip to content
91 changes: 77 additions & 14 deletions datalad_ebrains/fairgraph_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Path,
PurePosixPath,
)
import requests
from unittest.mock import patch
from urllib.parse import (
quote,
Expand Down Expand Up @@ -186,9 +187,24 @@ def get_file_records(self, ds, kg_dsver):
# EBRAINS uses different file repositories that need slightly
# different handling
dvr_url_p = urlparse(dvr.iri.value)
# public data-proxy datasets
if dvr_url_p.netloc == 'data-proxy.ebrains.eu' \
and dvr_url_p.path.startswith('/api/v1/public/buckets/'):
get_fname = _get_fname_dataproxy_v1_bucket
iter_files = partial(
self.iter_files_dp,
dsid=kg_dsver.uuid,
auth=False,
get_fname=_get_fname_dataproxy_v1_bucket_public,
)
# private data-proxy datasets (e.g. human data gateway)
elif dvr_url_p.netloc == 'data-proxy.ebrains.eu' \
and dvr_url_p.path.startswith('/api/v1/buckets/'):
iter_files = partial(
self.iter_files_dp,
dsid=kg_dsver.uuid,
auth=True,
get_fname=_get_fname_dataproxy_v1_bucket_private,
)
elif dvr_url_p.netloc == 'object.cscs.ch' \
and dvr_url_p.query.startswith('prefix='):
# get the repos base url by removing the query string
Expand All @@ -208,36 +224,74 @@ def get_file_records(self, ds, kg_dsver):
# filerepo prefix
dvr_prefix,
)
iter_files = partial(
self.iter_files_kg,
get_fname=get_fname,
)
else:
raise NotImplementedError(
f'Unrecognized file repository pointer {dvr.iri.value}')

for f in self.iter_files(dvr):
# we presently cannot understand non-md5 hashes
assert f.hash.algorithm.lower() == 'md5'
# must yield dict with keys
# (url: str, name: str, md5sum: str, size: int)
yield from iter_files(dvr)

f_url = _file_iri_to_url(f.iri.value)
fname = get_fname(f)
def iter_files_dp(self, dvr, dsid, auth, get_fname, chunk_size=10000):
"""Yield file records from a data proxy query"""
dsurl = f'https://data-proxy.ebrains.eu/api/v1/datasets/{dsid}'
response = requests.get(
# TODO handle properly
f'{dsurl}?limit=10000',
# data proxy API will 400 if auth is sent for public resources
headers={
"Content-Type": "application/json",
"Authorization": f'Bearer {os.environ["KG_AUTH_TOKEN"]}',
} if auth else {},
)
response.raise_for_status()
for f in response.json()['objects']:
# f is a dict like:
# {'hash': '16e1594b23e670086383ff7e7151d81a',
# 'last_modified': '2023-02-06T15:06:59.748510',
# 'bytes': 194037,
# 'name': 'EBRAINS-DataDescriptor_JBA-v3.0.1.pdf',
# 'content_type': 'application/pdf'}
#
# we need
# (url: str, name: str, md5sum: str, size: int)
yield dict(
url=f_url,
name=str(fname),
md5sum=f.hash.digest,
# assumed to be in bytes
size=f.storage_size.value,
url=f'{dsurl}/{f["name"]}',
name=f['name'],
md5sum=f['hash'],
size=f['bytes'],
)
#yield from self.iter_files_kg(dvr, get_fname, chunk_size=chunk_size)

# the chunk size is large, because the per-request latency costs
# are enourmous
# https://github.com/HumanBrainProject/fairgraph/issues/57
def iter_files(self, dvr, chunk_size=10000):
def iter_files_kg(self, dvr, get_fname, chunk_size=10000):
"""Yield file records from a KG query"""
cur_index = 0
while True:
batch = omcore.File.list(
self.client,
file_repository=dvr,
size=chunk_size,
from_index=cur_index)
yield from batch
for f in batch:
# we presently cannot understand non-md5 hashes
assert f.hash.algorithm.lower() == 'md5'

f_url = _file_iri_to_url(f.iri.value)
fname = get_fname(f)
yield dict(
url=f_url,
name=str(fname),
md5sum=f.hash.digest,
# assumed to be in bytes
size=f.storage_size.value,
)
if len(batch) < chunk_size:
# there is no point in asking for another batch
return
Expand Down Expand Up @@ -281,7 +335,7 @@ def get_agent_info(self, kg_dsver):
}


def _get_fname_dataproxy_v1_bucket(f):
def _get_fname_dataproxy_v1_bucket_public(f):
f_url_p = urlparse(f.iri.value)
assert f_url_p.netloc == 'data-proxy.ebrains.eu'
assert f_url_p.path.startswith('/api/v1/public/buckets/')
Expand All @@ -290,6 +344,15 @@ def _get_fname_dataproxy_v1_bucket(f):
return Path(*path.parts[6:])


def _get_fname_dataproxy_v1_bucket_private(f):
f_url_p = urlparse(f.iri.value)
assert f_url_p.netloc == 'data-proxy.ebrains.eu'
assert f_url_p.path.startswith('/api/v1/buckets/')
path = PurePosixPath(f_url_p.path)
# take everything past the bucket_id and turn into a Platform native path
return Path(*path.parts[5:])


def _get_fname_cscs_repo(baseurl, prefix, f):
f_url = f.iri.value
# we presently have no better way to determine a relative file path
Expand Down
6 changes: 6 additions & 0 deletions datalad_ebrains/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import os
import pytest
from unittest.mock import patch


@pytest.fixture(scope="session", autouse=True)
def authenticate():
if 'KG_AUTH_TOKEN' in os.environ:
# we seem to have what we need
yield
return

from datalad.api import ebrains_authenticate
token = ebrains_authenticate(
result_renderer='disabled',
Expand Down
2 changes: 1 addition & 1 deletion datalad_ebrains/tests/test_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_clone_invalid_call(tmp_path):
# make sure the parameter validation is working
from datalad.api import ebrains_clone
# always needs a `source`
with pytest.raises(TypeError):
with pytest.raises(ValueError):
ebrains_clone()
# must contain a UUID
with pytest.raises(ValueError):
Expand Down
3 changes: 0 additions & 3 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = True

# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {'https://docs.python.org/': None}

# -- Options for HTML output ----------------------------------------------

# The theme to use for HTML and HTML Help pages. See the documentation for
Expand Down
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ classifiers =
python_requires = >= 3.7
install_requires =
datalad >= 0.17
datalad_next >= 1.0.0b2
datalad_next >= 1.0.0b3
ebrains-kg-core
fairgraph
fairgraph >= 0.11
packages = find_namespace:
include_package_data = True

Expand Down