diff --git a/api/api/settings/local-dist.py b/api/api/settings/local-dist.py index 047df8bf..934bd1bf 100644 --- a/api/api/settings/local-dist.py +++ b/api/api/settings/local-dist.py @@ -15,4 +15,11 @@ } } -STATIC_URL = '{}/static/'.format(DOMAIN) +STATIC_URL = '/static/' + + +CORS_ORIGIN_WHITELIST = ( + 'localhost:5000', + 'osf.io', + 'staging.osf.io' +) diff --git a/api/webview/migrations/0001_initial.py b/api/webview/migrations/0001_initial.py new file mode 100644 index 00000000..89a4c183 --- /dev/null +++ b/api/webview/migrations/0001_initial.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import models, migrations +import django_pgjson.fields + + +class Migration(migrations.Migration): + + dependencies = [ + ] + + operations = [ + migrations.CreateModel( + name='Document', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('source', models.CharField(max_length=255)), + ('docID', models.TextField()), + ('providerUpdatedDateTime', models.DateTimeField(null=True)), + ('raw', django_pgjson.fields.JsonField()), + ('normalized', django_pgjson.fields.JsonField(null=True)), + ], + ), + migrations.CreateModel( + name='HarvesterResponse', + fields=[ + ('key', models.TextField(serialize=False, primary_key=True)), + ('method', models.CharField(max_length=8)), + ('url', models.TextField()), + ('ok', models.NullBooleanField()), + ('content', models.BinaryField(null=True)), + ('encoding', models.TextField(null=True)), + ('headers_str', models.TextField(null=True)), + ('status_code', models.IntegerField(null=True)), + ('time_made', models.DateTimeField(auto_now=True)), + ], + ), + migrations.CreateModel( + name='Person', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('raw_name', models.CharField(max_length=255)), + ('name', models.CharField(max_length=255)), + ('family_name', models.CharField(max_length=255, null=True)), + ('given_name', models.CharField(max_length=255, null=True)), + ('additional_name', models.CharField(max_length=255, null=True)), + ('institution', models.CharField(max_length=255, null=True)), + ('id_osf', models.CharField(max_length=10, null=True)), + ('id_orcid', models.CharField(max_length=100, null=True)), + ('id_email', models.CharField(max_length=255, null=True)), + ('raw_orcid', django_pgjson.fields.JsonField(null=True)), + ], + ), + migrations.CreateModel( + name='URL', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('url', models.CharField(max_length=500)), + ('status', django_pgjson.fields.JsonField(null=True)), + ('response', models.ForeignKey(related_name='response', to='webview.HarvesterResponse', null=True)), + ], + ), + migrations.AddField( + model_name='document', + name='contributors', + field=models.ManyToManyField(related_name='documents', to='webview.Person'), + ), + migrations.AddField( + model_name='document', + name='urls', + field=models.ManyToManyField(related_name='urls', to='webview.URL'), + ), + ] diff --git a/api/webview/migrations/0002_espersondocument.py b/api/webview/migrations/0002_espersondocument.py new file mode 100644 index 00000000..c9a76e06 --- /dev/null +++ b/api/webview/migrations/0002_espersondocument.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import models, migrations +import django_pgjson.fields + + +class Migration(migrations.Migration): + + dependencies = [ + ('webview', '0001_initial'), + ] + + operations = [ + migrations.CreateModel( + name='ESPersonDocument', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('raw_name', models.CharField(max_length=255)), + ('name', models.CharField(max_length=255)), + ('family_name', models.CharField(max_length=255, null=True)), + ('given_name', models.CharField(max_length=255, null=True)), + ('additional_name', models.CharField(max_length=255, null=True)), + ('institution', models.CharField(max_length=255, null=True)), + ('id_osf', models.CharField(max_length=10, null=True)), + ('id_orcid', models.CharField(max_length=100, null=True)), + ('id_email', models.CharField(max_length=255, null=True)), + ('raw_orcid', django_pgjson.fields.JsonField(null=True)), + ('source', models.CharField(max_length=255)), + ('docID', models.TextField()), + ], + ), + ] diff --git a/api/webview/migrations/__init__.py b/api/webview/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/webview/models.py b/api/webview/models.py index e0a9dc1e..d75b0f3e 100644 --- a/api/webview/models.py +++ b/api/webview/models.py @@ -2,14 +2,17 @@ from django_pgjson.fields import JsonField -class Document(models.Model): - source = models.CharField(max_length=255) - docID = models.TextField() - - providerUpdatedDateTime = models.DateTimeField(null=True) - - raw = JsonField() - normalized = JsonField(null=True) +class Person(models.Model): + raw_name = models.CharField(max_length=255) # source name we got + name = models.CharField(max_length=255) # reconstructed - given+add+fam + family_name = models.CharField(max_length=255, null=True) + given_name = models.CharField(max_length=255, null=True) + additional_name = models.CharField(max_length=255, null=True) + institution = models.CharField(max_length=255, null=True) + id_osf = models.CharField(max_length=10, null=True) + id_orcid = models.CharField(max_length=100, null=True) + id_email = models.CharField(max_length=255, null=True) + raw_orcid = JsonField(null=True) class HarvesterResponse(models.Model): @@ -26,3 +29,40 @@ class HarvesterResponse(models.Model): headers_str = models.TextField(null=True) status_code = models.IntegerField(null=True) time_made = models.DateTimeField(auto_now=True) + + +class URL(models.Model): + url = models.CharField(max_length=500) + status = JsonField(null=True) + response = models.ForeignKey(HarvesterResponse, related_name='response', null=True) + + +class Document(models.Model): + source = models.CharField(max_length=255) + docID = models.TextField() + + providerUpdatedDateTime = models.DateTimeField(null=True) + + raw = JsonField() + normalized = JsonField(null=True) + + contributors = models.ManyToManyField(Person, related_name='documents') + urls = models.ManyToManyField(URL, related_name='urls') + + +class ESPersonDocument(models.Model): + # Person + raw_name = models.CharField(max_length=255) # source name we got + name = models.CharField(max_length=255) # reconstructed - given+add+fam + family_name = models.CharField(max_length=255, null=True) + given_name = models.CharField(max_length=255, null=True) + additional_name = models.CharField(max_length=255, null=True) + institution = models.CharField(max_length=255, null=True) + id_osf = models.CharField(max_length=10, null=True) + id_orcid = models.CharField(max_length=100, null=True) + id_email = models.CharField(max_length=255, null=True) + raw_orcid = JsonField(null=True) + + # Which document they come from + source = models.CharField(max_length=255) + docID = models.TextField() diff --git a/scrapi/events.py b/scrapi/events.py index f6fc8eca..7ab83dab 100644 --- a/scrapi/events.py +++ b/scrapi/events.py @@ -22,6 +22,8 @@ HARVESTER_RUN = 'runHarvester' CHECK_ARCHIVE = 'checkArchive' NORMALIZATION = 'normalization' +PROCESSSING_URIS = 'processingUris' +PROCESSSING_CONTRIBUTORS = 'processingContributors' # statuses FAILED = 'failed' diff --git a/scrapi/processing/__init__.py b/scrapi/processing/__init__.py index 7e9f4046..0dade9ca 100644 --- a/scrapi/processing/__init__.py +++ b/scrapi/processing/__init__.py @@ -41,6 +41,18 @@ def process_raw(raw_doc, kwargs): get_processor(p).process_raw(raw_doc, **extras) +def process_uris(source, docID, uri, uritype, kwargs): + for p in settings.POST_PROCESSING: + extras = kwargs.get(p, {}) + get_processor(p).process_uris(source, docID, uri, uritype, **extras) + + +def process_contributors(source, docID, contributor_dict, kwargs): + for p in settings.POST_PROCESSING: + extras = kwargs.get(p, {}) + get_processor(p).process_contributors(source, docID, contributor_dict, **extras) + + HarvesterResponse = get_processor(settings.RESPONSE_PROCESSOR).HarvesterResponseModel all_processors = map(get_processor, list(set( diff --git a/scrapi/processing/base.py b/scrapi/processing/base.py index 1ab45a1a..505b0928 100644 --- a/scrapi/processing/base.py +++ b/scrapi/processing/base.py @@ -14,6 +14,12 @@ def process_raw(self, raw_doc, **kwargs): def process_normalized(self, raw_doc, normalized, **kwargs): pass # pragma: no cover + def process_uris(self, source, docID, uri, uritype, **kwargs): + pass # pragma: no cover + + def process_contributors(self, source, docID, contributor_dict, **kwargs): + pass # pragma: no cover + @abstractmethod def documents(self, *sources): ''' diff --git a/scrapi/processing/helpers.py b/scrapi/processing/helpers.py new file mode 100644 index 00000000..3261d564 --- /dev/null +++ b/scrapi/processing/helpers.py @@ -0,0 +1,28 @@ +from __future__ import absolute_import + +import datetime +import requests + + +def save_status_of_uri(uri, uritype): + uri_status = requests.get(uri) + + return { + 'actual_uri': uri, + 'uritype': uritype, + 'resolved_uri': uri_status.url, + 'resolved_datetime': datetime.datetime.now(), + 'resolved_status': uri_status.status_code, + } + + +def query_orcid_api(id_email): + orcid_email_query = 'http://pub.orcid.org/v1.2/search/orcid-bio?q=email:{}'.format(id_email) + + orcid_results = requests.get(orcid_email_query, headers={'Accept': 'application/orcid+json'}).json() + number_results = orcid_results['orcid-search-results']['num-found'] + + if number_results > 0: + return number_results + else: + return None diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index 50b438ad..404c99fa 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, unicode_literals import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") @@ -8,12 +8,13 @@ import django -from api.webview.models import HarvesterResponse, Document +from api.webview.models import HarvesterResponse, Document, URL, Person from scrapi import events from scrapi.util import json_without_bytes from scrapi.linter import RawDocument, NormalizedDocument from scrapi.processing import DocumentTuple +from scrapi.processing.helpers import save_status_of_uri, query_orcid_api from scrapi.processing.base import BaseProcessor, BaseHarvesterResponse, BaseDatabaseManager django.setup() @@ -109,10 +110,78 @@ def process_normalized(self, raw_doc, normalized): def _get_by_source_id(self, model, source, docID): try: - return Document.objects.filter(source=source, docID=docID)[0] - except IndexError: + return model.objects.get(source=source, docID=docID) + except model.DoesNotExist: return None + def process_uris(self, source, docID, uri, uritype, **kwargs): + document = Document.objects.get(source=source, docID=docID) + status = save_status_of_uri(uri, uritype) + url = URL(url=uri, status=status) + url.save() + document.urls.add(url) + document.save() + + def get_person(self, model, name, reconstructed_name, id_osf, id_email, id_orcid): + try: + return model.objects.get( + name=name, + reconstructed_name=reconstructed_name, + id_osf=id_osf, + id_email=id_email, + id_orcid=id_orcid + ) + except model.DoesNotExist: + return None + + def process_contributors(self, source, docID, contributor_dict): + document = Document.objects.get(source=source, docID=docID) + + id_osf = None + id_orcid = None + id_email = None + if contributor_dict.get('sameAs'): + for identifier in contributor_dict['sameAs']: + if 'osf.io' in identifier: + id_osf = identifier + if 'orcid' in identifier: + id_orcid = identifier + + if contributor_dict.get('email'): + id_email = contributor_dict['email'] + + reconstructed_name = contributor_dict['givenName'] + if contributor_dict.get('additionalName'): + reconstructed_name = '{} {}'.format(reconstructed_name, contributor_dict['additionalName']) + reconstructed_name = '{} {}'.format(reconstructed_name, contributor_dict['familyName']) + + if not id_orcid: + id_orcid = query_orcid_api(id_email) + + person = self.get_person( + Person, + name=reconstructed_name, + raw_name=contributor_dict['name'], + family_name=contributor_dict.get('familyName'), + given_name=contributor_dict.get('givenName'), + additional_name=contributor_dict.get('additionalName'), + id_osf=id_osf, + id_email=id_email, + id_orcid=id_orcid) or Person( + name=reconstructed_name, + raw_name=contributor_dict['name'], + family_name=contributor_dict.get('familyName'), + given_name=contributor_dict.get('givenName'), + additional_name=contributor_dict.get('additionalName'), + id_osf=id_osf, + id_email=id_email, + id_orcid=id_orcid + ) + person.save() + + document.contributors.add(person) + document.save() + class HarvesterResponseModel(BaseHarvesterResponse): diff --git a/scrapi/settings/local-dist.py b/scrapi/settings/local-dist.py index 1f29b226..d6ead1e3 100644 --- a/scrapi/settings/local-dist.py +++ b/scrapi/settings/local-dist.py @@ -14,6 +14,7 @@ RAW_PROCESSING = [] RESPONSE_PROCESSOR = None CANONICAL_PROCESSOR = None +POST_PROCESSING = [] SENTRY_DSN = None diff --git a/scrapi/tasks.py b/scrapi/tasks.py index 4ddfaea5..81625921 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -123,6 +123,83 @@ def process_normalized(normalized_doc, raw_doc, **kwargs): processing.process_normalized(raw_doc, normalized_doc, kwargs) +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) +@events.logged(events.PROCESSSING_URIS, 'uri_processing') +def process_contributors(async, **kwargs): + settings.CELERY_ALWAYS_EAGER = not async + + all_buckets = [] + if kwargs.get('source'): + source_buckets = util.gather_contributors(kwargs['source'], kwargs.get('production_es')) + all_buckets.append(source_buckets) + else: + for source in registry.keys(): + source_buckets = util.gather_contributors(source, kwargs.get('production_es')) + all_buckets.append(source_buckets) + + for source in all_buckets: + process_contributors_from_one_source.delay(source['contributors'], async, kwargs=kwargs) + + +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) +def process_contributors_from_one_source(contributors, async=False, **kwargs): + settings.CELERY_ALWAYS_EAGER = not async + + for person in contributors: + process_one_person.delay(person) + + +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0, rate_limit='5/s') +@events.logged(events.PROCESSSING_CONTRIBUTORS, 'contributor_processing') +def process_one_person(person, **kwargs): + processing.process_contributors( + source=person['source'], + docID=person['docID'], + contributor_dict=person['contributor'], + kwargs=kwargs + ) + + +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) +@events.logged(events.PROCESSSING_URIS, 'uri_processing') +def process_uris(async, **kwargs): + settings.CELERY_ALWAYS_EAGER = not async + + all_buckets = [] + if kwargs.get('source'): + source_buckets = util.parse_urls_into_groups(kwargs['source']) + all_buckets.append(source_buckets) + else: + for source in registry.keys(): + source_buckets = util.parse_urls_into_groups(source) + all_buckets.append(source_buckets) + + for source_dict in all_buckets: + for group in source_dict['uris']: + process_uris_at_one_base_uri.delay(group['individual_uris'], async, kwargs=kwargs) + + +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) +@events.logged(events.PROCESSSING_URIS, 'uri_processing') +def process_uris_at_one_base_uri(uri_list, async=False, **kwargs): + settings.CELERY_ALWAYS_EAGER = not async + + for uri in uri_list: + process_one_uri.delay(uri, kwargs=kwargs) + + +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0, rate_limit='5/s') +@events.logged(events.PROCESSSING_URIS, 'uri_processing') +def process_one_uri(uri, **kwargs): + processing.process_uris( + source=uri['source'], + docID=uri['docID'], + uri=uri['uri'], + uritype=uri['uritype'], + kwargs=kwargs + ) + + @app.task def migrate(migration, source_db=None, sources=tuple(), async=False, dry=True, group_size=1000, **kwargs): diff --git a/scrapi/util.py b/scrapi/util.py index cc74528f..09f0db8c 100644 --- a/scrapi/util.py +++ b/scrapi/util.py @@ -1,12 +1,20 @@ from datetime import datetime +import os +import re import six import pytz import time import logging +import sharefg +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") +from api.webview.models import Document logger = logging.getLogger() +URL_RE = re.compile(r'(https?:\/\/[^\/]*)') + + xrange = six.moves.xrange @@ -59,6 +67,83 @@ def json_without_bytes(jobj): return jobj +def gather_contributors(source, production_es): + source_contributors = [] + source_dict = {'source': source} + + if production_es: + pass + else: + for document in Document.objects.filter(source=source): + if document.normalized: + docID = document.normalized['shareProperties']['docID'] + + for person in document.normalized['contributors']: + source_contributors.append({'source': source, 'docID': docID, 'contributor': person}) + + source_dict['contributors'] = source_contributors + return source_dict + + +def parse_urls_into_groups(source): + source_dict = {'source': source, 'uris': [], 'all_bases': []} + for document in Document.objects.filter(source=source): + if document.normalized: + docID = document.normalized['shareProperties']['docID'] + + source_dict = uri_processing( + document.normalized['uris']['canonicalUri'], + source, + docID, + source_dict, + 'cannonicalUri' + ) + + if document.normalized['uris'].get('providerUris'): + for uri in document.normalized['uris']['providerUris']: + source_dict = uri_processing(uri, source, docID, source_dict, 'providerUris') + if document.normalized['uris'].get('descriptorUris'): + for uri in document.normalized['uris']['descriptorUris']: + source_dict = uri_processing(uri, source, docID, source_dict, 'descriptorUris') + if document.normalized['uris'].get('objectUris'): + for uri in document.normalized['uris']['objectUris']: + if uri: + if isinstance(uri, list): + for element in uri: + source_dict = uri_processing(element, source, docID, source_dict, 'objectUris') + else: + source_dict = uri_processing(uri, source, docID, source_dict, 'objectUris') + + return source_dict + + +def uri_processing(uri, source, docID, source_dict, uritype): + base_uri = URL_RE.search(uri).group() + + if base_uri in source_dict['all_bases']: + for entry in source_dict['uris']: + if base_uri == entry['base_uri']: + entry['individual_uris'].append({ + 'uri': uri, + 'source': source, + 'docID': docID, + 'uritype': uritype + }) + else: + source_dict['uris'].append({ + 'base_uri': base_uri, + 'individual_uris': [{ + 'uri': uri, + 'source': source, + 'docID': docID, + 'uritype': uritype + }] + }) + source_dict['all_bases'].append(base_uri) + + return source_dict + + def try_n_times(n, action, *args, **kwargs): for _ in xrange(n): try: diff --git a/tasks.py b/tasks.py index f6fbae4d..7bf5e8d2 100644 --- a/tasks.py +++ b/tasks.py @@ -197,6 +197,22 @@ def harvesters(async=False, start=None, end=None): logger.exception(e) +@task +def process_uris(async=False, source=None): + settings.CELERY_ALWAYS_EAGER = not async + from scrapi.tasks import process_uris + + process_uris.delay(async=async, source=source) + + +@task +def process_contributors(async=False, source=None, production_es=False): + settings.CELERY_ALWAYS_EAGER = not async + from scrapi.tasks import process_contributors + + process_contributors.delay(async=async, source=source, production_es=production_es) + + @task def lint_all(): for name in registry.keys():