From c3854bbb0dc39917c8f0e7d95dfbc3d9359d9011 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 28 Jul 2015 17:54:40 -0400 Subject: [PATCH 01/71] Start adding model for url logs --- api/webview/models.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/webview/models.py b/api/webview/models.py index 65d58765..34bb826d 100644 --- a/api/webview/models.py +++ b/api/webview/models.py @@ -11,3 +11,5 @@ class Document(models.Model): raw = JsonField() normalized = JsonField() + + url_logs = JsonField() From 050e4ecd745eee6b2900b408b1cd961cb7ecb467 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 28 Jul 2015 18:45:32 -0400 Subject: [PATCH 02/71] Remove the url log column - just adding to normalized --- api/webview/models.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/webview/models.py b/api/webview/models.py index 34bb826d..65d58765 100644 --- a/api/webview/models.py +++ b/api/webview/models.py @@ -11,5 +11,3 @@ class Document(models.Model): raw = JsonField() normalized = JsonField() - - url_logs = JsonField() From 16c05c57aece30938ffe3e6ffa4e07edd4123bf4 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 29 Jul 2015 10:49:16 -0400 Subject: [PATCH 03/71] Add url processor to postgres processor --- scrapi/processing/postgres.py | 64 ++++++++++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index 3686e9c3..b936f03e 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -1,7 +1,9 @@ from __future__ import absolute_import import os -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") +import json +import datetime +import requests import copy import logging @@ -9,6 +11,8 @@ from scrapi import events from scrapi.processing.base import BaseProcessor +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") +from django.core.exceptions import ObjectDoesNotExist from api.webview.models import Document logger = logging.getLogger(__name__) @@ -45,3 +49,61 @@ def _get_by_source_id(self, model, source, docID): return Document.objects.filter(source=source, docID=docID)[0] except IndexError: return None + + +class UriProcessor(BaseProcessor): + NAME = 'postgres_uri' + + def process_raw(self, raw_doc): + pass + + def process_normalized(self, raw_doc, normalized): + try: + document = Document.objects.get(source=raw_doc['source'], docID=raw_doc['docID']) + normalized_document = json.loads(document.normalized.attributes) + + processed_normalized = self.save_status_of_canonical_uri(normalized_document) + processed_normalized = self.save_status_of_object_uris(processed_normalized) + + document.normalized = processed_normalized + + document.save() + except ObjectDoesNotExist: + pass + + def save_status_of_canonical_uri(self, normalized): + cannonical_uri_status = requests.get(normalized['uris']['canonicalUri']) + + cannonical_status = { + 'resolved_uri': cannonical_uri_status.url, + 'resolved_datetime': datetime.datetime.now(), + 'resolved_status': cannonical_uri_status.status_code, + 'is_doi': True if 'dx.doi.org' in normalized['uris']['canonicalUri'] else False + } + + try: + normalized['shareProperties']['uri_logs']['cannonical_status'].append(cannonical_status) + except KeyError: + normalized['shareProperties']['uri_logs']['cannonical_status'] = [cannonical_status] + + return normalized + + def save_status_of_object_uris(self, normalized): + all_object_uris = normalized['uris']['object_uris'] + + for uri in all_object_uris: + current_list = [] + uri_resolved = requests.get(uri) + + uri_status = { + 'resolved_uri': uri_resolved.url, + 'resolved_datetime': datetime.datetime.now(), + 'resolved_status': uri_resolved.status_code, + 'is_doi': True if 'dx.doi.org' in uri else False + } + current_list.append(uri_status) + + try: + normalized['shareProperties']['uri_logs']['object_status'].append(current_list) + except KeyError: + normalized['shareProperties']['uri_logs']['object_status'] = [current_list] From 0921cc0c1de61976b78eb7b1870266a743eaadfb Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 29 Jul 2015 11:34:07 -0400 Subject: [PATCH 04/71] A few key error fixes --- scrapi/processing/postgres.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index b936f03e..1d261660 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -1,7 +1,6 @@ from __future__ import absolute_import import os -import json import datetime import requests @@ -60,9 +59,9 @@ def process_raw(self, raw_doc): def process_normalized(self, raw_doc, normalized): try: document = Document.objects.get(source=raw_doc['source'], docID=raw_doc['docID']) - normalized_document = json.loads(document.normalized.attributes) + # normalized_document = json.loads(document.normalized) - processed_normalized = self.save_status_of_canonical_uri(normalized_document) + processed_normalized = self.save_status_of_canonical_uri(document.normalized) processed_normalized = self.save_status_of_object_uris(processed_normalized) document.normalized = processed_normalized @@ -84,12 +83,17 @@ def save_status_of_canonical_uri(self, normalized): try: normalized['shareProperties']['uri_logs']['cannonical_status'].append(cannonical_status) except KeyError: + normalized['shareProperties']['uri_logs'] = {} normalized['shareProperties']['uri_logs']['cannonical_status'] = [cannonical_status] return normalized def save_status_of_object_uris(self, normalized): - all_object_uris = normalized['uris']['object_uris'] + try: + all_object_uris = normalized['uris']['object_uris'] + except KeyError: + all_object_uris = [] + current_list = [] for uri in all_object_uris: current_list = [] @@ -106,4 +110,5 @@ def save_status_of_object_uris(self, normalized): try: normalized['shareProperties']['uri_logs']['object_status'].append(current_list) except KeyError: + normalized['shareProperties']['uri_logs'] = {} normalized['shareProperties']['uri_logs']['object_status'] = [current_list] From 63243b7dce0f6e21075594c6b225bb969cca635b Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 29 Jul 2015 11:56:22 -0400 Subject: [PATCH 05/71] Add invoke task to reset postgres and elasticsearch --- tasks.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tasks.py b/tasks.py index 09f22b68..418a536c 100644 --- a/tasks.py +++ b/tasks.py @@ -1,3 +1,4 @@ +import os import base64 import logging import platform @@ -220,3 +221,12 @@ def provider_map(delete=False): refresh=True ) print(es.count('share_providers', body={'query': {'match_all': {}}})['count']) + + +@task +def reset_all(): + os.system('psql -c "DROP DATABASE scrapi;"') + os.system('psql -c "CREATE DATABASE scrapi;"') + os.system('python manage.py migrate') + + os.system("curl -XDELETE '{}/share*'".format(settings.ELASTIC_URI)) From 66c10595bb7049d07c17a6519eee2ae1aa9d713c Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 29 Jul 2015 13:04:56 -0400 Subject: [PATCH 06/71] return things properly duh --- scrapi/processing/postgres.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index 1d261660..6d1caf67 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -53,13 +53,9 @@ def _get_by_source_id(self, model, source, docID): class UriProcessor(BaseProcessor): NAME = 'postgres_uri' - def process_raw(self, raw_doc): - pass - def process_normalized(self, raw_doc, normalized): try: document = Document.objects.get(source=raw_doc['source'], docID=raw_doc['docID']) - # normalized_document = json.loads(document.normalized) processed_normalized = self.save_status_of_canonical_uri(document.normalized) processed_normalized = self.save_status_of_object_uris(processed_normalized) @@ -92,8 +88,7 @@ def save_status_of_object_uris(self, normalized): try: all_object_uris = normalized['uris']['object_uris'] except KeyError: - all_object_uris = [] - current_list = [] + return normalized for uri in all_object_uris: current_list = [] @@ -110,5 +105,6 @@ def save_status_of_object_uris(self, normalized): try: normalized['shareProperties']['uri_logs']['object_status'].append(current_list) except KeyError: - normalized['shareProperties']['uri_logs'] = {} normalized['shareProperties']['uri_logs']['object_status'] = [current_list] + + return normalized From 3f88feeba1d46a3f32815af638e9c82244da5730 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 29 Jul 2015 14:31:31 -0400 Subject: [PATCH 07/71] Add inv alias to reset task --- tasks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tasks.py b/tasks.py index 418a536c..3fccd3e5 100644 --- a/tasks.py +++ b/tasks.py @@ -230,3 +230,5 @@ def reset_all(): os.system('python manage.py migrate') os.system("curl -XDELETE '{}/share*'".format(settings.ELASTIC_URI)) + os.system("invoke alias share share_v2") + os.system("invoke provider_map") From a16502f5ffd707ea18f0aa826b1e25a2ef43e2a0 Mon Sep 17 00:00:00 2001 From: erinspace Date: Thu, 30 Jul 2015 10:02:02 -0400 Subject: [PATCH 08/71] Add actual uri to uri log --- scrapi/processing/postgres.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index 6d1caf67..cef8ef5f 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -1,10 +1,9 @@ from __future__ import absolute_import import os +import copy import datetime import requests - -import copy import logging from scrapi import events @@ -70,6 +69,7 @@ def save_status_of_canonical_uri(self, normalized): cannonical_uri_status = requests.get(normalized['uris']['canonicalUri']) cannonical_status = { + 'actual_uri': normalized['uris']['canonicalUri'], 'resolved_uri': cannonical_uri_status.url, 'resolved_datetime': datetime.datetime.now(), 'resolved_status': cannonical_uri_status.status_code, @@ -95,6 +95,7 @@ def save_status_of_object_uris(self, normalized): uri_resolved = requests.get(uri) uri_status = { + 'actual_uri': uri, 'resolved_uri': uri_resolved.url, 'resolved_datetime': datetime.datetime.now(), 'resolved_status': uri_resolved.status_code, From 9d91fe3e4ce4b116edadf2fefc9856c98315046c Mon Sep 17 00:00:00 2001 From: erinspace Date: Fri, 31 Jul 2015 11:13:22 -0400 Subject: [PATCH 09/71] Add post_processing flag, so we can more closely control post processing incl url gathering --- scrapi/processing/__init__.py | 6 ++++++ scrapi/processing/base.py | 3 +++ scrapi/settings/local-dist.py | 1 + 3 files changed, 10 insertions(+) diff --git a/scrapi/processing/__init__.py b/scrapi/processing/__init__.py index b08de00c..4a516c99 100644 --- a/scrapi/processing/__init__.py +++ b/scrapi/processing/__init__.py @@ -34,3 +34,9 @@ def process_raw(raw_doc, kwargs): for p in settings.RAW_PROCESSING: extras = kwargs.get(p, {}) get_processor(p).process_raw(raw_doc, **extras) + + +def post_process(raw_doc, normalized, kwargs): + for p in settings.POST_PROCESSING: + extras = kwargs.get(p, {}) + get_processor(p).process_normalized(raw_doc, normalized, **extras) diff --git a/scrapi/processing/base.py b/scrapi/processing/base.py index 155736e7..c8794b93 100644 --- a/scrapi/processing/base.py +++ b/scrapi/processing/base.py @@ -6,3 +6,6 @@ def process_raw(self, raw_doc, **kwargs): def process_normalized(self, raw_doc, normalized, **kwargs): pass # pragma: no cover + + def post_process(raw_doc, normalized, kwargs): + pass diff --git a/scrapi/settings/local-dist.py b/scrapi/settings/local-dist.py index 826d3528..5d6ada4a 100644 --- a/scrapi/settings/local-dist.py +++ b/scrapi/settings/local-dist.py @@ -12,6 +12,7 @@ NORMALIZED_PROCESSING = [] RAW_PROCESSING = [] +POST_PROCESSING = [] SENTRY_DSN = None From 1d9aae6a8c20904aa21080f011f6d270c05ca720 Mon Sep 17 00:00:00 2001 From: erinspace Date: Fri, 31 Jul 2015 11:25:07 -0400 Subject: [PATCH 10/71] Update local-dist to have correct static files --- api/api/settings/local-dist.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/api/settings/local-dist.py b/api/api/settings/local-dist.py index 047df8bf..6bebde93 100644 --- a/api/api/settings/local-dist.py +++ b/api/api/settings/local-dist.py @@ -15,4 +15,4 @@ } } -STATIC_URL = '{}/static/'.format(DOMAIN) +STATIC_URL = '/static/' From 581df5e141cca0f437ed444ff99b9ec1759bfc26 Mon Sep 17 00:00:00 2001 From: erinspace Date: Mon, 3 Aug 2015 14:53:47 -0400 Subject: [PATCH 11/71] Move uri logging processor out of postgres processor --- scrapi/processing/postgres.py | 65 --------------------------- scrapi/processing/uri_logging.py | 76 ++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 65 deletions(-) create mode 100644 scrapi/processing/uri_logging.py diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index cef8ef5f..82a78009 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -2,15 +2,12 @@ import os import copy -import datetime -import requests import logging from scrapi import events from scrapi.processing.base import BaseProcessor os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") -from django.core.exceptions import ObjectDoesNotExist from api.webview.models import Document logger = logging.getLogger(__name__) @@ -47,65 +44,3 @@ def _get_by_source_id(self, model, source, docID): return Document.objects.filter(source=source, docID=docID)[0] except IndexError: return None - - -class UriProcessor(BaseProcessor): - NAME = 'postgres_uri' - - def process_normalized(self, raw_doc, normalized): - try: - document = Document.objects.get(source=raw_doc['source'], docID=raw_doc['docID']) - - processed_normalized = self.save_status_of_canonical_uri(document.normalized) - processed_normalized = self.save_status_of_object_uris(processed_normalized) - - document.normalized = processed_normalized - - document.save() - except ObjectDoesNotExist: - pass - - def save_status_of_canonical_uri(self, normalized): - cannonical_uri_status = requests.get(normalized['uris']['canonicalUri']) - - cannonical_status = { - 'actual_uri': normalized['uris']['canonicalUri'], - 'resolved_uri': cannonical_uri_status.url, - 'resolved_datetime': datetime.datetime.now(), - 'resolved_status': cannonical_uri_status.status_code, - 'is_doi': True if 'dx.doi.org' in normalized['uris']['canonicalUri'] else False - } - - try: - normalized['shareProperties']['uri_logs']['cannonical_status'].append(cannonical_status) - except KeyError: - normalized['shareProperties']['uri_logs'] = {} - normalized['shareProperties']['uri_logs']['cannonical_status'] = [cannonical_status] - - return normalized - - def save_status_of_object_uris(self, normalized): - try: - all_object_uris = normalized['uris']['object_uris'] - except KeyError: - return normalized - - for uri in all_object_uris: - current_list = [] - uri_resolved = requests.get(uri) - - uri_status = { - 'actual_uri': uri, - 'resolved_uri': uri_resolved.url, - 'resolved_datetime': datetime.datetime.now(), - 'resolved_status': uri_resolved.status_code, - 'is_doi': True if 'dx.doi.org' in uri else False - } - current_list.append(uri_status) - - try: - normalized['shareProperties']['uri_logs']['object_status'].append(current_list) - except KeyError: - normalized['shareProperties']['uri_logs']['object_status'] = [current_list] - - return normalized diff --git a/scrapi/processing/uri_logging.py b/scrapi/processing/uri_logging.py new file mode 100644 index 00000000..67e0f8f1 --- /dev/null +++ b/scrapi/processing/uri_logging.py @@ -0,0 +1,76 @@ +from __future__ import absolute_import + +import os +import datetime +import requests +import logging + +from scrapi.processing.base import BaseProcessor + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") +from django.core.exceptions import ObjectDoesNotExist +from api.webview.models import Document + +logger = logging.getLogger(__name__) + + +class UriProcessor(BaseProcessor): + NAME = 'postgres_uri' + + def process_normalized(self, raw_doc, normalized): + try: + document = Document.objects.get(source=raw_doc['source'], docID=raw_doc['docID']) + + processed_normalized = self.save_status_of_canonical_uri(document.normalized) + processed_normalized = self.save_status_of_object_uris(processed_normalized) + + document.normalized = processed_normalized + + document.save() + except ObjectDoesNotExist: + pass + + def save_status_of_canonical_uri(self, normalized): + cannonical_uri_status = requests.get(normalized['uris']['canonicalUri']) + + cannonical_status = { + 'actual_uri': normalized['uris']['canonicalUri'], + 'resolved_uri': cannonical_uri_status.url, + 'resolved_datetime': datetime.datetime.now(), + 'resolved_status': cannonical_uri_status.status_code, + 'is_doi': True if 'dx.doi.org' in normalized['uris']['canonicalUri'] else False + } + + try: + normalized['shareProperties']['uri_logs']['cannonical_status'].append(cannonical_status) + except KeyError: + normalized['shareProperties']['uri_logs'] = {} + normalized['shareProperties']['uri_logs']['cannonical_status'] = [cannonical_status] + + return normalized + + def save_status_of_object_uris(self, normalized): + try: + all_object_uris = normalized['uris']['object_uris'] + except KeyError: + return normalized + + for uri in all_object_uris: + current_list = [] + uri_resolved = requests.get(uri) + + uri_status = { + 'actual_uri': uri, + 'resolved_uri': uri_resolved.url, + 'resolved_datetime': datetime.datetime.now(), + 'resolved_status': uri_resolved.status_code, + 'is_doi': True if 'dx.doi.org' in uri else False + } + current_list.append(uri_status) + + try: + normalized['shareProperties']['uri_logs']['object_status'].append(current_list) + except KeyError: + normalized['shareProperties']['uri_logs']['object_status'] = [current_list] + + return normalized From 4bacb91987e4135fb4d1bfd9e3d7e98eec9019c6 Mon Sep 17 00:00:00 2001 From: erinspace Date: Mon, 3 Aug 2015 14:54:30 -0400 Subject: [PATCH 12/71] Change name of uri processor --- scrapi/processing/uri_logging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapi/processing/uri_logging.py b/scrapi/processing/uri_logging.py index 67e0f8f1..e17ec1d5 100644 --- a/scrapi/processing/uri_logging.py +++ b/scrapi/processing/uri_logging.py @@ -15,7 +15,7 @@ class UriProcessor(BaseProcessor): - NAME = 'postgres_uri' + NAME = 'uri_logging' def process_normalized(self, raw_doc, normalized): try: From 48d3d8975c1977fcbaf84eef5f525952b267989c Mon Sep 17 00:00:00 2001 From: erinspace Date: Mon, 3 Aug 2015 15:25:42 -0400 Subject: [PATCH 13/71] Add basic processing tasks that do nothing right now --- scrapi/tasks.py | 6 ++++++ tasks.py | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index c196d84e..200bf87c 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -101,6 +101,12 @@ def process_raw(raw_doc, **kwargs): processing.process_raw(raw_doc, kwargs) +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=settings.CELERY_MAX_RETRIES) +@events.logged(events.PROCESSSING_URIS, 'post_processing') +def process_uris(): + processing.process_uris() + + @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=settings.CELERY_MAX_RETRIES, throws=events.Skip) @events.logged(events.NORMALIZATION) def normalize(raw_doc, harvester_name): diff --git a/tasks.py b/tasks.py index eb16633c..3807846f 100644 --- a/tasks.py +++ b/tasks.py @@ -228,6 +228,14 @@ def provider_map(delete=False): print(es.count('share_providers', body={'query': {'match_all': {}}})['count']) +@task +def process_uris(async=False): + settings.CELERY_ALWAYS_EAGER = not async + from scrapi.tasks import process_uris + + process_uris.delay() + + @task def reset_all(): os.system('psql -c "DROP DATABASE scrapi;"') From 0c88127febb567ae7f7013ae352d97bf77d7425b Mon Sep 17 00:00:00 2001 From: erinspace Date: Mon, 3 Aug 2015 15:26:17 -0400 Subject: [PATCH 14/71] update name --- scrapi/processing/uri_logging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapi/processing/uri_logging.py b/scrapi/processing/uri_logging.py index e17ec1d5..e4ea4215 100644 --- a/scrapi/processing/uri_logging.py +++ b/scrapi/processing/uri_logging.py @@ -17,7 +17,7 @@ class UriProcessor(BaseProcessor): NAME = 'uri_logging' - def process_normalized(self, raw_doc, normalized): + def process_uris(self, raw_doc, normalized): try: document = Document.objects.get(source=raw_doc['source'], docID=raw_doc['docID']) From 4350322b59bf7a5f251db766859c44c0d7f8b750 Mon Sep 17 00:00:00 2001 From: erinspace Date: Mon, 3 Aug 2015 15:26:29 -0400 Subject: [PATCH 15/71] Add post log event --- scrapi/events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scrapi/events.py b/scrapi/events.py index f6fc8eca..db943b83 100644 --- a/scrapi/events.py +++ b/scrapi/events.py @@ -22,6 +22,7 @@ HARVESTER_RUN = 'runHarvester' CHECK_ARCHIVE = 'checkArchive' NORMALIZATION = 'normalization' +PROCESSSING_URIS = 'processingUris' # statuses FAILED = 'failed' From 6436e7723f86a78c7c26177caaa1cba76f7b632b Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 4 Aug 2015 11:41:35 -0400 Subject: [PATCH 16/71] Add inv task for process uris --- tasks.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tasks.py b/tasks.py index 3807846f..97ccc266 100644 --- a/tasks.py +++ b/tasks.py @@ -187,6 +187,14 @@ def harvesters(async=False, start=None, end=None): logger.exception(e) +@task +def process_uris(async=False, start=None, end=None): + settings.CELERY_ALWAYS_EAGER = not async + from scrapi.tasks import process_uris + + process_uris.delay(async=async, start=start, end=None) + + @task def lint_all(): for name in registry.keys(): @@ -228,14 +236,6 @@ def provider_map(delete=False): print(es.count('share_providers', body={'query': {'match_all': {}}})['count']) -@task -def process_uris(async=False): - settings.CELERY_ALWAYS_EAGER = not async - from scrapi.tasks import process_uris - - process_uris.delay() - - @task def reset_all(): os.system('psql -c "DROP DATABASE scrapi;"') From 558b8536a07857e19e95b76e3f42bdd1928b96f5 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 4 Aug 2015 11:42:00 -0400 Subject: [PATCH 17/71] Add celery task for process uri that does all documents --- scrapi/tasks.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index 200bf87c..2b7a3fb3 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -1,3 +1,4 @@ +import os import logging import functools from itertools import islice @@ -101,12 +102,6 @@ def process_raw(raw_doc, **kwargs): processing.process_raw(raw_doc, kwargs) -@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=settings.CELERY_MAX_RETRIES) -@events.logged(events.PROCESSSING_URIS, 'post_processing') -def process_uris(): - processing.process_uris() - - @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=settings.CELERY_MAX_RETRIES, throws=events.Skip) @events.logged(events.NORMALIZATION) def normalize(raw_doc, harvester_name): @@ -131,6 +126,16 @@ def process_normalized(normalized_doc, raw_doc, **kwargs): processing.process_normalized(raw_doc, normalized_doc, kwargs) +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=settings.CELERY_MAX_RETRIES) +@events.logged(events.PROCESSSING_URIS, 'post_processing') +def process_uris(**kwargs): + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") + from api.webview.models import Document + + for document in Document.objects.all(): + processing.process_uris(document, kwargs) + + @app.task def migrate(migration, sources=tuple(), async=False, dry=True, group_size=1000, **kwargs): from scrapi.migrations import documents From 51be35f38fac4a3beee6774305a4d7a96e9daf5e Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 4 Aug 2015 11:42:42 -0400 Subject: [PATCH 18/71] Add try except for document normalized instead of the doc not existing --- scrapi/processing/uri_logging.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/scrapi/processing/uri_logging.py b/scrapi/processing/uri_logging.py index e4ea4215..b1ae7bd9 100644 --- a/scrapi/processing/uri_logging.py +++ b/scrapi/processing/uri_logging.py @@ -8,8 +8,8 @@ from scrapi.processing.base import BaseProcessor os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") -from django.core.exceptions import ObjectDoesNotExist -from api.webview.models import Document +# from django.core.exceptions import ObjectDoesNotExist +# from api.webview.models import Document logger = logging.getLogger(__name__) @@ -17,17 +17,15 @@ class UriProcessor(BaseProcessor): NAME = 'uri_logging' - def process_uris(self, raw_doc, normalized): + def process_uris(self, document): try: - document = Document.objects.get(source=raw_doc['source'], docID=raw_doc['docID']) - processed_normalized = self.save_status_of_canonical_uri(document.normalized) processed_normalized = self.save_status_of_object_uris(processed_normalized) document.normalized = processed_normalized document.save() - except ObjectDoesNotExist: + except TypeError: pass def save_status_of_canonical_uri(self, normalized): From f264fb4ec177bde2e2c057742c775abeead5265a Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 4 Aug 2015 11:45:29 -0400 Subject: [PATCH 19/71] Remove unused os and django settings --- scrapi/processing/uri_logging.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/scrapi/processing/uri_logging.py b/scrapi/processing/uri_logging.py index b1ae7bd9..9e51af55 100644 --- a/scrapi/processing/uri_logging.py +++ b/scrapi/processing/uri_logging.py @@ -1,15 +1,11 @@ from __future__ import absolute_import -import os import datetime import requests import logging from scrapi.processing.base import BaseProcessor -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") -# from django.core.exceptions import ObjectDoesNotExist -# from api.webview.models import Document logger = logging.getLogger(__name__) From 5f50306932edb4615c51cd8ef30a5e55441d04b0 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 4 Aug 2015 11:46:05 -0400 Subject: [PATCH 20/71] change args and kwargs for processor method --- scrapi/processing/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scrapi/processing/__init__.py b/scrapi/processing/__init__.py index 4a516c99..bd4e4ee5 100644 --- a/scrapi/processing/__init__.py +++ b/scrapi/processing/__init__.py @@ -36,7 +36,7 @@ def process_raw(raw_doc, kwargs): get_processor(p).process_raw(raw_doc, **extras) -def post_process(raw_doc, normalized, kwargs): +def process_uris(document, kwargs): for p in settings.POST_PROCESSING: extras = kwargs.get(p, {}) - get_processor(p).process_normalized(raw_doc, normalized, **extras) + get_processor(p).process_uris(document, **extras) From 9db7b50649858aa63e0cbd4413892c6417914930 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 4 Aug 2015 11:46:25 -0400 Subject: [PATCH 21/71] Add cross origin settings to local dist --- api/api/settings/local-dist.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/api/api/settings/local-dist.py b/api/api/settings/local-dist.py index 6bebde93..934bd1bf 100644 --- a/api/api/settings/local-dist.py +++ b/api/api/settings/local-dist.py @@ -16,3 +16,10 @@ } STATIC_URL = '/static/' + + +CORS_ORIGIN_WHITELIST = ( + 'localhost:5000', + 'osf.io', + 'staging.osf.io' +) From 8542dc07cd3945eb8c90a5c9a521283e47a9b0fc Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 4 Aug 2015 13:38:32 -0400 Subject: [PATCH 22/71] Now specifiy source to just resolve urls in that source --- scrapi/tasks.py | 9 ++++++--- tasks.py | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index 2b7a3fb3..4724993a 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -131,9 +131,12 @@ def process_normalized(normalized_doc, raw_doc, **kwargs): def process_uris(**kwargs): os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") from api.webview.models import Document - - for document in Document.objects.all(): - processing.process_uris(document, kwargs) + if kwargs.get('source'): + for document in Document.objects.filter(source=kwargs['source']): + processing.process_uris(document, kwargs) + else: + for document in Document.objects.all(): + processing.process_uris(document, kwargs) @app.task diff --git a/tasks.py b/tasks.py index 97ccc266..182d1ab4 100644 --- a/tasks.py +++ b/tasks.py @@ -188,11 +188,11 @@ def harvesters(async=False, start=None, end=None): @task -def process_uris(async=False, start=None, end=None): +def process_uris(async=False, start=None, end=None, source=None): settings.CELERY_ALWAYS_EAGER = not async from scrapi.tasks import process_uris - process_uris.delay(async=async, start=start, end=None) + process_uris.delay(async=async, start=start, end=end, source=source) @task From ab0239661cc469d44dd5475181f1f8ecb42524d1 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 5 Aug 2015 09:37:05 -0400 Subject: [PATCH 23/71] Massive if statement to organize source uris into common buckets --- scrapi/tasks.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index 4724993a..d0579333 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -1,4 +1,5 @@ import os +import re import logging import functools from itertools import islice @@ -126,11 +127,32 @@ def process_normalized(normalized_doc, raw_doc, **kwargs): processing.process_normalized(raw_doc, normalized_doc, kwargs) -@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=settings.CELERY_MAX_RETRIES) +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) @events.logged(events.PROCESSSING_URIS, 'post_processing') def process_uris(**kwargs): os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") from api.webview.models import Document + + URL_RE = re.compile(r'(https?:\/\/[^\/]*)') + + if kwargs.get('async'): + uri_buckets = [] + for source in registry.keys(): + source_dict = {'source': source, 'uris': [{}]} + for document in Document.objects.filter(source=source): + if document.normalized: + cannonical_uri = document.normalized['canonicalUri'] + base_uri = URL_RE.search(cannonical_uri).group() + for entry in source_dict['uris']: + if base_uri == entry.get('base_uri'): + entry['base_uri']['individual_uris'].append(cannonical_uri) + else: + entry['base_uri'] = base_uri + entry['individual_uris'] = [cannonical_uri] + uri_buckets.append(source_dict) + + print(uri_buckets) + if kwargs.get('source'): for document in Document.objects.filter(source=kwargs['source']): processing.process_uris(document, kwargs) From 970cae2a7744b9199b720cff2ffafbcc0f5555de Mon Sep 17 00:00:00 2001 From: erinspace Date: Mon, 10 Aug 2015 11:03:14 -0400 Subject: [PATCH 24/71] Update logging to be by document instead of all at once --- scrapi/processing/uri_logging.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/scrapi/processing/uri_logging.py b/scrapi/processing/uri_logging.py index 9e51af55..36a3952f 100644 --- a/scrapi/processing/uri_logging.py +++ b/scrapi/processing/uri_logging.py @@ -1,11 +1,15 @@ from __future__ import absolute_import +import os import datetime import requests import logging from scrapi.processing.base import BaseProcessor +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") +from api.webview.models import Document + logger = logging.getLogger(__name__) @@ -13,10 +17,11 @@ class UriProcessor(BaseProcessor): NAME = 'uri_logging' - def process_uris(self, document): + def process_uris(self, source, docID, uri): try: - processed_normalized = self.save_status_of_canonical_uri(document.normalized) - processed_normalized = self.save_status_of_object_uris(processed_normalized) + document = Document.objects.get(source=source, docID=docID) + processed_normalized = self.save_status_of_canonical_uri(document.normalized, uri) + # processed_normalized = self.save_status_of_object_uris(processed_normalized) document.normalized = processed_normalized @@ -24,11 +29,11 @@ def process_uris(self, document): except TypeError: pass - def save_status_of_canonical_uri(self, normalized): - cannonical_uri_status = requests.get(normalized['uris']['canonicalUri']) + def save_status_of_canonical_uri(self, normalized, uri): + cannonical_uri_status = requests.get(uri) cannonical_status = { - 'actual_uri': normalized['uris']['canonicalUri'], + 'actual_uri': uri, 'resolved_uri': cannonical_uri_status.url, 'resolved_datetime': datetime.datetime.now(), 'resolved_status': cannonical_uri_status.status_code, From 73d00bc6cce45d01f01841ed3f99b30a7a7958f6 Mon Sep 17 00:00:00 2001 From: erinspace Date: Mon, 10 Aug 2015 11:03:53 -0400 Subject: [PATCH 25/71] Start breaking into smaller chunks --- scrapi/tasks.py | 55 ++++++++++++++++++++++--------------------------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index d0579333..6b222e28 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -1,5 +1,6 @@ import os -import re +import re +import json import logging import functools from itertools import islice @@ -127,38 +128,32 @@ def process_normalized(normalized_doc, raw_doc, **kwargs): processing.process_normalized(raw_doc, normalized_doc, kwargs) -@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) -@events.logged(events.PROCESSSING_URIS, 'post_processing') +# @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) +# @events.logged(events.PROCESSSING_URIS, 'post_processing') def process_uris(**kwargs): - os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") - from api.webview.models import Document - - URL_RE = re.compile(r'(https?:\/\/[^\/]*)') if kwargs.get('async'): - uri_buckets = [] - for source in registry.keys(): - source_dict = {'source': source, 'uris': [{}]} - for document in Document.objects.filter(source=source): - if document.normalized: - cannonical_uri = document.normalized['canonicalUri'] - base_uri = URL_RE.search(cannonical_uri).group() - for entry in source_dict['uris']: - if base_uri == entry.get('base_uri'): - entry['base_uri']['individual_uris'].append(cannonical_uri) - else: - entry['base_uri'] = base_uri - entry['individual_uris'] = [cannonical_uri] - uri_buckets.append(source_dict) - - print(uri_buckets) - - if kwargs.get('source'): - for document in Document.objects.filter(source=kwargs['source']): - processing.process_uris(document, kwargs) - else: - for document in Document.objects.all(): - processing.process_uris(document, kwargs) + all_buckets = [] + if kwargs.get('source'): + uri_buckets = util.parse_urls_into_groups(kwargs.get('source')) + all_buckets.append(uri_buckets) + else: + for source in registry.keys(): + uri_buckets = util.parse_urls_into_groups(source) + all_buckets.append(uri_buckets) + + print(json.dumps(all_buckets, indent=4)) + + for source_dict in uri_buckets: + for group in source_dict['uris']: + process_uris_at_one_base_uri(group['individual_uris']) + + +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) +@events.logged(events.PROCESSSING_URIS, 'post_processing') +def process_uris_at_one_base_uri(uri_list): + for uri in uri_list: + processing.process_uris(source=uri['source'], docID=uri['docID'], uri=uri['uri']) @app.task From da4699961a42a8bb233a66dbe8bc877a1f46bdff Mon Sep 17 00:00:00 2001 From: erinspace Date: Mon, 10 Aug 2015 11:04:17 -0400 Subject: [PATCH 26/71] Breaking URIs into smaller groups now by source instead of all one --- scrapi/util.py | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/scrapi/util.py b/scrapi/util.py index 0116141d..0dbed8b6 100644 --- a/scrapi/util.py +++ b/scrapi/util.py @@ -1,8 +1,15 @@ from datetime import datetime +import os +import re import six import pytz +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") +from api.webview.models import Document + +URL_RE = re.compile(r'(https?:\/\/[^\/]*)') + def timestamp(): return pytz.utc.localize(datetime.utcnow()).isoformat() @@ -51,3 +58,58 @@ def json_without_bytes(jobj): if isinstance(v, six.binary_type): jobj[k] = v.decode('utf8') return jobj + + +def parse_urls_into_groups(source): + + uri_buckets = [] + for document in Document.objects.filter(source=source): + if document.normalized: + cannonical_uri = document.normalized['uris']['canonicalUri'] + provider_uris = document.normalized['uris'].get('providerUris') + descriptor_uris = document.normalized['uris'].get('descriptorUris') + object_uris = document.normalized['uris'].get('objectUris') + + uri_buckets.append(cannonical_uri_processing(cannonical_uri, document.normalized)) + + if provider_uris: + uri_buckets = other_uri_processing(provider_uris, 'providerUris', uri_buckets) + if descriptor_uris: + uri_buckets = other_uri_processing(descriptor_uris, 'descriptorUris', uri_buckets) + if object_uris: + uri_buckets = other_uri_processing(object_uris, 'objectUris', uri_buckets) + + return uri_buckets + + +def other_uri_processing(uri, uritype, uri_buckets): + pass + + +def cannonical_uri_processing(cannonical_uri, normalized): + source_dict = {'source': source, 'uris': [{}]} + base_uri = URL_RE.search(cannonical_uri).group() + for entry in source_dict['uris']: + if base_uri == entry.get('base_uri'): + if entry.get('individual_uris'): + entry['individual_uris'].append({ + 'uri': cannonical_uri, + 'source': normalized['shareProperties']['source'], + 'docID': normalized['shareProperties']['docID'], + 'type': 'cannonicalUri' + }) + else: + entry['individual_uris'] = [{ + 'uri': cannonical_uri, + 'source': normalized['shareProperties']['source'], + 'docID': normalized['shareProperties']['docID'], + 'type': 'cannonicalUri' + }] + else: + entry['base_uri'] = base_uri + entry['individual_uris'] = [{ + 'uri': cannonical_uri, + 'source': normalized['shareProperties']['source'], + 'docID': normalized['shareProperties']['docID'], + 'type': 'cannonicalUri' + }] From 6af8a54454e01068fcdeadfe52bd257c4e9ef5cd Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 11 Aug 2015 10:36:44 -0400 Subject: [PATCH 27/71] Fix url parsing for all different types of URIs --- scrapi/util.py | 92 +++++++++++++++++++++++++------------------------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/scrapi/util.py b/scrapi/util.py index 0dbed8b6..3ebea623 100644 --- a/scrapi/util.py +++ b/scrapi/util.py @@ -62,54 +62,54 @@ def json_without_bytes(jobj): def parse_urls_into_groups(source): - uri_buckets = [] + source_dict = {'source': source, 'uris': [], 'all_bases': []} for document in Document.objects.filter(source=source): if document.normalized: - cannonical_uri = document.normalized['uris']['canonicalUri'] - provider_uris = document.normalized['uris'].get('providerUris') - descriptor_uris = document.normalized['uris'].get('descriptorUris') - object_uris = document.normalized['uris'].get('objectUris') - - uri_buckets.append(cannonical_uri_processing(cannonical_uri, document.normalized)) - - if provider_uris: - uri_buckets = other_uri_processing(provider_uris, 'providerUris', uri_buckets) - if descriptor_uris: - uri_buckets = other_uri_processing(descriptor_uris, 'descriptorUris', uri_buckets) - if object_uris: - uri_buckets = other_uri_processing(object_uris, 'objectUris', uri_buckets) - - return uri_buckets - - -def other_uri_processing(uri, uritype, uri_buckets): - pass - - -def cannonical_uri_processing(cannonical_uri, normalized): - source_dict = {'source': source, 'uris': [{}]} - base_uri = URL_RE.search(cannonical_uri).group() - for entry in source_dict['uris']: - if base_uri == entry.get('base_uri'): - if entry.get('individual_uris'): + docID = document.normalized['shareProperties']['docID'] + + source_dict = uri_processing( + document.normalized['uris']['canonicalUri'], + source, + docID, + source_dict, + 'cannonicalUri' + ) + + if document.normalized['uris'].get('providerUris'): + for uri in document.normalized['uris']['providerUris']: + source_dict = uri_processing(uri, source, docID, source_dict, 'providerUris') + if document.normalized['uris'].get('descriptorUris'): + for uri in document.normalized['uris']['descriptorUris']: + source_dict = uri_processing(uri, source, docID, source_dict, 'descriptorUris') + if document.normalized['uris'].get('objectUris'): + for uri in document.normalized['uris']['objectUris']: + source_dict = uri_processing(uri, source, docID, source_dict, 'objectUris') + + return source_dict + + +def uri_processing(uri, source, docID, source_dict, uritype): + base_uri = URL_RE.search(uri).group() + + if base_uri in source_dict['all_bases']: + for entry in source_dict['uris']: + if base_uri == entry['base_uri']: entry['individual_uris'].append({ - 'uri': cannonical_uri, - 'source': normalized['shareProperties']['source'], - 'docID': normalized['shareProperties']['docID'], - 'type': 'cannonicalUri' + 'uri': uri, + 'source': source, + 'docID': docID, + 'type': uritype }) - else: - entry['individual_uris'] = [{ - 'uri': cannonical_uri, - 'source': normalized['shareProperties']['source'], - 'docID': normalized['shareProperties']['docID'], - 'type': 'cannonicalUri' - }] - else: - entry['base_uri'] = base_uri - entry['individual_uris'] = [{ - 'uri': cannonical_uri, - 'source': normalized['shareProperties']['source'], - 'docID': normalized['shareProperties']['docID'], - 'type': 'cannonicalUri' + else: + source_dict['uris'].append({ + 'base_uri': base_uri, + 'individual_uris': [{ + 'uri': uri, + 'source': source, + 'docID': docID, + 'type': uritype }] + }) + source_dict['all_bases'].append(base_uri) + + return source_dict From 2d8b6dac31f4dc9fd25e40a12bda70b9c8d043b8 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 11 Aug 2015 10:47:30 -0400 Subject: [PATCH 28/71] Change to uritype --- scrapi/util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scrapi/util.py b/scrapi/util.py index 3ebea623..59cafffe 100644 --- a/scrapi/util.py +++ b/scrapi/util.py @@ -98,7 +98,7 @@ def uri_processing(uri, source, docID, source_dict, uritype): 'uri': uri, 'source': source, 'docID': docID, - 'type': uritype + 'uritype': uritype }) else: source_dict['uris'].append({ @@ -107,7 +107,7 @@ def uri_processing(uri, source, docID, source_dict, uritype): 'uri': uri, 'source': source, 'docID': docID, - 'type': uritype + 'uritype': uritype }] }) source_dict['all_bases'].append(base_uri) From 8bac5758ee641b9c32648308a4202d3f1aebf085 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 11 Aug 2015 11:05:00 -0400 Subject: [PATCH 29/71] Update uri processing processor to be more universal --- scrapi/processing/uri_logging.py | 45 +++++++------------------------- 1 file changed, 10 insertions(+), 35 deletions(-) diff --git a/scrapi/processing/uri_logging.py b/scrapi/processing/uri_logging.py index 36a3952f..df43579a 100644 --- a/scrapi/processing/uri_logging.py +++ b/scrapi/processing/uri_logging.py @@ -17,10 +17,10 @@ class UriProcessor(BaseProcessor): NAME = 'uri_logging' - def process_uris(self, source, docID, uri): + def process_uris(self, source, docID, uri, uritype): try: document = Document.objects.get(source=source, docID=docID) - processed_normalized = self.save_status_of_canonical_uri(document.normalized, uri) + processed_normalized = self.save_status_of_uri(document.normalized, uri, uritype) # processed_normalized = self.save_status_of_object_uris(processed_normalized) document.normalized = processed_normalized @@ -29,47 +29,22 @@ def process_uris(self, source, docID, uri): except TypeError: pass - def save_status_of_canonical_uri(self, normalized, uri): - cannonical_uri_status = requests.get(uri) + def save_status_of_uri(self, normalized, uri, uritype): + uri_status = requests.get(uri) - cannonical_status = { + status = { 'actual_uri': uri, - 'resolved_uri': cannonical_uri_status.url, + 'uritype': uritype, + 'resolved_uri': uri_status.url, 'resolved_datetime': datetime.datetime.now(), - 'resolved_status': cannonical_uri_status.status_code, + 'resolved_status': uri_status.status_code, 'is_doi': True if 'dx.doi.org' in normalized['uris']['canonicalUri'] else False } try: - normalized['shareProperties']['uri_logs']['cannonical_status'].append(cannonical_status) + normalized['shareProperties']['uri_logs']['status'].append(status) except KeyError: normalized['shareProperties']['uri_logs'] = {} - normalized['shareProperties']['uri_logs']['cannonical_status'] = [cannonical_status] - - return normalized - - def save_status_of_object_uris(self, normalized): - try: - all_object_uris = normalized['uris']['object_uris'] - except KeyError: - return normalized - - for uri in all_object_uris: - current_list = [] - uri_resolved = requests.get(uri) - - uri_status = { - 'actual_uri': uri, - 'resolved_uri': uri_resolved.url, - 'resolved_datetime': datetime.datetime.now(), - 'resolved_status': uri_resolved.status_code, - 'is_doi': True if 'dx.doi.org' in uri else False - } - current_list.append(uri_status) - - try: - normalized['shareProperties']['uri_logs']['object_status'].append(current_list) - except KeyError: - normalized['shareProperties']['uri_logs']['object_status'] = [current_list] + normalized['shareProperties']['uri_logs']['status'] = [status] return normalized From 98b2be5b2921882f02d6a8d35455896be39b08cc Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 11 Aug 2015 11:05:54 -0400 Subject: [PATCH 30/71] Celery task for processing uris updated --- scrapi/tasks.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index 6b222e28..d65876c9 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -128,23 +128,20 @@ def process_normalized(normalized_doc, raw_doc, **kwargs): processing.process_normalized(raw_doc, normalized_doc, kwargs) -# @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) -# @events.logged(events.PROCESSSING_URIS, 'post_processing') +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) +@events.logged(events.PROCESSSING_URIS, 'post_processing') def process_uris(**kwargs): - if kwargs.get('async'): all_buckets = [] if kwargs.get('source'): - uri_buckets = util.parse_urls_into_groups(kwargs.get('source')) - all_buckets.append(uri_buckets) + source_buckets = util.parse_urls_into_groups(kwargs['source']) + all_buckets.append(source_buckets) else: for source in registry.keys(): - uri_buckets = util.parse_urls_into_groups(source) - all_buckets.append(uri_buckets) - - print(json.dumps(all_buckets, indent=4)) + source_buckets = util.parse_urls_into_groups(source) + all_buckets.append(source_buckets) - for source_dict in uri_buckets: + for source_dict in all_buckets: for group in source_dict['uris']: process_uris_at_one_base_uri(group['individual_uris']) @@ -153,7 +150,12 @@ def process_uris(**kwargs): @events.logged(events.PROCESSSING_URIS, 'post_processing') def process_uris_at_one_base_uri(uri_list): for uri in uri_list: - processing.process_uris(source=uri['source'], docID=uri['docID'], uri=uri['uri']) + processing.process_uris( + source=uri['source'], + docID=uri['docID'], + uri=uri['uri'], + uritype=uri['uritype'] + ) @app.task From d6d96f9754838cfa3823b6f60949a56cdc8f99f8 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 11 Aug 2015 11:06:11 -0400 Subject: [PATCH 31/71] Remove unecessary args from tasks --- tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tasks.py b/tasks.py index 182d1ab4..627c0659 100644 --- a/tasks.py +++ b/tasks.py @@ -188,11 +188,11 @@ def harvesters(async=False, start=None, end=None): @task -def process_uris(async=False, start=None, end=None, source=None): +def process_uris(async=False, source=None): settings.CELERY_ALWAYS_EAGER = not async from scrapi.tasks import process_uris - process_uris.delay(async=async, start=start, end=end, source=source) + process_uris.delay(async=async, source=source) @task From 77b875c0b6f281bed5470e5a9e18eaa6eaecdfc0 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 12 Aug 2015 10:43:03 -0400 Subject: [PATCH 32/71] Remove old comments --- scrapi/processing/uri_logging.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scrapi/processing/uri_logging.py b/scrapi/processing/uri_logging.py index df43579a..f9779262 100644 --- a/scrapi/processing/uri_logging.py +++ b/scrapi/processing/uri_logging.py @@ -21,7 +21,6 @@ def process_uris(self, source, docID, uri, uritype): try: document = Document.objects.get(source=source, docID=docID) processed_normalized = self.save_status_of_uri(document.normalized, uri, uritype) - # processed_normalized = self.save_status_of_object_uris(processed_normalized) document.normalized = processed_normalized From f380406df02b1e56d9ae3dd2439458a67c5be6a5 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 12 Aug 2015 10:43:36 -0400 Subject: [PATCH 33/71] Add invoke apiserver task --- tasks.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tasks.py b/tasks.py index 627c0659..89b4a312 100644 --- a/tasks.py +++ b/tasks.py @@ -236,6 +236,11 @@ def provider_map(delete=False): print(es.count('share_providers', body={'query': {'match_all': {}}})['count']) +@task +def apiserver(): + os.system('python manage.py runserver') + + @task def reset_all(): os.system('psql -c "DROP DATABASE scrapi;"') From 1384544d62e8a7d155ef1241d97c58326ddb6d7b Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 12 Aug 2015 10:43:53 -0400 Subject: [PATCH 34/71] Add proces one URI task that can be rate limited --- scrapi/tasks.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index d65876c9..4688b6ad 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -1,6 +1,3 @@ -import os -import re -import json import logging import functools from itertools import islice @@ -150,12 +147,18 @@ def process_uris(**kwargs): @events.logged(events.PROCESSSING_URIS, 'post_processing') def process_uris_at_one_base_uri(uri_list): for uri in uri_list: - processing.process_uris( - source=uri['source'], - docID=uri['docID'], - uri=uri['uri'], - uritype=uri['uritype'] - ) + process_one_uri(uri) + + +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0, rate_limit='5/s') +@events.logged(events.PROCESSSING_URIS, 'post_processing') +def process_one_uri(uri): + processing.process_uris( + source=uri['source'], + docID=uri['docID'], + uri=uri['uri'], + uritype=uri['uritype'] + ) @app.task From 3d851e1c0e5bf9be8862429b7e4eb1ee713cd196 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 12 Aug 2015 15:42:48 -0400 Subject: [PATCH 35/71] Allow proper not async settings --- scrapi/tasks.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index 4688b6ad..aeb0f3d4 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -128,26 +128,31 @@ def process_normalized(normalized_doc, raw_doc, **kwargs): @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) @events.logged(events.PROCESSSING_URIS, 'post_processing') def process_uris(**kwargs): - if kwargs.get('async'): - all_buckets = [] - if kwargs.get('source'): - source_buckets = util.parse_urls_into_groups(kwargs['source']) + all_buckets = [] + if kwargs.get('source'): + source_buckets = util.parse_urls_into_groups(kwargs['source']) + all_buckets.append(source_buckets) + else: + for source in registry.keys(): + source_buckets = util.parse_urls_into_groups(source) all_buckets.append(source_buckets) - else: - for source in registry.keys(): - source_buckets = util.parse_urls_into_groups(source) - all_buckets.append(source_buckets) - for source_dict in all_buckets: - for group in source_dict['uris']: + for source_dict in all_buckets: + for group in source_dict['uris']: + if kwargs.get('async'): + process_uris_at_one_base_uri.delay(group['individual_uris'], kwargs['async']) + else: process_uris_at_one_base_uri(group['individual_uris']) @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) @events.logged(events.PROCESSSING_URIS, 'post_processing') -def process_uris_at_one_base_uri(uri_list): +def process_uris_at_one_base_uri(uri_list, async=False): for uri in uri_list: - process_one_uri(uri) + if async: + process_one_uri.delay(uri) + else: + process_one_uri(uri) @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0, rate_limit='5/s') From 89219ac9a64d01f6d9798ff6d4b5fb8bddb1fcf2 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 12 Aug 2015 17:39:23 -0400 Subject: [PATCH 36/71] Update reset_all task to incl a confirmation --- tasks.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tasks.py b/tasks.py index 89b4a312..3dba470e 100644 --- a/tasks.py +++ b/tasks.py @@ -243,6 +243,12 @@ def apiserver(): @task def reset_all(): + try: + input = raw_input + except Exception: + pass + if input('Are you sure? y/N ') != 'y': + return os.system('psql -c "DROP DATABASE scrapi;"') os.system('psql -c "CREATE DATABASE scrapi;"') os.system('python manage.py migrate') From c44722cc39f6b7f3cbcec5748dd726947a18b242 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 12 Aug 2015 17:39:46 -0400 Subject: [PATCH 37/71] Use Celery always eager setting for async control --- scrapi/tasks.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index aeb0f3d4..5a69da04 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -126,8 +126,10 @@ def process_normalized(normalized_doc, raw_doc, **kwargs): @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) -@events.logged(events.PROCESSSING_URIS, 'post_processing') -def process_uris(**kwargs): +@events.logged(events.PROCESSSING_URIS, 'uri_processing') +def process_uris(async, **kwargs): + settings.CELERY_ALWAYS_EAGER = not async + all_buckets = [] if kwargs.get('source'): source_buckets = util.parse_urls_into_groups(kwargs['source']) @@ -139,30 +141,27 @@ def process_uris(**kwargs): for source_dict in all_buckets: for group in source_dict['uris']: - if kwargs.get('async'): - process_uris_at_one_base_uri.delay(group['individual_uris'], kwargs['async']) - else: - process_uris_at_one_base_uri(group['individual_uris']) + process_uris_at_one_base_uri.delay(group['individual_uris'], async, kwargs=kwargs) @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) -@events.logged(events.PROCESSSING_URIS, 'post_processing') -def process_uris_at_one_base_uri(uri_list, async=False): +@events.logged(events.PROCESSSING_URIS, 'uri_processing') +def process_uris_at_one_base_uri(uri_list, async=False, **kwargs): + settings.CELERY_ALWAYS_EAGER = not async + for uri in uri_list: - if async: - process_one_uri.delay(uri) - else: - process_one_uri(uri) + process_one_uri.delay(uri, kwargs=kwargs) @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0, rate_limit='5/s') -@events.logged(events.PROCESSSING_URIS, 'post_processing') -def process_one_uri(uri): +@events.logged(events.PROCESSSING_URIS, 'uri_processing') +def process_one_uri(uri, **kwargs): processing.process_uris( source=uri['source'], docID=uri['docID'], uri=uri['uri'], - uritype=uri['uritype'] + uritype=uri['uritype'], + kwargs=kwargs ) From 86180f11ed5eebb81ce9cc60352f4855e35f16f3 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 12 Aug 2015 17:40:36 -0400 Subject: [PATCH 38/71] Pass around correct params to uri logging --- scrapi/processing/__init__.py | 4 ++-- scrapi/processing/base.py | 4 ++-- scrapi/processing/uri_logging.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/scrapi/processing/__init__.py b/scrapi/processing/__init__.py index bd4e4ee5..7a1918dd 100644 --- a/scrapi/processing/__init__.py +++ b/scrapi/processing/__init__.py @@ -36,7 +36,7 @@ def process_raw(raw_doc, kwargs): get_processor(p).process_raw(raw_doc, **extras) -def process_uris(document, kwargs): +def process_uris(source, docID, uri, uritype, kwargs): for p in settings.POST_PROCESSING: extras = kwargs.get(p, {}) - get_processor(p).process_uris(document, **extras) + get_processor(p).process_uris(source, docID, uri, uritype, **extras) diff --git a/scrapi/processing/base.py b/scrapi/processing/base.py index c8794b93..87bcb869 100644 --- a/scrapi/processing/base.py +++ b/scrapi/processing/base.py @@ -7,5 +7,5 @@ def process_raw(self, raw_doc, **kwargs): def process_normalized(self, raw_doc, normalized, **kwargs): pass # pragma: no cover - def post_process(raw_doc, normalized, kwargs): - pass + def process_uris(self, source, docID, uri, uritype, **kwargs): + pass # pragma: no cover diff --git a/scrapi/processing/uri_logging.py b/scrapi/processing/uri_logging.py index f9779262..a6812189 100644 --- a/scrapi/processing/uri_logging.py +++ b/scrapi/processing/uri_logging.py @@ -17,7 +17,7 @@ class UriProcessor(BaseProcessor): NAME = 'uri_logging' - def process_uris(self, source, docID, uri, uritype): + def process_uris(self, source, docID, uri, uritype, **kwargs): try: document = Document.objects.get(source=source, docID=docID) processed_normalized = self.save_status_of_uri(document.normalized, uri, uritype) From 397af5ddce18c7b7b0705944b656326a3cb4768b Mon Sep 17 00:00:00 2001 From: erinspace Date: Mon, 17 Aug 2015 15:34:12 -0400 Subject: [PATCH 39/71] Add basic scraping to uri pipeline --- scrapi/processing/uri_logging.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/scrapi/processing/uri_logging.py b/scrapi/processing/uri_logging.py index a6812189..16bc8abd 100644 --- a/scrapi/processing/uri_logging.py +++ b/scrapi/processing/uri_logging.py @@ -4,7 +4,9 @@ import datetime import requests import logging +# from furl import furl +from scrapi.processing import scrapers from scrapi.processing.base import BaseProcessor os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") @@ -46,4 +48,12 @@ def save_status_of_uri(self, normalized, uri, uritype): normalized['shareProperties']['uri_logs'] = {} normalized['shareProperties']['uri_logs']['status'] = [status] + extra_info = scrapers.collect_scraped(uri) + + if extra_info: + try: + normalized['shareProperties']['scraped_properties'].append(extra_info) + except KeyError: + normalized['shareProperties']['scraped_properties'] = [extra_info] + return normalized From b1a3aec924c9f388a54ab7bf347b2e71fa4040ad Mon Sep 17 00:00:00 2001 From: erinspace Date: Mon, 17 Aug 2015 15:34:42 -0400 Subject: [PATCH 40/71] Add basic scrapers for springer and science direct --- scrapi/processing/scrapers.py | 67 +++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 scrapi/processing/scrapers.py diff --git a/scrapi/processing/scrapers.py b/scrapi/processing/scrapers.py new file mode 100644 index 00000000..367c54e7 --- /dev/null +++ b/scrapi/processing/scrapers.py @@ -0,0 +1,67 @@ +from furl import furl +import requests +from lxml import etree +from bs4 import BeautifulSoup + + +def collect_scraped(uri): + + base = furl(uri).host.replace('www.', '') + if base == 'sciencedirect.com': + info = science_direct(base) + elif base == 'link.springer.com': + info = springer_link(base) + else: + info = {} + + return info + + +# Generic helpers + +def get_elements_from_link(link): + content = requests.get(link).content + return etree.HTML(content) + + +def get_html_from_link(link): + return requests.get(link).content + + +# Science Direct + +def science_direct(uri): + soup = BeautifulSoup(get_html_from_link(uri)) + all_authors = soup.find_all("a", class_="authorName") + + return parse_sd_author_list(all_authors) + + +# Springer Link + +def springer_link(uri): + element = get_elements_from_link(uri) + return {'open_access': get_springer_open_access(element)} + + +def get_springer_open_access(element): + links = element.xpath('//a') + words = [] + for link in links: + if 'viewtype' in link.keys(): + if 'webtrekk-track' in link.get('class'): + words.append(link.get('viewtype')) + if 'Denial' in words: + return False + else: + return True + + +def parse_sd_author_list(author_list): + all_authors = [] + for author in author_list: + author_info = author.attrs + author_info['full_name'] = author.text + all_authors.append(author_info) + + return all_authors From 83540d05785e15b2d0c6baf06c3bfce14b4efdb2 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 18 Aug 2015 16:46:50 -0400 Subject: [PATCH 41/71] Move scrapers around, add more utilty --- scrapi/processing/scrapers.py | 67 ------------------------ scrapi/scrapers/__init__.py | 0 scrapi/scrapers/scrapers.py | 95 +++++++++++++++++++++++++++++++++++ scrapi/scrapers/utils.py | 26 ++++++++++ 4 files changed, 121 insertions(+), 67 deletions(-) delete mode 100644 scrapi/processing/scrapers.py create mode 100644 scrapi/scrapers/__init__.py create mode 100644 scrapi/scrapers/scrapers.py create mode 100644 scrapi/scrapers/utils.py diff --git a/scrapi/processing/scrapers.py b/scrapi/processing/scrapers.py deleted file mode 100644 index 367c54e7..00000000 --- a/scrapi/processing/scrapers.py +++ /dev/null @@ -1,67 +0,0 @@ -from furl import furl -import requests -from lxml import etree -from bs4 import BeautifulSoup - - -def collect_scraped(uri): - - base = furl(uri).host.replace('www.', '') - if base == 'sciencedirect.com': - info = science_direct(base) - elif base == 'link.springer.com': - info = springer_link(base) - else: - info = {} - - return info - - -# Generic helpers - -def get_elements_from_link(link): - content = requests.get(link).content - return etree.HTML(content) - - -def get_html_from_link(link): - return requests.get(link).content - - -# Science Direct - -def science_direct(uri): - soup = BeautifulSoup(get_html_from_link(uri)) - all_authors = soup.find_all("a", class_="authorName") - - return parse_sd_author_list(all_authors) - - -# Springer Link - -def springer_link(uri): - element = get_elements_from_link(uri) - return {'open_access': get_springer_open_access(element)} - - -def get_springer_open_access(element): - links = element.xpath('//a') - words = [] - for link in links: - if 'viewtype' in link.keys(): - if 'webtrekk-track' in link.get('class'): - words.append(link.get('viewtype')) - if 'Denial' in words: - return False - else: - return True - - -def parse_sd_author_list(author_list): - all_authors = [] - for author in author_list: - author_info = author.attrs - author_info['full_name'] = author.text - all_authors.append(author_info) - - return all_authors diff --git a/scrapi/scrapers/__init__.py b/scrapi/scrapers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/scrapi/scrapers/scrapers.py b/scrapi/scrapers/scrapers.py new file mode 100644 index 00000000..1fe307a1 --- /dev/null +++ b/scrapi/scrapers/scrapers.py @@ -0,0 +1,95 @@ +from furl import furl +from bs4 import BeautifulSoup + +from scrapi.scrapers import utils + + +def collect_scraped(uri): + + base = furl(uri).host.replace('www.', '') + if base == 'sciencedirect.com': + info = science_direct(uri) + elif base == 'link.springer.com': + info = springer_link(uri) + else: + info = {} + + return info + + +# Science Direct + + +def science_direct(uri): + ''' + Future potential +
+
  • + ''' + + return parse_sd_author_list(uri) + + +def parse_sd_author_list(uri): + + soup = BeautifulSoup(utils.get_html_from_link(uri), "lxml") + + auth_affil = soup.find_all("ul", class_="authorGroup") + lis = [item.find_all('li') for item in auth_affil][0] + theas = [thing.find_all('a') for thing in lis] + + full_author_info = [] + for author_pair in theas: + author_info = {} + for author_part in author_pair: + author_info = utils.merge_dicts(author_part.attrs, author_info) + author_info['matcher'] = author_info['href'][0] + + full_author_info.append(author_info) + + # Get affiliations and put them with the original author dict + outer_affilations = soup.find_all("ul", class_="authAffil") + affiliations = [item.find_all('li') for item in outer_affilations] + + affils = [] + for person in affiliations: + for result in person: + d = result.attrs + d['institution'] = result.text + d['matcher'] = d['id'] + affils.append(d) + + all_authors = [] + for author in full_author_info: + del author['data-pos'] + del author['data-t'] + del author['data-tb'] + + for affil in affils: + if author['matcher'].replace('#', '') == affil['matcher']: + combined = author.copy() + combined.update(affil) + all_authors.append(combined) + del author['matcher'] + + return all_authors + + +# Springer Link + +def springer_link(uri): + element = utils.get_elements_from_link(uri) + return {'open_access': get_springer_open_access(element)} + + +def get_springer_open_access(element): + links = element.xpath('//a') + words = [] + for link in links: + if 'viewtype' in link.keys(): + if 'webtrekk-track' in link.get('class'): + words.append(link.get('viewtype')) + if 'Denial' in words: + return False + else: + return True diff --git a/scrapi/scrapers/utils.py b/scrapi/scrapers/utils.py new file mode 100644 index 00000000..f02c8699 --- /dev/null +++ b/scrapi/scrapers/utils.py @@ -0,0 +1,26 @@ +import requests +from lxml import etree + + +def merge_dicts(*dicts): + d = {} + for dict in dicts: + for key in dict: + try: + d[key].append(dict[key]) + except KeyError: + d[key] = [dict[key]] + for key in d: + if len(d[key]) == 1: + d[key] = d[key][0] + + return d + + +def get_elements_from_link(link): + content = requests.get(link).content + return etree.HTML(content) + + +def get_html_from_link(link): + return requests.get(link).content From 8c6f410ac22f84ca56b679d7a7fd19cad05b3f1e Mon Sep 17 00:00:00 2001 From: erinspace Date: Fri, 21 Aug 2015 10:14:59 -0400 Subject: [PATCH 42/71] Leave things in an experimental state to get uri list --- scrapi/processing/uri_logging.py | 14 +++++++------- scrapi/tasks.py | 10 +++++++--- scrapi/util.py | 7 ++++++- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/scrapi/processing/uri_logging.py b/scrapi/processing/uri_logging.py index 16bc8abd..bbc10aa9 100644 --- a/scrapi/processing/uri_logging.py +++ b/scrapi/processing/uri_logging.py @@ -6,7 +6,7 @@ import logging # from furl import furl -from scrapi.processing import scrapers +# from scrapi.processing import scrapers from scrapi.processing.base import BaseProcessor os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") @@ -48,12 +48,12 @@ def save_status_of_uri(self, normalized, uri, uritype): normalized['shareProperties']['uri_logs'] = {} normalized['shareProperties']['uri_logs']['status'] = [status] - extra_info = scrapers.collect_scraped(uri) + # extra_info = scrapers.collect_scraped(uri) - if extra_info: - try: - normalized['shareProperties']['scraped_properties'].append(extra_info) - except KeyError: - normalized['shareProperties']['scraped_properties'] = [extra_info] + # if extra_info: + # try: + # normalized['shareProperties']['scraped_properties'].append(extra_info) + # except KeyError: + # normalized['shareProperties']['scraped_properties'] = [extra_info] return normalized diff --git a/scrapi/tasks.py b/scrapi/tasks.py index 5a69da04..948605e2 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -1,4 +1,5 @@ import logging +import json import functools from itertools import islice from datetime import date, timedelta @@ -139,9 +140,12 @@ def process_uris(async, **kwargs): source_buckets = util.parse_urls_into_groups(source) all_buckets.append(source_buckets) - for source_dict in all_buckets: - for group in source_dict['uris']: - process_uris_at_one_base_uri.delay(group['individual_uris'], async, kwargs=kwargs) + with open('all_sources.json', 'w') as outfile: + json.dump(all_buckets, outfile) + + # for source_dict in all_buckets: + # for group in source_dict['uris']: + # process_uris_at_one_base_uri.delay(group['individual_uris'], async, kwargs=kwargs) @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) diff --git a/scrapi/util.py b/scrapi/util.py index 59cafffe..86bf0b08 100644 --- a/scrapi/util.py +++ b/scrapi/util.py @@ -83,7 +83,12 @@ def parse_urls_into_groups(source): source_dict = uri_processing(uri, source, docID, source_dict, 'descriptorUris') if document.normalized['uris'].get('objectUris'): for uri in document.normalized['uris']['objectUris']: - source_dict = uri_processing(uri, source, docID, source_dict, 'objectUris') + if uri: + if isinstance(uri, list): + for element in uri: + source_dict = uri_processing(element, source, docID, source_dict, 'objectUris') + else: + source_dict = uri_processing(uri, source, docID, source_dict, 'objectUris') return source_dict From 4475e5de3ae5e7332d81a54fc76189a81e061160 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 12:15:35 -0400 Subject: [PATCH 43/71] Add models for a person and a URL harvest --- api/webview/models.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/api/webview/models.py b/api/webview/models.py index e0a9dc1e..6a044ac9 100644 --- a/api/webview/models.py +++ b/api/webview/models.py @@ -2,14 +2,11 @@ from django_pgjson.fields import JsonField -class Document(models.Model): - source = models.CharField(max_length=255) - docID = models.TextField() - - providerUpdatedDateTime = models.DateTimeField(null=True) - - raw = JsonField() - normalized = JsonField(null=True) +class Person(models.Model): + name = models.CharField(max_length=255) + institution = models.CharField(max_length=255, null=True) + email = models.CharField(max_length=100) + ids = JsonField(null=True) class HarvesterResponse(models.Model): @@ -26,3 +23,22 @@ class HarvesterResponse(models.Model): headers_str = models.TextField(null=True) status_code = models.IntegerField(null=True) time_made = models.DateTimeField(auto_now=True) + + +class URL(models.Model): + url = models.CharField(max_length=500) + status = JsonField(null=True) + response = models.ForeignKey(HarvesterResponse, related_name='response') + + +class Document(models.Model): + source = models.CharField(max_length=255) + docID = models.TextField() + + providerUpdatedDateTime = models.DateTimeField(null=True) + + raw = JsonField() + normalized = JsonField(null=True) + + contributors = models.ManyToManyField(Person, related_name='contributors') + urls = models.ManyToManyField(URL, related_name='urls') From 386f85df74cd015ac0785593300c511a8ff5af9e Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 12:17:02 -0400 Subject: [PATCH 44/71] Make sure things are null that may be --- api/webview/models.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/api/webview/models.py b/api/webview/models.py index 6a044ac9..b67edb16 100644 --- a/api/webview/models.py +++ b/api/webview/models.py @@ -5,7 +5,6 @@ class Person(models.Model): name = models.CharField(max_length=255) institution = models.CharField(max_length=255, null=True) - email = models.CharField(max_length=100) ids = JsonField(null=True) @@ -28,7 +27,7 @@ class HarvesterResponse(models.Model): class URL(models.Model): url = models.CharField(max_length=500) status = JsonField(null=True) - response = models.ForeignKey(HarvesterResponse, related_name='response') + response = models.ForeignKey(HarvesterResponse, related_name='response', null=True) class Document(models.Model): From 12c0928803dca90f43974c8fbcd43d429ae4746f Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 12:55:46 -0400 Subject: [PATCH 45/71] Remove generic id field and add some specific ones --- api/webview/models.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/api/webview/models.py b/api/webview/models.py index b67edb16..1a6119b3 100644 --- a/api/webview/models.py +++ b/api/webview/models.py @@ -5,7 +5,9 @@ class Person(models.Model): name = models.CharField(max_length=255) institution = models.CharField(max_length=255, null=True) - ids = JsonField(null=True) + id_osf = models.CharField(max_length=10, null=True) + id_orcid = models.CharField(max_length=100, null=True) + id_email = models.CharField(max_length=255, null=True) class HarvesterResponse(models.Model): From 20dcac61e78d8ed7d155c0b4462dc9c3986200c0 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 13:06:14 -0400 Subject: [PATCH 46/71] Add back HarvesterResponse loss from bad merge --- scrapi/processing/postgres.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index 89fb7656..50b438ad 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -8,7 +8,7 @@ import django -from api.webview.models import Document +from api.webview.models import HarvesterResponse, Document from scrapi import events from scrapi.util import json_without_bytes From 93292c0addbf5b3deabf7900d86bba9b8a5a73f4 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 15:34:37 -0400 Subject: [PATCH 47/71] Rename URI processing to helpers because it will happen in the processors --- scrapi/processing/{uri_logging.py => helpers.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename scrapi/processing/{uri_logging.py => helpers.py} (100%) diff --git a/scrapi/processing/uri_logging.py b/scrapi/processing/helpers.py similarity index 100% rename from scrapi/processing/uri_logging.py rename to scrapi/processing/helpers.py From 690d089abef2709d1a7e6affc2d9f65b98d3d1d0 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 15:35:21 -0400 Subject: [PATCH 48/71] Add the processing helpers to do some manipulating --- scrapi/processing/helpers.py | 61 ++++++------------------------------ 1 file changed, 9 insertions(+), 52 deletions(-) diff --git a/scrapi/processing/helpers.py b/scrapi/processing/helpers.py index bbc10aa9..1254a89a 100644 --- a/scrapi/processing/helpers.py +++ b/scrapi/processing/helpers.py @@ -1,59 +1,16 @@ from __future__ import absolute_import -import os import datetime import requests -import logging -# from furl import furl -# from scrapi.processing import scrapers -from scrapi.processing.base import BaseProcessor -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") -from api.webview.models import Document +def save_status_of_uri(uri, uritype): + uri_status = requests.get(uri) - -logger = logging.getLogger(__name__) - - -class UriProcessor(BaseProcessor): - NAME = 'uri_logging' - - def process_uris(self, source, docID, uri, uritype, **kwargs): - try: - document = Document.objects.get(source=source, docID=docID) - processed_normalized = self.save_status_of_uri(document.normalized, uri, uritype) - - document.normalized = processed_normalized - - document.save() - except TypeError: - pass - - def save_status_of_uri(self, normalized, uri, uritype): - uri_status = requests.get(uri) - - status = { - 'actual_uri': uri, - 'uritype': uritype, - 'resolved_uri': uri_status.url, - 'resolved_datetime': datetime.datetime.now(), - 'resolved_status': uri_status.status_code, - 'is_doi': True if 'dx.doi.org' in normalized['uris']['canonicalUri'] else False - } - - try: - normalized['shareProperties']['uri_logs']['status'].append(status) - except KeyError: - normalized['shareProperties']['uri_logs'] = {} - normalized['shareProperties']['uri_logs']['status'] = [status] - - # extra_info = scrapers.collect_scraped(uri) - - # if extra_info: - # try: - # normalized['shareProperties']['scraped_properties'].append(extra_info) - # except KeyError: - # normalized['shareProperties']['scraped_properties'] = [extra_info] - - return normalized + return { + 'actual_uri': uri, + 'uritype': uritype, + 'resolved_uri': uri_status.url, + 'resolved_datetime': datetime.datetime.now(), + 'resolved_status': uri_status.status_code, + } From 73ac7f5f8e7d0732f9dc8db4a2532a03c1e972af Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 15:35:48 -0400 Subject: [PATCH 49/71] Add process_uris to postgres processor Addresses [#SHARE-105] --- scrapi/processing/postgres.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index 50b438ad..f25d9e7e 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -8,12 +8,13 @@ import django -from api.webview.models import HarvesterResponse, Document +from api.webview.models import HarvesterResponse, Document, URL from scrapi import events from scrapi.util import json_without_bytes from scrapi.linter import RawDocument, NormalizedDocument from scrapi.processing import DocumentTuple +from scrapi.processing.helpers import save_status_of_uri from scrapi.processing.base import BaseProcessor, BaseHarvesterResponse, BaseDatabaseManager django.setup() @@ -113,6 +114,17 @@ def _get_by_source_id(self, model, source, docID): except IndexError: return None + def process_uris(self, source, docID, uri, uritype, **kwargs): + document = Document.objects.get(source=source, docID=docID) + status = save_status_of_uri(uri, uritype) + url = URL(url=uri, status=status) + url.save() + document.urls.add(url) + document.save() + + def process_contributors(self): + pass + class HarvesterResponseModel(BaseHarvesterResponse): From d2c8820e082c577a9f6890af62a6b87c232ab946 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 15:36:58 -0400 Subject: [PATCH 50/71] Add back real processing, but leave testing writing to file just in case --- scrapi/tasks.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index 8f9c5b47..acb95c02 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -1,5 +1,4 @@ import logging -import json import functools from itertools import islice from datetime import date, timedelta @@ -138,12 +137,13 @@ def process_uris(async, **kwargs): source_buckets = util.parse_urls_into_groups(source) all_buckets.append(source_buckets) - with open('all_sources.json', 'w') as outfile: - json.dump(all_buckets, outfile) + # import json + # with open('all_sources.json', 'w') as outfile: + # json.dump(all_buckets, outfile) - # for source_dict in all_buckets: - # for group in source_dict['uris']: - # process_uris_at_one_base_uri.delay(group['individual_uris'], async, kwargs=kwargs) + for source_dict in all_buckets: + for group in source_dict['uris']: + process_uris_at_one_base_uri.delay(group['individual_uris'], async, kwargs=kwargs) @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) From 926dc1f8a5e8344ea5c265f02309872b48263d0b Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 15:46:21 -0400 Subject: [PATCH 51/71] Fix error in requirements --- requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 14748db9..0a3355f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,7 +16,6 @@ furl==0.4.4 jsonschema==2.4.0 jsonpointer==1.7 pycountry==1.10 -<<<<<<< Temporary merge branch 1 djangorestframework==3.1.3 Django==1.8.2 django-pgjson==0.3.1 From c1d7e3880648aad12f8c785e823d2be121ee178f Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 15:52:39 -0400 Subject: [PATCH 52/71] Move scrapers for now --- scrapi/scrapers/__init__.py | 0 scrapi/scrapers/scrapers.py | 95 ------------------------------------- scrapi/scrapers/utils.py | 26 ---------- 3 files changed, 121 deletions(-) delete mode 100644 scrapi/scrapers/__init__.py delete mode 100644 scrapi/scrapers/scrapers.py delete mode 100644 scrapi/scrapers/utils.py diff --git a/scrapi/scrapers/__init__.py b/scrapi/scrapers/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/scrapi/scrapers/scrapers.py b/scrapi/scrapers/scrapers.py deleted file mode 100644 index 1fe307a1..00000000 --- a/scrapi/scrapers/scrapers.py +++ /dev/null @@ -1,95 +0,0 @@ -from furl import furl -from bs4 import BeautifulSoup - -from scrapi.scrapers import utils - - -def collect_scraped(uri): - - base = furl(uri).host.replace('www.', '') - if base == 'sciencedirect.com': - info = science_direct(uri) - elif base == 'link.springer.com': - info = springer_link(uri) - else: - info = {} - - return info - - -# Science Direct - - -def science_direct(uri): - ''' - Future potential -
    -
  • - ''' - - return parse_sd_author_list(uri) - - -def parse_sd_author_list(uri): - - soup = BeautifulSoup(utils.get_html_from_link(uri), "lxml") - - auth_affil = soup.find_all("ul", class_="authorGroup") - lis = [item.find_all('li') for item in auth_affil][0] - theas = [thing.find_all('a') for thing in lis] - - full_author_info = [] - for author_pair in theas: - author_info = {} - for author_part in author_pair: - author_info = utils.merge_dicts(author_part.attrs, author_info) - author_info['matcher'] = author_info['href'][0] - - full_author_info.append(author_info) - - # Get affiliations and put them with the original author dict - outer_affilations = soup.find_all("ul", class_="authAffil") - affiliations = [item.find_all('li') for item in outer_affilations] - - affils = [] - for person in affiliations: - for result in person: - d = result.attrs - d['institution'] = result.text - d['matcher'] = d['id'] - affils.append(d) - - all_authors = [] - for author in full_author_info: - del author['data-pos'] - del author['data-t'] - del author['data-tb'] - - for affil in affils: - if author['matcher'].replace('#', '') == affil['matcher']: - combined = author.copy() - combined.update(affil) - all_authors.append(combined) - del author['matcher'] - - return all_authors - - -# Springer Link - -def springer_link(uri): - element = utils.get_elements_from_link(uri) - return {'open_access': get_springer_open_access(element)} - - -def get_springer_open_access(element): - links = element.xpath('//a') - words = [] - for link in links: - if 'viewtype' in link.keys(): - if 'webtrekk-track' in link.get('class'): - words.append(link.get('viewtype')) - if 'Denial' in words: - return False - else: - return True diff --git a/scrapi/scrapers/utils.py b/scrapi/scrapers/utils.py deleted file mode 100644 index f02c8699..00000000 --- a/scrapi/scrapers/utils.py +++ /dev/null @@ -1,26 +0,0 @@ -import requests -from lxml import etree - - -def merge_dicts(*dicts): - d = {} - for dict in dicts: - for key in dict: - try: - d[key].append(dict[key]) - except KeyError: - d[key] = [dict[key]] - for key in d: - if len(d[key]) == 1: - d[key] = d[key][0] - - return d - - -def get_elements_from_link(link): - content = requests.get(link).content - return etree.HTML(content) - - -def get_html_from_link(link): - return requests.get(link).content From 838b37587dea6b77001d9a8ca1ebf3ac7546cc5f Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 17:35:58 -0400 Subject: [PATCH 53/71] Add processing contributors to postgres --- scrapi/processing/postgres.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index f25d9e7e..a2af049a 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -8,7 +8,7 @@ import django -from api.webview.models import HarvesterResponse, Document, URL +from api.webview.models import HarvesterResponse, Document, URL, Person from scrapi import events from scrapi.util import json_without_bytes @@ -122,8 +122,28 @@ def process_uris(self, source, docID, uri, uritype, **kwargs): document.urls.add(url) document.save() - def process_contributors(self): - pass + def process_contributor(self, source, docID, contributor_dict): + document = Document.objects.get(source=source, docID=docID) + + id_osf = None + id_orcid = None + id_email = None + if contributor_dict.get('sameAs'): + for identifier in contributor_dict['sameAs']: + if 'osf.io' in identifier: + id_osf = identifier + if 'orcid' in identifier: + id_orcid = identifier + + if contributor_dict.get('email'): + id_email = contributor_dict['email'] + + # TODO check to see if the person exists first, if they do, don't make a new one + person = Person(name=contributor_dict['name'], id_osf=id_osf, id_email=id_email, id_orcid=id_orcid) + person.save() + + document.contributors.add(person) + document.save() class HarvesterResponseModel(BaseHarvesterResponse): From a4ca45b0ad5ecb06d361288ba3ef1c3bb2ad37f0 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 17:36:22 -0400 Subject: [PATCH 54/71] Add task to process contributors --- scrapi/tasks.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index acb95c02..aa913532 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -123,6 +123,43 @@ def process_normalized(normalized_doc, raw_doc, **kwargs): processing.process_normalized(raw_doc, normalized_doc, kwargs) +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) +@events.logged(events.PROCESSSING_URIS, 'uri_processing') +def process_contributors(async, **kwargs): + settings.CELERY_ALWAYS_EAGER = not async + + all_buckets = [] + if kwargs.get('source'): + source_buckets = util.gather_contributors(kwargs['source']) + all_buckets.append(source_buckets) + else: + for source in registry.keys(): + source_buckets = util.gather_contributors(source) + all_buckets.append(source_buckets) + + for source in all_buckets: + process_contributors_from_one_source.delay(source['contributors'], async, kwargs=kwargs) + + +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) +def process_contributors_from_one_source(contributors, async=False, **kwargs): + settings.CELERY_ALWAYS_EAGER = not async + + for person in contributors: + process_one_person.delay(person) + + +@task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0, rate_limit='5/s') +@events.logged(events.PROCESSSING_URIS, 'uri_processing') +def process_one_person(person, **kwargs): + processing.process_contributors( + source=person['source'], + docID=person['docID'], + contributor_dict=person['contributor'], + kwargs=kwargs + ) + + @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0) @events.logged(events.PROCESSSING_URIS, 'uri_processing') def process_uris(async, **kwargs): From 7712ff7ec6532beea4a5657ebf888c44f43f3644 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 17:36:50 -0400 Subject: [PATCH 55/71] Add util function to gather contributors into one place for a source --- scrapi/util.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/scrapi/util.py b/scrapi/util.py index 7c5528d5..e49d1530 100644 --- a/scrapi/util.py +++ b/scrapi/util.py @@ -66,8 +66,21 @@ def json_without_bytes(jobj): return jobj -def parse_urls_into_groups(source): +def gather_contributors(source): + source_contributors = [] + source_dict = {'source': source} + for document in Document.objects.filter(source=source): + if document.normalized: + docID = document.normalized['shareProperties']['docID'] + + for person in document.normalized['contributors']: + source_contributors.append({'source': source, 'docID': docID, 'contributor': person}) + source_dict['contributors'] = source_contributors + return source_dict + + +def parse_urls_into_groups(source): source_dict = {'source': source, 'uris': [], 'all_bases': []} for document in Document.objects.filter(source=source): if document.normalized: From eaacfd2ba93863ebb1b7d63fa5e1be8a3addc0a2 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 6 Oct 2015 17:44:49 -0400 Subject: [PATCH 56/71] Add invoke task for process contributors --- tasks.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tasks.py b/tasks.py index e6888aa2..13894697 100644 --- a/tasks.py +++ b/tasks.py @@ -205,6 +205,14 @@ def process_uris(async=False, source=None): process_uris.delay(async=async, source=source) +@task +def process_contributors(async=False, source=None): + settings.CELERY_ALWAYS_EAGER = not async + from scrapi.tasks import process_contributors + + process_contributors.delay(async=async, source=source) + + @task def lint_all(): for name in registry.keys(): From 51e40e61cd8f32bbf43e88c8dfd774044a5177b3 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 7 Oct 2015 14:04:10 -0400 Subject: [PATCH 57/71] Update related name and add field for reconstructed name --- api/webview/models.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/webview/models.py b/api/webview/models.py index 1a6119b3..e7d744ce 100644 --- a/api/webview/models.py +++ b/api/webview/models.py @@ -4,6 +4,7 @@ class Person(models.Model): name = models.CharField(max_length=255) + reconstructed_name = models.CharField(max_length=255) institution = models.CharField(max_length=255, null=True) id_osf = models.CharField(max_length=10, null=True) id_orcid = models.CharField(max_length=100, null=True) @@ -41,5 +42,5 @@ class Document(models.Model): raw = JsonField() normalized = JsonField(null=True) - contributors = models.ManyToManyField(Person, related_name='contributors') + contributors = models.ManyToManyField(Person, related_name='documents') urls = models.ManyToManyField(URL, related_name='urls') From 7d2c06094544ec2012488cbff8cc020e150a5aaf Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 7 Oct 2015 14:04:59 -0400 Subject: [PATCH 58/71] Add contrib processig to events --- scrapi/events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scrapi/events.py b/scrapi/events.py index db943b83..7ab83dab 100644 --- a/scrapi/events.py +++ b/scrapi/events.py @@ -23,6 +23,7 @@ CHECK_ARCHIVE = 'checkArchive' NORMALIZATION = 'normalization' PROCESSSING_URIS = 'processingUris' +PROCESSSING_CONTRIBUTORS = 'processingContributors' # statuses FAILED = 'failed' From 6934afb8265cc355e8e1c3fca30aa849bd345513 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 7 Oct 2015 14:05:29 -0400 Subject: [PATCH 59/71] Add process_contributors to base processor --- scrapi/processing/__init__.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/scrapi/processing/__init__.py b/scrapi/processing/__init__.py index 899e7b6b..0dade9ca 100644 --- a/scrapi/processing/__init__.py +++ b/scrapi/processing/__init__.py @@ -47,6 +47,12 @@ def process_uris(source, docID, uri, uritype, kwargs): get_processor(p).process_uris(source, docID, uri, uritype, **extras) +def process_contributors(source, docID, contributor_dict, kwargs): + for p in settings.POST_PROCESSING: + extras = kwargs.get(p, {}) + get_processor(p).process_contributors(source, docID, contributor_dict, **extras) + + HarvesterResponse = get_processor(settings.RESPONSE_PROCESSOR).HarvesterResponseModel all_processors = map(get_processor, list(set( From d0d1c84ad0f5e406f5f83e15b48c32ce3fc84f37 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 7 Oct 2015 14:05:52 -0400 Subject: [PATCH 60/71] Add process_contributors to base hold --- scrapi/processing/base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scrapi/processing/base.py b/scrapi/processing/base.py index 038fcb69..505b0928 100644 --- a/scrapi/processing/base.py +++ b/scrapi/processing/base.py @@ -17,6 +17,9 @@ def process_normalized(self, raw_doc, normalized, **kwargs): def process_uris(self, source, docID, uri, uritype, **kwargs): pass # pragma: no cover + def process_contributors(self, source, docID, contributor_dict, **kwargs): + pass # pragma: no cover + @abstractmethod def documents(self, *sources): ''' From 2dabdbfa97ebf3c0df00172acb00534e5ad70164 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 7 Oct 2015 14:06:35 -0400 Subject: [PATCH 61/71] Add person gettr, and more to process contributors --- scrapi/processing/postgres.py | 39 ++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index a2af049a..e48546aa 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, unicode_literals import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") @@ -110,8 +110,8 @@ def process_normalized(self, raw_doc, normalized): def _get_by_source_id(self, model, source, docID): try: - return Document.objects.filter(source=source, docID=docID)[0] - except IndexError: + return model.objects.get(source=source, docID=docID) + except model.DoesNotExist: return None def process_uris(self, source, docID, uri, uritype, **kwargs): @@ -122,7 +122,19 @@ def process_uris(self, source, docID, uri, uritype, **kwargs): document.urls.add(url) document.save() - def process_contributor(self, source, docID, contributor_dict): + def get_person(self, model, name, reconstructed_name, id_osf, id_email, id_orcid): + try: + return model.objects.get( + name=name, + reconstructed_name=reconstructed_name, + id_osf=id_osf, + id_email=id_email, + id_orcid=id_orcid + ) + except model.DoesNotExist: + return None + + def process_contributors(self, source, docID, contributor_dict): document = Document.objects.get(source=source, docID=docID) id_osf = None @@ -138,8 +150,25 @@ def process_contributor(self, source, docID, contributor_dict): if contributor_dict.get('email'): id_email = contributor_dict['email'] + reconstructed_name = contributor_dict['givenName'] + if contributor_dict.get('additionalName'): + reconstructed_name = '{} {}'.format(reconstructed_name, contributor_dict['additionalName']) + reconstructed_name = '{} {}'.format(reconstructed_name, contributor_dict['familyName']) + # TODO check to see if the person exists first, if they do, don't make a new one - person = Person(name=contributor_dict['name'], id_osf=id_osf, id_email=id_email, id_orcid=id_orcid) + person = self.get_person( + Person, + name=contributor_dict['name'], + reconstructed_name=reconstructed_name, + id_osf=id_osf, + id_email=id_email, + id_orcid=id_orcid) or Person( + name=contributor_dict['name'], + reconstructed_name=reconstructed_name, + id_osf=id_osf, + id_email=id_email, + id_orcid=id_orcid + ) person.save() document.contributors.add(person) From eb87e3d27595cbbc075d3dee3429661ab3e7a2b0 Mon Sep 17 00:00:00 2001 From: erinspace Date: Wed, 7 Oct 2015 14:07:21 -0400 Subject: [PATCH 62/71] Fix process contribs nvoke task --- scrapi/tasks.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index aa913532..1259b0d3 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -150,7 +150,7 @@ def process_contributors_from_one_source(contributors, async=False, **kwargs): @task_autoretry(default_retry_delay=settings.CELERY_RETRY_DELAY, max_retries=0, rate_limit='5/s') -@events.logged(events.PROCESSSING_URIS, 'uri_processing') +@events.logged(events.PROCESSSING_CONTRIBUTORS, 'contributor_processing') def process_one_person(person, **kwargs): processing.process_contributors( source=person['source'], @@ -174,10 +174,6 @@ def process_uris(async, **kwargs): source_buckets = util.parse_urls_into_groups(source) all_buckets.append(source_buckets) - # import json - # with open('all_sources.json', 'w') as outfile: - # json.dump(all_buckets, outfile) - for source_dict in all_buckets: for group in source_dict['uris']: process_uris_at_one_base_uri.delay(group['individual_uris'], async, kwargs=kwargs) From 8fa87793a5ba75c8ca6f02eb80e389c99d955d98 Mon Sep 17 00:00:00 2001 From: erinspace Date: Mon, 19 Oct 2015 16:20:51 -0400 Subject: [PATCH 63/71] Update models for person with names and orcid blobs --- api/webview/models.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/api/webview/models.py b/api/webview/models.py index e7d744ce..0d92b136 100644 --- a/api/webview/models.py +++ b/api/webview/models.py @@ -3,12 +3,16 @@ class Person(models.Model): - name = models.CharField(max_length=255) - reconstructed_name = models.CharField(max_length=255) + raw_name = models.CharField(max_length=255) # source name we got + name = models.CharField(max_length=255) # reconstructed - given+add+fam + family_name = models.CharField(max_length=255, null=True) + given_name = models.CharField(max_length=255, null=True) + additional_name = models.CharField(max_length=255, null=True) institution = models.CharField(max_length=255, null=True) id_osf = models.CharField(max_length=10, null=True) id_orcid = models.CharField(max_length=100, null=True) id_email = models.CharField(max_length=255, null=True) + raw_orcid = JsonField(null=True) class HarvesterResponse(models.Model): From 9f8b4d1f2a534199ee39141f5d4d2ea2285fe28e Mon Sep 17 00:00:00 2001 From: erinspace Date: Mon, 19 Oct 2015 16:21:27 -0400 Subject: [PATCH 64/71] Add query orcid helper --- scrapi/processing/helpers.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/scrapi/processing/helpers.py b/scrapi/processing/helpers.py index 1254a89a..3261d564 100644 --- a/scrapi/processing/helpers.py +++ b/scrapi/processing/helpers.py @@ -14,3 +14,15 @@ def save_status_of_uri(uri, uritype): 'resolved_datetime': datetime.datetime.now(), 'resolved_status': uri_status.status_code, } + + +def query_orcid_api(id_email): + orcid_email_query = 'http://pub.orcid.org/v1.2/search/orcid-bio?q=email:{}'.format(id_email) + + orcid_results = requests.get(orcid_email_query, headers={'Accept': 'application/orcid+json'}).json() + number_results = orcid_results['orcid-search-results']['num-found'] + + if number_results > 0: + return number_results + else: + return None From 228111ddce4d97d4fee55a246be7a494d48a59e7 Mon Sep 17 00:00:00 2001 From: erinspace Date: Mon, 19 Oct 2015 16:22:35 -0400 Subject: [PATCH 65/71] Update the person save method with new fields from model --- scrapi/processing/postgres.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index e48546aa..404c99fa 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -14,7 +14,7 @@ from scrapi.util import json_without_bytes from scrapi.linter import RawDocument, NormalizedDocument from scrapi.processing import DocumentTuple -from scrapi.processing.helpers import save_status_of_uri +from scrapi.processing.helpers import save_status_of_uri, query_orcid_api from scrapi.processing.base import BaseProcessor, BaseHarvesterResponse, BaseDatabaseManager django.setup() @@ -155,16 +155,24 @@ def process_contributors(self, source, docID, contributor_dict): reconstructed_name = '{} {}'.format(reconstructed_name, contributor_dict['additionalName']) reconstructed_name = '{} {}'.format(reconstructed_name, contributor_dict['familyName']) - # TODO check to see if the person exists first, if they do, don't make a new one + if not id_orcid: + id_orcid = query_orcid_api(id_email) + person = self.get_person( Person, - name=contributor_dict['name'], - reconstructed_name=reconstructed_name, + name=reconstructed_name, + raw_name=contributor_dict['name'], + family_name=contributor_dict.get('familyName'), + given_name=contributor_dict.get('givenName'), + additional_name=contributor_dict.get('additionalName'), id_osf=id_osf, id_email=id_email, id_orcid=id_orcid) or Person( - name=contributor_dict['name'], - reconstructed_name=reconstructed_name, + name=reconstructed_name, + raw_name=contributor_dict['name'], + family_name=contributor_dict.get('familyName'), + given_name=contributor_dict.get('givenName'), + additional_name=contributor_dict.get('additionalName'), id_osf=id_osf, id_email=id_email, id_orcid=id_orcid From 50d94c6d8c50999b45d038565e681b047051871d Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 20 Oct 2015 10:04:50 -0400 Subject: [PATCH 66/71] Add some migrateions --- api/webview/migrations/0001_initial.py | 74 ++++++++++++++++++++++++++ api/webview/migrations/__init__.py | 0 2 files changed, 74 insertions(+) create mode 100644 api/webview/migrations/0001_initial.py create mode 100644 api/webview/migrations/__init__.py diff --git a/api/webview/migrations/0001_initial.py b/api/webview/migrations/0001_initial.py new file mode 100644 index 00000000..89a4c183 --- /dev/null +++ b/api/webview/migrations/0001_initial.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import models, migrations +import django_pgjson.fields + + +class Migration(migrations.Migration): + + dependencies = [ + ] + + operations = [ + migrations.CreateModel( + name='Document', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('source', models.CharField(max_length=255)), + ('docID', models.TextField()), + ('providerUpdatedDateTime', models.DateTimeField(null=True)), + ('raw', django_pgjson.fields.JsonField()), + ('normalized', django_pgjson.fields.JsonField(null=True)), + ], + ), + migrations.CreateModel( + name='HarvesterResponse', + fields=[ + ('key', models.TextField(serialize=False, primary_key=True)), + ('method', models.CharField(max_length=8)), + ('url', models.TextField()), + ('ok', models.NullBooleanField()), + ('content', models.BinaryField(null=True)), + ('encoding', models.TextField(null=True)), + ('headers_str', models.TextField(null=True)), + ('status_code', models.IntegerField(null=True)), + ('time_made', models.DateTimeField(auto_now=True)), + ], + ), + migrations.CreateModel( + name='Person', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('raw_name', models.CharField(max_length=255)), + ('name', models.CharField(max_length=255)), + ('family_name', models.CharField(max_length=255, null=True)), + ('given_name', models.CharField(max_length=255, null=True)), + ('additional_name', models.CharField(max_length=255, null=True)), + ('institution', models.CharField(max_length=255, null=True)), + ('id_osf', models.CharField(max_length=10, null=True)), + ('id_orcid', models.CharField(max_length=100, null=True)), + ('id_email', models.CharField(max_length=255, null=True)), + ('raw_orcid', django_pgjson.fields.JsonField(null=True)), + ], + ), + migrations.CreateModel( + name='URL', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('url', models.CharField(max_length=500)), + ('status', django_pgjson.fields.JsonField(null=True)), + ('response', models.ForeignKey(related_name='response', to='webview.HarvesterResponse', null=True)), + ], + ), + migrations.AddField( + model_name='document', + name='contributors', + field=models.ManyToManyField(related_name='documents', to='webview.Person'), + ), + migrations.AddField( + model_name='document', + name='urls', + field=models.ManyToManyField(related_name='urls', to='webview.URL'), + ), + ] diff --git a/api/webview/migrations/__init__.py b/api/webview/migrations/__init__.py new file mode 100644 index 00000000..e69de29b From 9869f94dccded459f17aa0f5938c4affd56eae58 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 20 Oct 2015 11:05:33 -0400 Subject: [PATCH 67/71] Add a postgres model for data collected from ES --- api/webview/models.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/api/webview/models.py b/api/webview/models.py index 0d92b136..d75b0f3e 100644 --- a/api/webview/models.py +++ b/api/webview/models.py @@ -48,3 +48,21 @@ class Document(models.Model): contributors = models.ManyToManyField(Person, related_name='documents') urls = models.ManyToManyField(URL, related_name='urls') + + +class ESPersonDocument(models.Model): + # Person + raw_name = models.CharField(max_length=255) # source name we got + name = models.CharField(max_length=255) # reconstructed - given+add+fam + family_name = models.CharField(max_length=255, null=True) + given_name = models.CharField(max_length=255, null=True) + additional_name = models.CharField(max_length=255, null=True) + institution = models.CharField(max_length=255, null=True) + id_osf = models.CharField(max_length=10, null=True) + id_orcid = models.CharField(max_length=100, null=True) + id_email = models.CharField(max_length=255, null=True) + raw_orcid = JsonField(null=True) + + # Which document they come from + source = models.CharField(max_length=255) + docID = models.TextField() From ad79ad0e6f7e4b0e30adc71f30f594dd0d6d30d0 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 20 Oct 2015 13:07:53 -0400 Subject: [PATCH 68/71] Add from prod es arg --- tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tasks.py b/tasks.py index 13894697..7bf5e8d2 100644 --- a/tasks.py +++ b/tasks.py @@ -206,11 +206,11 @@ def process_uris(async=False, source=None): @task -def process_contributors(async=False, source=None): +def process_contributors(async=False, source=None, production_es=False): settings.CELERY_ALWAYS_EAGER = not async from scrapi.tasks import process_contributors - process_contributors.delay(async=async, source=source) + process_contributors.delay(async=async, source=source, production_es=production_es) @task From 2219f5a3b628af32d3bfaf0fba23f4eef5710e74 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 20 Oct 2015 13:08:46 -0400 Subject: [PATCH 69/71] Add from prod es arg --- scrapi/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index 1259b0d3..81625921 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -130,11 +130,11 @@ def process_contributors(async, **kwargs): all_buckets = [] if kwargs.get('source'): - source_buckets = util.gather_contributors(kwargs['source']) + source_buckets = util.gather_contributors(kwargs['source'], kwargs.get('production_es')) all_buckets.append(source_buckets) else: for source in registry.keys(): - source_buckets = util.gather_contributors(source) + source_buckets = util.gather_contributors(source, kwargs.get('production_es')) all_buckets.append(source_buckets) for source in all_buckets: From 246589c38e167e2fb49e07a29f8bf9ee3ce44957 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 20 Oct 2015 13:09:17 -0400 Subject: [PATCH 70/71] Placehlder for process from production es --- scrapi/util.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/scrapi/util.py b/scrapi/util.py index e49d1530..09f0db8c 100644 --- a/scrapi/util.py +++ b/scrapi/util.py @@ -6,6 +6,7 @@ import pytz import time import logging +import sharefg os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings") from api.webview.models import Document @@ -66,17 +67,21 @@ def json_without_bytes(jobj): return jobj -def gather_contributors(source): +def gather_contributors(source, production_es): source_contributors = [] source_dict = {'source': source} - for document in Document.objects.filter(source=source): - if document.normalized: - docID = document.normalized['shareProperties']['docID'] - for person in document.normalized['contributors']: - source_contributors.append({'source': source, 'docID': docID, 'contributor': person}) + if production_es: + pass + else: + for document in Document.objects.filter(source=source): + if document.normalized: + docID = document.normalized['shareProperties']['docID'] + + for person in document.normalized['contributors']: + source_contributors.append({'source': source, 'docID': docID, 'contributor': person}) - source_dict['contributors'] = source_contributors + source_dict['contributors'] = source_contributors return source_dict From c8a38864240968f315beba03f373287a53cffda6 Mon Sep 17 00:00:00 2001 From: erinspace Date: Tue, 20 Oct 2015 13:10:31 -0400 Subject: [PATCH 71/71] Add a migration for es prod documents --- .../migrations/0002_espersondocument.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 api/webview/migrations/0002_espersondocument.py diff --git a/api/webview/migrations/0002_espersondocument.py b/api/webview/migrations/0002_espersondocument.py new file mode 100644 index 00000000..c9a76e06 --- /dev/null +++ b/api/webview/migrations/0002_espersondocument.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import models, migrations +import django_pgjson.fields + + +class Migration(migrations.Migration): + + dependencies = [ + ('webview', '0001_initial'), + ] + + operations = [ + migrations.CreateModel( + name='ESPersonDocument', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('raw_name', models.CharField(max_length=255)), + ('name', models.CharField(max_length=255)), + ('family_name', models.CharField(max_length=255, null=True)), + ('given_name', models.CharField(max_length=255, null=True)), + ('additional_name', models.CharField(max_length=255, null=True)), + ('institution', models.CharField(max_length=255, null=True)), + ('id_osf', models.CharField(max_length=10, null=True)), + ('id_orcid', models.CharField(max_length=100, null=True)), + ('id_email', models.CharField(max_length=255, null=True)), + ('raw_orcid', django_pgjson.fields.JsonField(null=True)), + ('source', models.CharField(max_length=255)), + ('docID', models.TextField()), + ], + ), + ]