From a3b0865d961add0268da8ec8ed7e7c88eb13fed1 Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Wed, 16 Apr 2025 15:48:52 +0200 Subject: [PATCH 01/14] Implement asynchronous file deletion --- bin/archive | 14 ++++++++ odp/api/lib/archive.py | 19 ----------- odp/api/routers/package.py | 32 ++++++++++++++---- odp/archive/__init__.py | 36 ++++++++++++++++++++ odp/archive/file_purge.py | 39 ++++++++++++++++++++++ odp/lib/archive/__init__.py | 64 +++++++++++++++++++++++++++--------- odp/lib/archive/filestore.py | 23 +++++++++---- odp/lib/archive/website.py | 9 +++-- 8 files changed, 185 insertions(+), 51 deletions(-) create mode 100644 bin/archive delete mode 100644 odp/api/lib/archive.py create mode 100644 odp/archive/__init__.py create mode 100644 odp/archive/file_purge.py diff --git a/bin/archive b/bin/archive new file mode 100644 index 0000000..61c94c2 --- /dev/null +++ b/bin/archive @@ -0,0 +1,14 @@ +#!/usr/bin/env python + +import pathlib +import sys + +rootdir = pathlib.Path(__file__).parent.parent +sys.path.append(str(rootdir)) + +import odp.archive +import odp.logfile + +if __name__ == '__main__': + odp.logfile.initialize() + odp.archive.run_all() diff --git a/odp/api/lib/archive.py b/odp/api/lib/archive.py deleted file mode 100644 index 255e1c7..0000000 --- a/odp/api/lib/archive.py +++ /dev/null @@ -1,19 +0,0 @@ -from fastapi import HTTPException -from starlette.status import HTTP_404_NOT_FOUND - -from odp.const.db import ArchiveType -from odp.db import Session -from odp.db.models import Archive -from odp.lib.archive import ArchiveAdapter, filestore, website - - -async def get_archive_adapter(archive_id: str) -> ArchiveAdapter: - if not (archive := Session.get(Archive, archive_id)): - raise HTTPException(HTTP_404_NOT_FOUND) - - adapter_cls = { - ArchiveType.filestore: filestore.FilestoreArchiveAdapter, - ArchiveType.website: website.WebsiteArchiveAdapter, - }[archive.type] - - return adapter_cls(archive.download_url, archive.upload_url) diff --git a/odp/api/routers/package.py b/odp/api/routers/package.py index 6ab8117..0b9241b 100644 --- a/odp/api/routers/package.py +++ b/odp/api/routers/package.py @@ -11,7 +11,6 @@ from starlette.status import HTTP_404_NOT_FOUND, HTTP_405_METHOD_NOT_ALLOWED, HTTP_422_UNPROCESSABLE_ENTITY from werkzeug.utils import secure_filename -from odp.api.lib.archive import get_archive_adapter from odp.api.lib.auth import ArchiveAuthorize, Authorize, Authorized, TagAuthorize, UntagAuthorize from odp.api.lib.paging import Paginator from odp.api.lib.schema import get_metadata_validity @@ -510,7 +509,6 @@ async def upload_file( description: str = Query(None, title='Resource description'), unpack: bool = Query(False, title='Unpack zip file into folder'), overwrite: bool = Query(False, title='Overwrite existing file(s)'), # todo - archive_adapter: ArchiveAdapter = Depends(get_archive_adapter), auth: Authorized = Depends(Authorize(ODPScope.PACKAGE_WRITE)), ) -> None: """ @@ -531,7 +529,7 @@ async def upload_file( await _upload_file( package_id, archive_id, folder, file.file, filename, sha256, - title, description, unpack, overwrite, archive_adapter, auth, + title, description, unpack, overwrite, auth, ) @@ -546,7 +544,6 @@ async def _upload_file( description: str | None, unpack: bool, overwrite: bool, - archive_adapter: ArchiveAdapter, auth: Authorized, ): if not (archive := Session.get(Archive, archive_id)): @@ -566,10 +563,11 @@ async def _upload_file( if not (filename := secure_filename(filename)): raise HTTPException(HTTP_422_UNPROCESSABLE_ENTITY, 'invalid filename') - archive_folder = f'{package.key}/{folder}' + archive_path = f'{package.key}/{folder or "."}/{filename}' + archive_adapter = ArchiveAdapter.get_instance(archive) try: file_info_list = await archive_adapter.put( - archive_folder, filename, file, sha256, unpack + archive_path, file, sha256, unpack ) except ArchiveError as e: raise HTTPException(e.status_code, e.error_detail) from e @@ -617,4 +615,24 @@ async def delete_file( resource_id: str, auth: Authorized = Depends(Authorize(ODPScope.PACKAGE_WRITE)), ) -> None: - ... + """Delete a file. + + Flags archive-resource link(s) as `delete_pending`; actual file deletions + are performed by a background service. + + Requires scope `odp.package:write`. The package status must be `pending`. + """ + if not (package := Session.get(Package, package_id)): + raise HTTPException(HTTP_404_NOT_FOUND, 'Package not found') + + auth.enforce_constraint([package.provider_id]) + ensure_status(package, PackageStatus.pending) + + if not (resource := Session.get(Resource, resource_id)): + raise HTTPException(HTTP_404_NOT_FOUND, 'Resource not found') + + if resource.package_id != package_id: + raise HTTPException(HTTP_404_NOT_FOUND, 'Resource not found') + + for archive_resource in resource.archive_resources: + archive_resource.status = ResourceStatus.delete_pending diff --git a/odp/archive/__init__.py b/odp/archive/__init__.py new file mode 100644 index 0000000..645d15f --- /dev/null +++ b/odp/archive/__init__.py @@ -0,0 +1,36 @@ +import logging +from importlib import import_module +from pathlib import Path +from pkgutil import iter_modules +from typing import final + +from odp.db import Session + +logger = logging.getLogger(__name__) + + +class ArchiveModule: + + @final + def run(self): + modname = self.__class__.__name__ + try: + logger.info(f'{modname} started') + self.exec() + Session.commit() + logger.info(f'{modname} completed') + except Exception as e: + Session.rollback() + logger.exception(f'{modname} failed: {e!r}') + + def exec(self): + raise NotImplementedError + + +def run_all(): + archive_dir = str(Path(__file__).parent) + for mod_info in iter_modules([archive_dir]): + mod = import_module(f'odp.archive.{mod_info.name}') + for cls in mod.__dict__.values(): + if isinstance(cls, type) and issubclass(cls, ArchiveModule) and cls is not ArchiveModule: + cls().run() diff --git a/odp/archive/file_purge.py b/odp/archive/file_purge.py new file mode 100644 index 0000000..af3f4da --- /dev/null +++ b/odp/archive/file_purge.py @@ -0,0 +1,39 @@ +import asyncio +import logging + +from sqlalchemy import select + +from odp.archive import ArchiveModule +from odp.const.db import ResourceStatus +from odp.db import Session +from odp.db.models import ArchiveResource +from odp.lib.archive import ArchiveAdapter, ArchiveError + +logger = logging.getLogger(__name__) + + +class FilePurgeModule(ArchiveModule): + + def exec(self): + archive_resources_to_delete = Session.execute( + select(ArchiveResource).where(ArchiveResource.status == ResourceStatus.delete_pending) + ).scalars().all() + + for ar in archive_resources_to_delete: + archive_adapter = ArchiveAdapter.get_instance(ar.archive) + try: + asyncio.run(archive_adapter.delete(ar.path)) + logger.info(f'Deleted {ar.path} in {ar.archive_id}') + + except ArchiveError as e: + if e.status_code == 404: + logger.info(f'Delete {ar.path} in {ar.archive_id}: already gone') + else: + logger.exception(f'{e.status_code}: {e.error_detail}') + continue + + except NotImplementedError: + pass + + ar.delete() + Session.commit() diff --git a/odp/lib/archive/__init__.py b/odp/lib/archive/__init__.py index c3c7aac..71d5a95 100644 --- a/odp/lib/archive/__init__.py +++ b/odp/lib/archive/__init__.py @@ -1,7 +1,12 @@ +from __future__ import annotations + from collections import namedtuple from os import PathLike from typing import BinaryIO +from odp.const.db import ArchiveType +from odp.db.models import Archive + ArchiveFileInfo = namedtuple('ArchiveFileInfo', ( 'path', 'size', 'sha256' )) @@ -21,7 +26,7 @@ def __init__(self, redirect_url: str): class ArchiveError(Exception): - def __init__(self, status_code, error_detail): + def __init__(self, status_code: int, error_detail: str): self.status_code = status_code self.error_detail = error_detail @@ -32,32 +37,59 @@ class ArchiveAdapter: All paths are relative. """ - def __init__(self, download_url: str | None, upload_url: str | None) -> None: + _instance_cache: dict[str, ArchiveAdapter] = {} + + @classmethod + def get_instance(cls, archive: Archive) -> ArchiveAdapter: + from . import filestore, website + + try: + return cls._instance_cache[archive.id] + except KeyError: + pass + + adapter_cls = { + ArchiveType.filestore: filestore.FilestoreArchiveAdapter, + ArchiveType.website: website.WebsiteArchiveAdapter, + }[archive.type] + + cls._instance_cache[archive.id] = instance = adapter_cls(archive.download_url, archive.upload_url) + + return instance + + def __init__( + self, download_url: str | None, upload_url: str | None + ) -> None: self.download_url = download_url self.upload_url = upload_url - async def get(self, path: str | PathLike) -> ArchiveFileResponse | ArchiveRedirectResponse: - """Send the contents of the file at `path` to the client, - or return a redirect.""" + async def get( + self, path: str | PathLike + ) -> ArchiveFileResponse | ArchiveRedirectResponse: + """Return the contents of the file at `path`, or a redirect.""" raise NotImplementedError - async def get_zip(self, *paths: str | PathLike) -> ArchiveFileResponse: - """Send a zip file of the directories (recursively) and - files at `paths` to the client.""" + async def get_zip( + self, *paths: str | PathLike + ) -> ArchiveFileResponse: + """Return a zip file of the directories (recursively) and + files at `paths`.""" raise NotImplementedError async def put( - self, - folder: str, - filename: str, - file: BinaryIO, - sha256: str, - unpack: bool, + self, path: str, file: BinaryIO, sha256: str, unpack: bool ) -> list[ArchiveFileInfo]: - """Add or unpack `file` into `folder` relative to the - archive's upload directory. + """Store `file` at `path` relative to the upload URL. + + If `unpack` is true, `file` is unzipped at the parent of `path`. Return a list of ArchiveFileInfo tuple(path, size, sha256) for each written file. """ raise NotImplementedError + + async def delete( + self, path: str | PathLike + ) -> None: + """Delete the file at `path`.""" + raise NotImplementedError diff --git a/odp/lib/archive/filestore.py b/odp/lib/archive/filestore.py index 995ec3e..ded0667 100644 --- a/odp/lib/archive/filestore.py +++ b/odp/lib/archive/filestore.py @@ -1,3 +1,4 @@ +from os import PathLike from typing import Any, BinaryIO from urllib.parse import urljoin @@ -15,25 +16,24 @@ class FilestoreArchiveAdapter(ArchiveAdapter): which must be running on the server. """ - def __init__(self, download_url: str | None, upload_url: str | None) -> None: - super().__init__(download_url, upload_url) + def __init__(self, *args) -> None: + super().__init__(*args) self.timeout = 3600.0 if config.ODP.ENV == 'development' else 10.0 async def put( self, - folder: str, - filename: str, + path: str, file: BinaryIO, sha256: str, unpack: bool, ) -> list[ArchiveFileInfo]: - params = {'filename': filename, 'sha256': sha256} + params = {'sha256': sha256} if unpack: params |= {'unpack': 1} result = self._send_request( 'PUT', - urljoin(self.upload_url, folder), + urljoin(self.upload_url, path), files={'file': file}, params=params, ) @@ -42,7 +42,16 @@ async def put( for path, info in result.items() ] - def _send_request(self, method, url, files, params) -> Any: + async def delete( + self, + path: str | PathLike, + ) -> None: + self._send_request( + 'DELETE', + urljoin(self.upload_url, path), + ) + + def _send_request(self, method, url, *, params=None, files=None) -> Any: """Send a request to the ODP file storage service and return its JSON response.""" try: diff --git a/odp/lib/archive/website.py b/odp/lib/archive/website.py index 644e10c..bdd3781 100644 --- a/odp/lib/archive/website.py +++ b/odp/lib/archive/website.py @@ -8,6 +8,11 @@ class WebsiteArchiveAdapter(ArchiveAdapter): """Adapter for a read-only archive with its own web interface for accessing data.""" - async def get(self, path: str | PathLike) -> ArchiveRedirectResponse: + async def get( + self, + path: str | PathLike, + ) -> ArchiveRedirectResponse: """Return a redirect to the relevant web page.""" - return ArchiveRedirectResponse(urljoin(self.download_url, path)) + return ArchiveRedirectResponse( + urljoin(self.download_url, path) + ) From 2b4cc0414ef3eb3a386672f46faf82fa8e5b88a7 Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Wed, 16 Apr 2025 18:14:15 +0200 Subject: [PATCH 02/14] Split resource status `delete_pending` is a user- (or client-) driven status and therefore must be on resource, not archive_resource. It applies implicitly to all archive_resources associated with a resource. --- ...4_16_b9774b38ac0b_split_resource_status.py | 35 +++++++++++++++++++ odp/db/models/archive.py | 4 +-- odp/db/models/resource.py | 5 +-- 3 files changed, 40 insertions(+), 4 deletions(-) create mode 100644 migrate/versions/2025_04_16_b9774b38ac0b_split_resource_status.py diff --git a/migrate/versions/2025_04_16_b9774b38ac0b_split_resource_status.py b/migrate/versions/2025_04_16_b9774b38ac0b_split_resource_status.py new file mode 100644 index 0000000..77ebc16 --- /dev/null +++ b/migrate/versions/2025_04_16_b9774b38ac0b_split_resource_status.py @@ -0,0 +1,35 @@ +"""Split resource status + +Revision ID: b9774b38ac0b +Revises: 660293c24387 +Create Date: 2025-04-16 17:49:32.137456 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + + +# revision identifiers, used by Alembic. +revision = 'b9774b38ac0b' +down_revision = '660293c24387' +branch_labels = None +depends_on = None + + +def upgrade(): + op.drop_column('archive_resource', 'status') + op.execute('drop type resourcestatus') + op.execute("create type archiveresourcestatus as enum ('pending', 'valid', 'missing', 'corrupt')") + op.add_column('archive_resource', sa.Column('status', postgresql.ENUM(name='archiveresourcestatus', create_type=False), nullable=False)) + op.execute("create type resourcestatus as enum ('active', 'delete_pending')") + op.add_column('resource', sa.Column('status', postgresql.ENUM(name='resourcestatus', create_type=False), nullable=False)) + + +def downgrade(): + op.drop_column('resource', 'status') + op.execute('drop type resourcestatus') + op.drop_column('archive_resource', 'status') + op.execute('drop type archiveresourcestatus') + op.execute("create type resourcestatus as enum ('pending', 'valid', 'missing', 'corrupt', 'delete_pending')") + op.add_column('archive_resource', sa.Column('status', postgresql.ENUM(name='resourcestatus', create_type=False), nullable=False)) diff --git a/odp/db/models/archive.py b/odp/db/models/archive.py index 367bbe5..9bfd0db 100644 --- a/odp/db/models/archive.py +++ b/odp/db/models/archive.py @@ -1,7 +1,7 @@ from sqlalchemy import CheckConstraint, Column, Enum, ForeignKey, ForeignKeyConstraint, String, TIMESTAMP, UniqueConstraint from sqlalchemy.orm import relationship -from odp.const.db import ArchiveType, ResourceStatus, ScopeType +from odp.const.db import ArchiveResourceStatus, ArchiveType, ScopeType from odp.db import Base @@ -52,7 +52,7 @@ class ArchiveResource(Base): resource = relationship('Resource') path = Column(String, nullable=False) - status = Column(Enum(ResourceStatus), nullable=False) + status = Column(Enum(ArchiveResourceStatus), nullable=False) timestamp = Column(TIMESTAMP(timezone=True), nullable=False) _repr_ = 'archive_id', 'resource_id', 'path', 'status' diff --git a/odp/db/models/resource.py b/odp/db/models/resource.py index a3c41a1..46710bb 100644 --- a/odp/db/models/resource.py +++ b/odp/db/models/resource.py @@ -4,7 +4,7 @@ from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import relationship -from odp.const.db import HashAlgorithm +from odp.const.db import HashAlgorithm, ResourceStatus from odp.db import Base @@ -35,6 +35,7 @@ class Resource(Base): hash_algorithm = Column(Enum(HashAlgorithm)) title = Column(String) description = Column(String) + status = Column(Enum(ResourceStatus), nullable=False) timestamp = Column(TIMESTAMP(timezone=True), nullable=False) package_id = Column(String, ForeignKey('package.id', ondelete='RESTRICT'), nullable=False) @@ -44,4 +45,4 @@ class Resource(Base): archive_resources = relationship('ArchiveResource', viewonly=True) archives = association_proxy('archive_resources', 'archive') - _repr_ = 'id', 'folder', 'filename', 'mimetype', 'size', 'hash', 'package_id' + _repr_ = 'id', 'folder', 'filename', 'mimetype', 'size', 'hash', 'package_id', 'status' From bff39c96ea2df164f58f2b9c3add0d29983e6614 Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Thu, 17 Apr 2025 10:36:33 +0200 Subject: [PATCH 03/14] Use `delete_pending` on resource Follows 2b4cc0414ef3eb3a386672f46faf82fa8e5b88a7 --- odp/api/routers/package.py | 12 ++++++---- odp/archive/file_purge.py | 48 +++++++++++++++++++++----------------- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/odp/api/routers/package.py b/odp/api/routers/package.py index 0b9241b..31cd10f 100644 --- a/odp/api/routers/package.py +++ b/odp/api/routers/package.py @@ -18,7 +18,7 @@ from odp.api.models import PackageDetailModel, PackageModel, PackageModelIn, Page, TagInstanceModel, TagInstanceModelIn from odp.api.routers.resource import output_resource_model from odp.const import ODPScope -from odp.const.db import HashAlgorithm, PackageCommand, PackageStatus, ResourceStatus, SchemaType, TagType +from odp.const.db import ArchiveResourceStatus, HashAlgorithm, PackageCommand, PackageStatus, ResourceStatus, SchemaType, TagType from odp.db import Session from odp.db.models import Archive, ArchiveResource, Package, PackageAudit, Provider, Resource, Schema from odp.lib.archive import ArchiveAdapter, ArchiveError @@ -588,6 +588,7 @@ async def _upload_file( hash_algorithm=HashAlgorithm.sha256, title=title, description=description, + status=ResourceStatus.active, timestamp=(timestamp := datetime.now(timezone.utc)), ) resource.save() @@ -597,7 +598,7 @@ async def _upload_file( archive_id=archive.id, resource_id=resource.id, path=archive_path, - status=ResourceStatus.valid, + status=ArchiveResourceStatus.valid, timestamp=timestamp, ) archive_resource.save() @@ -617,7 +618,7 @@ async def delete_file( ) -> None: """Delete a file. - Flags archive-resource link(s) as `delete_pending`; actual file deletions + Updates the resource status to `delete_pending`; actual file deletions are performed by a background service. Requires scope `odp.package:write`. The package status must be `pending`. @@ -634,5 +635,6 @@ async def delete_file( if resource.package_id != package_id: raise HTTPException(HTTP_404_NOT_FOUND, 'Resource not found') - for archive_resource in resource.archive_resources: - archive_resource.status = ResourceStatus.delete_pending + resource.status = ResourceStatus.delete_pending + resource.timestamp = datetime.now(timezone.utc) + resource.save() diff --git a/odp/archive/file_purge.py b/odp/archive/file_purge.py index af3f4da..3e1e510 100644 --- a/odp/archive/file_purge.py +++ b/odp/archive/file_purge.py @@ -6,7 +6,7 @@ from odp.archive import ArchiveModule from odp.const.db import ResourceStatus from odp.db import Session -from odp.db.models import ArchiveResource +from odp.db.models import Resource from odp.lib.archive import ArchiveAdapter, ArchiveError logger = logging.getLogger(__name__) @@ -15,25 +15,31 @@ class FilePurgeModule(ArchiveModule): def exec(self): - archive_resources_to_delete = Session.execute( - select(ArchiveResource).where(ArchiveResource.status == ResourceStatus.delete_pending) + resources_to_delete = Session.execute( + select(Resource).where(Resource.status == ResourceStatus.delete_pending) ).scalars().all() - for ar in archive_resources_to_delete: - archive_adapter = ArchiveAdapter.get_instance(ar.archive) - try: - asyncio.run(archive_adapter.delete(ar.path)) - logger.info(f'Deleted {ar.path} in {ar.archive_id}') - - except ArchiveError as e: - if e.status_code == 404: - logger.info(f'Delete {ar.path} in {ar.archive_id}: already gone') - else: - logger.exception(f'{e.status_code}: {e.error_detail}') - continue - - except NotImplementedError: - pass - - ar.delete() - Session.commit() + for resource in resources_to_delete: + for ar in resource.archive_resources: + archive_adapter = ArchiveAdapter.get_instance(ar.archive) + try: + asyncio.run(archive_adapter.delete(ar.path)) + logger.info(f'Deleted {ar.path} in {ar.archive_id}') + + except ArchiveError as e: + if e.status_code == 404: + logger.info(f'Delete {ar.path} in {ar.archive_id}: already gone') + else: + logger.exception(f'{e.status_code}: {e.error_detail}') + continue + + except NotImplementedError: + pass + + ar.delete() + Session.commit() + + # Delete resource only if there are no archive_resources left. + if not resource.archive_resources: + resource.delete() + Session.commit() From 501f18d62f8a14d82cc3f02e52155c3e7eb3abd9 Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Mon, 21 Apr 2025 12:40:35 +0200 Subject: [PATCH 04/14] Include status on resource output model --- odp/api/routers/resource.py | 1 + 1 file changed, 1 insertion(+) diff --git a/odp/api/routers/resource.py b/odp/api/routers/resource.py index 9fbce8d..38829cf 100644 --- a/odp/api/routers/resource.py +++ b/odp/api/routers/resource.py @@ -25,6 +25,7 @@ def output_resource_model(resource: Resource) -> ResourceModel: size=resource.size, hash=resource.hash, hash_algorithm=resource.hash_algorithm, + status=resource.status, timestamp=resource.timestamp.isoformat(), package_id=resource.package_id, package_key=resource.package.key, From cb4f4c01f794f17b07a0185d09541d19ff50fe93 Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Mon, 21 Apr 2025 12:41:28 +0200 Subject: [PATCH 05/14] Replace existing objects It's a PUT. This is expected. Also, combine folder and filename into path across the board. This simplifies path integrity checks and validation, and parts can in any case be easily inferred by the client. If in future we need to distinguish file and folder resource types, this can be done explicitly with an enum on resource, instead of implicitly based on folder/filename values. --- ...e8c_merge_folder_and_filename_into_path.py | 36 ++++++ odp/api/routers/package.py | 113 +++++++++--------- odp/api/routers/resource.py | 6 +- odp/archive/__init__.py | 6 +- odp/db/models/resource.py | 13 +- 5 files changed, 103 insertions(+), 71 deletions(-) create mode 100644 migrate/versions/2025_04_21_c8363345ee8c_merge_folder_and_filename_into_path.py diff --git a/migrate/versions/2025_04_21_c8363345ee8c_merge_folder_and_filename_into_path.py b/migrate/versions/2025_04_21_c8363345ee8c_merge_folder_and_filename_into_path.py new file mode 100644 index 0000000..e209fd0 --- /dev/null +++ b/migrate/versions/2025_04_21_c8363345ee8c_merge_folder_and_filename_into_path.py @@ -0,0 +1,36 @@ +"""Merge folder and filename into path + +Revision ID: c8363345ee8c +Revises: b9774b38ac0b +Create Date: 2025-04-21 16:05:08.099042 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'c8363345ee8c' +down_revision = 'b9774b38ac0b' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - adjusted ### + op.add_column('resource', sa.Column('path', sa.String(), nullable=False)) + op.create_unique_constraint('resource_package_id_path_key', 'resource', ['package_id', 'path']) + op.drop_index('resource_package_path_uix', 'resource') + op.drop_column('resource', 'filename') + op.drop_column('resource', 'folder') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - adjusted ### + op.add_column('resource', sa.Column('folder', sa.VARCHAR(), autoincrement=False, nullable=False)) + op.add_column('resource', sa.Column('filename', sa.VARCHAR(), autoincrement=False, nullable=False)) + op.create_index('resource_package_path_uix', 'resource', [sa.text("(package_id || '/' || folder || '/' || filename)")], unique=True) + op.drop_constraint('resource_package_id_path_key', 'resource', type_='unique') + op.drop_column('resource', 'path') + # ### end Alembic commands ### diff --git a/odp/api/routers/package.py b/odp/api/routers/package.py index 31cd10f..934f163 100644 --- a/odp/api/routers/package.py +++ b/odp/api/routers/package.py @@ -8,7 +8,7 @@ from jschon_translation import remove_empty_children from sqlalchemy import select from sqlalchemy.exc import IntegrityError -from starlette.status import HTTP_404_NOT_FOUND, HTTP_405_METHOD_NOT_ALLOWED, HTTP_422_UNPROCESSABLE_ENTITY +from starlette.status import HTTP_400_BAD_REQUEST, HTTP_404_NOT_FOUND, HTTP_405_METHOD_NOT_ALLOWED, HTTP_422_UNPROCESSABLE_ENTITY from werkzeug.utils import secure_filename from odp.api.lib.auth import ArchiveAuthorize, Authorize, Authorized, TagAuthorize, UntagAuthorize @@ -495,29 +495,29 @@ async def _cancel_package( @router.put( - '/{package_id}/files/{folder:path}', + '/{package_id}/files/{path:path}', dependencies=[Depends(ArchiveAuthorize())], ) async def upload_file( package_id: str, archive_id: str, - folder: str = Path(..., title='Path to containing folder relative to package root'), - file: UploadFile = File(..., title='File upload'), - filename: str = Query(..., title='File name'), + path: str = Path(..., title='File path relative to the package root'), + file: UploadFile = File(..., title='File data'), sha256: str = Query(..., title='SHA-256 checksum'), title: str = Query(None, title='Resource title'), description: str = Query(None, title='Resource description'), - unpack: bool = Query(False, title='Unpack zip file into folder'), - overwrite: bool = Query(False, title='Overwrite existing file(s)'), # todo + unpack: bool = Query(False, title='Unpack zipped file data'), auth: Authorized = Depends(Authorize(ODPScope.PACKAGE_WRITE)), ) -> None: """ Upload a file to an archive and add/unpack it into a package folder. - By default, a single resource is created and associated with the archive - and the package. If unpack is true and the file is a supported zip format, - its contents are unpacked into the folder and, for each unpacked file, a - resource is created and similarly associated. + By default, a single resource is created and associated with the package + and the archive. If `unpack` is True, `file` is assumed to be zipped data + and its contents are unpacked at the parent directory of `path`. For each + unpacked file, a resource is created and similarly associated. + + Existing files are replaced. Requires scope `odp.package:write` along with the scope associated with the archive. The package status must be `pending`. @@ -528,46 +528,42 @@ async def upload_file( ensure_status(package, PackageStatus.pending) await _upload_file( - package_id, archive_id, folder, file.file, filename, sha256, - title, description, unpack, overwrite, auth, + package_id, archive_id, path, file.file, sha256, title, description, unpack, auth, ) async def _upload_file( package_id: str, archive_id: str, - folder: str, + path: str, file: BinaryIO, - filename: str, sha256: str, title: str | None, description: str | None, unpack: bool, - overwrite: bool, auth: Authorized, ): - if not (archive := Session.get(Archive, archive_id)): - raise HTTPException(HTTP_404_NOT_FOUND, 'Archive not found') - if not (package := Session.get(Package, package_id)): raise HTTPException(HTTP_404_NOT_FOUND, 'Package not found') - auth.enforce_constraint([package.provider_id]) + if not (archive := Session.get(Archive, archive_id)): + raise HTTPException(HTTP_404_NOT_FOUND, 'Archive not found') - if '..' in folder: - raise HTTPException(HTTP_422_UNPROCESSABLE_ENTITY, "'..' not allowed in folder") + auth.enforce_constraint([package.provider_id]) - if pathlib.Path(folder).is_absolute(): - raise HTTPException(HTTP_422_UNPROCESSABLE_ENTITY, 'folder must be relative') + path = pathlib.Path(path) + if path.is_absolute(): + raise HTTPException(HTTP_400_BAD_REQUEST, 'path must be relative') - if not (filename := secure_filename(filename)): - raise HTTPException(HTTP_422_UNPROCESSABLE_ENTITY, 'invalid filename') + for part in path.parts: + if part != secure_filename(part): + raise HTTPException(HTTP_400_BAD_REQUEST, 'invalid path') - archive_path = f'{package.key}/{folder or "."}/{filename}' archive_adapter = ArchiveAdapter.get_instance(archive) + archive_resource_path = f'{package.key}/{path}' try: file_info_list = await archive_adapter.put( - archive_path, file, sha256, unpack + archive_resource_path, file, sha256, unpack ) except ArchiveError as e: raise HTTPException(e.status_code, e.error_detail) from e @@ -575,37 +571,41 @@ async def _upload_file( raise HTTPException(HTTP_405_METHOD_NOT_ALLOWED, f'Operation not supported for {archive.id}') for file_info in file_info_list: - archive_path = file_info.path - package_path = file_info.path.removeprefix(f'{package.key}/') - - resource = Resource( - package_id=package.id, - folder=str(pathlib.Path(package_path).parent), - filename=pathlib.Path(file_info.path).name, - mimetype=mimetypes.guess_type(file_info.path, strict=False)[0], - size=file_info.size, - hash=file_info.sha256, - hash_algorithm=HashAlgorithm.sha256, - title=title, - description=description, - status=ResourceStatus.active, - timestamp=(timestamp := datetime.now(timezone.utc)), - ) + archive_resource_path = file_info.path + resource_path = file_info.path.removeprefix(f'{package.key}/') + + if not (resource := Session.execute( + select(Resource) + .where(Resource.package_id == package_id) + .where(Resource.path == resource_path) + ).scalar_one_or_none()): + resource = Resource( + package_id=package_id, + path=resource_path, + ) + + resource.mimetype = mimetypes.guess_type(file_info.path, strict=False)[0] + resource.size = file_info.size + resource.hash = file_info.sha256 + resource.hash_algorithm = HashAlgorithm.sha256 + resource.title = title + resource.description = description + resource.status = ResourceStatus.active + resource.timestamp = (timestamp := datetime.now(timezone.utc)) resource.save() - try: + if not (archive_resource := Session.get(ArchiveResource, (archive_id, resource.id))): archive_resource = ArchiveResource( archive_id=archive.id, resource_id=resource.id, - path=archive_path, - status=ArchiveResourceStatus.valid, - timestamp=timestamp, ) - archive_resource.save() - except IntegrityError as e: - raise HTTPException( - HTTP_422_UNPROCESSABLE_ENTITY, f"Path '{archive_path}' already exists in archive" - ) from e + + archive_resource.path = archive_resource_path + archive_resource.status = ArchiveResourceStatus.valid + archive_resource.timestamp = timestamp + archive_resource.save() + + # TODO: what about existing archive_resource records for other archives? @router.delete( @@ -635,6 +635,7 @@ async def delete_file( if resource.package_id != package_id: raise HTTPException(HTTP_404_NOT_FOUND, 'Resource not found') - resource.status = ResourceStatus.delete_pending - resource.timestamp = datetime.now(timezone.utc) - resource.save() + if resource.status != ResourceStatus.delete_pending: + resource.status = ResourceStatus.delete_pending + resource.timestamp = datetime.now(timezone.utc) + resource.save() diff --git a/odp/api/routers/resource.py b/odp/api/routers/resource.py index 38829cf..a89acec 100644 --- a/odp/api/routers/resource.py +++ b/odp/api/routers/resource.py @@ -1,5 +1,3 @@ -from pathlib import Path - from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import select from starlette.status import HTTP_404_NOT_FOUND @@ -19,8 +17,7 @@ def output_resource_model(resource: Resource) -> ResourceModel: id=resource.id, title=resource.title, description=resource.description, - folder=resource.folder, - filename=resource.filename, + path=resource.path, mimetype=resource.mimetype, size=resource.size, hash=resource.hash, @@ -29,7 +26,6 @@ def output_resource_model(resource: Resource) -> ResourceModel: timestamp=resource.timestamp.isoformat(), package_id=resource.package_id, package_key=resource.package.key, - package_path=str(Path(resource.folder) / resource.filename), archive_paths={ ar.archive_id: ar.path for ar in resource.archive_resources diff --git a/odp/archive/__init__.py b/odp/archive/__init__.py index 645d15f..cd95bfa 100644 --- a/odp/archive/__init__.py +++ b/odp/archive/__init__.py @@ -28,9 +28,13 @@ def exec(self): def run_all(): + archive_modules = [] archive_dir = str(Path(__file__).parent) for mod_info in iter_modules([archive_dir]): mod = import_module(f'odp.archive.{mod_info.name}') for cls in mod.__dict__.values(): if isinstance(cls, type) and issubclass(cls, ArchiveModule) and cls is not ArchiveModule: - cls().run() + archive_modules += [cls()] + + for archive_module in archive_modules: + archive_module.run() diff --git a/odp/db/models/resource.py b/odp/db/models/resource.py index 46710bb..e69cf2f 100644 --- a/odp/db/models/resource.py +++ b/odp/db/models/resource.py @@ -1,6 +1,6 @@ import uuid -from sqlalchemy import BigInteger, CheckConstraint, Column, Enum, ForeignKey, Index, String, TIMESTAMP, text +from sqlalchemy import BigInteger, CheckConstraint, Column, Enum, ForeignKey, String, TIMESTAMP, UniqueConstraint from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import relationship @@ -15,11 +15,7 @@ class Resource(Base): __tablename__ = 'resource' __table_args__ = ( - Index( - 'resource_package_path_uix', - text("(package_id || '/' || folder || '/' || filename)"), - unique=True, - ), + UniqueConstraint('package_id', 'path'), CheckConstraint( 'hash is null or hash_algorithm is not null', name='resource_hash_algorithm_check', @@ -27,8 +23,7 @@ class Resource(Base): ) id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) - folder = Column(String, nullable=False) - filename = Column(String, nullable=False) + path = Column(String, nullable=False) mimetype = Column(String) size = Column(BigInteger) hash = Column(String) @@ -45,4 +40,4 @@ class Resource(Base): archive_resources = relationship('ArchiveResource', viewonly=True) archives = association_proxy('archive_resources', 'archive') - _repr_ = 'id', 'folder', 'filename', 'mimetype', 'size', 'hash', 'package_id', 'status' + _repr_ = 'id', 'path', 'mimetype', 'size', 'hash', 'package_id', 'status' From 44df73fadfa2e8820499b1eaf7a4bf570380b041 Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Wed, 23 Apr 2025 15:06:17 +0200 Subject: [PATCH 06/14] Upgrade dependencies --- requirements.txt | 51 ++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/requirements.txt b/requirements.txt index 86cad19..af1663d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,9 +14,9 @@ # via -r requirements.in -e file:../odp-ui # via -r requirements.in -alembic==1.14.1 +alembic==1.15.2 # via -r requirements.in -anyio==4.8.0 +anyio==4.9.0 # via # httpx # starlette @@ -26,7 +26,7 @@ argon2-cffi-bindings==21.2.0 # via argon2-cffi async-timeout==5.0.1 # via redis -authlib==1.4.0 +authlib==1.5.2 # via # -r requirements.in # odp @@ -35,7 +35,7 @@ blinker==1.9.0 # via # flask # flask-mail -certifi==2024.12.14 +certifi==2025.1.31 # via # httpcore # httpx @@ -50,9 +50,9 @@ click==8.1.8 # via # flask # uvicorn -coverage==7.6.10 +coverage==7.8.0 # via -r requirements.in -cryptography==44.0.0 +cryptography==44.0.2 # via authlib dnspython==2.7.0 # via email-validator @@ -62,13 +62,13 @@ exceptiongroup==1.2.2 # via # anyio # pytest -factory-boy==3.3.1 +factory-boy==3.3.3 # via -r requirements.in -faker==34.0.2 +faker==37.1.0 # via # -r requirements.in # factory-boy -fastapi==0.115.7 +fastapi==0.115.12 # via -r requirements.in flask==2.3.3 # via @@ -80,7 +80,7 @@ flask-login==0.6.3 # via odp-ui flask-mail==0.10.0 # via -r requirements.in -greenlet==3.1.1 +greenlet==3.2.1 # via sqlalchemy gunicorn==23.0.0 # via -r requirements.in @@ -88,7 +88,7 @@ h11==0.14.0 # via # httpcore # uvicorn -httpcore==1.0.7 +httpcore==1.0.8 # via httpx httpx==0.28.1 # via -r requirements.in @@ -98,15 +98,15 @@ idna==3.10 # email-validator # httpx # requests -iniconfig==2.0.0 +iniconfig==2.1.0 # via pytest itsdangerous==2.2.0 # via # -r requirements.in # flask -jinja2==3.1.5 +jinja2==3.1.6 # via flask -mako==1.3.8 +mako==1.3.10 # via alembic markupsafe==3.0.2 # via @@ -116,7 +116,7 @@ markupsafe==3.0.2 # wtforms ory-hydra-client==1.11.8 # via odp -packaging==24.2 +packaging==25.0 # via # gunicorn # pytest @@ -131,13 +131,11 @@ pydantic[dotenv]==1.10.21 # -r requirements.in # fastapi # odp -pytest==8.3.4 +pytest==8.3.5 # via -r requirements.in python-dateutil==2.9.0.post0 - # via - # faker - # ory-hydra-client -python-dotenv==1.0.1 + # via ory-hydra-client +python-dotenv==1.1.0 # via pydantic python-multipart==0.0.20 # via -r requirements.in @@ -158,33 +156,34 @@ six==1.17.0 # via python-dateutil sniffio==1.3.1 # via anyio -sqlalchemy==2.0.37 +sqlalchemy==2.0.40 # via # -r requirements.in # alembic # sqlalchemy-utils sqlalchemy-utils==0.41.2 # via -r requirements.in -starlette==0.45.2 +starlette==0.46.2 # via # -r requirements.in # fastapi tomli==2.2.1 # via pytest -typing-extensions==4.12.2 +typing-extensions==4.13.2 # via # alembic # anyio - # faker # fastapi # pydantic # sqlalchemy # uvicorn -urllib3==2.3.0 +tzdata==2025.2 + # via faker +urllib3==2.4.0 # via # ory-hydra-client # requests -uvicorn==0.34.0 +uvicorn==0.34.2 # via -r requirements.in werkzeug==2.3.8 # via From fd8fe1ea9efb28ce0f60b80367278ae3def72a78 Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Wed, 23 Apr 2025 18:07:00 +0200 Subject: [PATCH 07/14] Remove obsolete scopes --- migrate/systemdata/clients.yml | 2 +- migrate/systemdata/roles.yml | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/migrate/systemdata/clients.yml b/migrate/systemdata/clients.yml index 64be0fb..d3bacad 100644 --- a/migrate/systemdata/clients.yml +++ b/migrate/systemdata/clients.yml @@ -34,7 +34,7 @@ ODP.Web.UI: - odp.package:sdg - odp.provider:read - odp.record:read - - odp.resource:write + - odp.resource:read - odp.token:read - openid - offline_access diff --git a/migrate/systemdata/roles.yml b/migrate/systemdata/roles.yml index 3ed0dcf..5f3f460 100644 --- a/migrate/systemdata/roles.yml +++ b/migrate/systemdata/roles.yml @@ -19,7 +19,6 @@ SAEON.Staff: - odp.record:read - odp.resource:read - odp.resource:read_all - - odp.resource:write - odp.role:read - odp.schema:read - odp.scope:read From 9d1d6e145718d57a06bbf29df3e2717ce350f03e Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Wed, 23 Apr 2025 18:17:35 +0200 Subject: [PATCH 08/14] Implement file download --- odp/api/routers/package.py | 54 +++++++++++++++++++++++++++++++++++- odp/lib/archive/__init__.py | 24 +++++++++++----- odp/lib/archive/filestore.py | 29 +++++++++++++++---- 3 files changed, 94 insertions(+), 13 deletions(-) diff --git a/odp/api/routers/package.py b/odp/api/routers/package.py index 934f163..dc3f137 100644 --- a/odp/api/routers/package.py +++ b/odp/api/routers/package.py @@ -8,6 +8,7 @@ from jschon_translation import remove_empty_children from sqlalchemy import select from sqlalchemy.exc import IntegrityError +from starlette.responses import StreamingResponse from starlette.status import HTTP_400_BAD_REQUEST, HTTP_404_NOT_FOUND, HTTP_405_METHOD_NOT_ALLOWED, HTTP_422_UNPROCESSABLE_ENTITY from werkzeug.utils import secure_filename @@ -21,7 +22,7 @@ from odp.const.db import ArchiveResourceStatus, HashAlgorithm, PackageCommand, PackageStatus, ResourceStatus, SchemaType, TagType from odp.db import Session from odp.db.models import Archive, ArchiveResource, Package, PackageAudit, Provider, Resource, Schema -from odp.lib.archive import ArchiveAdapter, ArchiveError +from odp.lib.archive import ArchiveAdapter, ArchiveError, ArchiveFileResponse from odp.lib.schema import schema_catalog router = APIRouter() @@ -608,6 +609,57 @@ async def _upload_file( # TODO: what about existing archive_resource records for other archives? +@router.get( + '/{package_id}/files/{resource_id}', + dependencies=[Depends(ArchiveAuthorize())], +) +async def download_file( + package_id: str, + resource_id: str, + archive_id: str, + auth: Authorized = Depends(Authorize(ODPScope.PACKAGE_READ)), +) -> StreamingResponse: + """ + Download a package file from an archive. Requires scope `odp.package:read` + along with the scope associated with the archive. + """ + if not (package := Session.get(Package, package_id)): + raise HTTPException(HTTP_404_NOT_FOUND, 'Package not found') + + auth.enforce_constraint([package.provider_id]) + + if ( + not (resource := Session.get(Resource, resource_id)) or + resource.package_id != package_id or + resource.status != ResourceStatus.active + ): + raise HTTPException(HTTP_404_NOT_FOUND, 'Resource not found') + + if not (archive := Session.get(Archive, archive_id)): + raise HTTPException(HTTP_404_NOT_FOUND, 'Archive not found') + + if not (archive_resource := Session.get(ArchiveResource, (archive_id, resource_id))): + raise HTTPException(HTTP_404_NOT_FOUND, 'Resource not found in archive') + + if archive_resource.status != ArchiveResourceStatus.valid: + raise HTTPException(HTTP_422_UNPROCESSABLE_ENTITY, f'Archived resource is {archive_resource.status}') + + archive_adapter = ArchiveAdapter.get_instance(archive) + archive_response = await archive_adapter.get(archive_resource.path) + + if not isinstance(archive_response, ArchiveFileResponse): + raise HTTPException(HTTP_422_UNPROCESSABLE_ENTITY, 'Resource is not a file') + + filename = pathlib.Path(resource.path).name + return StreamingResponse( + archive_response.file, + media_type=resource.mimetype, + headers={ + 'Content-Disposition': f'attachment; filename="{filename}"' + } + ) + + @router.delete( '/{package_id}/files/{resource_id}', ) diff --git a/odp/lib/archive/__init__.py b/odp/lib/archive/__init__.py index 71d5a95..3120664 100644 --- a/odp/lib/archive/__init__.py +++ b/odp/lib/archive/__init__.py @@ -17,7 +17,8 @@ class ArchiveResponse: class ArchiveFileResponse(ArchiveResponse): - """TODO""" + def __init__(self, file: BinaryIO): + self.file = file class ArchiveRedirectResponse(ArchiveResponse): @@ -58,26 +59,34 @@ def get_instance(cls, archive: Archive) -> ArchiveAdapter: return instance def __init__( - self, download_url: str | None, upload_url: str | None + self, + download_url: str | None, + upload_url: str | None, ) -> None: self.download_url = download_url self.upload_url = upload_url async def get( - self, path: str | PathLike - ) -> ArchiveFileResponse | ArchiveRedirectResponse: + self, + path: str | PathLike, + ) -> ArchiveResponse: """Return the contents of the file at `path`, or a redirect.""" raise NotImplementedError async def get_zip( - self, *paths: str | PathLike + self, + *paths: str | PathLike, ) -> ArchiveFileResponse: """Return a zip file of the directories (recursively) and files at `paths`.""" raise NotImplementedError async def put( - self, path: str, file: BinaryIO, sha256: str, unpack: bool + self, + path: str, + file: BinaryIO, + sha256: str, + unpack: bool, ) -> list[ArchiveFileInfo]: """Store `file` at `path` relative to the upload URL. @@ -89,7 +98,8 @@ async def put( raise NotImplementedError async def delete( - self, path: str | PathLike + self, + path: str | PathLike, ) -> None: """Delete the file at `path`.""" raise NotImplementedError diff --git a/odp/lib/archive/filestore.py b/odp/lib/archive/filestore.py index ded0667..5782a9f 100644 --- a/odp/lib/archive/filestore.py +++ b/odp/lib/archive/filestore.py @@ -1,3 +1,4 @@ +from io import BytesIO from os import PathLike from typing import Any, BinaryIO from urllib.parse import urljoin @@ -5,21 +6,31 @@ import requests from odp.config import config -from odp.lib.archive import ArchiveAdapter, ArchiveError, ArchiveFileInfo +from odp.lib.archive import ArchiveAdapter, ArchiveError, ArchiveFileInfo, ArchiveFileResponse class FilestoreArchiveAdapter(ArchiveAdapter): """Adapter for the ODP file storage service, providing read-write access to Nextcloud or other filesystem-based archives. - Integrates with `ODP Filing `_, - which must be running on the server. + Integrates with `ODP Filing `_. """ def __init__(self, *args) -> None: super().__init__(*args) self.timeout = 3600.0 if config.ODP.ENV == 'development' else 10.0 + async def get( + self, + path: str | PathLike, + ) -> ArchiveFileResponse: + data = self._send_request( + 'GET', + urljoin(self.download_url, path), + return_bytes=True, + ) + return ArchiveFileResponse(BytesIO(data)) + async def put( self, path: str, @@ -51,7 +62,15 @@ async def delete( urljoin(self.upload_url, path), ) - def _send_request(self, method, url, *, params=None, files=None) -> Any: + def _send_request( + self, + method, + url, + *, + params=None, + files=None, + return_bytes=False, + ) -> Any: """Send a request to the ODP file storage service and return its JSON response.""" try: @@ -63,7 +82,7 @@ def _send_request(self, method, url, *, params=None, files=None) -> Any: timeout=self.timeout, ) r.raise_for_status() - return r.json() + return r.content if return_bytes else r.json() except requests.RequestException as e: if e.response is not None: From c3f2451c925b12ed9bd8d3cf2635f67dca3f19fd Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Mon, 5 May 2025 16:49:45 +0200 Subject: [PATCH 09/14] Update archive, resource and package tests --- odp/api/routers/package.py | 28 +++--- test/api/test_archive.py | 4 +- test/api/test_package.py | 201 ++++++++++++++++--------------------- test/api/test_resource.py | 12 +-- test/factories.py | 82 +++++++++------ test/test_db.py | 32 +++--- 6 files changed, 180 insertions(+), 179 deletions(-) diff --git a/odp/api/routers/package.py b/odp/api/routers/package.py index dc3f137..1a9a76f 100644 --- a/odp/api/routers/package.py +++ b/odp/api/routers/package.py @@ -279,12 +279,7 @@ async def delete_package( Delete a provider-accessible package. The package status must be `pending`. Requires scope `odp.package:write`. """ - if not (package := Session.get(Package, package_id)): - raise HTTPException(HTTP_404_NOT_FOUND) - - ensure_status(package, PackageStatus.pending) - - await _delete_package(package_id, auth) + await _delete_package(package_id, auth, True) @router.delete( @@ -297,18 +292,24 @@ async def admin_delete_package( """ Delete any package. Requires scope `odp.package:admin`. """ - await _delete_package(package_id, auth) + await _delete_package(package_id, auth, False) async def _delete_package( package_id: str, auth: Authorized, + check_status: bool, ): + # TODO: allow deletion of package with resources - flag resources as delete_pending + if not (package := Session.get(Package, package_id)): raise HTTPException(HTTP_404_NOT_FOUND) auth.enforce_constraint([package.provider_id]) + if check_status: + ensure_status(package, PackageStatus.pending) + create_audit_record(auth, package, datetime.now(timezone.utc), PackageCommand.delete) try: @@ -356,12 +357,7 @@ async def untag_package( Remove a tag instance set by the calling user. The package status must be `pending`. Requires the scope associated with the tag. """ - if not (package := Session.get(Package, package_id)): - raise HTTPException(HTTP_404_NOT_FOUND) - - ensure_status(package, PackageStatus.pending) - - await _untag_package(package_id, tag_instance_id, auth) + await _untag_package(package_id, tag_instance_id, auth, True) @router.delete( @@ -375,19 +371,23 @@ async def admin_untag_package( """ Remove any tag instance from a package. Requires scope `odp.package:admin`. """ - await _untag_package(package_id, tag_instance_id, auth) + await _untag_package(package_id, tag_instance_id, auth, False) async def _untag_package( package_id: str, tag_instance_id: str, auth: Authorized, + check_status: bool, ) -> None: if not (package := Session.get(Package, package_id)): raise HTTPException(HTTP_404_NOT_FOUND) auth.enforce_constraint([package.provider_id]) + if check_status: + ensure_status(package, PackageStatus.pending) + await Tagger(TagType.package).delete_tag_instance(tag_instance_id, package, auth) diff --git a/test/api/test_archive.py b/test/api/test_archive.py index 99adb6f..c607a4c 100644 --- a/test/api/test_archive.py +++ b/test/api/test_archive.py @@ -31,7 +31,7 @@ def assert_db_state(archives): assert len(result) == len(archives) for n, row in enumerate(result): assert row.id == archives[n].id - assert row.adapter == archives[n].adapter + assert row.type == archives[n].type assert row.download_url == archives[n].download_url assert row.upload_url == archives[n].upload_url assert row.scope_id == archives[n].scope_id @@ -42,9 +42,9 @@ def assert_json_result(response, json, archive): """Verify that the API result matches the given archive object.""" assert response.status_code == 200 assert json['id'] == archive.id + assert json['type'] == archive.type assert json['download_url'] == archive.download_url assert json['upload_url'] == archive.upload_url - assert json['adapter'] == archive.adapter assert json['scope_id'] == archive.scope_id assert json['resource_count'] == archive.resource_count diff --git a/test/api/test_package.py b/test/api/test_package.py index 54a30db..74cab4f 100644 --- a/test/api/test_package.py +++ b/test/api/test_package.py @@ -7,8 +7,15 @@ from odp.const import ODPDateRangeIncType, ODPPackageTag, ODPScope, ODPTagSchema from odp.db.models import Package, PackageAudit, PackageTag, Resource, Scope, Tag, User from test import TestSession -from test.api import test_resource -from test.api.assertions import assert_forbidden, assert_new_timestamp, assert_not_found, assert_ok_null, assert_unprocessable +from test.api import all_scopes, test_resource +from test.api.assertions import ( + assert_forbidden, + assert_method_not_allowed, + assert_new_timestamp, + assert_not_found, + assert_ok_null, + assert_unprocessable, +) from test.api.assertions.tags import ( assert_tag_instance_audit_log, assert_tag_instance_audit_log_empty, @@ -71,10 +78,12 @@ def assert_db_state(packages): for n, row in enumerate(result): assert row.id == packages[n].id assert row.key == packages[n].key - assert row.title == packages[n].title assert row.status == packages[n].status assert_new_timestamp(row.timestamp) assert row.provider_id == packages[n].provider_id + assert row.metadata_ == packages[n].metadata_ + assert row.schema_id == packages[n].schema_id + assert row.schema_type == packages[n].schema_type result = TestSession.execute(select(Resource.package_id, Resource.id)).all() result.sort(key=lambda r: (r.package_id, r.id)) @@ -94,9 +103,9 @@ def assert_audit_log(command, package, grant_type): assert_new_timestamp(result.timestamp) assert result._id == package.id assert result._key == package.key - assert result._title == package.title assert result._status == package.status assert result._provider_id == package.provider_id + assert result._schema_id == package.schema_id assert sorted(result._resources) == sorted(package.resource_ids) @@ -104,46 +113,56 @@ def assert_no_audit_log(): assert TestSession.execute(select(PackageAudit)).first() is None -def assert_json_result(response, json, package, detail=False): +def assert_json_result(response, json, package, detail=False, old_provider_key=None): """Verify that the API result matches the given package object.""" # todo: check linked record assert response.status_code == 200 assert json['id'] == package.id - assert json['key'] == package.key - assert json['title'] == package.title + + # Numeric suffix will differ between factory- and API-generated package key; + # update the factory object to match API output. Pass in old_provider_key for + # updates, since the package key cannot change. + date = datetime.now().strftime('%Y_%m_%d') + assert json['key'].startswith(f'{old_provider_key or package.provider.key}_{date}_') + package.key = json['key'] + assert json['status'] == package.status assert_new_timestamp(datetime.fromisoformat(json['timestamp'])) assert json['provider_id'] == package.provider_id assert json['provider_key'] == package.provider.key assert sorted(json['resource_ids']) == sorted(package.resource_ids) + assert json['schema_id'] == package.schema_id + assert json['schema_uri'] == package.schema.uri + + json_resources = json['resources'] + db_resources = TestSession.execute( + select(Resource).where(Resource.package_id == package.id) + ).scalars().all() + assert len(json_resources) == len(db_resources) + json_resources.sort(key=lambda r: r['id']) + db_resources.sort(key=lambda r: r.id) + for n, json_resource in enumerate(json_resources): + db_resources[n].archive_paths = {} # stub for attr used locally in test_resource + test_resource.assert_json_result(response, json_resource, db_resources[n]) + + json_tags = json['tags'] + db_tags = TestSession.execute( + select(PackageTag, Tag, User).join(Tag).join(User).where(PackageTag.package_id == package.id) + ).all() + assert len(json_tags) == len(db_tags) + json_tags.sort(key=lambda t: t['id']) + db_tags.sort(key=lambda t: t.PackageTag.id) + for n, json_tag in enumerate(json_tags): + assert json_tag['tag_id'] == db_tags[n].PackageTag.tag_id + assert json_tag['user_id'] == db_tags[n].PackageTag.user_id + assert json_tag['user_name'] == db_tags[n].User.name + assert json_tag['data'] == db_tags[n].PackageTag.data + assert_new_timestamp(db_tags[n].PackageTag.timestamp) + assert json_tag['cardinality'] == db_tags[n].Tag.cardinality + assert json_tag['public'] == db_tags[n].Tag.public if detail: - json_resources = json['resources'] - db_resources = TestSession.execute( - select(Resource).where(Resource.package_id == package.id) - ).scalars().all() - assert len(json_resources) == len(db_resources) - json_resources.sort(key=lambda r: r['id']) - db_resources.sort(key=lambda r: r.id) - for n, json_resource in enumerate(json_resources): - db_resources[n].archive_paths = {} # stub for attr used locally in test_resource - test_resource.assert_json_result(response, json_resource, db_resources[n]) - - json_tags = json['tags'] - db_tags = TestSession.execute( - select(PackageTag, Tag, User).join(Tag).join(User).where(PackageTag.package_id == package.id) - ).all() - assert len(json_tags) == len(db_tags) - json_tags.sort(key=lambda t: t['id']) - db_tags.sort(key=lambda t: t.PackageTag.id) - for n, json_tag in enumerate(json_tags): - assert json_tag['tag_id'] == db_tags[n].PackageTag.tag_id - assert json_tag['user_id'] == db_tags[n].PackageTag.user_id - assert json_tag['user_name'] == db_tags[n].User.name - assert json_tag['data'] == db_tags[n].PackageTag.data - assert_new_timestamp(db_tags[n].PackageTag.timestamp) - assert json_tag['cardinality'] == db_tags[n].Tag.cardinality - assert json_tag['public'] == db_tags[n].Tag.public + assert json['metadata'] == package.metadata_ def assert_json_results(response, json, packages): @@ -416,8 +435,8 @@ def _test_create_package( ) r = api(scopes, **api_kwargs).post(route, json=dict( - title=package.title, provider_id=package.provider_id, + schema_id=package.schema_id, )) if authorized: @@ -431,43 +450,10 @@ def _test_create_package( assert_no_audit_log() -@pytest.mark.require_scope(ODPScope.PACKAGE_WRITE) -@pytest.mark.parametrize('package_new_provider', ['same', 'different']) -@pytest.mark.package_batch_with_tags -def test_update_package( - api, - scopes, - package_batch, - client_provider_constraint, - user_provider_constraint, - package_new_provider, -): - api_kwargs = parameterize_api_fixture( - package_batch, - api.grant_type, - client_provider_constraint, - user_provider_constraint, - ) - authorized = ( - ODPScope.PACKAGE_WRITE in scopes and - client_provider_constraint in ('client_provider_any', 'client_provider_match') and - (api.grant_type == 'client_credentials' or user_provider_constraint == 'user_provider_match') - ) - if ( - client_provider_constraint == 'client_provider_match' or - user_provider_constraint == 'user_provider_match' - ): - authorized = authorized and package_new_provider == 'same' - - _test_update_package( - api, - scopes, - package_batch, - package_new_provider, - '/package/', - authorized, - api_kwargs, - ) +def test_update_package(api): + r = api(all_scopes).put('/package/foo') + assert_method_not_allowed(r) + assert_no_audit_log() @pytest.mark.require_scope(ODPScope.PACKAGE_ADMIN) @@ -489,48 +475,25 @@ def test_admin_update_package( ) authorized = ODPScope.PACKAGE_ADMIN in scopes - _test_update_package( - api, - scopes, - package_batch, - package_new_provider, - '/package/admin/', - authorized, - api_kwargs, - ) - - -def _test_update_package( - api, - scopes, - package_batch, - package_new_provider, - route, - authorized, - api_kwargs, -): package_provider = package_batch[2].provider if package_new_provider == 'same' else ProviderFactory() - package_build_args = dict( + package = package_build( id=package_batch[2].id, + key=package_batch[2].key, status=package_batch[2].status, provider=package_provider, + schema_id='SAEON.DataCite4' if package_batch[2].schema_id == 'SAEON.ISO19115' else 'SAEON.ISO19115', + metadata_=package_batch[2].metadata_, + resource_ids=[resource.id for resource in package_batch[2].resources], ) - if package_batch[2].resources: - # key must stay the same if package #2 has any resources - package_build_args |= dict( - key=package_batch[2].key, - resource_ids=[resource.id for resource in package_batch[2].resources], - ) - - package = package_build(**package_build_args) + old_provider_key = package_batch[2].provider.key - r = api(scopes, **api_kwargs).put(f'{route}{package.id}', json=dict( - title=package.title, + r = api(scopes, **api_kwargs).put(f'/package/admin/{package.id}', json=dict( provider_id=package.provider_id, + schema_id=package.schema_id, )) if authorized: - assert_json_result(r, r.json(), package, detail=True) + assert_json_result(r, r.json(), package, detail=True, old_provider_key=old_provider_key) assert_db_state(package_batch[:2] + [package] + package_batch[3:]) assert_audit_log('update', package, api.grant_type) else: @@ -539,15 +502,13 @@ def _test_update_package( assert_no_audit_log() -@pytest.mark.parametrize('route', ['/package/', '/package/admin/']) -def test_update_package_not_found( +def test_admin_update_package_not_found( api, - route, package_batch, client_provider_constraint, user_provider_constraint, ): - scopes = [ODPScope.PACKAGE_ADMIN] if 'admin' in route else [ODPScope.PACKAGE_WRITE] + scopes = [ODPScope.PACKAGE_ADMIN] api_kwargs = parameterize_api_fixture( package_batch, api.grant_type, @@ -556,9 +517,9 @@ def test_update_package_not_found( ) package = package_build(id='foo') - r = api(scopes, **api_kwargs).put(f'{route}{package.id}', json=dict( - title=package.title, + r = api(scopes, **api_kwargs).put(f'/package/admin/{package.id}', json=dict( provider_id=package.provider_id, + schema_id=package.schema_id, )) assert_not_found(r) @@ -654,13 +615,15 @@ def _test_delete_package( deleted_package = PackageFactory.stub( id=package_batch[2].id, key=package_batch[2].key, - title=package_batch[2].title, status=package_batch[2].status, provider_id=package_batch[2].provider_id, + schema_id=package_batch[2].schema_id, resource_ids=[], ) deleted_package_id = deleted_package.id + deleted_status = package_batch[2].status + r = api(scopes, **api_kwargs).delete(f'{route}{deleted_package_id}') changed = False @@ -670,6 +633,8 @@ def _test_delete_package( assert_not_found(r) elif not authorized_constraint: assert_forbidden(r) + elif 'admin' not in route and deleted_status != 'pending': + assert_unprocessable(r, "Package status must be 'pending'") elif error in ('has_resources', 'has_record'): assert_unprocessable(r, 'A package with an associated record or resources cannot be deleted.') else: @@ -717,7 +682,7 @@ def test_tag_package( public=tag.public, ) | keyword_tag_args(tag.vocabulary, 0))) - if authorized: + if authorized and package_batch[2].status == 'pending': assert_tag_instance_output(r, package_tag_1, api.grant_type) assert_tag_instance_db_state('package', api.grant_type, package_id, package_tag_1) assert_tag_instance_audit_log( @@ -808,8 +773,12 @@ def test_tag_package( dict(command='insert', object_id=package_id, tag_instance=package_tag_3), ) - else: # not authorized - assert_forbidden(r) + else: + if not authorized: + assert_forbidden(r) + else: # package_batch[2].status != 'pending' + assert_unprocessable(r, "Package status must be 'pending'") + assert_tag_instance_db_state('package', api.grant_type, package_id) assert_tag_instance_audit_log_empty('package') @@ -918,7 +887,11 @@ def _test_untag_package( r = client.delete(f'{route}{package.id}/tag/{package_tag_1.id}') if authorized: - if not admin_route and not same_user: + if not admin_route and package.status != 'pending': + assert_unprocessable(r, "Package status must be 'pending'") + assert_tag_instance_db_state('package', api.grant_type, package.id, *package_tags, package_tag_1) + assert_tag_instance_audit_log_empty('package') + elif not admin_route and not same_user: assert_forbidden(r) assert_tag_instance_db_state('package', api.grant_type, package.id, *package_tags, package_tag_1) assert_tag_instance_audit_log_empty('package') diff --git a/test/api/test_resource.py b/test/api/test_resource.py index 1127954..61bc923 100644 --- a/test/api/test_resource.py +++ b/test/api/test_resource.py @@ -42,14 +42,14 @@ def assert_db_state(resources): assert len(result) == len(resources) for n, row in enumerate(result): assert row.id == resources[n].id - assert row.title == resources[n].title - assert row.description == resources[n].description - assert row.folder == resources[n].folder - assert row.filename == resources[n].filename + assert row.path == resources[n].path assert row.mimetype == resources[n].mimetype assert row.size == resources[n].size assert row.hash == resources[n].hash assert row.hash_algorithm == resources[n].hash_algorithm + assert row.title == resources[n].title + assert row.description == resources[n].description + assert row.status == resources[n].status assert row.package_id == resources[n].package_id assert_new_timestamp(row.timestamp) @@ -80,12 +80,12 @@ def assert_json_result(response, json, resource): assert json['id'] == resource.id assert json['title'] == resource.title assert json['description'] == resource.description - assert json['folder'] == resource.folder - assert json['filename'] == resource.filename + assert json['path'] == resource.path assert json['mimetype'] == resource.mimetype assert json['size'] == resource.size assert json['hash'] == resource.hash assert json['hash_algorithm'] == resource.hash_algorithm + assert json['status'] == resource.status assert json['package_id'] == resource.package_id assert json['package_key'] == resource.package.key assert json['archive_paths'] == resource.archive_paths diff --git a/test/factories.py b/test/factories.py index 4925586..ae2e92b 100644 --- a/test/factories.py +++ b/test/factories.py @@ -57,30 +57,39 @@ def _sanitize_id(val): return re.sub(r'[^-.:\w]', '_', val) -def create_metadata(record, n): - if record.use_example_metadata: - if record.schema_id == 'SAEON.DataCite4': +def create_metadata(record_or_package, n): + try: + if record_or_package.status == 'pending': + return None + except AttributeError: + pass # only applies to packages + + if record_or_package.use_example_metadata: + if record_or_package.schema_id == 'SAEON.DataCite4': metadata = datacite4_example() - elif record.schema_id == 'SAEON.ISO19115': + elif record_or_package.schema_id == 'SAEON.ISO19115': metadata = iso19115_example() else: metadata = {'foo': f'test-{n}'} - if record.doi: - metadata |= {'doi': record.doi} - else: - metadata.pop('doi', None) - - if record.parent_doi: - metadata.setdefault("relatedIdentifiers", []) - metadata["relatedIdentifiers"] += [{ - "relatedIdentifier": record.parent_doi, - "relatedIdentifierType": "DOI", - "relationType": "IsPartOf" - }] + try: + if record_or_package.doi: + metadata |= {'doi': record_or_package.doi} + else: + metadata.pop('doi', None) + + if record_or_package.parent_doi: + metadata.setdefault("relatedIdentifiers", []) + metadata["relatedIdentifiers"] += [{ + "relatedIdentifier": record_or_package.parent_doi, + "relatedIdentifierType": "DOI", + "relationType": "IsPartOf" + }] + except AttributeError: + pass # only applies to records # non-DOI relatedIdentifierType should be ignored for parent_id calculation - if not record.use_example_metadata and randint(0, 1): + if not record_or_package.use_example_metadata and randint(0, 1): metadata.setdefault("relatedIdentifiers", []) metadata["relatedIdentifiers"] += [{ "relatedIdentifier": "foo", @@ -89,7 +98,7 @@ def create_metadata(record, n): }] # non-IsPartOf relationType should be ignored for parent_id calculation - if not record.use_example_metadata and randint(0, 1): + if not record_or_package.use_example_metadata and randint(0, 1): metadata.setdefault("relatedIdentifiers", []) metadata["relatedIdentifiers"] += [{ "relatedIdentifier": "bar", @@ -100,6 +109,12 @@ def create_metadata(record, n): return metadata +def create_package_key(package, n): + timestamp = datetime.now(timezone.utc) + date = timestamp.strftime('%Y_%m_%d') + return f'{package.provider.key}_{date}_{n:03}' + + def create_keyword_key(kw, n, invalid=False): if kw.vocabulary.schema.uri.endswith('institution'): return -1 if invalid else fake.word() + str(n) @@ -220,13 +235,21 @@ def users(obj, create, users): class PackageFactory(ODPModelFactory): class Meta: model = Package + exclude = ('parent_doi', 'use_example_metadata') id = factory.Faker('uuid4') - key = factory.LazyAttribute(lambda p: re.sub(r'\W', '_', p.title)) - title = factory.Sequence(lambda n: f'{fake.catch_phrase()}.{n}') - status = factory.LazyFunction(lambda: choice(('pending', 'submitted', 'archived', 'deleted'))) - provider = factory.SubFactory(ProviderFactory) + key = factory.LazyAttributeSequence(create_package_key) + status = factory.LazyFunction(lambda: choices(('pending', 'submitted', 'archived', 'deleted'), weights=(12, 4, 3, 1))[0]) timestamp = factory.LazyFunction(lambda: datetime.now(timezone.utc)) + provider = factory.SubFactory(ProviderFactory) + + schema_id = factory.LazyFunction(lambda: choice(('SAEON.DataCite4', 'SAEON.ISO19115'))) + schema_type = 'metadata' + schema = factory.LazyAttribute(lambda p: FactorySession.get(Schema, (p.schema_id, 'metadata')) or + SchemaFactory(id=p.schema_id, type='metadata')) + use_example_metadata = False + metadata_ = factory.LazyAttributeSequence(create_metadata) + validity = factory.LazyAttribute(lambda p: dict(valid=p.use_example_metadata)) class ResourceFactory(ODPModelFactory): @@ -234,16 +257,16 @@ class Meta: model = Resource id = factory.Faker('uuid4') - folder = factory.LazyFunction(lambda: choice(('', f'{fake.uri_path(deep=randint(1, 4))}'))) - filename = factory.Sequence(lambda n: f'{fake.file_name()}.{n}') - title = factory.Faker('catch_phrase') - description = factory.Faker('sentence') + path = factory.Sequence(lambda n: f'{fake.uri(deep=randint(1, 4))}.{n}') mimetype = factory.Faker('mime_type') size = factory.LazyFunction(lambda: randint(1, sys.maxsize)) hash = factory.LazyAttribute(lambda r: fake.md5() if r.hash_algorithm == 'md5' else fake.sha256()) hash_algorithm = factory.LazyFunction(lambda: choice(('md5', 'sha256'))) - package = factory.SubFactory(PackageFactory) + title = factory.Faker('catch_phrase') + description = factory.Faker('sentence') + status = factory.LazyFunction(lambda: choices(('active', 'delete_pending'), weights=(9, 1))[0]) timestamp = factory.LazyFunction(lambda: datetime.now(timezone.utc)) + package = factory.SubFactory(PackageFactory) class CollectionFactory(ODPModelFactory): @@ -438,9 +461,9 @@ class Meta: model = Archive id = factory.Sequence(lambda n: f'{fake.slug()}.{n}') + type = factory.LazyFunction(lambda: choice(('filestore', 'website'))) download_url = factory.Faker('url') upload_url = factory.Faker('url') - adapter = factory.LazyFunction(lambda: choice(('filesystem', 'nextcloud', 'website'))) scope = factory.SubFactory(ScopeFactory, type='odp') @@ -450,5 +473,6 @@ class Meta: archive = factory.SubFactory(ArchiveFactory) resource = factory.SubFactory(ResourceFactory) - path = factory.LazyAttribute(lambda a: f'{fake.uri_path(deep=randint(1, 5))}/{a.resource.filename}') + path = factory.Sequence(lambda n: f'{fake.uri(deep=randint(1, 4))}.{n}') + status = factory.LazyFunction(lambda: choices(('pending', 'valid', 'missing', 'corrupt'), weights=(4, 14, 1, 1))[0]) timestamp = factory.LazyFunction(lambda: datetime.now(timezone.utc)) diff --git a/test/test_db.py b/test/test_db.py index a88b295..b837567 100644 --- a/test/test_db.py +++ b/test/test_db.py @@ -97,15 +97,15 @@ def test_create_archive(): result = TestSession.execute(select(Archive)).scalar_one() assert ( result.id, + result.type, result.download_url, result.upload_url, - result.adapter, result.scope_id, ) == ( archive.id, + archive.type, archive.download_url, archive.upload_url, - archive.adapter, archive.scope_id, ) @@ -201,17 +201,21 @@ def test_create_package(): assert ( result.id, result.key, - result.title, result.status, - result.provider_id, result.timestamp, + result.provider_id, + result.schema_id, + result.metadata_, + result.validity, ) == ( package.id, package.key, - package.title, package.status, - package.provider_id, package.timestamp, + package.provider_id, + package.schema_id, + package.metadata_, + package.validity, ) @@ -296,26 +300,26 @@ def test_create_resource(): result = TestSession.execute(select(Resource)).scalar_one() assert ( result.id, - result.title, - result.description, - result.folder, - result.filename, + result.path, result.mimetype, result.size, result.hash, result.hash_algorithm, + result.title, + result.description, + result.status, result.timestamp, result.package_id, ) == ( resource.id, - resource.title, - resource.description, - resource.folder, - resource.filename, + resource.path, resource.mimetype, resource.size, resource.hash, resource.hash_algorithm, + resource.title, + resource.description, + resource.status, resource.timestamp, resource.package_id, ) From 6a330acd252d1e24eaa8eb3d0b42f7a25ed793f5 Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Mon, 5 May 2025 17:01:20 +0200 Subject: [PATCH 10/14] Check out submodules for CI --- .github/workflows/main.yml | 1 + .gitmodules | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 30798f2..a9ff7c0 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -19,6 +19,7 @@ jobs: uses: actions/checkout@v4 with: path: odp-server + submodules: true - name: Check out odp-core uses: actions/checkout@v4 diff --git a/.gitmodules b/.gitmodules index 6f098c1..ef40206 100644 --- a/.gitmodules +++ b/.gitmodules @@ -5,3 +5,4 @@ [submodule "jschon-translation"] path = jschon-translation url = https://github.com/marksparkza/jschon-translation.git + branch = odp-package-support From 28d62713541249c39c0318570a031da3d0e05463 Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Mon, 5 May 2025 17:04:07 +0200 Subject: [PATCH 11/14] Check out submodules recursively for CI --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a9ff7c0..598ff84 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -19,7 +19,7 @@ jobs: uses: actions/checkout@v4 with: path: odp-server - submodules: true + submodules: recursive - name: Check out odp-core uses: actions/checkout@v4 From b21bf236a3da98451dd5a95a17efc4669e9e94e6 Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Tue, 6 May 2025 16:29:23 +0200 Subject: [PATCH 12/14] Implement asynchronous package deletion Deleting a package sets the status of the package and its resources to delete_pending. Archived resources are deleted via background archival service; purge of delete_pending packages in background packaging service still needs to be done. --- ...5_06_326d3fc9b55b_modify_package_status.py | 31 ++++++++++ odp/api/routers/package.py | 58 +++++++++++-------- 2 files changed, 65 insertions(+), 24 deletions(-) create mode 100644 migrate/versions/2025_05_06_326d3fc9b55b_modify_package_status.py diff --git a/migrate/versions/2025_05_06_326d3fc9b55b_modify_package_status.py b/migrate/versions/2025_05_06_326d3fc9b55b_modify_package_status.py new file mode 100644 index 0000000..9aad0da --- /dev/null +++ b/migrate/versions/2025_05_06_326d3fc9b55b_modify_package_status.py @@ -0,0 +1,31 @@ +"""Modify package status + +Revision ID: 326d3fc9b55b +Revises: c8363345ee8c +Create Date: 2025-05-06 11:16:12.242684 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + + +# revision identifiers, used by Alembic. +revision = '326d3fc9b55b' +down_revision = 'c8363345ee8c' +branch_labels = None +depends_on = None + + +def upgrade(): + op.drop_column('package', 'status') + op.execute('drop type packagestatus') + op.execute("create type packagestatus as enum ('editing', 'submitted', 'in_review', 'archived', 'delete_pending')") + op.add_column('package', sa.Column('status', postgresql.ENUM(name='packagestatus', create_type=False), nullable=False)) + + +def downgrade(): + op.drop_column('package', 'status') + op.execute('drop type packagestatus') + op.execute("create type packagestatus as enum ('pending', 'submitted', 'archived', 'deleted')") + op.add_column('package', sa.Column('status', postgresql.ENUM(name='packagestatus', create_type=False), nullable=False)) diff --git a/odp/api/routers/package.py b/odp/api/routers/package.py index 1a9a76f..a0fc849 100644 --- a/odp/api/routers/package.py +++ b/odp/api/routers/package.py @@ -78,7 +78,7 @@ def create_audit_record( _status=package.status, _provider_id=package.provider_id, _schema_id=package.schema_id, - _resources=[resource.id for resource in package.resources], + _resources=[resource.path for resource in package.resources], ).save() @@ -101,7 +101,10 @@ async def list_packages( """ List provider-accessible packages. Requires scope `odp.package:read`. """ - stmt = select(Package) + stmt = ( + select(Package) + .where(Package.status != PackageStatus.delete_pending) + ) if auth.object_ids != '*': stmt = stmt.where(Package.provider_id.in_(auth.object_ids)) @@ -214,7 +217,7 @@ async def _create_package( try: package = Package( key=f'{provider.key}_{date}_{n:03}', - status=PackageStatus.pending, + status=PackageStatus.editing, timestamp=timestamp, provider_id=package_in.provider_id, schema_id=package_in.schema_id, @@ -276,7 +279,7 @@ async def delete_package( auth: Authorized = Depends(Authorize(ODPScope.PACKAGE_WRITE)), ) -> None: """ - Delete a provider-accessible package. The package status must be `pending`. + Delete a provider-accessible package. The package status must be `editing`. Requires scope `odp.package:write`. """ await _delete_package(package_id, auth, True) @@ -300,25 +303,32 @@ async def _delete_package( auth: Authorized, check_status: bool, ): - # TODO: allow deletion of package with resources - flag resources as delete_pending - if not (package := Session.get(Package, package_id)): raise HTTPException(HTTP_404_NOT_FOUND) auth.enforce_constraint([package.provider_id]) if check_status: - ensure_status(package, PackageStatus.pending) + ensure_status(package, PackageStatus.editing) - create_audit_record(auth, package, datetime.now(timezone.utc), PackageCommand.delete) - - try: - package.delete() - except IntegrityError as e: + if package.package_records: raise HTTPException( HTTP_422_UNPROCESSABLE_ENTITY, - 'A package with an associated record or resources cannot be deleted.', - ) from e + 'A package with an associated record cannot be deleted.', + ) + + timestamp = datetime.now(timezone.utc) + create_audit_record(auth, package, timestamp, PackageCommand.delete) + + package.status = PackageStatus.delete_pending + package.timestamp = timestamp + package.save() + + for resource in package.resources: + if resource.status != ResourceStatus.delete_pending: + resource.status = ResourceStatus.delete_pending + resource.timestamp = timestamp + resource.save() @router.post( @@ -331,7 +341,7 @@ async def tag_package( ) -> TagInstanceModel | None: """ Set a tag instance on a package, returning the created or updated instance, - or null if no change was made. The package status must be `pending`. Requires + or null if no change was made. The package status must be `editing`. Requires the scope associated with the tag. """ if not (package := Session.get(Package, package_id)): @@ -339,7 +349,7 @@ async def tag_package( auth.enforce_constraint([package.provider_id]) - ensure_status(package, PackageStatus.pending) + ensure_status(package, PackageStatus.editing) if package_tag := await Tagger(TagType.package).set_tag_instance(tag_instance_in, package, auth): return output_tag_instance_model(package_tag) @@ -354,7 +364,7 @@ async def untag_package( auth: Authorized = Depends(UntagAuthorize(TagType.package)), ) -> None: """ - Remove a tag instance set by the calling user. The package status must be `pending`. + Remove a tag instance set by the calling user. The package status must be `editing`. Requires the scope associated with the tag. """ await _untag_package(package_id, tag_instance_id, auth, True) @@ -386,7 +396,7 @@ async def _untag_package( auth.enforce_constraint([package.provider_id]) if check_status: - ensure_status(package, PackageStatus.pending) + ensure_status(package, PackageStatus.editing) await Tagger(TagType.package).delete_tag_instance(tag_instance_id, package, auth) @@ -426,7 +436,7 @@ async def _submit_package( auth.enforce_constraint([package.provider_id]) - ensure_status(package, PackageStatus.pending) + ensure_status(package, PackageStatus.editing) tag_patch = [] for package_tag in package.tags: @@ -486,7 +496,7 @@ async def _cancel_package( ensure_status(package, PackageStatus.submitted) - package.status = PackageStatus.pending + package.status = PackageStatus.editing package.metadata_ = None package.validity = None package.timestamp = (timestamp := datetime.now(timezone.utc)) @@ -521,12 +531,12 @@ async def upload_file( Existing files are replaced. Requires scope `odp.package:write` along with the scope associated with - the archive. The package status must be `pending`. + the archive. The package status must be `editing`. """ if not (package := Session.get(Package, package_id)): raise HTTPException(HTTP_404_NOT_FOUND, 'Package not found') - ensure_status(package, PackageStatus.pending) + ensure_status(package, PackageStatus.editing) await _upload_file( package_id, archive_id, path, file.file, sha256, title, description, unpack, auth, @@ -673,13 +683,13 @@ async def delete_file( Updates the resource status to `delete_pending`; actual file deletions are performed by a background service. - Requires scope `odp.package:write`. The package status must be `pending`. + Requires scope `odp.package:write`. The package status must be `editing`. """ if not (package := Session.get(Package, package_id)): raise HTTPException(HTTP_404_NOT_FOUND, 'Package not found') auth.enforce_constraint([package.provider_id]) - ensure_status(package, PackageStatus.pending) + ensure_status(package, PackageStatus.editing) if not (resource := Session.get(Resource, resource_id)): raise HTTPException(HTTP_404_NOT_FOUND, 'Resource not found') From e8e35a7af8f32711c3ddf3aafae38ffc449e1819 Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Wed, 7 May 2025 09:35:24 +0200 Subject: [PATCH 13/14] Generalize background services --- bin/archive | 4 ++-- bin/package | 4 ++-- odp/package/__init__.py | 26 -------------------------- odp/{archive => svc}/__init__.py | 22 ++++++++++++---------- odp/svc/archive/__init__.py | 0 odp/{ => svc}/archive/file_purge.py | 4 ++-- odp/svc/package/__init__.py | 0 odp/{ => svc}/package/date_range.py | 10 +++++----- 8 files changed, 23 insertions(+), 47 deletions(-) delete mode 100644 odp/package/__init__.py rename odp/{archive => svc}/__init__.py (55%) create mode 100644 odp/svc/archive/__init__.py rename odp/{ => svc}/archive/file_purge.py (94%) create mode 100644 odp/svc/package/__init__.py rename odp/{ => svc}/package/date_range.py (92%) diff --git a/bin/archive b/bin/archive index 61c94c2..cc7f4e8 100644 --- a/bin/archive +++ b/bin/archive @@ -6,9 +6,9 @@ import sys rootdir = pathlib.Path(__file__).parent.parent sys.path.append(str(rootdir)) -import odp.archive import odp.logfile +import odp.svc if __name__ == '__main__': odp.logfile.initialize() - odp.archive.run_all() + odp.svc.run_service('archive') diff --git a/bin/package b/bin/package index 7619445..59ae6d3 100644 --- a/bin/package +++ b/bin/package @@ -7,8 +7,8 @@ rootdir = pathlib.Path(__file__).parent.parent sys.path.append(str(rootdir)) import odp.logfile -import odp.package +import odp.svc if __name__ == '__main__': odp.logfile.initialize() - odp.package.run_all() + odp.svc.run_service('package') diff --git a/odp/package/__init__.py b/odp/package/__init__.py deleted file mode 100644 index ea018d9..0000000 --- a/odp/package/__init__.py +++ /dev/null @@ -1,26 +0,0 @@ -import logging - -from odp.db import Session - -logger = logging.getLogger(__name__) - - -class PackageModule: - - def execute(self): - try: - self._internal_execute() - Session.commit() - except Exception as e: - Session.rollback() - logger.exception(f'PACKAGING EXECUTION FAILED: {str(e)}') - - def _internal_execute(self): - raise NotImplementedError - - -def run_all(): - from .date_range import DateRangeInc - - DateRangeInc().execute() - diff --git a/odp/archive/__init__.py b/odp/svc/__init__.py similarity index 55% rename from odp/archive/__init__.py rename to odp/svc/__init__.py index cd95bfa..2f7faf5 100644 --- a/odp/archive/__init__.py +++ b/odp/svc/__init__.py @@ -9,7 +9,8 @@ logger = logging.getLogger(__name__) -class ArchiveModule: +class ServiceModule: + """Abstract base class for a background service module.""" @final def run(self): @@ -27,14 +28,15 @@ def exec(self): raise NotImplementedError -def run_all(): - archive_modules = [] - archive_dir = str(Path(__file__).parent) - for mod_info in iter_modules([archive_dir]): - mod = import_module(f'odp.archive.{mod_info.name}') +def run_service(name: str): + """Run all service modules within `name` directory.""" + modules = [] + dir_ = str(Path(__file__).parent / name) + for mod_info in iter_modules([dir_]): + mod = import_module(f'odp.svc.{name}.{mod_info.name}') for cls in mod.__dict__.values(): - if isinstance(cls, type) and issubclass(cls, ArchiveModule) and cls is not ArchiveModule: - archive_modules += [cls()] + if isinstance(cls, type) and issubclass(cls, ServiceModule) and cls is not ServiceModule: + modules += [cls()] - for archive_module in archive_modules: - archive_module.run() + for module in modules: + module.run() diff --git a/odp/svc/archive/__init__.py b/odp/svc/archive/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/odp/archive/file_purge.py b/odp/svc/archive/file_purge.py similarity index 94% rename from odp/archive/file_purge.py rename to odp/svc/archive/file_purge.py index 3e1e510..3955103 100644 --- a/odp/archive/file_purge.py +++ b/odp/svc/archive/file_purge.py @@ -3,16 +3,16 @@ from sqlalchemy import select -from odp.archive import ArchiveModule from odp.const.db import ResourceStatus from odp.db import Session from odp.db.models import Resource from odp.lib.archive import ArchiveAdapter, ArchiveError +from odp.svc import ServiceModule logger = logging.getLogger(__name__) -class FilePurgeModule(ArchiveModule): +class FilePurgeModule(ServiceModule): def exec(self): resources_to_delete = Session.execute( diff --git a/odp/svc/package/__init__.py b/odp/svc/package/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/odp/package/date_range.py b/odp/svc/package/date_range.py similarity index 92% rename from odp/package/date_range.py rename to odp/svc/package/date_range.py index 9f98fe7..df774c0 100644 --- a/odp/package/date_range.py +++ b/odp/svc/package/date_range.py @@ -5,17 +5,17 @@ from sqlalchemy import and_, select from sqlalchemy.orm import aliased -from odp.const import ODPPackageTag, ODPDateRangeIncType +from odp.const import ODPDateRangeIncType, ODPPackageTag from odp.db import Session -from odp.db.models import PackageTag, Package -from odp.package import PackageModule +from odp.db.models import Package, PackageTag +from odp.svc import ServiceModule logger = logging.getLogger(__name__) -class DateRangeInc(PackageModule): +class DateRangeIncModule(ServiceModule): - def _internal_execute(self): + def exec(self): """ Fetch and increment date of package date range tag according to it's related date range increment tag. """ From 1c913cb208c6d296380ba56f6b196550ee2b6a8e Mon Sep 17 00:00:00 2001 From: Mark Jacobson <52427991+marksparkza@users.noreply.github.com> Date: Thu, 8 May 2025 11:58:02 +0200 Subject: [PATCH 14/14] Purge deleted packages when empty --- odp/svc/package/package_purge.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 odp/svc/package/package_purge.py diff --git a/odp/svc/package/package_purge.py b/odp/svc/package/package_purge.py new file mode 100644 index 0000000..2fda935 --- /dev/null +++ b/odp/svc/package/package_purge.py @@ -0,0 +1,29 @@ +import logging + +from sqlalchemy import select +from sqlalchemy.exc import IntegrityError + +from odp.const.db import PackageStatus +from odp.db import Session +from odp.db.models import Package +from odp.svc import ServiceModule + +logger = logging.getLogger(__name__) + + +class PackagePurgeModule(ServiceModule): + + def exec(self): + packages_to_delete = Session.execute( + select(Package).where(Package.status == PackageStatus.delete_pending) + ).scalars().all() + + for package in packages_to_delete: + # Delete package only after resources have been deleted by the archival service. + if not package.resources: + try: + package.delete() + logger.info(f'{package.key} deleted') + + except IntegrityError as e: + logger.exception(f'{package.key} delete failed: {e!r}')