Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 238 additions & 92 deletions adsdata/process.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

from datetime import datetime
from collections import defaultdict

Expand All @@ -19,6 +18,8 @@ def __init__(self, compute_metrics=True, compute_CC = False):
self.data_dict = data_files
self.logger = tasks.app.logger
self.readers = {}
self.master_protobuf = self._get_master_nonbib_dict()


def __enter__(self):
self._open_all()
Expand All @@ -27,6 +28,46 @@ def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
self._close_all()

def _get_master_nonbib_dict(self):
# Template for the new protobuf structure
return {
"identifier": [], # Master Pipeline
"links": {
"ARXIV": [], # Master Pipeline
"DOI": [], # Master Pipeline
"DATA": {},
"ESOURCE": {},
"ASSOCIATED": {
"url": [],
"title": [],
"count": 0
},
"INSPIRE": {
"url": [],
"title": [],
"count": 0
},
"LIBRARYCATALOG": {
"url": [],
"title": [],
"count": 0
},
"PRESENTATION": {
"url": [],
"title": [],
"count": 0
},
"ABSTRACT": False, # Master Pipeline
"CITATIONS": False, # Master Pipeline
"GRAPHICS": False, # Master Pipeline
"METRICS": False, # Master Pipeline
"OPENURL": False, # Master Pipeline
"REFERENCES": False,# Master Pipeline
"TOC": False, # Master Pipeline
"COREAD": False # Master Pipeline
}
}

def process_bibcodes(self, bibcodes):
"""send nonbib and metrics records to master for the passed bibcodes
for each bibcode
Expand Down Expand Up @@ -54,79 +95,118 @@ def process_bibcodes(self, bibcodes):
tasks.task_output_metrics.delay(metrics_protos)

def _convert(self, passed):
"""convert full nonbib dict to what is needed for nonbib protobuf
data links values are read from separate files so they are in separate dicts
they must be merged into one field for the protobuf
a couple fields are summarized
some other fields are just copied
some fields are deleted
"""Convert full nonbib dict to what is needed for nonbib protobuf.

Data links values are read from separate files and merged into one field.
The method handles:
- Data link processing and merging
- Property aggregation
- Field summarization and copying
- Computed field generation
- Cleanup of unused fields

Args:
passed (dict): Raw data dictionary containing all input fields

Returns:
dict: Processed data ready for nonbib protobuf
"""
return_value = {}
return_value['data_links_rows'] = []
return_value['property'] = set()
return_value['esource'] = set()
# Initialize return structure
return_value = {
"data_links_rows": [],
"property": set(),
"esource": set()
}

for filetype, value in passed.items():
file_properties = self.data_dict[filetype] #data_files[filetype]
file_properties = self.data_dict[filetype]
default_value = file_properties.get('default_value')
extra_values = file_properties.get('extra_values', {})

# Handle special cases first
if filetype == 'canonical':
return_value['bibcode'] = passed['canonical']
if (value is dict and dict and 'property' in value[filetype]):
return_value['property'].update(value[filetype]['property'])
if (type(file_properties['default_value']) is bool):
continue

if filetype == 'relevance':
return_value.update(passed[filetype])
continue

# Handle boolean fields and TOC
if isinstance(default_value, bool):
return_value[filetype] = value[filetype]
value = value[filetype]
if ('extra_values' in file_properties and 'link_type' in file_properties['extra_values'] and value != file_properties['default_value']):
# here with one or more real datalinks value(s)
# add each data links dict to existing list of dicts
# tweak some values (e.g., sub_link_type) in original dict
if type(value) is bool or type(value) is dict:
d = self._convert_data_link(filetype, value)
return_value['data_links_rows'].append(d)
elif type(value) is list:
for v in value:
d = self._convert_data_link(filetype, v)
return_value['data_links_rows'].append(d)

# Process data links
if 'link_type' in extra_values and value != default_value:
# Convert and add data links
if isinstance(value, (bool, dict)):
return_value['data_links_rows'].append(
self._convert_data_link(filetype, value))
elif isinstance(value, list):
return_value['data_links_rows'].extend(
self._convert_data_link(filetype, v) for v in value)
else:
self.logger.error('serious error in process._convert with {} {} {}'.format(filetype, type(value), value))

if file_properties['extra_values']['link_type'] == 'ESOURCE':
return_value['esource'].add(file_properties['extra_values']['link_sub_type'])
return_value['property'].add(file_properties['extra_values']['link_type'])
return_value['property'].update(file_properties['extra_values'].get('property', []))
elif ('extra_values' in file_properties and value != file_properties['default_value']):
if 'property' in file_properties['extra_values']:
return_value['property'].update(file_properties['extra_values']['property'])

elif value != file_properties['default_value'] or file_properties.get('copy_default', False):
# otherwise, copy value
self.logger.error(
f'serious error in process._convert with {filetype} {type(value)} {value}')
continue

# Update esource and properties
link_type = extra_values['link_type']
if link_type == 'ESOURCE':
return_value['esource'].add(extra_values['link_sub_type'])
return_value['property'].add(link_type)
return_value['property'].update(extra_values.get('property', []))

# Handle properties
elif extra_values and value != default_value:
if 'property' in extra_values:
return_value['property'].update(extra_values['property'])

# Copy remaining fields if needed
elif value != default_value or file_properties.get('copy_default', False):
return_value[filetype] = passed[filetype]
if filetype == 'relevance':
for k in passed[filetype]:
# simply add all dict value to top level
return_value[k] = passed[filetype][k]


# Add computed properties
self._add_refereed_property(return_value)
self._add_article_property(return_value, passed)
self._add_data_summary(return_value)
self._add_citation_count_fields(return_value, passed)

# Sort sets
return_value['property'] = sorted(return_value['property'])
return_value['esource'] = sorted(return_value['esource'])
self._add_data_summary(return_value)

# Merge and process data links
return_value['data_links_rows'] = self._merge_data_links(return_value['data_links_rows'])
self._add_citation_count_fields(return_value, passed)

# time for computed fields
for k, v in computed_fields.items():
f = getattr(self, v['converter_function'], None)
if f is None:
self.logger.error('serious error in process._covert, expected converter_function {} for field {} not found'.format(v['converter_function'], k))
master_template = self._get_master_nonbib_dict()

# Populate the new protobuf structure with link data
self._populate_new_links_structure(return_value['data_links_rows'], master_template)

# Add computed fields
for field_name, field_config in computed_fields.items():
converter = getattr(self, field_config['converter_function'], None)
if converter:
return_value.update(converter(return_value))
else:
x = f(return_value)
return_value.update(x)

# finally, delete the keys not in the nonbib protobuf
not_needed = ['author', 'canonical', 'citation', 'deleted', 'deprecated_citation_count', 'doi', 'download', 'item_count', 'nonarticle',
'ocrabstract', 'preprint', 'private', 'pub_openaccess', 'pub2arxiv',
'reads', 'refereed', 'relevance', 'toc']
for n in not_needed:
return_value.pop(n, None)
self.logger.error(
f'serious error in process._convert, expected converter_function '
f'{field_config["converter_function"]} for field {field_name} not found')

# Remove unused fields
unused_fields = {
'author', 'canonical', 'citation', 'deleted', 'deprecated_citation_count',
'doi', 'download', 'item_count', 'nonarticle', 'ocrabstract', 'preprint',
'private', 'pub_openaccess', 'pub2arxiv', 'reads', 'refereed',
'relevance', 'toc'
}
for field in unused_fields:
return_value.pop(field, None)
return_value.update(master_template)
return_value.pop('data_links_rows')
self.logger.debug('Processed nonbib data: {}'.format(return_value))
return return_value

def _add_citation_count_fields(self, return_value, original):
Expand All @@ -145,11 +225,11 @@ def _add_refereed_property(self, return_value):
if'REFEREED' not in return_value['property']:
return_value['property'].add('NOT REFEREED')

def _add_article_property(self, return_value, d):
x = d.get('nonarticle', False)
if type(x) is dict:
x = x['nonarticle']
if x:
def _add_article_property(self, return_value, passed):
nonarticle_value = passed.get('nonarticle', False)
if isinstance(nonarticle_value, dict):
nonarticle_value = nonarticle_value['nonarticle']
if nonarticle_value:
return_value['property'].add('NONARTICLE')
else:
return_value['property'].add('ARTICLE')
Expand Down Expand Up @@ -199,36 +279,50 @@ def _merge_data_links(self, datalinks):

def _convert_data_link(self, filetype, value):
"""convert one data link row"""
file_properties = self.data_dict[filetype] #data_files[filetype]
d = {}
d['link_type'] = file_properties['extra_values']['link_type']

self.logger.debug('Converting data link: {}'.format(value))
file_properties = self.data_dict[filetype]

link_type = file_properties['extra_values']['link_type']
link_sub_type = file_properties['extra_values'].get('link_sub_type', '')
link_sub_type_suffix = ''
if value is dict and 'subparts' in value and 'item_count' in value['subparts']:
link_sub_type_suffix = ' ' + str(value['subparts']['item_count'])
if value is True:
d['link_sub_type'] = file_properties['extra_values']['link_sub_type'] + link_sub_type_suffix
elif 'link_sub_type' in value:
d['link_sub_type'] = value['link_sub_type'] + link_sub_type_suffix
elif 'link_sub_type' in file_properties['extra_values']:
d['link_sub_type'] = file_properties['extra_values']['link_sub_type'] + link_sub_type_suffix
if type(value) is bool:
d['url'] = ['']
d['title'] = ['']
d['item_count'] = 0
elif type(value) is dict:
d['url'] = value.get('url', [''])
if type(d['url']) is str:
d['url'] = [d['url']]
d['title'] = value.get('title', [''])
if type(d['title']) is str:
d['title'] = [d['title']]
# if d['title'] == ['']:
# d.pop('title') # to match old pipeline
d['item_count'] = value.get('item_count', 0)
else:
self.logger.error('serious error in process.convert_data_link: unexpected type for value, filetype = {}, value = {}, type of value = {}'.format(filetype, value, type(value)))

return d
if isinstance(value, dict) and 'subparts' in value:
link_sub_type_suffix = f" {value['subparts'].get('item_count', '')}".strip()

# Determine the link sub type
if not link_sub_type and isinstance(value, dict) and 'link_sub_type' in value:
link_sub_type = value['link_sub_type']


link_sub_type += link_sub_type_suffix

# Initialize result dictionary
link_data = { 'link_type': link_type,
'link_sub_type': link_sub_type,
"url": [""],
"title": [""],
"item_count": 0
}

if isinstance(value, dict):
link_data['url'] = value.get('url', [''])
link_data['title'] = value.get('title', [''])
link_data['item_count'] = value.get('item_count', 0)

self.logger.debug('Link data before conversion: {}'.format(link_data))
if isinstance(link_data['url'], str):
link_data['url'] = [link_data['url']]
if isinstance(link_data['title'], str):
link_data['title'] = [link_data['title']]
self.logger.debug('Link data after conversion: {}'.format(link_data))
elif not isinstance(value, bool):
self.logger.error(
f"Serious error in process.convert_data_link: unexpected type for value, filetype = {filetype}, "
f"value = {value}, type of value = {type(value)}"
)
self.logger.debug('Converted data link: {}'.format(link_data))
return link_data

def _read_next_bibcode(self, bibcode):
"""read all the info for the passed bibcode into a dict"""
Expand Down Expand Up @@ -324,3 +418,55 @@ def _compute_bibgroup_facet(self, d):
return {}
bibgroup_facet = sorted(list(set(bibgroup)))
return {'bibgroup_facet': bibgroup_facet}

def _populate_new_links_structure(self, data_links_rows, master_template):
"""Populate the new protobuf links structure from data_links_rows.
Maps the flat data_links_rows into the hierarchical links structure."""

self.logger.debug('Populating new links structure: {}'.format(data_links_rows))

# Map for link types that need special handling
link_type_mapping = {
'DATA': 'DATA',
'ESOURCE': 'ESOURCE',
'ASSOCIATED': 'ASSOCIATED',
'INSPIRE': 'INSPIRE',
'LIBRARYCATALOG': 'LIBRARYCATALOG',
'PRESENTATION': 'PRESENTATION'
}

for row in data_links_rows:
link_type = row.get('link_type', '')

# Skip if not in our mapping
if link_type not in link_type_mapping:
continue

mapped_type = link_type_mapping[link_type]

# Handle DATA and ESOURCE which have sub_type structure
if mapped_type in ('DATA', 'ESOURCE'):
sub_type = row.get('link_sub_type', '')
if sub_type not in master_template['links'][mapped_type]:
master_template['links'][mapped_type][sub_type] = {
'url': [],
'title': [],
'count': 0
}
if 'url' in row:
master_template['links'][mapped_type][sub_type]['url'].extend(row['url'])
if 'title' in row:
master_template['links'][mapped_type][sub_type]['title'].extend(row['title'])
if 'item_count' in row:
master_template['links'][mapped_type][sub_type]['count'] = row['item_count']

# Handle other link types with direct structure
else:
if 'url' in row:
master_template['links'][mapped_type]['url'].extend(row['url'])
if 'title' in row:
master_template['links'][mapped_type]['title'].extend(row['title'])
if 'item_count' in row:
master_template['links'][mapped_type]['count'] = row['item_count']
self.logger.debug('Populated new links structure: {}'.format(master_template))
return master_template
Loading