|
27 | 27 | from __future__ import absolute_import, print_function |
28 | 28 |
|
29 | 29 | import copy |
30 | | -import shutil |
31 | | -import tempfile |
32 | 30 | from copy import deepcopy |
33 | 31 | from functools import wraps |
34 | 32 |
|
|
43 | 41 | from invenio_files_rest.errors import MultipartMissingParts |
44 | 42 | from invenio_files_rest.models import Bucket, FileInstance, ObjectVersion |
45 | 43 | from invenio_jsonschemas.errors import JSONSchemaNotFound |
| 44 | +from invenio_pidstore.errors import PIDDoesNotExistError |
46 | 45 | from invenio_records.models import RecordMetadata |
47 | 46 | from invenio_records_files.models import RecordsBuckets |
48 | 47 | from invenio_rest.errors import FieldError |
| 48 | + |
| 49 | +from invenio_sipstore.api import RecordSIP, SIP as SIPApi |
| 50 | +from invenio_sipstore.archivers import BagItArchiver |
| 51 | +from invenio_sipstore.models import SIP as SIPModel, \ |
| 52 | + RecordSIP as RecordSIPModel |
| 53 | + |
49 | 54 | from jsonschema.validators import Draft4Validator, RefResolutionError |
50 | 55 | from sqlalchemy.exc import IntegrityError |
51 | 56 | from sqlalchemy.orm.exc import NoResultFound |
52 | 57 | from werkzeug.local import LocalProxy |
53 | 58 |
|
54 | | -from cap.config import FILES_URL_MAX_SIZE |
55 | 59 | from cap.modules.records.api import CAPRecord |
56 | 60 | from cap.modules.repoimporter.repo_importer import RepoImporter |
57 | 61 | from cap.modules.schemas.models import Schema |
58 | 62 | from cap.modules.user.errors import DoesNotExistInLDAP |
59 | 63 | from cap.modules.user.utils import (get_existing_or_register_role, |
60 | 64 | get_existing_or_register_user) |
61 | 65 |
|
62 | | -from .errors import (DepositValidationError, FileUploadError, |
| 66 | +from .errors import (ArchivingError, DepositValidationError, FileUploadError, |
63 | 67 | UpdateDepositPermissionsError) |
64 | 68 | from .fetchers import cap_deposit_fetcher |
65 | 69 | from .minters import cap_deposit_minter |
66 | 70 | from .permissions import (AdminDepositPermission, CloneDepositPermission, |
67 | 71 | DepositAdminActionNeed, DepositReadActionNeed, |
68 | 72 | DepositUpdateActionNeed, UpdateDepositPermission) |
| 73 | +from .utils import compare_files, task_commit, ensure_content_length |
69 | 74 |
|
70 | 75 | _datastore = LocalProxy(lambda: current_app.extensions['security'].datastore) |
71 | 76 |
|
@@ -197,7 +202,52 @@ def publish(self, *args, **kwargs): |
197 | 202 | if file_.data['checksum'] is None: |
198 | 203 | raise MultipartMissingParts() |
199 | 204 |
|
200 | | - return super(CAPDeposit, self).publish(*args, **kwargs) |
| 205 | + try: |
| 206 | + _, last_record = self.fetch_published() |
| 207 | + is_first_publishing = False |
| 208 | + fetched_files = last_record.files |
| 209 | + create_sip_files = not compare_files(fetched_files, self.files) |
| 210 | + except (PIDDoesNotExistError, KeyError): |
| 211 | + is_first_publishing = True |
| 212 | + create_sip_files = True if self.files else False |
| 213 | + |
| 214 | + deposit = super(CAPDeposit, self).publish(*args, **kwargs) |
| 215 | + recid, record = deposit.fetch_published() |
| 216 | + sip_patch_of = None |
| 217 | + if not is_first_publishing: |
| 218 | + sip_recid = recid |
| 219 | + |
| 220 | + sip_patch_of = ( |
| 221 | + db.session.query(SIPModel) |
| 222 | + .join(RecordSIPModel, RecordSIPModel.sip_id == SIPModel.id) |
| 223 | + .filter(RecordSIPModel.pid_id == sip_recid.id) |
| 224 | + .order_by(SIPModel.created.desc()) |
| 225 | + .first() |
| 226 | + ) |
| 227 | + |
| 228 | + recordsip = RecordSIP.create( |
| 229 | + recid, record, archivable=True, |
| 230 | + create_sip_files=create_sip_files, |
| 231 | + sip_metadata_type='json', |
| 232 | + user_id=current_user.id, |
| 233 | + agent=None) |
| 234 | + |
| 235 | + archiver = BagItArchiver( |
| 236 | + recordsip.sip, include_all_previous=(not is_first_publishing), |
| 237 | + patch_of=sip_patch_of) |
| 238 | + |
| 239 | + archiver.save_bagit_metadata() |
| 240 | + |
| 241 | + sip = ( |
| 242 | + RecordSIPModel.query |
| 243 | + .filter_by(pid_id=recid.id) |
| 244 | + .order_by(RecordSIPModel.created.desc()) |
| 245 | + .first().sip |
| 246 | + ) |
| 247 | + |
| 248 | + archive_sip.delay(str(sip.id)) |
| 249 | + |
| 250 | + return deposit |
201 | 251 |
|
202 | 252 | @mark_as_action |
203 | 253 | def upload(self, pid=None, *args, **kwargs): |
@@ -601,32 +651,31 @@ def download_repo(pid, url, filename): |
601 | 651 | task_commit(record, response.raw, filename, total) |
602 | 652 |
|
603 | 653 |
|
604 | | -def task_commit(record, response, filename, total): |
605 | | - """Commit file to the record.""" |
606 | | - record.files[filename].file.set_contents( |
607 | | - response, |
608 | | - default_location=record.files.bucket.location.uri, |
609 | | - size=total |
610 | | - ) |
611 | | - db.session.commit() |
612 | | - |
613 | | - |
614 | | -def ensure_content_length( |
615 | | - url, method='GET', |
616 | | - session=None, |
617 | | - max_size=FILES_URL_MAX_SIZE or 2**20, |
618 | | - *args, **kwargs): |
619 | | - """Add Content-Length when no present.""" |
620 | | - kwargs['stream'] = True |
621 | | - session = session or requests.Session() |
622 | | - r = session.request(method, url, *args, **kwargs) |
623 | | - if 'Content-Length' not in r.headers: |
624 | | - # stream content into a temporary file so we can get the real size |
625 | | - spool = tempfile.SpooledTemporaryFile(max_size) |
626 | | - shutil.copyfileobj(r.raw, spool) |
627 | | - r.headers['Content-Length'] = str(spool.tell()) |
628 | | - spool.seek(0) |
629 | | - # replace the original socket with our temporary file |
630 | | - r.raw._fp.close() |
631 | | - r.raw._fp = spool |
632 | | - return r |
| 654 | +@shared_task(ignore_result=True, max_retries=6, |
| 655 | + default_retry_delay=4 * 60 * 60) |
| 656 | +def archive_sip(sip_uuid): |
| 657 | + """Send the SIP for archiving. |
| 658 | +
|
| 659 | + Retries every 4 hours, six times, which should work for up to 24 hours |
| 660 | + archiving system downtime. |
| 661 | + :param sip_uuid: UUID of the SIP for archiving. |
| 662 | + :type sip_uuid: str |
| 663 | + """ |
| 664 | + try: |
| 665 | + sip = SIPApi(SIPModel.query.get(sip_uuid)) |
| 666 | + archiver = BagItArchiver(sip) |
| 667 | + bagmeta = archiver.get_bagit_metadata(sip) |
| 668 | + if bagmeta is None: |
| 669 | + raise ArchivingError( |
| 670 | + 'Bagit metadata does not exist for SIP: {0}.'.format(sip.id)) |
| 671 | + if sip.archived: |
| 672 | + raise ArchivingError( |
| 673 | + 'SIP was already archived {0}.'.format(sip.id)) |
| 674 | + archiver.write_all_files() |
| 675 | + sip.archived = True |
| 676 | + db.session.commit() |
| 677 | + except Exception as exc: |
| 678 | + # On ArchivingError (see above), do not retry, but re-raise |
| 679 | + if not isinstance(exc, ArchivingError): |
| 680 | + archive_sip.retry(exc=exc) |
| 681 | + raise |
0 commit comments