diff --git a/.gitignore b/.gitignore index 6e2ca78..207f31f 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ *.pyc *.egg-info docs/build +.vscode diff --git a/README.md b/README.md index 667f107..c20605f 100644 --- a/README.md +++ b/README.md @@ -1,30 +1,49 @@ -DPimport: A command line glob importer for DPdash -================================================= +# DPimport: A command line glob importer for DPdash + DPimport is a command line tool for importing files into DPdash using a -simple [`glob`](https://en.wikipedia.org/wiki/Glob_(programming)) expression. +simple [`glob`]() expression. ## Table of contents + 1. [Installation](#installation) 2. [Configuration](#configuration) 3. [Usage](#usage) -4. [MongoDB](#mongodb) ## Installation -Just use `pip` + +Option 1: Install via `pip` + ```bash pip install https://github.com/AMP-SCZ/dpimport.git ``` +Option 2: Clone the repository and run manually + +```bash +git clone https://github.com/AMP-SCZ/dpimport.git +cd dpimport +python -m venv venv +source venv/bin/activate +pip install -r requirements.txt +python scripts/import.py -c config.yml '/PHOENIX/GENERAL/STUDY_A/SUB_001/DATA_TYPE/processed/*.csv' +``` + ## Configuration + DPimport requires a configuration file in YAML format, passed as a command -line argument with `-c|--config`, for establishing a MongoDB database -connection. You will find an example configuration file in the `examples` -directory within this repository. +line argument with `-c|--config`, for connecting with an instance of the DPdash +application API. The configuration file should contain the following fields: + +api_url - Endpoint for the DPdash API +api_user - Username for the DPdash API +api_key - API key for the DPdash API +verify_ssl - Whether to verify SSL certificates (default: True) ## Usage + The main command line tool is `import.py`. You can use this tool to import any -DPdash-compatible CSV files or metadata files using the direct path to a file +DPdash-compatible CSV files or metadata files using the direct path to a file or a glob expression (use single quotes to avoid shell expansion) ```bash @@ -32,9 +51,6 @@ import.py -c config.yml '/PHOENIX/GENERAL/STUDY_A/SUB_001/DATA_TYPE/processed/*. import.py -c config.yml '/PHOENIX/GENERAL/STUDY_A/SUB_001/DATA_TYPE/processed/*.csv' -n 8 ``` -`-n 8` is for parallelly importing 8 files. The default is `-n 1`. - - You may also now use the `**` recursive glob expression, for example: ```bash @@ -55,18 +71,9 @@ and so on. `directory/*/*.csv` matches only `directory/[subdirectory]/[filename].csv`. With a [recursive glob pattern](https://docs.python.org/3/library/glob.html#glob.glob), `directory/**/*.csv` will additionally match: -* `directory/[filename].csv` (no subdirectory) -* `directory/[subdirectory1]/[subdirectory2]/[filename].csv` (sub-subdirectory) +- `directory/[filename].csv` (no subdirectory) +- `directory/[subdirectory1]/[subdirectory2]/[filename].csv` (sub-subdirectory) and so on, for as many levels deep as exist in the directory tree. - - - -## MongoDB - -This tool requires MongoDB to be running and accessible with the credentials you -supply in the `config.yml` file. For tips on MongoDB as it is used in DPdash and DPimport, -see [the DPdash wiki](https://github.com/PREDICT-DPACC/dpdash/wiki/MongoDB-Tips). - diff --git a/dpimport/__init__.py b/dpimport/__init__.py deleted file mode 100644 index e530fc8..0000000 --- a/dpimport/__init__.py +++ /dev/null @@ -1,104 +0,0 @@ -import os -import re -import hashlib -import logging -import mimetypes as mt -from . import patterns -from dppylib import diff_files - -logger = logging.getLogger(__name__) - -class Role: - UNKNOWN = 'unknown' - METADATA = 'metadata' - DATAFILE = 'data' - -def probe(path): - ''' - Check file for DPdash compatibility and return a file - information object. - - :param path: File path - :type path: str - ''' - if not os.path.exists(path): - logger.debug('file not found %s', path) - return None - dirname = os.path.dirname(path) - basename = os.path.basename(path) - # match file and get re match object and file role - role,match = match_file(basename) - if role == Role.UNKNOWN: - return None - # initialize info object - info = match.groupdict() - info['glob'] = path - if role == Role.DATAFILE: - info.update(init_datafile(info)) - info['glob'] = get_glob(path) - # add other necessary information to info object - mimetype,encoding = mt.guess_type(path) - stat = os.stat(path) - - with open(path) as f: - content = f.read().strip() - content_hash = hashlib.sha256(content.encode()).hexdigest() - - info.update({ - 'path' : path, - 'filetype' : mimetype, - 'encoding' : encoding, - 'basename' : basename, - 'dirname' : dirname, - 'dirty' : True, - 'synced' : False, - 'mtime' : stat.st_mtime, - 'size' : stat.st_size, - 'uid' : stat.st_uid, - 'gid' : stat.st_gid, - 'mode' : stat.st_mode, - 'role': role, - 'content': content_hash - }) - return info - -def match_file(f): - match = patterns.DATAFILE.match(f) - if match: - return Role.DATAFILE, match - match = patterns.METADATA.match(f) - if match: - return Role.METADATA, match - return Role.UNKNOWN, None - -def init_datafile(info): - string = '{STUDY}{SUBJECT}{ASSESSMENT}'.format( - STUDY=info['study'], - SUBJECT=info['subject'], - ASSESSMENT=info['assessment'] - ) - hash = hashlib.sha256(string.encode('utf-8')) - return { - 'collection': hash.hexdigest(), - 'subject' : info['subject'], - 'assessment' : info['assessment'], - 'time_units' : str(info['units']), - 'time_start' : int(info['start']), - 'time_end' : int(info['end']) - } - -def get_glob(f): - basename = os.path.basename(f) - dirname = os.path.dirname(f) - glob = patterns.GLOB_SUB.sub('\\1*\\2', basename) - return os.path.join(dirname, glob) - -def import_file(db, file_info): - if file_info['role'] == 'data': - collection = db['toc'] - elif file_info['role'] == 'metadata': - collection = db['metadata'] - else: - logger.error('incompatible file %s', file_info['path']) - return - diff_files(db, collection, file_info) diff --git a/dpimport/__version__.py b/dpimport/__version__.py deleted file mode 100644 index def0ea4..0000000 --- a/dpimport/__version__.py +++ /dev/null @@ -1,6 +0,0 @@ -__title__ = 'dpimport' -__description__ = 'DPdash importer' -__url__ = 'https://github.com/harvard-nrg/dpimport' -__version__ = '0.0.1' -__author__ = 'Neuroinformatics Research Group' -__author_email__ = 'info@neuroinfo.org' diff --git a/dpimport/database/__init__.py b/dpimport/database/__init__.py deleted file mode 100644 index e2fc40a..0000000 --- a/dpimport/database/__init__.py +++ /dev/null @@ -1,102 +0,0 @@ -import ssl -import fnmatch -import logging -from pymongo import MongoClient -from bson.json_util import dumps - -logger = logging.getLogger(__name__) - -class Database(object): - def __init__(self, config, dbname): - self.config = config - self.dbname = dbname - self.client = None - self.db = None - - def connect(self): - uri = 'mongodb://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{AUTH_SOURCE}' - uri = uri.format( - USERNAME=self.config['username'], - PASSWORD=self.config['password'], - HOST=self.config['hostname'], - PORT=self.config['port'], - AUTH_SOURCE=self.config['auth_source'] - ) - self.client = MongoClient( - uri, - ssl=True, - ssl_cert_reqs=ssl.CERT_REQUIRED, - ssl_certfile=self.config['ssl_certfile'], - ssl_keyfile=self.config['ssl_keyfile'], - ssl_ca_certs=self.config['ssl_ca_certs'], - serverSelectionTimeoutMS=300000 - ) - self.db = self.client[self.dbname] - return self - - def remove_unsynced(self, expr): - ''' - Remove all documents with sync: false matching the - input shell-style expression. - - :param expr: shell-style expression - :type expr: str - ''' - regex = fnmatch.translate(expr) - cursor = self.db.toc.find({ - 'path': { - '$regex': regex, - }, - 'synced': False - }, { - 'collection': True - }) - for doc in cursor: - _id = doc['_id'] - collection = doc['collection'] - # todo: wrap in a transaction, requires MongoDB 4.x - logger.debug('dropping collection %s', collection) - self.db[collection].drop() - logger.debug('deleting toc document %s', _id) - self.db.toc.remove({ '_id': _id }) - - def exists(self, probe): - ''' - Check if file exists in the database - - :param probe: File probe - :type probe: dict - ''' - doc = self.db.toc.find_one({ - 'content': probe['content'] - }) - if not doc: - # since it's not in data, check if it's in metadata - doc = self.db.metadata.find_one({ - 'content': probe['content'] - }) - - if doc: - return True - return False - - def unsync(self, expr): - ''' - Convert shell-style expression to a regular expression and - use that to match TOC entries for files stored in the - database and mark them as un-synced. - - :param expr: shell-style regular expression - :type expr: str - ''' - regex = fnmatch.translate(expr) - docs = self.db.toc.update_many({ - 'path': { - '$regex': regex - } - }, { - '$set': { - 'synced': False - } - }) - diff --git a/dpimport/importer/__init__.py b/dpimport/importer/__init__.py deleted file mode 100644 index 5e99f8d..0000000 --- a/dpimport/importer/__init__.py +++ /dev/null @@ -1,111 +0,0 @@ -import uuid -import logging - -logger = logging.getLogger(__name__) - -def import_file(db, file_info): - if file_info['role'] == 'data': - collection = db['toc'] - elif file_info['role'] == 'metadata': - collection = db['metadata'] - else: - logger.error('{FILE} is not compatible with DPdash. Exiting import.'.format(FILE=file_info['path'])) - return - diff_files(db, collection, file_info) - -# Match the file info with the record stored in the database -def diff_files(db, collection, file_info): - file_path = file_info['path'] - db_data = collection.find_one({ 'path' : file_path }) - if not db_data: - logger.info('{FILE} does not exist in the database. Importing.'.format(FILE=file_path)) - import_data(db, collection, file_info) - else: - if db_data['mtime'] != file_info['mtime'] or db_data['size'] != file_info['size']: - logger.info('{FILE} has been modified. Re-importing.'.format(FILE=file_path)) - dbtools.remove_doc(db, collection, db_data, file_info['role']) - import_data(db, collection, file_info) - else: - logger.info('Database already has {FILE}. Skipping.'.format(FILE=file_path)) - logged = log_success(collection, db_data['_id']) - if logged == 0: - logger.info('Journaling complete for {FILE}'.format(FILE=file_info['path'])) - -# Import data into the database -def import_data(db, ref_collection, file_info): - if file_info['role'] == 'metadata': - file_info.update({'collection': str(uuid.uuid4())}) - ref_id = insert_reference(ref_collection, file_info) - if ref_id is None: - logger.error('Unable to import {FILE}'.format(FILE=file_info['path'])) - return - - inserted = insert_data(db, file_info) - if inserted == 0: - logger.info('Import success for {FILE}'.format(FILE=file_info['path'])) - - logged = log_success(ref_collection, ref_id) - if logged == 0: - logger.info('Journaling complete for {FILE}'.format(FILE=file_info['path'])) - -# Mark the sync as successful -def log_success(ref_collection, ref_id): - update_ref = { - '$set' : { - 'dirty': False, - 'synced' : True, - 'updated' : datetime.utcnow() - } - } - - try: - ref_collection.update({ - '_id' : ref_id - }, update_ref) - return 0 - except Exception as e: - logger.error(e) - return 1 - -# Insert the reference doc, returns the inserted id -def insert_reference(collection, reference): - try: - ref_id = collection.insert_one(reference).inserted_id - return ref_id - except Exception as e: - logger.error(e) - return None - -# Insert the data -def insert_data(db, file_info): - try: - # Import data - data_blob = [] - import_collection = db[file_info['collection']] - for chunk in reader.read_csv(file_info['path']): - if len(chunk) > 0: - if file_info['role'] != 'metadata': - chunk_columns = sanitize_columns(chunk.columns.values.tolist()) - chunk.columns = chunk_columns - chunk['path'] = file_info['path'] - data_blob.extend(chunk.round(4).to_dict('records')) - - if len(data_blob) >= 100000: - import_collection.insert_many(data_blob, False) - data_blob = [] - if data_blob: - import_collection.insert_many(data_blob, False) - return 0 - except Exception as e: - logger.error(e) - logger.error('Unable to import {FILE}'.format(FILE=file_info['path'])) - -# Rename columns to encode special characters -def sanitize_columns(columns): - new_columns = [] - for column in columns: - new_column = quote(unicode(column).encode('utf-8'), safe='~()*!.\'').replace('.', '%2E') - new_columns.append(new_column) - - return new_columns - diff --git a/dpimport/patterns/__init__.py b/dpimport/patterns/__init__.py deleted file mode 100644 index d14e73a..0000000 --- a/dpimport/patterns/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -import re - -DATAFILE = re.compile(r'(?P\w+)\-(?P\w+)\-(?P\w+)\-(?Pday)(?P[+-]?\d+(?:\.\d+)?)to(?P[+-]?\d+(?:\.\d+)?)(?P.csv)') - -METADATA = re.compile(r'(?P\w+)\_metadata(?P.csv)') - -GLOB_SUB = re.compile(r'(\w+\-\w+\-\w+\-day)[+-]?\d+(?:\.\d+)?to[+-]?\d+(?:\.\d+)?(.*)') diff --git a/dppylib/__init__.py b/dppylib/__init__.py deleted file mode 100644 index bcd053c..0000000 --- a/dppylib/__init__.py +++ /dev/null @@ -1,248 +0,0 @@ -import os -import re -import logging -import hashlib -import uuid -import mimetypes as mt -from datetime import datetime -from urllib.parse import quote - -from tools import database as dbtools -from tools import reader -from time import sleep - -TIME_UNITS = { - 'day' : 'days', - 'hr' : 'hours' -} - -_UNITS = '|'.join(TIME_UNITS.keys()) -_EXTENSION = '.csv' - -FILE_REGEX = re.compile(r'(?P\w+)\-(?P\w+)\-(?P\w+)\-(?P{UNITS})(?P[+-]?\d+(?:\.\d+)?)to(?P[+-]?\d+(?:\.\d+)?)(?P{EXTENSION})'.format(UNITS=_UNITS, EXTENSION=_EXTENSION)) -FILE_SUB = re.compile(r'(\w+\-\w+\-\w+\-{UNITS})[+-]?\d+(?:\.\d+)?to[+-]?\d+(?:\.\d+)?(.*)'.format(UNITS=_UNITS)) -METADATA_REGEX = re.compile(r'(?P\w+)\_metadata(?P{EXTENSION})'.format(EXTENSION='.csv')) - -logger = logging.getLogger(__name__) - -# Verify if a file is DPdash-compatible file, and return file info if so. -def stat_file(import_dir, file_name, file_path): - file_info = match_file(file_name, import_dir) - if not file_info: - return None - - filetype,encoding = guess_type(file_info['extension']) - if not os.path.exists(file_path): - return None - - with open(file_path) as f: - content = f.read().strip() - content_hash = hashlib.sha256(content.encode()).hexdigest() - - file_stat = os.stat(file_path) - file_info.update({ - 'path' : file_path, - 'filetype' : filetype, - 'encoding' : encoding, - 'basename' : file_name, - 'dirname' : import_dir, - 'dirty' : True, - 'synced' : False, - 'mtime' : file_stat.st_mtime, - 'size' : file_stat.st_size, - 'uid' : file_stat.st_uid, - 'gid' : file_stat.st_gid, - 'mode' : file_stat.st_mode, - 'content' : content_hash - }) - - return file_info - -def import_file(db, file_info): - if file_info['role'] == 'data': - collection = db['toc'] - elif file_info['role'] == 'metadata': - collection = db['metadata'] - else: - logger.error('{FILE} is not compatible with DPdash. Exiting import.'.format(FILE=file_info['path'])) - return - - diff_files(db, collection, file_info) - -# Match the file info with the record stored in the database -def diff_files(db, collection, file_info): - file_path = file_info['path'] - db_data = collection.find_one({ 'path' : file_path }) - if not db_data: - logger.info('{FILE} does not exist in the database. Importing.'.format(FILE=file_path)) - import_data(db, collection, file_info) - else: - if db_data['content'] != file_info['content']: - logger.info('{FILE} has been modified. Re-importing.'.format(FILE=file_path)) - dbtools.remove_doc(db, collection, db_data, file_info['role']) - import_data(db, collection, file_info) - else: - logger.info('Database already has {FILE}. Skipping.'.format(FILE=file_path)) - logged = log_success(collection, db_data['_id']) - if logged == 0: - logger.info('Journaling complete for {FILE}'.format(FILE=file_info['path'])) - -# Import data into the database -def import_data(db, ref_collection, file_info): - if file_info['role'] == 'metadata': - file_info.update({'collection': str(uuid.uuid4())}) - ref_id = insert_reference(ref_collection, file_info) - if ref_id is None: - logger.error('Unable to import {FILE}'.format(FILE=file_info['path'])) - return - - inserted = insert_data(db, file_info) - if inserted == 0: - logger.info('Import success for {FILE}'.format(FILE=file_info['path'])) - - logged = log_success(ref_collection, ref_id) - if logged == 0: - logger.info('Journaling complete for {FILE}'.format(FILE=file_info['path'])) - -# Mark the sync as successful -def log_success(ref_collection, ref_id): - update_ref = { - '$set' : { - 'dirty': False, - 'synced' : True, - 'updated' : datetime.utcnow() - } - } - - try: - ref_collection.update({ - '_id' : ref_id - }, update_ref) - return 0 - except Exception as e: - logger.error(e) - return 1 - -# Insert the reference doc, returns the inserted id -def insert_reference(collection, reference): - try: - try: - ref_id = collection.insert_one(reference).inserted_id - except: - sleep(60) - logger.info('Retrying reference insertion of {FILE}'.format(FILE=reference['path'])) - ref_id = collection.insert_one(reference).inserted_id - return ref_id - except Exception as e: - logger.error(e) - return None - -# Insert the data -def insert_data(db, file_info): - try: - # Import data - data_blob = [] - import_collection = db[file_info['collection']] - - # https://github.com/AMP-SCZ/dpimport/blob/52a0b80e704b20297e2239597d7145db1ae7c7f8/tools/reader/__init__.py#L9 - # reader.read_csv() has specified chunksize=1 - # so each chunk is one row of the csv file at a time - # https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-chunking - for chunk in reader.read_csv(file_info['path']): - if len(chunk) > 0: - if file_info['role'] != 'metadata': - chunk_columns = sanitize_columns(chunk.columns.values.tolist()) - chunk.columns = chunk_columns - chunk['path'] = file_info['path'] - - # each data_blob is json representation of one row of the csv file at a time - data_blob.extend(chunk.to_dict('records')) - - if len(data_blob) >= 10: - try: - import_collection.insert_many(data_blob, False) - except: - sleep(60) - logger.info('Retrying data insertion of {FILE}'.format(FILE=file_info['path'])) - import_collection.insert_many(data_blob, False) - data_blob = [] - - if data_blob: - try: - import_collection.insert_many(data_blob, False) - except: - sleep(60) - logger.info('Retrying data insertion of {FILE}'.format(FILE=file_info['path'])) - import_collection.insert_many(data_blob, False) - return 0 - except Exception as e: - logger.error(e) - logger.error('Unable to import {FILE}'.format(FILE=file_info['path'])) - return 1 - -# Rename columns to encode special characters -def sanitize_columns(columns): - new_columns = [] - for column in columns: - new_column = quote(str(column).encode('utf-8'), safe='~()*!.\'').replace('.', '%2E') - new_columns.append(new_column) - - return new_columns - -# Match the filename to distinguish data from metadata files -def match_file(file_name, sub_dir): - matched_file = FILE_REGEX.match(file_name) - if not matched_file: - logger.info('file did not match %s', file_name) - matched_metadata = METADATA_REGEX.match(file_name) - if not matched_metadata: - return None - else: - return scan_metadata(matched_metadata, file_name, sub_dir) - else: - logger.info('file matched %s', file_name) - return scan_data(matched_file, file_name, sub_dir) - -# Return file_info for the metadata -def scan_metadata(match, file_name, sub_dir): - file_info = match.groupdict() - - file_info.update({ - 'glob' : os.path.join(sub_dir, file_name), - 'role' : 'metadata' - }) - - return file_info - -# Return file_info for the data -def scan_data(match, file_name, sub_dir): - file_info = match.groupdict() - - m = hashlib.sha256('{STUDY}{SUBJECT}{ASSESSMENT}'.format( - STUDY= file_info['study'], - SUBJECT=file_info['subject'], - ASSESSMENT=file_info['assessment'] - )) - - file_info.update({ - 'collection': m.hexdigest(), - 'subject' : file_info['subject'], - 'assessment' : file_info['assessment'], - 'glob' : os.path.join(sub_dir, FILE_SUB.sub('\\1*\\2', file_name)), - 'time_units' : str(file_info['units']), - 'time_start' : int(file_info['start']), - 'time_end' : int(file_info['end']), - 'role' : 'data' - }) - - return file_info - -# get mime type and encoding -def guess_type(extension): - return mt.guess_type('file{}'.format(extension)) - -class StatError(Exception): - pass - -class ParserError(Exception): - pass diff --git a/examples/config.yml b/examples/config.yml index 5f55739..e020203 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -1,9 +1,7 @@ -ssl_keyfile: /path/to/ssl/client/key.pem -ssl_certfile: /path/to/ssl/client/cert.pem -ssl_ca_certs: /path/to/ssl/ca/cacert.pem -username: jharvard -password: ********* -port: 27017 -auth_source: admin -db: dpdata -hostname: dpdash.example.org +columns: 5 +max_days: 45 +participant_count: 5 +api_url: https://dpdash.local/api/v1/import/data/ +api_user: api-user-1 +api_key: api-key-1 +verify_ssl: True \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index d6e1198..f7a244e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ --e . +polars +pyaml +requests diff --git a/tools/__init__.py b/scripts/__init__.py similarity index 100% rename from tools/__init__.py rename to scripts/__init__.py diff --git a/scripts/csv_data_service/__init__.py b/scripts/csv_data_service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/csv_data_service/csv_data_service.py b/scripts/csv_data_service/csv_data_service.py new file mode 100644 index 0000000..a94aea0 --- /dev/null +++ b/scripts/csv_data_service/csv_data_service.py @@ -0,0 +1,60 @@ +import csv +import logging + +logger = logging.getLogger(__name__) + + +class CSVDataService: + def __init__(self, path, max_columns, max_days): + self.Data = {} + self.max_columns = max_columns + self.max_days = max_days + self.columns = [] + self.day_data = [] + self.path = path + + def _generate_columns(self): + var_prefix = "var_random_" + + for x in range(0, self.max_columns): + if x == 2: + self.columns.append("day") + self.columns.append(var_prefix + "%d" % (x)) + + def _generate_column_data(self): + for x in range(0, self.max_columns): + if x == 2: + self.day_data.append(x) + if x % 2 == 0 and not x == 2: + self.day_data.append("value" + " %d" % (x)) + elif x % 1 == 0: + self.day_data.append(x) + + def prepare_csv_test_data(self): + self._generate_columns() + self._generate_column_data() + + def write_test_csv_files(self, participant_num): + # Write file name to automatically be day1 to maxday + + with open(self.build_file_path(participant_num), "w", newline="") as csvfile: + csv_writer = csv.writer( + csvfile, delimiter=",", quotechar="|", quoting=csv.QUOTE_MINIMAL + ) + + csv_writer.writerow(self.columns) + + for x in range(1, self.max_days): + self.day_data[2] = x + csv_writer.writerow(self.day_data) + + print(f"Finished writing file {participant_num}") + + def build_file_path(self, participant): + return ( + self.path + + "/" + + f"gnar-eng{participant}-superassessment-day1to" + + "%d" % (self.max_days) + + ".csv" + ) diff --git a/scripts/data_importer_service/__init__.py b/scripts/data_importer_service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py new file mode 100644 index 0000000..2d17165 --- /dev/null +++ b/scripts/data_importer_service/data_import_service.py @@ -0,0 +1,146 @@ +import math +import re +import os +import json +import polars as pl + +from urllib.parse import quote +import mimetypes as mt +import logging + +logger = logging.getLogger(__name__) + + +class DataImporterService: + DATAFILE = re.compile( + r"(?P\w+)\-(?P\w+)\-(?P\w+)\-(?Pday)(?P[+-]?\d+(?:\.\d+)?)to(?P[+-]?\d+(?:\.\d+)?)(?P.csv)" + ) + METADATA = re.compile(r"(?P\w+)\_metadata(?P.csv)") + GLOB_SUB = re.compile( + r"(\w+\-\w+\-\w+\-day)[+-]?\d+(?:\.\d+)?to[+-]?\d+(?:\.\d+)?(.*)" + ) + records = "records" + metadata_key = "metadata" + + def __init__(self, data_file, metadata_file): + self.data_file = data_file + self.metadata_file = metadata_file + + def process_file(self, path): + basename = os.path.basename(path) + is_data_file = self.DATAFILE.match(basename) + is_metadata_file = self.METADATA.match(basename) + + if is_data_file: + file_extension_to_dict = self.DATAFILE.match(basename).groupdict() + return self.process_data_file(path, file_extension_to_dict) + if is_metadata_file: + metadata_file_extension = self.METADATA.match(basename).groupdict() + + return self.process_metadata_file(path, metadata_file_extension) + else: + return None + + def process_data_file(self, path, file_extension): + metadata = self._file_info(path) + assessment_variables = [] + + file_extension.update({"time_end": file_extension["end"]}) + metadata.update({"role": "data", **file_extension}) + + del file_extension["extension"] + + participant_assessments = self._read_csv( + path, + ) + + for assessment in participant_assessments: + + for variable in assessment: + assessment_variables.append(variable) + + isUnsupportedValue = ( + assessment[variable] == math.inf + or assessment[variable] == -math.inf + or assessment[variable] != assessment[variable] + ) + if isUnsupportedValue: + assessment[variable] = None + + var_set = set(assessment_variables) + assessment_variables = [ + dict(name=variable, assessment=metadata.get("assessment")) + for variable in sorted(var_set) + ] + + self.data_file.update( + { + "metadata": metadata, + "participant_assessments": participant_assessments, + "assessment_variables": assessment_variables, + } + ) + + return + + def process_metadata_file(self, path, file_extension): + metadata = self._file_info(path) + participants = self._read_csv(path) + + for participant in participants: + participant["participant"] = participant.pop("Subject ID") + participant["study"] = participant.pop("Study") + + metadata.update( + { + "role": "metadata", + **file_extension, + } + ) + self.metadata_file.update( + { + "metadata": metadata, + "participants": participants, + } + ) + + return + + def _read_csv(self, file_path): + try: + df = pl.read_csv(file_path, infer_schema_length=int(1e19)) + except pl.ComputeError: + df = pl.read_csv(file_path, infer_schema_length=0) + return df.to_dicts() + + def _file_info(self, path): + mimetype, encoding = mt.guess_type(path) + dirname = os.path.dirname(path) + basename = os.path.basename(path) + stat = os.stat(path) + + return { + "path": path, + "filetype": mimetype, + "encoding": encoding, + "basename": basename, + "dirname": dirname, + "mtime": stat.st_mtime, + "size": stat.st_size, + "uid": stat.st_uid, + "gid": stat.st_gid, + "mode": stat.st_mode, + } + + def processed_data_to_json(self): + processed_data = ( + json.dumps(self.data_file) + if self.data_file and len(self.data_file["participant_assessments"]) > 0 + else None + ) + processed_metadata = ( + json.dumps(self.metadata_file) + if self.metadata_file and len(self.metadata_file["participants"]) > 0 + else None + ) + return processed_data, processed_metadata diff --git a/scripts/generate_test_data.py b/scripts/generate_test_data.py new file mode 100644 index 0000000..9481f25 --- /dev/null +++ b/scripts/generate_test_data.py @@ -0,0 +1,34 @@ +import os +import yaml +import argparse as ap +from csv_data_service import csv_data_service +from concurrent.futures import ProcessPoolExecutor + + +def main(): + parser = ap.ArgumentParser() + parser.add_argument("-c", "--config") + args = parser.parse_args() + + with open(os.path.expanduser(args.config), "r") as fo: + config = yaml.load(fo, Loader=yaml.SafeLoader) + + dir_path = config["csv_directory"] + data_columns = config["columns"] + range_of_days = config["max_days"] + num_of_participants = config["participant_count"] + + if not os.path.isdir(dir_path): + os.mkdir(dir_path) + + csv_service = csv_data_service.CSVDataService(dir_path, data_columns, range_of_days) + csv_service.prepare_csv_test_data() + + with ProcessPoolExecutor(4) as exe: + exe.map(csv_service.write_test_csv_files, range(num_of_participants)) + + print("Finished Writing files") + + +if __name__ == "__main__": + main() diff --git a/scripts/import.py b/scripts/import.py index 8c1b4a7..20e9688 100755 --- a/scripts/import.py +++ b/scripts/import.py @@ -1,228 +1,52 @@ #!/usr/bin/env python import os -import sys -import ssl import glob import yaml -import dppylib -import dpimport -import logging import argparse as ap -import collections as col -import dpimport.importer as importer -from dpimport.database import Database -from pymongo import DeleteMany, UpdateMany -from pymongo.errors import BulkWriteError - -from multiprocessing import Pool -import signal - -logger = logging.getLogger(__name__) - -def RAISE(err): - raise err - - -def _main(db1,f): - dirname = os.path.dirname(f) - basename = os.path.basename(f) - # probe for dpdash-compatibility and gather information - probe = dpimport.probe(f) - if not probe: - logger.debug('document is unknown %s', basename) - return - - db=db1.connect() - # nothing to be done - if db.exists(probe): - logger.info('document exists and is up to date %s', probe['path']) - return - logger.info('document does not exist or is out of date %s', probe['path']) - # import the file - logger.info('importing file %s', f) - dppylib.import_file(db.db, probe) - +from data_importer_service import data_import_service +from importer_service import api_importer_service def main(): parser = ap.ArgumentParser() - parser.add_argument('-c', '--config') - parser.add_argument('-d', '--dbname', default='dpdata') - parser.add_argument('-v', '--verbose', action='store_true') - parser.add_argument('-n','--ncpu', type= int, default= 1, - help='number of processes/threads to use') - parser.add_argument('expr') + parser.add_argument("-c", "--config") + parser.add_argument("expr") args = parser.parse_args() - level = logging.INFO - if args.verbose: - level = logging.DEBUG - logging.basicConfig(level=level) - - with open(os.path.expanduser(args.config), 'r') as fo: + with open(os.path.expanduser(args.config), "r") as fo: config = yaml.load(fo, Loader=yaml.SafeLoader) - - db1 = Database(config, args.dbname) - - files=glob.glob(args.expr, recursive=True) - - if args.ncpu==1: - - for f in files: - _main(db1,f) - - elif args.ncpu>1: - - sigint_handler= signal.signal(signal.SIGINT, signal.SIG_IGN) - pool= Pool(args.ncpu) - signal.signal(signal.SIGINT, sigint_handler) - - try: - for f in files: - pool.apply_async(_main, (db1,f), error_callback= RAISE) - except KeyboardInterrupt: - pool.terminate() - else: - pool.close() - pool.join() - - db=db1.connect() - logger.info('cleaning metadata') - lastday = get_lastday(db.db) - if lastday: - clean_metadata(db.db, lastday) - -def clean_metadata(db, max_days): - studies = col.defaultdict() - subjects = list() - - for subject in max_days: - if subject['_id']['study'] not in studies: - studies[subject['_id']['study']] = {} - studies[subject['_id']['study']]['subject'] = [] - studies[subject['_id']['study']]['max_day'] = 0 - - # if there are more than 2, drop unsynced - metadata = list(db.metadata.find( - { - 'study' : subject['_id']['study'] - }, - { - '_id' : True, - 'collection' : True, - 'synced' : True - } - )) - - if len(metadata) > 1: - for doc in metadata: - if doc['synced'] is False and 'collection' in doc: - db[doc['collection']].drop() - if doc['synced'] is False: - db.metadata.delete_many( - { - '_id': doc['_id'] - } - ) - - subject_metadata = col.defaultdict() - subject_metadata['subject'] = subject['_id']['subject'] - subject_metadata['synced'] = subject['synced'] - subject_metadata['days'] = subject['days'] - subject_metadata['study'] = subject['_id']['study'] - - studies[subject['_id']['study']]['max_day'] = studies[subject['_id']['study']]['max_day'] if (studies[subject['_id']['study']]['max_day'] >= subject['days'] ) else subject['days'] - - studies[subject['_id']['study']]['subject'].append(subject_metadata) - - for study, subject in iter(studies.items()): - bulk_metadata = [] - bulk_metadata = bulk_metadata + [UpdateMany({'study' : study}, {'$set' : - { - 'synced' : True, - 'subjects' : studies[study]['subject'], - 'days' : studies[study]['max_day'] - } - }, upsert=True)] - bulk_metadata = bulk_metadata + [DeleteMany({'study' : study, 'synced' : False})] - bulk_metadata = bulk_metadata + [UpdateMany({'study' : study }, {'$set' : {'synced' : False}})] - try: - db.metadata.bulk_write(bulk_metadata) - except BulkWriteError as e: - logger.error(e) - -def get_lastday(db): - return list(db.toc.aggregate([ - { - '$group' : { - '_id' : { - 'study': '$study', - 'subject' : '$subject' - }, - 'days' : { - '$max' : '$time_end' - }, - 'synced' : { - '$max' : '$updated' - } - } - } - ])) - -def clean_toc(db): - logger.info('cleaning table of contents') - out_of_sync_tocs = db.toc.find( - { - 'synced' : False - }, - { - '_id' : False, - 'collection' : True, - 'path' : True - } - ) - - for doc in out_of_sync_tocs: - db[doc['collection']].delete_many( - { - 'path' : doc['path'] - } + api_url = config["api_url"] + credentials = { + "x-api-user": config["api_user"], + "x-api-key": config["api_key"], + } + if("verify_ssl" in config and config["verify_ssl"] == False): + verify_ssl = False + else: + verify_ssl = True + + api = api_importer_service.ImporterApiService(credentials, verify_ssl) + + for file in glob.iglob(args.expr, recursive=True): + data_file = {} + metadata_file = {} + importer_service = data_import_service.DataImporterService( + data_file, metadata_file ) + importer_service.process_file(file) + data, meta = importer_service.processed_data_to_json() + if data: + # print("Day Data struct:") + # print(data) + api.upsert_file(api.routes(api_url, "day_data"), data) - bulk = [DeleteMany({ 'synced' : False })] - try: - db.toc.bulk_write(bulk) - except BulkWriteError as e: - logger.error(e) + if meta: + # print("Metadata struct:") + # print(meta) + api.upsert_file(api.routes(api_url, "metadata"), meta) -def clean_toc_study(db, study): - logger.info('cleaning table of contents for {0}'.format(study)) - out_of_sync_tocs = db.toc.find( - { - 'study' : study, - 'synced' : False - }, - { - '_id' : False, - 'collection' : True, - 'path' : True - } - ) - for doc in out_of_sync_tocs: - db[doc['collection']].delete_many( - { - 'path' : doc['path'] - } - ) - - bulk = [DeleteMany({ 'study' : study, 'synced' : False })] - try: - db.toc.bulk_write(bulk) - except BulkWriteError as e: - logger.error(e) -if __name__ == '__main__': +if __name__ == "__main__": main() - diff --git a/scripts/importer_service/__init__.py b/scripts/importer_service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/importer_service/api_importer_service.py b/scripts/importer_service/api_importer_service.py new file mode 100644 index 0000000..46a4ae6 --- /dev/null +++ b/scripts/importer_service/api_importer_service.py @@ -0,0 +1,44 @@ +import requests +import urllib3 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +class ImporterApiService: + def __init__( + self, + credentials, + verify_ssl=True, + ): + self.headers = {"content-type": "application/json", **credentials} + self.verify_ssl = verify_ssl + + def routes(self, url, path): + routes = {"metadata": url + "metadata", "day_data": url + "day"} + + return routes[path] + + def upsert_file(self, url, file_data): + r = requests.post( + url, + headers=self.headers, + data=file_data, + verify=self.verify_ssl, + ) + status = r.status_code + try: + response = r.json() + print(response) + except Exception as e: + print(status) + print(f"Error: {e}") + print(r) + pass + + def refresh_metadata_collection(self, url): + r = requests.delete(url, headers=self.headers, verify=self.verify_ssl) + status = r.status_code + + if status != 200: + print("There was an error.", r.json()["message"]) + else: + response = r.json()["data"] + print(response) diff --git a/scripts/refresh_metadata.py b/scripts/refresh_metadata.py new file mode 100644 index 0000000..83f0456 --- /dev/null +++ b/scripts/refresh_metadata.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python + +import os +import yaml +import argparse as ap +from importer_service import api_importer_service + + +def main(): + parser = ap.ArgumentParser() + parser.add_argument("-c", "--config") + args = parser.parse_args() + + with open(os.path.expanduser(args.config), "r") as fo: + config = yaml.load(fo, Loader=yaml.SafeLoader) + + api_url = config["api_url"] + credentials = { + "x-api-user": config["api_user"], + "x-api-key": config["api_key"], + } + api = api_importer_service.ImporterApiService(credentials) + api.refresh_metadata_collection(api.routes(api_url, "metadata")) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index a446c6d..caa0828 100644 --- a/setup.py +++ b/setup.py @@ -3,26 +3,20 @@ here = os.path.abspath(os.path.dirname(__file__)) -requires = [ - 'pyaml', - 'pymongo==3.11.4', - 'pandas' -] +requires = ["pyaml", "polars", "requests"] about = dict() -with open(os.path.join(here, 'dpimport', '__version__.py'), 'r') as f: +with open(os.path.join(here, "dpimport", "__version__.py"), "r") as f: exec(f.read(), about) setup( - name=about['__title__'], - version=about['__version__'], - description=about['__description__'], - author=about['__author__'], - author_email=about['__author_email__'], - url=about['__url__'], + name=about["__title__"], + version=about["__version__"], + description=about["__description__"], + author=about["__author__"], + author_email=about["__author_email__"], + url=about["__url__"], packages=find_packages(), - scripts=[ - 'scripts/import.py' - ], - install_requires=requires + scripts=["scripts/import.py", "scripts/generate_test_data"], + install_requires=requires, ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_csv_data_service.py b/tests/test_csv_data_service.py new file mode 100644 index 0000000..8860e78 --- /dev/null +++ b/tests/test_csv_data_service.py @@ -0,0 +1,21 @@ +from unittest import TestCase +from scripts.csv_data_service.csv_data_service import CSVDataService + + +class TestCsvData(TestCase): + @classmethod + def setUpClass(cls): + cls.max_columns = 6 + cls.max_days = 5 + cls.participants = 5 + + def setUp(self): + self.csv_generator = CSVDataService( + self.max_columns, self.max_days, self.participants + ) + + def test_generate_column_data(self): + self.csv_generator._generate_columns() + assert len(self.csv_generator.columns) == self.max_columns + self.csv_generator._generate_column_data() + assert len(self.csv_generator.day_data) == self.max_columns diff --git a/tests/test_importer_service.py b/tests/test_importer_service.py new file mode 100644 index 0000000..9442c4d --- /dev/null +++ b/tests/test_importer_service.py @@ -0,0 +1,129 @@ +from unittest import TestCase +from unittest.mock import patch +import pprint +from scripts.data_importer_service.data_import_service import DataImporterService + + +class TestImporter(TestCase): + @classmethod + def setUpClass(cls): + cls.data_file = {} + cls.metadata_file = {} + + def setUp(self): + self.importer = DataImporterService(self.data_file, self.metadata_file) + + def test_file_divergence(self): + path = "blah.csv" + + assert self.importer.process_file(path) == None + + with patch.object( + self.importer, "process_data_file" + ) as mocked_process_data_file: + data_file_path = "study-participant-assessment-day1to4.csv" + + self.importer.process_file(data_file_path) + mocked_process_data_file.assert_called_once() + + with patch.object( + self.importer, "process_metadata_file" + ) as mocked_process_metadata_file: + metadata_file_path = "site_metadata.csv" + + self.importer.process_file(metadata_file_path) + mocked_process_metadata_file.assert_called_once() + + def test_processed_data_to_json(self): + data_file_path = "study-participant-assessment-day1to4.csv" + data_file_extension_to_dict = self.importer.DATAFILE.match( + data_file_path + ).groupdict() + mock_data_file_info = { + "path": "study-participant-assessment-day1to4.csv", + "filetype": "text/csv", + "encoding": "utf-8", + "basename": "study-participant-assessment-day1to4.csv", + "dirname": "/path/to/files", + "mtime": 1234567890.0, + "size": 1024, + "uid": 1000, + "gid": 1000, + "mode": 0o644, + } + participant_assessments = [ + { + "var1": 1, + "var2": 2, + "var3": "str", + }, + { + "var1": 2, + "var2": 2, + "var3": "str", + "var4": 5, + "var6": 6, + "var7": "str2", + }, + ] + metadata_file_path = "site_metadata.csv" + metadata_file_extension_to_dict = self.importer.METADATA.match( + metadata_file_path + ).groupdict() + mock_file_info = { + "filetype": "text/csv", + "encoding": "utf-8", + "dirname": "/path/to/files", + "mtime": 1234567890.0, + "size": 1024, + "uid": 1000, + "gid": 1000, + "mode": 0o644, + } + participants = [ + { + "Subject ID": "YA1", + "Active": 1, + "Consent": "2022-06-02", + "Study": "YA", + }, + { + "Subject ID": "YA2", + "Active": 1, + "Consent": "2022-06-02", + "Study": "YA", + }, + ] + + with patch.object( + self.importer, "_file_info", return_value=mock_data_file_info + ), patch.object( + self.importer, "_read_csv", return_value=participant_assessments + ): + self.importer.process_data_file(data_file_path, data_file_extension_to_dict) + with patch.object( + self.importer, "_file_info", return_value=mock_file_info + ), patch.object(self.importer, "_read_csv", return_value=participants): + self.importer.process_metadata_file( + metadata_file_path, metadata_file_extension_to_dict + ) + + data_to_json = self.importer.processed_data_to_json() + + assert ( + data_to_json[0] + == '{"metadata": {"path": "study-participant-assessment-day1to4.csv", "filetype": "text/csv", "encoding": "utf-8", "basename": "study-participant-assessment-day1to4.csv", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "data", "study": "study", "participant": "participant", "assessment": "assessment", "units": "day", "start": "1", "end": "4", "extension": ".csv", "time_end": "4"}, "participant_assessments": [{"var1": 1, "var2": 2, "var3": "str"}, {"var1": 2, "var2": 2, "var3": "str", "var4": 5, "var6": 6, "var7": "str2"}], "assessment_variables": [{"name": "var1", "assessment": "assessment"}, {"name": "var2", "assessment": "assessment"}, {"name": "var3", "assessment": "assessment"}, {"name": "var4", "assessment": "assessment"}, {"name": "var6", "assessment": "assessment"}, {"name": "var7", "assessment": "assessment"}]}' + ) + assert ( + data_to_json[1] + == '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "2022-06-02", "participant": "YA1", "study": "YA"}, {"Active": 1, "Consent": "2022-06-02", "participant": "YA2", "study": "YA"}]}' + ) + + self.importer.data_file["participant_assessments"] = [] + + data_to_json = self.importer.processed_data_to_json() + + assert data_to_json == ( + None, + '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "2022-06-02", "participant": "YA1", "study": "YA"}, {"Active": 1, "Consent": "2022-06-02", "participant": "YA2", "study": "YA"}]}', + ) diff --git a/tools/database/__init__.py b/tools/database/__init__.py deleted file mode 100644 index 7f2fc74..0000000 --- a/tools/database/__init__.py +++ /dev/null @@ -1,36 +0,0 @@ -import logging - -logger = logging.getLogger(__name__) - -def sanitize(db): - dirty_files = db.toc.find({ - 'dirty' : True - }) - dirty_metadata = db.metadata.find({ - 'dirty' : True - }) - for doc in dirty_files: - logger.info('{FILE} is outdated. Deleting from the database.'.format(FILE=doc['path'])) - remove_doc(db, db['toc'], doc, 'data') - - for doc in dirty_metadata: - logger.info('{FILE} is outdated. Deleting from the database.'.format(FILE=doc['path'])) - remove_doc(db, db['metadata'], doc, 'metadata') - -def remove_doc(db, collection, doc, role): - try: - collection.delete_many({ - '_id' : doc['_id'] - }) - if role == 'metadata': - db[doc['collection']].drop() - else: - - db[doc['collection']].delete_many({ - 'path' : doc['path'] - }) - return 0 - except Exception as e: - logger.error(e) - logger.error('Could not remove {FILE} from the database.'.format(FILE=doc['path'])) - return 1 diff --git a/tools/reader/__init__.py b/tools/reader/__init__.py deleted file mode 100644 index 7ced633..0000000 --- a/tools/reader/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -import pandas as pd -import logging - -logger = logging.getLogger(__name__) - -# Read in the file and yield the dataframe chunk -def read_csv(file_path): - try: - tfr = pd.read_csv(file_path, memory_map=True, keep_default_na=False, chunksize=1, engine='c', skipinitialspace=True) - for df in tfr: - yield df - except pd.io.common.EmptyDataError as e: - logger.error(e) - except Exception as e: - logger.error(e)