Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
dabc3df
Add TRE API uploader
milanmlft Jul 9, 2025
b9f79eb
Small fixes
milanmlft Jul 9, 2025
8594e0a
Clean up
milanmlft Jul 9, 2025
397e744
Drive-by: don't call `generate_uid()` twice
milanmlft Jul 9, 2025
6664799
Add tests
milanmlft Jul 9, 2025
111e266
Preserve dir structure when creating zip
milanmlft Jul 9, 2025
1e93239
Pass root dir into `_create_zip_archive`
milanmlft Jul 9, 2025
c2d6c0d
Don't use full `source_root_dir` path for zip filename
milanmlft Jul 9, 2025
0590a98
Ignore ruff `SLF001` for tests
milanmlft Jul 9, 2025
e1cab35
Linter fixes
milanmlft Jul 9, 2025
96c94db
Add ng tube for education project config
stefpiatek Dec 23, 2025
e3548b7
Update secret name for api
stefpiatek Dec 23, 2025
899cff8
Add TRE to destinations
stefpiatek Dec 23, 2025
b535549
Should have read what I was adding
stefpiatek Dec 23, 2025
86444b0
Use 202 as acceptance from API
stefpiatek Dec 23, 2025
9b0213c
Dont flush after each image upload
stefpiatek Dec 23, 2025
dcfacff
Remove extra status checks
stefpiatek Dec 23, 2025
236d1dd
Remote flush in image upload
stefpiatek Dec 30, 2025
da9055e
Update dicom validator
stefpiatek Dec 30, 2025
12b58e9
Pin dicom validator raw install
stefpiatek Dec 30, 2025
c1631b6
Merge branch 'main' into milanmlft/606-add-api-uploader
stefpiatek Dec 30, 2025
aea884a
Use http timeout for file uploading on TRE
stefpiatek Dec 30, 2025
a25595f
Merge branch 'main' into milanmlft/606-add-api-uploader
stefpiatek Dec 30, 2025
0819238
Merge branch 'main' into milanmlft/606-add-api-uploader
stefpiatek Dec 30, 2025
fede2e3
Move TRE config to be in line with other uploaders
stefpiatek Dec 30, 2025
0bd88ca
Use correct destination filename
milanmlft Jan 5, 2026
f1b5b0a
Log successful parquet upload
stefpiatek Jan 6, 2026
bbe8f9d
Merge branch 'milanmlft/606-add-api-uploader' of https://github.com/S…
stefpiatek Jan 6, 2026
e3cd6a4
Fix test
stefpiatek Jan 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cli/tests/test_populate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
# limitations under the License.
"""Patient queue tests"""

# ruff: noqa: SLF001 allow accessing of private members for mocking

from __future__ import annotations

from typing import TYPE_CHECKING
Expand Down
13 changes: 10 additions & 3 deletions pixl_core/src/core/exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,14 @@ def upload(self) -> None:

else:
uploader = get_uploader(self.project_slug)

msg = f"Uploading parquet files for project {self.project_slug} via '{destination}'"
logger.info(msg)
logger.info(
"Starting upload of parquet files for project {} via '{}'",
self.project_slug,
destination,
)
uploader.upload_parquet_files(self)
logger.success(
"Finished uploading parquet files for project {} via '{}'",
self.project_slug,
destination,
)
1 change: 1 addition & 0 deletions pixl_core/src/core/project_config/pixl_config_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class _DestinationEnum(str, Enum):
ftps = "ftps"
dicomweb = "dicomweb"
xnat = "xnat"
tre = "tre"


class _Destination(BaseModel):
Expand Down
4 changes: 3 additions & 1 deletion pixl_core/src/core/uploader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,21 @@

from ._dicomweb import DicomWebUploader
from ._ftps import FTPSUploader
from ._treapi import TreApiUploader
from ._xnat import XNATUploader

if TYPE_CHECKING:
from core.uploader.base import Uploader


# Intenitonally defined in __init__.py to avoid circular imports
# Intentionally defined in __init__.py to avoid circular imports
def get_uploader(project_slug: str) -> Uploader:
"""Uploader Factory, returns uploader instance based on destination."""
choices: dict[str, type[Uploader]] = {
"ftps": FTPSUploader,
"dicomweb": DicomWebUploader,
"xnat": XNATUploader,
"tre": TreApiUploader,
}
project_config = load_project_config(project_slug)
destination = project_config.destination.dicom
Expand Down
226 changes: 226 additions & 0 deletions pixl_core/src/core/uploader/_treapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
# Copyright (c) University College London Hospitals NHS Foundation Trust
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Uploader subclass for the ARC TRE API."""

from __future__ import annotations

import zipfile
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING

import requests
from decouple import config

from core.uploader._orthanc import StudyTags, get_study_zip_archive
from core.uploader.base import Uploader

if TYPE_CHECKING:
from core.exports import ParquetExport

# API Configuration
TRE_API_URL = "https://api.tre.arc.ucl.ac.uk/v0"
REQUEST_TIMEOUT = 10

# HTTP Status Codes
HTTP_OK = 200


class TreApiUploader(Uploader):
"""
Uploader for the ARC TRE API.

This uploader handles uploading DICOM images and parquet files to the ARC TRE
via their REST API. Files are uploaded to an airlock and then flushed to the
main project storage.
"""

def __init__(self, project_slug: str, keyvault_alias: str | None = None) -> None:
"""
Initialize the TRE API uploader.

Args:
project_slug: The project identifier
keyvault_alias: Optional Azure Key Vault alias for authentication

"""
super().__init__(project_slug, keyvault_alias)

def _set_config(self) -> None:
"""Set up authentication configuration from Azure Key Vault."""
# Use the Azure KV alias as prefix if it exists, otherwise use the project name
prefix = self.keyvault_alias or self.project_slug
self.token = self.keyvault.fetch_secret(f"{prefix}--api--token")
self.headers = {"Authorization": f"Bearer {self.token}"}
self.host = TRE_API_URL
self.upload_timeout = int(config("HTTP_TIMEOUT", default=30))

def _upload_dicom_image(self, study_id: str, study_tags: StudyTags) -> None:
"""
Upload a DICOM image to the TRE API.

Args:
study_id: The study identifier
study_tags: Study metadata containing the pseudo anonymized ID

"""
zip_content = get_study_zip_archive(study_id)
self.send_via_api(zip_content, f"{study_tags.pseudo_anon_image_id}.zip")

def upload_parquet_files(self, parquet_export: ParquetExport) -> None:
"""
Upload parquet files as a zip archive to the TRE API.

Args:
parquet_export: The parquet export containing files to upload

Raises:
FileNotFoundError: If no parquet files are found in the export

"""
source_root_dir = parquet_export.current_extract_base
source_files = list(source_root_dir.rglob("*.parquet"))

if not source_files:
msg = f"No parquet files found in {source_root_dir}"
raise FileNotFoundError(msg)

# Create zip file
zip_filename = f"{source_root_dir.name}.zip"
zip_file = _create_zip_archive(source_files, source_root_dir, zip_filename)

# Upload the zip file
self.send_via_api(BytesIO(zip_file.read_bytes()), zip_file.name)
self.flush() # Not ideal, as this may cause multiple flushes in short period

def send_via_api(self, data: BytesIO, filename: str) -> None:
"""
Upload data to the TRE API.

Args:
data: The data to upload as a BytesIO stream
filename: The filename for the uploaded data

Raises:
RuntimeError: If the token is invalid or upload fails

"""
if not self._is_token_valid():
msg = f"Token invalid: {self.token}"
raise RuntimeError(msg)

self._upload_file(data, filename)

def _is_token_valid(self) -> bool:
"""
Check if the current token is valid.

Returns:
True if the token is valid, False otherwise

"""
try:
response = requests.get(
url=f"{self.host}/tokens/info",
headers=self.headers,
timeout=REQUEST_TIMEOUT,
)
response.raise_for_status()
except requests.RequestException:
return False
else:
return response.status_code == HTTP_OK

def _upload_file(self, content: BytesIO, filename: str) -> None:
"""
Upload a file to the TRE airlock.

Args:
content: The file content as a BytesIO stream
filename: The filename for the uploaded file

Raises:
RuntimeError: If the upload fails

"""
try:
response = requests.post(
url=f"{self.host}/airlock/upload/{filename}",
headers=self.headers,
data=content,
timeout=self.upload_timeout,
)
response.raise_for_status()

except requests.RequestException as e:
msg = f"Failed to upload file {filename}: {e}"
raise RuntimeError(msg) from e

def flush(self) -> None:
"""
Flush the TRE airlock to move files to main project storage.

This operation scans and moves files from quarantine storage to the
associated project storage. The operation is asynchronous and expensive,
so it should be called sparingly (e.g., once per day).

Note: Files are automatically deleted if not moved within 7 days.

Raises:
RuntimeError: If the flush operation fails

"""
try:
response = requests.put(
url=f"{self.host}/airlock/flush",
headers=self.headers,
timeout=REQUEST_TIMEOUT,
)
response.raise_for_status()

except requests.RequestException as e:
msg = f"Failed to flush airlock: {e}"
raise RuntimeError(msg) from e


def _create_zip_archive(files: list[Path], root_dir: Path, zip_filename: str) -> Path:
"""
Create a zip archive from a list of files.

Args:
files: List of file paths to include in the archive
root_dir: Root directory for relative paths, used to preserve the
directory structure of the input files
zip_filename: Filename for the output zip file

Returns:
Path to the created zip file

Raises:
OSError: If zip file creation fails

"""
zip_path = Path(zip_filename)

try:
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for file_path in files:
source_rel_path = file_path.relative_to(root_dir)
zipf.write(file_path, arcname=source_rel_path)
except OSError as e:
msg = f"Failed to create zip file {zip_filename}: {e}"
raise OSError(msg) from e

return zip_path
4 changes: 2 additions & 2 deletions pixl_core/tests/patient_queue/test_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ async def test_create(mock_message) -> None:
# Cancel before assertion so the task doesn't hang
task.cancel()
# need to close the connection and channel
await consumer._channel.close() # noqa: SLF001
await consumer._connection.close() # noqa: SLF001
await consumer._channel.close()
await consumer._connection.close()
consume.assert_called_once()
# Fail on purpose to check async test awaited
raise ExpectedTestError
8 changes: 4 additions & 4 deletions pixl_core/tests/uploader/test_dicomweb.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def dicomweb_uploader() -> MockDicomWebUploader:

def test_dicomweb_server_config(run_containers, dicomweb_uploader) -> None:
"""Tests that the DICOMWeb server is configured correctly in Orthanc"""
dicomweb_uploader._setup_dicomweb_credentials() # noqa: SLF001, private method
dicomweb_uploader._setup_dicomweb_credentials()
servers_response = requests.get(
ORTHANC_ANON_URL + "/dicom-web/servers",
auth=(ORTHANC_USERNAME, ORTHANC_PASSWORD),
Expand Down Expand Up @@ -99,7 +99,7 @@ def test_upload_dicom_image(
pseudo_anon_image_id=not_yet_exported_dicom_image.pseudo_study_uid,
patient_id="patient",
)
dicomweb_uploader._upload_dicom_image( # noqa: SLF001
dicomweb_uploader._upload_dicom_image(
study_id,
study_tags,
)
Expand All @@ -119,12 +119,12 @@ def test_dicomweb_upload_fails_with_wrong_credentials(
dicomweb_uploader.endpoint_password = "wrong"

with pytest.raises(requests.exceptions.ConnectionError):
dicomweb_uploader._setup_dicomweb_credentials() # noqa: SLF001, private method
dicomweb_uploader._setup_dicomweb_credentials()


def test_dicomweb_upload_fails_with_wrong_url(study_id, run_containers, dicomweb_uploader) -> None:
"""Tests that the DICOMWeb uploader fails when given wrong URL."""
dicomweb_uploader.endpoint_url = "http://wrong"

with pytest.raises(requests.exceptions.ConnectionError):
dicomweb_uploader._setup_dicomweb_credentials() # noqa: SLF001, private method
dicomweb_uploader._setup_dicomweb_credentials()
16 changes: 7 additions & 9 deletions pixl_core/tests/uploader/test_ftps.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,14 @@ def test_update_exported_and_save(rows_in_session) -> None:
"""Tests that the exported_at field is updated when a file is uploaded"""
# ARRANGE
expected_export_time = datetime.now(tz=UTC)
uid = generate_uid(entropy_srcs=["not_yet_exported"])

# ACT
update_exported_at(generate_uid(entropy_srcs=["not_yet_exported"]), expected_export_time)
new_row = (
rows_in_session.query(Image)
.filter(Image.pseudo_study_uid == generate_uid(entropy_srcs=["not_yet_exported"]))
.one()
)
actual_export_time = new_row.exported_at.replace(tzinfo=UTC)
# Act
update_exported_at(uid, expected_export_time)

# Retrieve updated record
updated_record = rows_in_session.query(Image).filter(Image.pseudo_study_uid == uid).one()
actual_export_time = updated_record.exported_at.replace(tzinfo=UTC)

# ASSERT
assert actual_export_time == expected_export_time
Expand All @@ -129,7 +128,6 @@ def parquet_export(export_dir) -> ParquetExport:
def test_upload_parquet(parquet_export, ftps_home_dir, ftps_uploader) -> None:
"""Tests that parquet files are uploaded to the correct location (but ignore their contents)"""
# ARRANGE

parquet_export.copy_to_exports(Path(__file__).parents[3] / "test" / "resources" / "omop")
parquet_export.export_radiology_linker(pd.DataFrame(list("dummy"), columns=["D"]))

Expand Down
Loading
Loading