diff --git a/README.md b/README.md index a5874e6..62e626d 100755 --- a/README.md +++ b/README.md @@ -1,7 +1,38 @@ # babel_datapipeline Data processing pipeline for Babel -Luigi tasks are in the luigi_pipeline file. +### Command to run +`luigi --module babel_datapipeline tasks.[task_file].[specific_task] --local-scheduler --date [yyyy-mm-dd]` + +This is the command to run locally. `[task_file]` and `[specific_task]` refer to the tasks listed in Current Tasks. + +### Current Tasks + +**Infomap** +- PajekFactory +- InfomapTask + +**IO** +- LocalTargetInputs +- AminerS3Targets +- DynamoOutputTask +- *Future dataset S3 targets would go here* + +**Parsers** +- AMinerParse +- *Future dataset parser tasks go here* + +**Recommenders** +- CocitationTask +- BibcoupleTask +- EFTask + +### Configuration +Luigi specific configuration as described [here](http://luigi.readthedocs.org/en/stable/configuration.html) can be found in configs/luigi.cfg. Dataset configuration can be found in configs/default.cfg. + +### Output files +Currently the datapipeline dumps the outputs of various steps of the pipeline in the folder in which the command is run. In doing so it will create folders named `citation_dict`, `infomap_output`, `pajek_files`, and `recs`. + +### Requirements +This module, in addition to the packages noted in the setup.py and requirements.txt files, requires that [Infomap](http://www.mapequation.org/code.html#Installation) is installed and included in PATH. Additionally, in order to use any of the AWS IO tasks, [boto3 credentials](http://boto3.readthedocs.org/en/latest/guide/configuration.html) must be configured. So far `region`, `aws_access_key_id`, and `aws_secret_access_key` are required. -Current command to run: -`luigi --module luigi_pipeline [parser or recommender task] --local-scheduler --date [yyyy-mm-dd]` diff --git a/babel_datapipeline/database/storage.py b/babel_datapipeline/database/storage.py deleted file mode 100755 index 05a5bce..0000000 --- a/babel_datapipeline/database/storage.py +++ /dev/null @@ -1,193 +0,0 @@ -#!/usr/bin/env python - -import boto.dynamodb2.table -from boto.dynamodb2.fields import HashKey, RangeKey -from boto.dynamodb2.types import NUMBER -import boto.dynamodb2 -import random -import hashlib -import logging -from collections import defaultdict - -def create_hash(doi, rec_type): - return "|".join((doi, rec_type)) - -class Table(): - def __init__(self, connection, max_results, table_name, hash_key, range_key, rec_attribute, rec_types, publisher): - self.table_name = table_name - self.hash_key = hash_key - self.range_key = range_key - self.rec_attribute = rec_attribute - self.rec_types = rec_types - self.connection = connection - self.max_results = max_results - self.table = boto.dynamodb2.table.Table(table_name, connection=connection) - self.publisher = publisher - - def create(self, read=5, write=5): - return self.table.create(self.table_name, - schema=[HashKey(self.hash_key), - RangeKey(self.range_key, - data_type=NUMBER)], - throughput={'read':read, 'write':write}, - connection=self.connection) - def delete(self): - return self.table.delete() - - def get_batch_put_context(self): - """ - Returns the context manager to use for batched put requests. - - See http://boto.readthedocs.org/en/latest/dynamodb2_tut.html#batch-writing for details. - """ - return self.table.batch_write() - - def put_entry(self, entry): - return self.table.put_item(entry) - - def update_throughput(self, read=5, write=5): - return self.table.update(throughput={'read':read, 'write':write}) - - def get_rec_set(self, ids, limit=10, rec_type="classic"): - """ - Given a set of IDs from a publisher return a dictionary of recommendations - """ -#TODO: Make this limit better, currently 100 - limit = min(limit, self.max_results*10) - id_set = set(ids) - uuid = hashlib.md5() - if rec_type not in self.rec_types: - raise ValueError("Invalid recommendation type: " + rec_type) - - #TODO: Use an ordered set so we have a stable hash - papers = {} - for cid in id_set: - uuid.update(cid) - index = create_hash(cid, rec_type) - query_results = self.table.query(composite_doi__eq=index, limit=limit) - for qr in query_results: - for pid in qr[self.rec_attribute]: - if pid in papers and papers[pid] != qr["score"]: - logging.warn("{0} present already but with a different score: {1} vs {2}".format(pid, papers[pid], qr["score"])) - if qr["score"] > papers[pid]: - papers[pid] = qr["score"] - else: - papers[pid] = qr["score"] - - recs_set = defaultdict(set) - for pid, score in papers.iteritems(): - recs_set[score].add(pid) - recs = [(paper_ids, score) for score, paper_ids in recs_set.iteritems()] - recs.sort(key=lambda rec: rec[1], reverse=True) - - return self._build_rec_list(recs, hashobj=uuid, limit=limit) - - def _build_rec_list(self, recs, hashobj=None, limit=10): - results = [] - for rec, _ in recs: - - if hashobj: - [hashobj.update(paper) for paper in rec] - - if len(rec) == 1: - # Only one result in the score bucket, just add it - results.extend(rec) - else: - # A score tie. Randomly choose from the tied elements - # You cannot sample > len(pop) - sample = min(len(rec), limit-len(results)) - results.extend(random.sample(rec, sample)) - - if limit - len(results) <= 0: - break - - results_dict = [{"id": paper, "publisher":self.publisher} for paper in results] - if len(results_dict): - return {"recommendations" : results_dict, "transaction_id" : hashobj.hexdigest()} - else: - return {} - - def get_recommendation(self, doi, rec_type=None, limit=100): - """ - Given a doi, return a dictionary of recommendations. - - Returns: - A dictionary with keys corresponding to the recommendation types - and values are lists of recommended document DOIs, descending - (best recommendation -> worst). - - In the event that no recommendations exist an empty dictionary - will be returned. - """ - results = dict() - if rec_type is None: - # If no type is specified look for them all - for t in self.rec_types: - results.update(self.get_recommendation(doi, - rec_type=t, - limit=limit)) - return results - else: - if rec_type not in self.rec_types: - raise ValueError("Invalid recommendation type: " + rec_type) - - limit = min(limit, self.max_results) - index = create_hash(doi, rec_type) - query_results = self.table.query(composite_doi__eq=index, limit=limit) - uuid = hashlib.md5() - uuid.update(index) - recs = list() - for query_result in query_results: - result_set = query_result[self.rec_attribute] - - for result in result_set: - uuid.update(result) - - if len(result_set) == 1: - # Only one result in the score bucket, just add it - recs.extend(result_set) - else: - # A score tie. Randomly choose from the tied elements - # You cannot sample > len(pop) - sample = min(len(result_set), limit-len(recs)) - recs.extend(random.sample(result_set, sample)) - - if limit - len(recs) <= 0: - break - - recs = [{"id":x, "publisher":self.publisher} for x in recs] - - if len(recs) > 0: - return {rec_type: {"recommendations" : recs, "transaction_id" : uuid.hexdigest()}} - else: - return dict() - -class Storage(): - def __init__(self, storage_conf): - - self.conf = storage_conf - self.max_results = storage_conf["max_results"] - self.publishers = self.conf["publishers"] - - if self.conf["region"] == "localhost": - from boto.dynamodb2.layer1 import DynamoDBConnection - self.connection = DynamoDBConnection( - host='localhost', - port=8000, - aws_secret_access_key='anything', - is_secure=False) - - else: - self.connection = boto.dynamodb2.connect_to_region(self.conf["region"]) - - self.tables = dict() - for prod in self.publishers: - self.tables[prod] = Table(self.connection, max_results=self.max_results, **self.conf[prod]) - - def close(self): - """ Closes the connection. This allows you to use with contextlib's closing. - Mostly necessary for the test DB which seems to only allow a single connection. - """ - - self.connection.close() - diff --git a/babel_datapipeline/database/transformer.py b/babel_datapipeline/database/transformer.py old mode 100755 new mode 100644 index 910042c..88b8484 --- a/babel_datapipeline/database/transformer.py +++ b/babel_datapipeline/database/transformer.py @@ -1,131 +1,99 @@ -#!/usr/bin/env python - -from __future__ import print_function -from configobj import ConfigObj -from storage import Storage -from contextlib import closing -import itertools -from decimal import Decimal -from boto.exception import JSONResponseError -from tree_transform import make_tree_rec, process_tree +from babel_util.storage.dynamo import Table, TABLE_DEFINITION, DATASETS +import boto3 import time import logging +from itertools import groupby +from decimal import Decimal -REC_TYPE_MAP = {"1" : "classic", "2" : "expert"} - -def process_dict_stream(stream, conf): - filtered_stream = itertools.ifilter(lambda e: e["rectype"] in REC_TYPE_MAP, stream) # Only include the rec types we know and love - hashkey_stream = itertools.groupby(filtered_stream, lambda e: "|".join((e["targetdoi"], REC_TYPE_MAP[e["rectype"]], e["EF"]))) - for (key, stream) in hashkey_stream: - # Boto doesn't support lists, ony sets - recs = set([s["doi"] for s in stream]) - yield debucketer(key, recs, conf) - -def make_key(e): - return "|".join((e.target_doi, e.rec_type, str(e.score))) - -def process_record_stream(stream, conf): - for group in stream: - hashkey_stream = itertools.groupby(group, make_key) - for (key, stream) in hashkey_stream: - # Boto doesn't support lists, ony sets - recs = set([s.doi for s in stream]) - yield debucketer(key, recs, conf) - -def debucketer(key, value, conf): - (hash_key, ef) = key.rsplit("|", 1) - # Boto's handling of float's is poor. Decimals, however, work fine. - # See https://github.com/boto/boto/issues/2413 - return {conf["hash_key"] : hash_key, - conf["range_key"] : Decimal(ef), - conf["rec_attribute"] : value} - -def get_configs(): - import os, babel_datapipeline - from validate import Validator - - vtor = Validator() - config = ConfigObj(infile=os.path.join(babel_datapipeline.__path__[0], 'configs', 'default.cfg'), - unrepr=True, interpolation="template", - configspec=os.path.join(babel_datapipeline.__path__[0], 'configs', 'confspec.cfg')) - - config.validate(vtor) - config = config.dict() - # print(config) - return(config) - -def main(publisher, filename, create=False, flush=False, dryrun=False, verbose=False, skip=False): - import sys - from collections import deque - from csv import reader - - print(filename) - - config = get_configs() - - if publisher not in config['storage']['publishers']: - raise ValueError('Publisher is not valid') - - with closing(Storage(config["storage"])) as c: - table = c.tables[publisher] - if flush: - logging.info("Deleting table: " + table.table_name) - try: - if dryrun is False: - table.delete() - time.sleep(20) - except JSONResponseError as e: - pass # Table doesn't exist, be cool. - - if create: - logging.info("Creating table: " + table.table_name) - if dryrun is False: - table.create(write=2000) # Just don't forget to turn it back down - time.sleep(20) # This call is async, so chill for a bit - - entries = 0 - start = time.time() - - if skip: - logging.info("Skipping the first line") - filename.next() - - reader = reader(filename, delimiter=config["metadata"][publisher]["tree"]["delimiter"]) - record_reader = itertools.imap(make_tree_rec, reader) - entry_stream = process_record_stream(process_tree(record_reader), config["storage"][publisher]) - rate = deque(maxlen=20) - with table.get_batch_put_context() as batch: - for entry in entry_stream: - if verbose: - print(entry) - if dryrun is False: - batch.put_item(entry) - entries += 1 - if entries % 50000 == 0: - current_time = time.time() - current_rate = entries/(current_time - start) - rate.append(current_rate) - sys.stdout.flush() - end = time.time() - print("\nProcessed {0:,} entries in {1:.0f} seconds: {2:.2f} entries/sec".format(entries, end-start, entries/(end-start))) - - if dryrun is False: - table.update_throughput() + +def id_and_ef(line): + line = line.split() + return line[0] + '|' + line[2] + + +def process_edgelist(stream, rec_type): + for key, group in groupby(stream, lambda line: id_and_ef(line)): + key, ef = key.split('|') + group = map(str.split, group) + recs = set([s[1] for s in group]) + yield debucketer(key, rec_type, ef, recs) + + +def debucketer(key, rec_type, ef, recs): + hash_key = "%s|%s" % (key, rec_type) + return {TABLE_DEFINITION["hash_key"]: hash_key, + TABLE_DEFINITION["range_key"]: Decimal(ef), + TABLE_DEFINITION["rec_attribute"]: recs} + + +def main(dataset, expert, classic, region, create=False, flush=False, dryrun=False, verbose=False): + if region == "localhost": + client = boto3.resource('dynamodb', endpoint_url="http://localhost:8000") + else: + client = boto3.resource('dynamodb') + + t = Table(client, dataset) + + if flush: + logging.info("Deleting table: " + t.table_name) + if not dryrun: + t.delete() + + if create: + logging.info("Creating table: " + t.table_name) + if not dryrun: + t.create(write=2000) + + entries = 0 + start = time.time() + + with t.get_batch_put_context() as batch: + print("Generating expert recommendations...") + for expert_rec in process_edgelist(expert, 'expert'): + if verbose: + print(expert_rec) + if not dryrun: + batch.put_item(expert_rec) + entries += 1 + if entries % 50000 == 0: + current_time = time.time() + current_rate = entries/(current_time - start) + print("\nProcessed {0:,} entries in {1:.0f} seconds: {2:.2f} entries/sec".format(entries, time.time()-start, entries/(time.time()-start))) + sys.stdout.flush() + + # Reset for the second pass + print("Generating classic recommendations...") + for classic_rec in process_edgelist(classic, 'classic'): + if verbose: + print(classic_rec) + if not dryrun: + batch.put_item(classic_rec) + entries += 1 + if entries % 50000 == 0: + current_time = time.time() + current_rate = entries/(current_time - start) + print("\nProcessed {0:,} entries in {1:.0f} seconds: {2:.2f} entries/sec".format(entries, time.time()-start, entries/(time.time()-start))) + sys.stdout.flush() + end = time.time() + print("\nProcessed {0:,} entries in {1:.0f} seconds: {2:.2f} entries/sec".format(entries, end-start, entries/(end-start))) + + if not dryrun: + t.update_throughput() if __name__ == '__main__': import argparse + import sys - PRODUCTS = ("plos", "jstor", "arxiv", "pubmed", "dblp", "ssrn", "mas", "aminer") - parser = argparse.ArgumentParser(description="Transform reccomender output to DynamoDB") - parser.add_argument("publisher", help="which publisher", choices=PRODUCTS) - parser.add_argument("filename", help="file to transform", type=argparse.FileType('r')) + parser = argparse.ArgumentParser(description="Transform recommender output to DynamoDB") + parser.add_argument("dataset", help="Dataset", choices=DATASETS) + parser.add_argument("expert", help=" expert file to transform", type=argparse.FileType('r')) + parser.add_argument("classic", help="classic file to transform", type=argparse.FileType('r')) + parser.add_argument("--region", help="Region to connect to", default="localhost") parser.add_argument("-c", "--create", help="create table in database", action="store_true") parser.add_argument("-f", "--flush", help="flush database.", action="store_true") parser.add_argument("-d", "--dryrun", help="Process data, but don't insert into DB", action="store_true") parser.add_argument("-v", "--verbose", action="store_true") - parser.add_argument("-s", "--skip", help="Skip first line", action="store_true") args = parser.parse_args() - main(args.publisher, args.filename, args.create, args.flush, args.dryrun, args.verbose, args.skip) - + main(args.dataset, args.expert, args.classic, args.region, args.create, args.flush, args.dryrun, args.verbose) diff --git a/babel_datapipeline/database/tree_transform.py b/babel_datapipeline/database/tree_transform.py deleted file mode 100755 index 7236c68..0000000 --- a/babel_datapipeline/database/tree_transform.py +++ /dev/null @@ -1,136 +0,0 @@ -#!/usr/bin/env python -from __future__ import print_function -import itertools -from csv import reader - -class TreeRecord(object): - __slots__ = ("doi", "local", "score", "parent") - def __init__(self, cluster, doi, score): - cluster = cluster.split(CLUSTER_DELIMITER) - - try: - cluster.pop() # Remove local order - self.local = CLUSTER_DELIMITER.join(cluster) - except IndexError: - self.local = None - try: - cluster.pop() # Remove local-cluster id - self.parent = CLUSTER_DELIMITER.join(cluster) - except IndexError: - self.parent = None - - score = float(score) - if score == 0: - score = -1.0 #Dynamo doesn't understand inf - - # Strip whitespace and any quotes - self.doi = doi.strip().strip('"') - self.score = score - - def __eq__(self, other): - return self.doi == other.doi and self.local == other.local and self.parent == other.parent - - def __ne__(self, other): - return not self == other - -class Recommendation(object): - rec_type = None - __slots__ = ("target_doi", "doi", "score") - - def __init__(self, target_doi, doi, score): - self.target_doi = target_doi - self.score = score - self.doi = doi - - def __str__(self): - return "%s\t%s\t%s\t%s" % (self.target_doi, self.rec_type, self.doi, self.score) - - def __repr__(self): - return "<%s,%s,%s,%s>" % (self.target_doi, self.rec_type, self.doi, self.score) - -class ClassicRec(Recommendation): - rec_type = "classic" - -class ExpertRec(Recommendation): - rec_type = "expert" - - -CLUSTER_DELIMITER = ':' -def require_local(e): - return e.local != None -def get_local(e): - return e.local -def get_parent(e): - return e.parent -def get_score(e): - return e.score -def make_tree_rec(entry): - """Transforms a raw entry to a TreeRecord""" - return TreeRecord(entry[0], entry[2], entry[1]) - -def make_expert_rec(stream, rec_limit=5): - filtered_stream = itertools.ifilter(require_local, stream) - expert_stream = itertools.groupby(filtered_stream, get_local) - - for (_, stream) in expert_stream: - expert_papers = [e for e in stream] - - expert_rec = list() - for p in expert_papers: - count = 0 - for r in expert_papers: - if count >= rec_limit: - break - - if r == p: - continue - - expert_rec.append(ExpertRec(p.doi, r.doi, r.score)) - count = count + 1 - - #NOTE: Yielding an empty list means no recommendations - yield expert_rec, expert_papers - -def process_tree(stream, rec_limit=5): - """Given a stream of TreeRecord, convert them to ClassicRec and ExpertRec""" - classic_stream = itertools.groupby(stream, get_parent) - for (classic_key, stream) in classic_stream: - - classic_recs = list() - expert_recs = list() - expert_papers = list() - - for (recs, papers) in make_expert_rec(stream): - expert_papers.extend(papers) - if recs != []: - expert_recs.extend(recs) - - if classic_key != None: - expert_sorted = sorted(expert_papers, key=get_score, reverse=True) - for p in expert_sorted: - count = 0 - for r in expert_sorted: - if count >= rec_limit: - break - - if p == r: - continue - - classic_recs.append(ClassicRec(p.doi, r.doi, r.score)) - count = count + 1 - - expert_recs.extend(classic_recs) - yield expert_recs - -if __name__ == '__main__': - import argparse - - parser = argparse.ArgumentParser(description="Transform tree file to rec file") - parser.add_argument("filename", help="file to transform", type=argparse.FileType('r')) - - args = parser.parse_args() - - reader = reader(args.filename, delimiter=' ') - record_reader = itertools.imap(make_tree_rec, reader) - for recs in process_tree(record_reader): - map(print, recs) diff --git a/babel_datapipeline/debug/__init__.py b/babel_datapipeline/debug/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/babel_datapipeline/debug/small.tree b/babel_datapipeline/debug/small.tree deleted file mode 100755 index 682eaf9..0000000 --- a/babel_datapipeline/debug/small.tree +++ /dev/null @@ -1,3 +0,0 @@ -1:1:1:1:1 4.11772e-06 "10.1016/s0896-6273(01)00251-3" -1:1:1:1:2 3.83271e-06 "10.1016/j.conb.2006.06.009" -1:1:1:1:3 3.08197e-06 "10.1371/journal.pbio.0050305" diff --git a/babel_datapipeline/tasks/infomap.py b/babel_datapipeline/tasks/infomap.py index bb14708..351a4ec 100644 --- a/babel_datapipeline/tasks/infomap.py +++ b/babel_datapipeline/tasks/infomap.py @@ -3,6 +3,8 @@ import os from babel_datapipeline.util.misc import * from subprocess import check_output, check_call, STDOUT +import luigi +import luigi.s3 as s3 class PajekFactory(luigi.Task): @@ -52,10 +54,10 @@ def run(self): stderr=STDOUT, shell=True) s3client = s3.S3Client() - s3client.put_string(infomap_log, 'S3://babel-logging/%s_infomap_output_%s.txt' % (self.dataset, self.date)) + s3client.put_string(infomap_log, 'S3://babel-logging/%s_infomap_output_%s.txt' % (self.dataset, self.date)) # TODO for extension in ('tree', 'bftree', 'map'): file_path = '%s_%s.%s' % (self.generic_path, self.date, extension) check_call(['gzip', file_path]) file_name = '%s_pajek_%s.%s.gz' % (self.dataset, self.date, extension) - s3client.put('%s.gz' % file_path, 'S3://babel-processing/%s' % file_name) \ No newline at end of file + s3client.put('%s.gz' % file_path, 'S3://babel-processing/%s' % file_name) # TODO diff --git a/babel_datapipeline/tasks/io.py b/babel_datapipeline/tasks/io.py index 317049a..01c5a14 100644 --- a/babel_datapipeline/tasks/io.py +++ b/babel_datapipeline/tasks/io.py @@ -1,7 +1,7 @@ import luigi import luigi.s3 as s3 -from recommenders import * import datetime +from babel_datapipeline.tasks.recommenders import EFTask class LocalTargetInputs(luigi.ExternalTask): @@ -25,5 +25,5 @@ def requires(self): def run(self): from babel_datapipeline.database.transformer import main - for infile in self.input(): - main('aminer', open(infile.path, 'r'), create=True,flush=True) \ No newline at end of file + main('aminer', open(self.input()[1].path, 'r'), open(self.input()[0].path, 'r'), 'localhost', create=True, + flush=True) \ No newline at end of file diff --git a/babel_datapipeline/tasks/parsers.py b/babel_datapipeline/tasks/parsers.py index fd39116..17af689 100644 --- a/babel_datapipeline/tasks/parsers.py +++ b/babel_datapipeline/tasks/parsers.py @@ -1,13 +1,13 @@ import datetime import luigi -from io import * -from babel_datapipeline.util.misc import * +from babel_datapipeline.util.misc import makedir class AMinerParse(luigi.Task): date = luigi.DateParameter(default=datetime.date.today()) def requires(self): + from babel_datapipeline.tasks.io import AminerS3Targets return AminerS3Targets() def output(self): diff --git a/setup.py b/setup.py index 85fac20..a84823c 100644 --- a/setup.py +++ b/setup.py @@ -56,7 +56,7 @@ # your project is installed. For an analysis of "install_requires" vs pip's # requirements files see: # https://packaging.python.org/en/latest/requirements.html - install_requires=['scipy', 'networkx', 'boto', 'configobj', 'luigi', 'babel_util'], + install_requires=['scipy', 'networkx', 'boto3', 'boto', 'configobj', 'luigi', 'babel_util'], ) with open('requirements.txt', 'r') as f: