From 9fdfcc4744ea6088e7cc2a1c7401a77c05a7d6a0 Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Fri, 25 Aug 2023 14:16:54 -0500 Subject: [PATCH 01/21] Service * Added service that parses data csv and metadata csv * Using the new data structures the service parses the data and creats a json * Added tests to the service Updated import script for v2 * Removed all files that parse mongodb and removed dependency * Added service that handles incoming csv data for metadata and participant csv data * Service also has a method to convert data to json * Added api request with a configuration for url Add requests * Added requests dependency Updates to script to add hash collection * Added hash collection --- .gitignore | 1 + README.md | 33 +-- dpimport/__init__.py | 104 -------- dpimport/__version__.py | 6 - dpimport/database/__init__.py | 102 ------- dpimport/importer/__init__.py | 111 -------- dpimport/patterns/__init__.py | 7 - dppylib/__init__.py | 248 ------------------ examples/config.yml | 10 +- {tools => scripts}/__init__.py | 0 scripts/api/__init__.py | 0 scripts/api/api.py | 23 ++ scripts/data_importer_service/__init__.py | 0 .../data_import_service.py | 158 +++++++++++ scripts/import.py | 223 ++-------------- setup.py | 26 +- tests/__init__.py | 0 tests/test_importer_service.py | 133 ++++++++++ tools/database/__init__.py | 36 --- tools/reader/__init__.py | 15 -- 20 files changed, 357 insertions(+), 879 deletions(-) delete mode 100644 dpimport/__init__.py delete mode 100644 dpimport/__version__.py delete mode 100644 dpimport/database/__init__.py delete mode 100644 dpimport/importer/__init__.py delete mode 100644 dpimport/patterns/__init__.py delete mode 100644 dppylib/__init__.py rename {tools => scripts}/__init__.py (100%) create mode 100644 scripts/api/__init__.py create mode 100644 scripts/api/api.py create mode 100644 scripts/data_importer_service/__init__.py create mode 100644 scripts/data_importer_service/data_import_service.py create mode 100644 tests/__init__.py create mode 100644 tests/test_importer_service.py delete mode 100644 tools/database/__init__.py delete mode 100644 tools/reader/__init__.py diff --git a/.gitignore b/.gitignore index 6e2ca78..207f31f 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ *.pyc *.egg-info docs/build +.vscode diff --git a/README.md b/README.md index 667f107..b98c201 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,16 @@ -DPimport: A command line glob importer for DPdash -================================================= +# DPimport: A command line glob importer for DPdash + DPimport is a command line tool for importing files into DPdash using a -simple [`glob`](https://en.wikipedia.org/wiki/Glob_(programming)) expression. +simple [`glob`]() expression. ## Table of contents + 1. [Installation](#installation) 2. [Configuration](#configuration) 3. [Usage](#usage) -4. [MongoDB](#mongodb) ## Installation + Just use `pip` ```bash @@ -17,14 +18,16 @@ pip install https://github.com/AMP-SCZ/dpimport.git ``` ## Configuration + DPimport requires a configuration file in YAML format, passed as a command -line argument with `-c|--config`, for establishing a MongoDB database -connection. You will find an example configuration file in the `examples` +line argument with `-c|--config`, for establishing a MongoDB database +connection. You will find an example configuration file in the `examples` directory within this repository. ## Usage + The main command line tool is `import.py`. You can use this tool to import any -DPdash-compatible CSV files or metadata files using the direct path to a file +DPdash-compatible CSV files or metadata files using the direct path to a file or a glob expression (use single quotes to avoid shell expansion) ```bash @@ -32,9 +35,6 @@ import.py -c config.yml '/PHOENIX/GENERAL/STUDY_A/SUB_001/DATA_TYPE/processed/*. import.py -c config.yml '/PHOENIX/GENERAL/STUDY_A/SUB_001/DATA_TYPE/processed/*.csv' -n 8 ``` -`-n 8` is for parallelly importing 8 files. The default is `-n 1`. - - You may also now use the `**` recursive glob expression, for example: ```bash @@ -55,18 +55,9 @@ and so on. `directory/*/*.csv` matches only `directory/[subdirectory]/[filename].csv`. With a [recursive glob pattern](https://docs.python.org/3/library/glob.html#glob.glob), `directory/**/*.csv` will additionally match: -* `directory/[filename].csv` (no subdirectory) -* `directory/[subdirectory1]/[subdirectory2]/[filename].csv` (sub-subdirectory) +- `directory/[filename].csv` (no subdirectory) +- `directory/[subdirectory1]/[subdirectory2]/[filename].csv` (sub-subdirectory) and so on, for as many levels deep as exist in the directory tree. - - - -## MongoDB - -This tool requires MongoDB to be running and accessible with the credentials you -supply in the `config.yml` file. For tips on MongoDB as it is used in DPdash and DPimport, -see [the DPdash wiki](https://github.com/PREDICT-DPACC/dpdash/wiki/MongoDB-Tips). - diff --git a/dpimport/__init__.py b/dpimport/__init__.py deleted file mode 100644 index e530fc8..0000000 --- a/dpimport/__init__.py +++ /dev/null @@ -1,104 +0,0 @@ -import os -import re -import hashlib -import logging -import mimetypes as mt -from . import patterns -from dppylib import diff_files - -logger = logging.getLogger(__name__) - -class Role: - UNKNOWN = 'unknown' - METADATA = 'metadata' - DATAFILE = 'data' - -def probe(path): - ''' - Check file for DPdash compatibility and return a file - information object. - - :param path: File path - :type path: str - ''' - if not os.path.exists(path): - logger.debug('file not found %s', path) - return None - dirname = os.path.dirname(path) - basename = os.path.basename(path) - # match file and get re match object and file role - role,match = match_file(basename) - if role == Role.UNKNOWN: - return None - # initialize info object - info = match.groupdict() - info['glob'] = path - if role == Role.DATAFILE: - info.update(init_datafile(info)) - info['glob'] = get_glob(path) - # add other necessary information to info object - mimetype,encoding = mt.guess_type(path) - stat = os.stat(path) - - with open(path) as f: - content = f.read().strip() - content_hash = hashlib.sha256(content.encode()).hexdigest() - - info.update({ - 'path' : path, - 'filetype' : mimetype, - 'encoding' : encoding, - 'basename' : basename, - 'dirname' : dirname, - 'dirty' : True, - 'synced' : False, - 'mtime' : stat.st_mtime, - 'size' : stat.st_size, - 'uid' : stat.st_uid, - 'gid' : stat.st_gid, - 'mode' : stat.st_mode, - 'role': role, - 'content': content_hash - }) - return info - -def match_file(f): - match = patterns.DATAFILE.match(f) - if match: - return Role.DATAFILE, match - match = patterns.METADATA.match(f) - if match: - return Role.METADATA, match - return Role.UNKNOWN, None - -def init_datafile(info): - string = '{STUDY}{SUBJECT}{ASSESSMENT}'.format( - STUDY=info['study'], - SUBJECT=info['subject'], - ASSESSMENT=info['assessment'] - ) - hash = hashlib.sha256(string.encode('utf-8')) - return { - 'collection': hash.hexdigest(), - 'subject' : info['subject'], - 'assessment' : info['assessment'], - 'time_units' : str(info['units']), - 'time_start' : int(info['start']), - 'time_end' : int(info['end']) - } - -def get_glob(f): - basename = os.path.basename(f) - dirname = os.path.dirname(f) - glob = patterns.GLOB_SUB.sub('\\1*\\2', basename) - return os.path.join(dirname, glob) - -def import_file(db, file_info): - if file_info['role'] == 'data': - collection = db['toc'] - elif file_info['role'] == 'metadata': - collection = db['metadata'] - else: - logger.error('incompatible file %s', file_info['path']) - return - diff_files(db, collection, file_info) diff --git a/dpimport/__version__.py b/dpimport/__version__.py deleted file mode 100644 index def0ea4..0000000 --- a/dpimport/__version__.py +++ /dev/null @@ -1,6 +0,0 @@ -__title__ = 'dpimport' -__description__ = 'DPdash importer' -__url__ = 'https://github.com/harvard-nrg/dpimport' -__version__ = '0.0.1' -__author__ = 'Neuroinformatics Research Group' -__author_email__ = 'info@neuroinfo.org' diff --git a/dpimport/database/__init__.py b/dpimport/database/__init__.py deleted file mode 100644 index e2fc40a..0000000 --- a/dpimport/database/__init__.py +++ /dev/null @@ -1,102 +0,0 @@ -import ssl -import fnmatch -import logging -from pymongo import MongoClient -from bson.json_util import dumps - -logger = logging.getLogger(__name__) - -class Database(object): - def __init__(self, config, dbname): - self.config = config - self.dbname = dbname - self.client = None - self.db = None - - def connect(self): - uri = 'mongodb://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{AUTH_SOURCE}' - uri = uri.format( - USERNAME=self.config['username'], - PASSWORD=self.config['password'], - HOST=self.config['hostname'], - PORT=self.config['port'], - AUTH_SOURCE=self.config['auth_source'] - ) - self.client = MongoClient( - uri, - ssl=True, - ssl_cert_reqs=ssl.CERT_REQUIRED, - ssl_certfile=self.config['ssl_certfile'], - ssl_keyfile=self.config['ssl_keyfile'], - ssl_ca_certs=self.config['ssl_ca_certs'], - serverSelectionTimeoutMS=300000 - ) - self.db = self.client[self.dbname] - return self - - def remove_unsynced(self, expr): - ''' - Remove all documents with sync: false matching the - input shell-style expression. - - :param expr: shell-style expression - :type expr: str - ''' - regex = fnmatch.translate(expr) - cursor = self.db.toc.find({ - 'path': { - '$regex': regex, - }, - 'synced': False - }, { - 'collection': True - }) - for doc in cursor: - _id = doc['_id'] - collection = doc['collection'] - # todo: wrap in a transaction, requires MongoDB 4.x - logger.debug('dropping collection %s', collection) - self.db[collection].drop() - logger.debug('deleting toc document %s', _id) - self.db.toc.remove({ '_id': _id }) - - def exists(self, probe): - ''' - Check if file exists in the database - - :param probe: File probe - :type probe: dict - ''' - doc = self.db.toc.find_one({ - 'content': probe['content'] - }) - if not doc: - # since it's not in data, check if it's in metadata - doc = self.db.metadata.find_one({ - 'content': probe['content'] - }) - - if doc: - return True - return False - - def unsync(self, expr): - ''' - Convert shell-style expression to a regular expression and - use that to match TOC entries for files stored in the - database and mark them as un-synced. - - :param expr: shell-style regular expression - :type expr: str - ''' - regex = fnmatch.translate(expr) - docs = self.db.toc.update_many({ - 'path': { - '$regex': regex - } - }, { - '$set': { - 'synced': False - } - }) - diff --git a/dpimport/importer/__init__.py b/dpimport/importer/__init__.py deleted file mode 100644 index 5e99f8d..0000000 --- a/dpimport/importer/__init__.py +++ /dev/null @@ -1,111 +0,0 @@ -import uuid -import logging - -logger = logging.getLogger(__name__) - -def import_file(db, file_info): - if file_info['role'] == 'data': - collection = db['toc'] - elif file_info['role'] == 'metadata': - collection = db['metadata'] - else: - logger.error('{FILE} is not compatible with DPdash. Exiting import.'.format(FILE=file_info['path'])) - return - diff_files(db, collection, file_info) - -# Match the file info with the record stored in the database -def diff_files(db, collection, file_info): - file_path = file_info['path'] - db_data = collection.find_one({ 'path' : file_path }) - if not db_data: - logger.info('{FILE} does not exist in the database. Importing.'.format(FILE=file_path)) - import_data(db, collection, file_info) - else: - if db_data['mtime'] != file_info['mtime'] or db_data['size'] != file_info['size']: - logger.info('{FILE} has been modified. Re-importing.'.format(FILE=file_path)) - dbtools.remove_doc(db, collection, db_data, file_info['role']) - import_data(db, collection, file_info) - else: - logger.info('Database already has {FILE}. Skipping.'.format(FILE=file_path)) - logged = log_success(collection, db_data['_id']) - if logged == 0: - logger.info('Journaling complete for {FILE}'.format(FILE=file_info['path'])) - -# Import data into the database -def import_data(db, ref_collection, file_info): - if file_info['role'] == 'metadata': - file_info.update({'collection': str(uuid.uuid4())}) - ref_id = insert_reference(ref_collection, file_info) - if ref_id is None: - logger.error('Unable to import {FILE}'.format(FILE=file_info['path'])) - return - - inserted = insert_data(db, file_info) - if inserted == 0: - logger.info('Import success for {FILE}'.format(FILE=file_info['path'])) - - logged = log_success(ref_collection, ref_id) - if logged == 0: - logger.info('Journaling complete for {FILE}'.format(FILE=file_info['path'])) - -# Mark the sync as successful -def log_success(ref_collection, ref_id): - update_ref = { - '$set' : { - 'dirty': False, - 'synced' : True, - 'updated' : datetime.utcnow() - } - } - - try: - ref_collection.update({ - '_id' : ref_id - }, update_ref) - return 0 - except Exception as e: - logger.error(e) - return 1 - -# Insert the reference doc, returns the inserted id -def insert_reference(collection, reference): - try: - ref_id = collection.insert_one(reference).inserted_id - return ref_id - except Exception as e: - logger.error(e) - return None - -# Insert the data -def insert_data(db, file_info): - try: - # Import data - data_blob = [] - import_collection = db[file_info['collection']] - for chunk in reader.read_csv(file_info['path']): - if len(chunk) > 0: - if file_info['role'] != 'metadata': - chunk_columns = sanitize_columns(chunk.columns.values.tolist()) - chunk.columns = chunk_columns - chunk['path'] = file_info['path'] - data_blob.extend(chunk.round(4).to_dict('records')) - - if len(data_blob) >= 100000: - import_collection.insert_many(data_blob, False) - data_blob = [] - if data_blob: - import_collection.insert_many(data_blob, False) - return 0 - except Exception as e: - logger.error(e) - logger.error('Unable to import {FILE}'.format(FILE=file_info['path'])) - -# Rename columns to encode special characters -def sanitize_columns(columns): - new_columns = [] - for column in columns: - new_column = quote(unicode(column).encode('utf-8'), safe='~()*!.\'').replace('.', '%2E') - new_columns.append(new_column) - - return new_columns - diff --git a/dpimport/patterns/__init__.py b/dpimport/patterns/__init__.py deleted file mode 100644 index d14e73a..0000000 --- a/dpimport/patterns/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -import re - -DATAFILE = re.compile(r'(?P\w+)\-(?P\w+)\-(?P\w+)\-(?Pday)(?P[+-]?\d+(?:\.\d+)?)to(?P[+-]?\d+(?:\.\d+)?)(?P.csv)') - -METADATA = re.compile(r'(?P\w+)\_metadata(?P.csv)') - -GLOB_SUB = re.compile(r'(\w+\-\w+\-\w+\-day)[+-]?\d+(?:\.\d+)?to[+-]?\d+(?:\.\d+)?(.*)') diff --git a/dppylib/__init__.py b/dppylib/__init__.py deleted file mode 100644 index bcd053c..0000000 --- a/dppylib/__init__.py +++ /dev/null @@ -1,248 +0,0 @@ -import os -import re -import logging -import hashlib -import uuid -import mimetypes as mt -from datetime import datetime -from urllib.parse import quote - -from tools import database as dbtools -from tools import reader -from time import sleep - -TIME_UNITS = { - 'day' : 'days', - 'hr' : 'hours' -} - -_UNITS = '|'.join(TIME_UNITS.keys()) -_EXTENSION = '.csv' - -FILE_REGEX = re.compile(r'(?P\w+)\-(?P\w+)\-(?P\w+)\-(?P{UNITS})(?P[+-]?\d+(?:\.\d+)?)to(?P[+-]?\d+(?:\.\d+)?)(?P{EXTENSION})'.format(UNITS=_UNITS, EXTENSION=_EXTENSION)) -FILE_SUB = re.compile(r'(\w+\-\w+\-\w+\-{UNITS})[+-]?\d+(?:\.\d+)?to[+-]?\d+(?:\.\d+)?(.*)'.format(UNITS=_UNITS)) -METADATA_REGEX = re.compile(r'(?P\w+)\_metadata(?P{EXTENSION})'.format(EXTENSION='.csv')) - -logger = logging.getLogger(__name__) - -# Verify if a file is DPdash-compatible file, and return file info if so. -def stat_file(import_dir, file_name, file_path): - file_info = match_file(file_name, import_dir) - if not file_info: - return None - - filetype,encoding = guess_type(file_info['extension']) - if not os.path.exists(file_path): - return None - - with open(file_path) as f: - content = f.read().strip() - content_hash = hashlib.sha256(content.encode()).hexdigest() - - file_stat = os.stat(file_path) - file_info.update({ - 'path' : file_path, - 'filetype' : filetype, - 'encoding' : encoding, - 'basename' : file_name, - 'dirname' : import_dir, - 'dirty' : True, - 'synced' : False, - 'mtime' : file_stat.st_mtime, - 'size' : file_stat.st_size, - 'uid' : file_stat.st_uid, - 'gid' : file_stat.st_gid, - 'mode' : file_stat.st_mode, - 'content' : content_hash - }) - - return file_info - -def import_file(db, file_info): - if file_info['role'] == 'data': - collection = db['toc'] - elif file_info['role'] == 'metadata': - collection = db['metadata'] - else: - logger.error('{FILE} is not compatible with DPdash. Exiting import.'.format(FILE=file_info['path'])) - return - - diff_files(db, collection, file_info) - -# Match the file info with the record stored in the database -def diff_files(db, collection, file_info): - file_path = file_info['path'] - db_data = collection.find_one({ 'path' : file_path }) - if not db_data: - logger.info('{FILE} does not exist in the database. Importing.'.format(FILE=file_path)) - import_data(db, collection, file_info) - else: - if db_data['content'] != file_info['content']: - logger.info('{FILE} has been modified. Re-importing.'.format(FILE=file_path)) - dbtools.remove_doc(db, collection, db_data, file_info['role']) - import_data(db, collection, file_info) - else: - logger.info('Database already has {FILE}. Skipping.'.format(FILE=file_path)) - logged = log_success(collection, db_data['_id']) - if logged == 0: - logger.info('Journaling complete for {FILE}'.format(FILE=file_info['path'])) - -# Import data into the database -def import_data(db, ref_collection, file_info): - if file_info['role'] == 'metadata': - file_info.update({'collection': str(uuid.uuid4())}) - ref_id = insert_reference(ref_collection, file_info) - if ref_id is None: - logger.error('Unable to import {FILE}'.format(FILE=file_info['path'])) - return - - inserted = insert_data(db, file_info) - if inserted == 0: - logger.info('Import success for {FILE}'.format(FILE=file_info['path'])) - - logged = log_success(ref_collection, ref_id) - if logged == 0: - logger.info('Journaling complete for {FILE}'.format(FILE=file_info['path'])) - -# Mark the sync as successful -def log_success(ref_collection, ref_id): - update_ref = { - '$set' : { - 'dirty': False, - 'synced' : True, - 'updated' : datetime.utcnow() - } - } - - try: - ref_collection.update({ - '_id' : ref_id - }, update_ref) - return 0 - except Exception as e: - logger.error(e) - return 1 - -# Insert the reference doc, returns the inserted id -def insert_reference(collection, reference): - try: - try: - ref_id = collection.insert_one(reference).inserted_id - except: - sleep(60) - logger.info('Retrying reference insertion of {FILE}'.format(FILE=reference['path'])) - ref_id = collection.insert_one(reference).inserted_id - return ref_id - except Exception as e: - logger.error(e) - return None - -# Insert the data -def insert_data(db, file_info): - try: - # Import data - data_blob = [] - import_collection = db[file_info['collection']] - - # https://github.com/AMP-SCZ/dpimport/blob/52a0b80e704b20297e2239597d7145db1ae7c7f8/tools/reader/__init__.py#L9 - # reader.read_csv() has specified chunksize=1 - # so each chunk is one row of the csv file at a time - # https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-chunking - for chunk in reader.read_csv(file_info['path']): - if len(chunk) > 0: - if file_info['role'] != 'metadata': - chunk_columns = sanitize_columns(chunk.columns.values.tolist()) - chunk.columns = chunk_columns - chunk['path'] = file_info['path'] - - # each data_blob is json representation of one row of the csv file at a time - data_blob.extend(chunk.to_dict('records')) - - if len(data_blob) >= 10: - try: - import_collection.insert_many(data_blob, False) - except: - sleep(60) - logger.info('Retrying data insertion of {FILE}'.format(FILE=file_info['path'])) - import_collection.insert_many(data_blob, False) - data_blob = [] - - if data_blob: - try: - import_collection.insert_many(data_blob, False) - except: - sleep(60) - logger.info('Retrying data insertion of {FILE}'.format(FILE=file_info['path'])) - import_collection.insert_many(data_blob, False) - return 0 - except Exception as e: - logger.error(e) - logger.error('Unable to import {FILE}'.format(FILE=file_info['path'])) - return 1 - -# Rename columns to encode special characters -def sanitize_columns(columns): - new_columns = [] - for column in columns: - new_column = quote(str(column).encode('utf-8'), safe='~()*!.\'').replace('.', '%2E') - new_columns.append(new_column) - - return new_columns - -# Match the filename to distinguish data from metadata files -def match_file(file_name, sub_dir): - matched_file = FILE_REGEX.match(file_name) - if not matched_file: - logger.info('file did not match %s', file_name) - matched_metadata = METADATA_REGEX.match(file_name) - if not matched_metadata: - return None - else: - return scan_metadata(matched_metadata, file_name, sub_dir) - else: - logger.info('file matched %s', file_name) - return scan_data(matched_file, file_name, sub_dir) - -# Return file_info for the metadata -def scan_metadata(match, file_name, sub_dir): - file_info = match.groupdict() - - file_info.update({ - 'glob' : os.path.join(sub_dir, file_name), - 'role' : 'metadata' - }) - - return file_info - -# Return file_info for the data -def scan_data(match, file_name, sub_dir): - file_info = match.groupdict() - - m = hashlib.sha256('{STUDY}{SUBJECT}{ASSESSMENT}'.format( - STUDY= file_info['study'], - SUBJECT=file_info['subject'], - ASSESSMENT=file_info['assessment'] - )) - - file_info.update({ - 'collection': m.hexdigest(), - 'subject' : file_info['subject'], - 'assessment' : file_info['assessment'], - 'glob' : os.path.join(sub_dir, FILE_SUB.sub('\\1*\\2', file_name)), - 'time_units' : str(file_info['units']), - 'time_start' : int(file_info['start']), - 'time_end' : int(file_info['end']), - 'role' : 'data' - }) - - return file_info - -# get mime type and encoding -def guess_type(extension): - return mt.guess_type('file{}'.format(extension)) - -class StatError(Exception): - pass - -class ParserError(Exception): - pass diff --git a/examples/config.yml b/examples/config.yml index 5f55739..6c14134 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -1,9 +1 @@ -ssl_keyfile: /path/to/ssl/client/key.pem -ssl_certfile: /path/to/ssl/client/cert.pem -ssl_ca_certs: /path/to/ssl/ca/cacert.pem -username: jharvard -password: ********* -port: 27017 -auth_source: admin -db: dpdata -hostname: dpdash.example.org +api_url: http://localhost:8000/api/v1/import/data/ diff --git a/tools/__init__.py b/scripts/__init__.py similarity index 100% rename from tools/__init__.py rename to scripts/__init__.py diff --git a/scripts/api/__init__.py b/scripts/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/api/api.py b/scripts/api/api.py new file mode 100644 index 0000000..8020869 --- /dev/null +++ b/scripts/api/api.py @@ -0,0 +1,23 @@ +import requests + + +def routes(url, path): + routes = {"metadata": url + "metadata", "day_data": url + "day"} + + return routes[path] + + +def upsert_file(url, file_data): + headers = {"content-type": "application/json"} + r = requests.post( + url, + headers=headers, + data=file_data, + ) + status = r.status_code + if status != 200: + response = r.json()["message"] + print(response) + else: + response = r.json()["data"] + print(response) diff --git a/scripts/data_importer_service/__init__.py b/scripts/data_importer_service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py new file mode 100644 index 0000000..bc4306f --- /dev/null +++ b/scripts/data_importer_service/data_import_service.py @@ -0,0 +1,158 @@ +import re +import os +import hashlib +import json +import pandas as pd +from urllib.parse import quote +import mimetypes as mt +import logging + +logger = logging.getLogger(__name__) + + +class DataImporterService: + DATAFILE = re.compile( + r"(?P\w+)\-(?P\w+)\-(?P\w+)\-(?Pday)(?P[+-]?\d+(?:\.\d+)?)to(?P[+-]?\d+(?:\.\d+)?)(?P.csv)" + ) + METADATA = re.compile(r"(?P\w+)\_metadata(?P.csv)") + GLOB_SUB = re.compile( + r"(\w+\-\w+\-\w+\-day)[+-]?\d+(?:\.\d+)?to[+-]?\d+(?:\.\d+)?(.*)" + ) + records = "records" + metadata_key = "metadata" + + def __init__(self, data_file, metadata_file): + self.data_file = data_file + self.metadata_file = metadata_file + + def process_file(self, path): + basename = os.path.basename(path) + data_file = self.DATAFILE.match(basename) + metadata_file = self.METADATA.match(basename) + + if data_file: + data_file_info = data_file.groupdict() + + return self.process_data_file(path, data_file_info) + if metadata_file: + metadata_file_info = metadata_file.groupdict() + + return self.process_metadata_file(path, metadata_file_info) + else: + return None + + def process_data_file(self, path, file_extension): + file_extension.update({"time_end": file_extension["end"]}) + collection_base = "{study}{subject}{assessment}".format( + study=file_extension["study"], + subject=file_extension["subject"], + assessment=file_extension["assessment"], + ).encode("utf-8") + hash_collection = hashlib.sha256(collection_base).hexdigest() + metadata = self._file_info(path) + + metadata.update( + {"role": "data", "collection": hash_collection, **file_extension} + ) + + del file_extension["extension"] + + csv_data = self._read_csv( + path, + ) + subject_assessments = self.collect_csv_row(csv_data) + + self.data_file.update( + {"metadata": metadata, "subject_assessments": subject_assessments} + ) + return + + def process_metadata_file(self, path, file_extension): + metadata = self._file_info(path) + csv_data = self._read_csv(path) + participants = self.collect_csv_row(csv_data) + + metadata.update( + { + "role": "metadata", + **file_extension, + } + ) + self.metadata_file.update( + { + "metadata": metadata, + "participants": participants, + } + ) + + return + + def _read_csv(self, file_path): + try: + tfr = pd.read_csv( + file_path, + memory_map=True, + keep_default_na=False, + chunksize=1, + engine="c", + skipinitialspace=True, + ) + for data_frame in tfr: + yield data_frame + + except pd.io.common.EmptyDataError as e: + logger.error(e) + except Exception as e: + logger.error(e) + + def _sanitize_columns(self, columns): + new_columns = [] + for column in columns: + new_column = ( + quote(str(column).encode("utf-8"), safe="~()*!.'") + .replace(".", "%2E") + .replace(" ", "") + ) + new_columns.append(new_column) + + return new_columns + + def _file_info(self, path): + mimetype, encoding = mt.guess_type(path) + dirname = os.path.dirname(path) + basename = os.path.basename(path) + stat = os.stat(path) + + return { + "path": path, + "filetype": mimetype, + "encoding": encoding, + "basename": basename, + "dirname": dirname, + "mtime": stat.st_mtime, + "size": stat.st_size, + "uid": stat.st_uid, + "gid": stat.st_gid, + "mode": stat.st_mode, + } + + def collect_csv_row(self, data_frame): + list = [] + for data in data_frame: + data.columns = self._sanitize_columns(data.columns.values.tolist()) + list.extend(data.to_dict(self.records)) + return list + + def processed_data_to_json(self): + processed_data = ( + json.dumps(self.data_file) + if self.data_file and len(self.data_file["subject_assessments"]) > 0 + else None + ) + processed_metadata = ( + json.dumps(self.metadata_file) + if self.metadata_file and len(self.metadata_file["participants"]) > 0 + else None + ) + + return processed_data, processed_metadata diff --git a/scripts/import.py b/scripts/import.py index 8c1b4a7..64fe502 100755 --- a/scripts/import.py +++ b/scripts/import.py @@ -1,228 +1,43 @@ #!/usr/bin/env python import os -import sys -import ssl import glob import yaml -import dppylib -import dpimport import logging import argparse as ap -import collections as col -import dpimport.importer as importer -from dpimport.database import Database -from pymongo import DeleteMany, UpdateMany -from pymongo.errors import BulkWriteError - -from multiprocessing import Pool -import signal +from data_importer_service import data_import_service +from api import api logger = logging.getLogger(__name__) -def RAISE(err): - raise err - - -def _main(db1,f): - dirname = os.path.dirname(f) - basename = os.path.basename(f) - # probe for dpdash-compatibility and gather information - probe = dpimport.probe(f) - if not probe: - logger.debug('document is unknown %s', basename) - return - - db=db1.connect() - # nothing to be done - if db.exists(probe): - logger.info('document exists and is up to date %s', probe['path']) - return - logger.info('document does not exist or is out of date %s', probe['path']) - # import the file - logger.info('importing file %s', f) - dppylib.import_file(db.db, probe) - - def main(): parser = ap.ArgumentParser() - parser.add_argument('-c', '--config') - parser.add_argument('-d', '--dbname', default='dpdata') - parser.add_argument('-v', '--verbose', action='store_true') - parser.add_argument('-n','--ncpu', type= int, default= 1, - help='number of processes/threads to use') - parser.add_argument('expr') + parser.add_argument("-c", "--config") + parser.add_argument("expr") args = parser.parse_args() - level = logging.INFO - if args.verbose: - level = logging.DEBUG - logging.basicConfig(level=level) - - with open(os.path.expanduser(args.config), 'r') as fo: + with open(os.path.expanduser(args.config), "r") as fo: config = yaml.load(fo, Loader=yaml.SafeLoader) - - db1 = Database(config, args.dbname) - - files=glob.glob(args.expr, recursive=True) - - if args.ncpu==1: - - for f in files: - _main(db1,f) - - elif args.ncpu>1: - - sigint_handler= signal.signal(signal.SIGINT, signal.SIG_IGN) - pool= Pool(args.ncpu) - signal.signal(signal.SIGINT, sigint_handler) - - try: - for f in files: - pool.apply_async(_main, (db1,f), error_callback= RAISE) - except KeyboardInterrupt: - pool.terminate() - else: - pool.close() - pool.join() + api_url = config["api_url"] - db=db1.connect() - logger.info('cleaning metadata') - lastday = get_lastday(db.db) - if lastday: - clean_metadata(db.db, lastday) - -def clean_metadata(db, max_days): - studies = col.defaultdict() - subjects = list() - - for subject in max_days: - if subject['_id']['study'] not in studies: - studies[subject['_id']['study']] = {} - studies[subject['_id']['study']]['subject'] = [] - studies[subject['_id']['study']]['max_day'] = 0 - - # if there are more than 2, drop unsynced - metadata = list(db.metadata.find( - { - 'study' : subject['_id']['study'] - }, - { - '_id' : True, - 'collection' : True, - 'synced' : True - } - )) - - if len(metadata) > 1: - for doc in metadata: - if doc['synced'] is False and 'collection' in doc: - db[doc['collection']].drop() - if doc['synced'] is False: - db.metadata.delete_many( - { - '_id': doc['_id'] - } - ) - - subject_metadata = col.defaultdict() - subject_metadata['subject'] = subject['_id']['subject'] - subject_metadata['synced'] = subject['synced'] - subject_metadata['days'] = subject['days'] - subject_metadata['study'] = subject['_id']['study'] - - studies[subject['_id']['study']]['max_day'] = studies[subject['_id']['study']]['max_day'] if (studies[subject['_id']['study']]['max_day'] >= subject['days'] ) else subject['days'] - - studies[subject['_id']['study']]['subject'].append(subject_metadata) - - for study, subject in iter(studies.items()): - bulk_metadata = [] - bulk_metadata = bulk_metadata + [UpdateMany({'study' : study}, {'$set' : - { - 'synced' : True, - 'subjects' : studies[study]['subject'], - 'days' : studies[study]['max_day'] - } - }, upsert=True)] - bulk_metadata = bulk_metadata + [DeleteMany({'study' : study, 'synced' : False})] - bulk_metadata = bulk_metadata + [UpdateMany({'study' : study }, {'$set' : {'synced' : False}})] - try: - db.metadata.bulk_write(bulk_metadata) - except BulkWriteError as e: - logger.error(e) - -def get_lastday(db): - return list(db.toc.aggregate([ - { - '$group' : { - '_id' : { - 'study': '$study', - 'subject' : '$subject' - }, - 'days' : { - '$max' : '$time_end' - }, - 'synced' : { - '$max' : '$updated' - } - } - } - ])) - -def clean_toc(db): - logger.info('cleaning table of contents') - out_of_sync_tocs = db.toc.find( - { - 'synced' : False - }, - { - '_id' : False, - 'collection' : True, - 'path' : True - } - ) - - for doc in out_of_sync_tocs: - db[doc['collection']].delete_many( - { - 'path' : doc['path'] - } + # iterate over matching files on the filesystem + for file in glob.iglob(args.expr, recursive=True): + data_file = {} + metadata_file = {} + importer_service = data_import_service.DataImporterService( + data_file, metadata_file ) + importer_service.process_file(file) + data, meta = importer_service.processed_data_to_json() - bulk = [DeleteMany({ 'synced' : False })] - try: - db.toc.bulk_write(bulk) - except BulkWriteError as e: - logger.error(e) + if data: + api.upsert_file(api.routes(api_url, "day_data"), data) -def clean_toc_study(db, study): - logger.info('cleaning table of contents for {0}'.format(study)) - out_of_sync_tocs = db.toc.find( - { - 'study' : study, - 'synced' : False - }, - { - '_id' : False, - 'collection' : True, - 'path' : True - } - ) - for doc in out_of_sync_tocs: - db[doc['collection']].delete_many( - { - 'path' : doc['path'] - } - ) + if meta: + api.upsert_file(api.routes(api_url, "metadata"), meta) - bulk = [DeleteMany({ 'study' : study, 'synced' : False })] - try: - db.toc.bulk_write(bulk) - except BulkWriteError as e: - logger.error(e) -if __name__ == '__main__': +if __name__ == "__main__": main() - diff --git a/setup.py b/setup.py index a446c6d..f8976c3 100644 --- a/setup.py +++ b/setup.py @@ -3,26 +3,20 @@ here = os.path.abspath(os.path.dirname(__file__)) -requires = [ - 'pyaml', - 'pymongo==3.11.4', - 'pandas' -] +requires = ["pyaml", "pandas", "requests"] about = dict() -with open(os.path.join(here, 'dpimport', '__version__.py'), 'r') as f: +with open(os.path.join(here, "dpimport", "__version__.py"), "r") as f: exec(f.read(), about) setup( - name=about['__title__'], - version=about['__version__'], - description=about['__description__'], - author=about['__author__'], - author_email=about['__author_email__'], - url=about['__url__'], + name=about["__title__"], + version=about["__version__"], + description=about["__description__"], + author=about["__author__"], + author_email=about["__author_email__"], + url=about["__url__"], packages=find_packages(), - scripts=[ - 'scripts/import.py' - ], - install_requires=requires + scripts=["scripts/import.py"], + install_requires=requires, ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_importer_service.py b/tests/test_importer_service.py new file mode 100644 index 0000000..97b4f70 --- /dev/null +++ b/tests/test_importer_service.py @@ -0,0 +1,133 @@ +from unittest import TestCase +from unittest.mock import patch +from scripts.data_importer_service.data_import_service import DataImporterService + + +class TestImporter(TestCase): + @classmethod + def setUpClass(cls): + cls.data_file = {} + cls.metadata_file = {} + + def setUp(self): + self.importer = DataImporterService(self.data_file, self.metadata_file) + + def test_file_divergence(self): + path = "blah.csv" + + assert self.importer.diverge_files(path) == None + + with patch.object( + self.importer, "process_data_file" + ) as mocked_process_data_file: + data_file_path = "study-subject-assessment-day1to4.csv" + + self.importer.diverge_files(data_file_path) + mocked_process_data_file.assert_called_once() + + with patch.object( + self.importer, "process_metadata_file" + ) as mocked_process_metadata_file: + metadata_file_path = "site_metadata.csv" + + self.importer.diverge_files(metadata_file_path) + mocked_process_metadata_file.assert_called_once() + + def test_processed_data_to_json(self): + data_file_path = "study-subject-assessment-day1to4.csv" + data_file_extension_to_dict = self.importer.DATAFILE.match( + data_file_path + ).groupdict() + mock_data_file_info = { + "path": "study-subject-assessment-day1to4.csv", + "filetype": "text/csv", + "encoding": "utf-8", + "basename": "study-subject-assessment-day1to4.csv", + "dirname": "/path/to/files", + "mtime": 1234567890.0, + "size": 1024, + "uid": 1000, + "gid": 1000, + "mode": 0o644, + } + subject_assessments = [ + { + "var1": 1, + "var2": 2, + "var3": "str", + }, + { + "var1": 2, + "var2": 2, + "var3": "str", + "var4": 5, + "var6": 6, + "var7": "str2", + }, + ] + metadata_file_path = "site_metadata.csv" + metadata_file_extension_to_dict = self.importer.METADATA.match( + metadata_file_path + ).groupdict() + mock_file_info = { + "filetype": "text/csv", + "encoding": "utf-8", + "dirname": "/path/to/files", + "mtime": 1234567890.0, + "size": 1024, + "uid": 1000, + "gid": 1000, + "mode": 0o644, + } + participants = [ + {"Subject ID": "YA1", "Active": 1, "Consent": "-", "Study": "YA"}, + {"Subject ID": "YA2", "Active": 1, "Consent": "-", "Study": "YA"}, + ] + + with patch.object( + self.importer, "_file_info", return_value=mock_data_file_info + ), patch.object( + self.importer, "_read_csv", return_value={"columns": []} + ), patch.object( + self.importer, "collect_csv_row", return_value=subject_assessments + ): + self.importer.process_data_file(data_file_path, data_file_extension_to_dict) + with patch.object( + self.importer, "_file_info", return_value=mock_file_info + ), patch.object( + self.importer, "_read_csv", return_value={"columns": []} + ), patch.object( + self.importer, "collect_csv_row", return_value=participants + ): + self.importer.process_metadata_file( + metadata_file_path, metadata_file_extension_to_dict + ) + + data_to_json = self.importer.processed_data_to_json() + + assert ( + data_to_json[0] + == '{"metadata": {"path": "study-subject-assessment-day1to4.csv", "filetype": ' + '"text/csv", "encoding": "utf-8", "basename": ' + '"study-subject-assessment-day1to4.csv", "dirname": "/path/to/files", ' + '"mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, ' + '"role": "data", "collection": ' + '"5e74265e4a4d3760737bbf39a513a938b5bd333a958c699d5006ae026ae0017a", "study": ' + '"study", "subject": "subject", "assessment": "assessment", "units": "day", ' + '"start": "1", "end": "4", "extension": ".csv", "time_end": "4"}, ' + '"subject_assessments": [{"var1": 1, "var2": 2, "var3": "str"}, {"var1": 2, ' + '"var2": 2, "var3": "str", "var4": 5, "var6": 6, "var7": "str2"}]}' + ) + assert ( + data_to_json[1] + == '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Subject ID": "YA1", "Active": 1, "Consent": "-", "Study": "YA"}, {"Subject ID": "YA2", "Active": 1, "Consent": "-", "Study": "YA"}]}' + ) + + self.importer.data_file["subject_assessments"] = [] + + data_to_json = self.importer.processed_data_to_json() + + assert data_to_json == ( + None, + '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Subject ID": "YA1", "Active": 1, "Consent": "-", "Study": "YA"}, {"Subject ID": "YA2", "Active": 1, "Consent": "-", "Study": "YA"}]}', + ) diff --git a/tools/database/__init__.py b/tools/database/__init__.py deleted file mode 100644 index 7f2fc74..0000000 --- a/tools/database/__init__.py +++ /dev/null @@ -1,36 +0,0 @@ -import logging - -logger = logging.getLogger(__name__) - -def sanitize(db): - dirty_files = db.toc.find({ - 'dirty' : True - }) - dirty_metadata = db.metadata.find({ - 'dirty' : True - }) - for doc in dirty_files: - logger.info('{FILE} is outdated. Deleting from the database.'.format(FILE=doc['path'])) - remove_doc(db, db['toc'], doc, 'data') - - for doc in dirty_metadata: - logger.info('{FILE} is outdated. Deleting from the database.'.format(FILE=doc['path'])) - remove_doc(db, db['metadata'], doc, 'metadata') - -def remove_doc(db, collection, doc, role): - try: - collection.delete_many({ - '_id' : doc['_id'] - }) - if role == 'metadata': - db[doc['collection']].drop() - else: - - db[doc['collection']].delete_many({ - 'path' : doc['path'] - }) - return 0 - except Exception as e: - logger.error(e) - logger.error('Could not remove {FILE} from the database.'.format(FILE=doc['path'])) - return 1 diff --git a/tools/reader/__init__.py b/tools/reader/__init__.py deleted file mode 100644 index 7ced633..0000000 --- a/tools/reader/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -import pandas as pd -import logging - -logger = logging.getLogger(__name__) - -# Read in the file and yield the dataframe chunk -def read_csv(file_path): - try: - tfr = pd.read_csv(file_path, memory_map=True, keep_default_na=False, chunksize=1, engine='c', skipinitialspace=True) - for df in tfr: - yield df - except pd.io.common.EmptyDataError as e: - logger.error(e) - except Exception as e: - logger.error(e) From 56038b83f4c7ee28762ff539f688f1f3e7d6a7e6 Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Fri, 25 Aug 2023 14:16:54 -0500 Subject: [PATCH 02/21] u Load Test CSV * This pr adds a script generate_test_csv to generate import files that stress test the api endpoint * It uses parallelism to write the files and it is configurable using the yaml file # json * Added hash collection --- examples/config.yml | 4 ++ scripts/api/api.py | 1 + scripts/csv_data_service/__init__.py | 0 scripts/csv_data_service/csv_data_service.py | 61 +++++++++++++++++++ .../data_import_service.py | 52 +++------------- scripts/generate_test_data.py | 34 +++++++++++ scripts/import.py | 1 - setup.py | 4 +- tests/test_csv_data_service.py | 21 +++++++ tests/test_importer_service.py | 18 ++---- 10 files changed, 136 insertions(+), 60 deletions(-) create mode 100644 scripts/csv_data_service/__init__.py create mode 100644 scripts/csv_data_service/csv_data_service.py create mode 100644 scripts/generate_test_data.py create mode 100644 tests/test_csv_data_service.py diff --git a/examples/config.yml b/examples/config.yml index 6c14134..d6e6f19 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -1 +1,5 @@ +csv_directory: './test_data' +columns: 5 +max_days: 45 +participant_count: 5 api_url: http://localhost:8000/api/v1/import/data/ diff --git a/scripts/api/api.py b/scripts/api/api.py index 8020869..7f58ec7 100644 --- a/scripts/api/api.py +++ b/scripts/api/api.py @@ -1,4 +1,5 @@ import requests +import logging def routes(url, path): diff --git a/scripts/csv_data_service/__init__.py b/scripts/csv_data_service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/csv_data_service/csv_data_service.py b/scripts/csv_data_service/csv_data_service.py new file mode 100644 index 0000000..e591a4b --- /dev/null +++ b/scripts/csv_data_service/csv_data_service.py @@ -0,0 +1,61 @@ +import csv +import logging + +logger = logging.getLogger(__name__) + + +class CSVDataService: + def __init__(self, path, max_columns, max_days): + self.Data = {} + self.max_columns = max_columns + self.max_days = max_days + self.columns = [] + self.day_data = [] + self.assessment_data = [] + self.path = path + + def _generate_columns(self): + var_prefix = "var_random_" + + for x in range(0, self.max_columns): + if x == 2: + self.columns.append("day") + self.columns.append(var_prefix + "%d" % (x)) + + def _generate_column_data(self): + for x in range(0, self.max_columns): + if x == 2: + self.day_data.append(x) + if x % 2 == 0 and not x == 2: + self.day_data.append("value" + " %d" % (x)) + elif x % 1 == 0: + self.day_data.append(x) + + def prepare_csv_test_data(self): + self._generate_columns() + self._generate_column_data() + + def write_test_csv_files(self, participant_num): + # Write file name to automatically be day1 to maxday + + with open(self.build_file_path(participant_num), "w", newline="") as csvfile: + csv_writer = csv.writer( + csvfile, delimiter=",", quotechar="|", quoting=csv.QUOTE_MINIMAL + ) + + csv_writer.writerow(self.columns) + + for x in range(1, self.max_days): + self.day_data[2] = x + csv_writer.writerow(self.day_data) + + print(f"Finished writing file {participant_num}") + + def build_file_path(self, participant): + return ( + self.path + + "/" + + f"gnar-eng{participant}-superassessment-day1to" + + "%d" % (self.max_days) + + ".csv" + ) diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index bc4306f..fb9b5f5 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -2,7 +2,7 @@ import os import hashlib import json -import pandas as pd +import polars as pl from urllib.parse import quote import mimetypes as mt import logging @@ -50,18 +50,14 @@ def process_data_file(self, path, file_extension): ).encode("utf-8") hash_collection = hashlib.sha256(collection_base).hexdigest() metadata = self._file_info(path) - metadata.update( {"role": "data", "collection": hash_collection, **file_extension} ) - del file_extension["extension"] - csv_data = self._read_csv( + subject_assessments = self._read_csv( path, ) - subject_assessments = self.collect_csv_row(csv_data) - self.data_file.update( {"metadata": metadata, "subject_assessments": subject_assessments} ) @@ -69,8 +65,7 @@ def process_data_file(self, path, file_extension): def process_metadata_file(self, path, file_extension): metadata = self._file_info(path) - csv_data = self._read_csv(path) - participants = self.collect_csv_row(csv_data) + participants = self._read_csv(path) metadata.update( { @@ -88,34 +83,8 @@ def process_metadata_file(self, path, file_extension): return def _read_csv(self, file_path): - try: - tfr = pd.read_csv( - file_path, - memory_map=True, - keep_default_na=False, - chunksize=1, - engine="c", - skipinitialspace=True, - ) - for data_frame in tfr: - yield data_frame - - except pd.io.common.EmptyDataError as e: - logger.error(e) - except Exception as e: - logger.error(e) - - def _sanitize_columns(self, columns): - new_columns = [] - for column in columns: - new_column = ( - quote(str(column).encode("utf-8"), safe="~()*!.'") - .replace(".", "%2E") - .replace(" ", "") - ) - new_columns.append(new_column) - - return new_columns + df = pl.read_csv(file_path) + return df.to_dicts() def _file_info(self, path): mimetype, encoding = mt.guess_type(path) @@ -136,23 +105,18 @@ def _file_info(self, path): "mode": stat.st_mode, } - def collect_csv_row(self, data_frame): - list = [] - for data in data_frame: - data.columns = self._sanitize_columns(data.columns.values.tolist()) - list.extend(data.to_dict(self.records)) - return list - def processed_data_to_json(self): + print("converting data to json") processed_data = ( json.dumps(self.data_file) if self.data_file and len(self.data_file["subject_assessments"]) > 0 else None ) + print("processing meatada to json") processed_metadata = ( json.dumps(self.metadata_file) if self.metadata_file and len(self.metadata_file["participants"]) > 0 else None ) - + print("processed json") return processed_data, processed_metadata diff --git a/scripts/generate_test_data.py b/scripts/generate_test_data.py new file mode 100644 index 0000000..9481f25 --- /dev/null +++ b/scripts/generate_test_data.py @@ -0,0 +1,34 @@ +import os +import yaml +import argparse as ap +from csv_data_service import csv_data_service +from concurrent.futures import ProcessPoolExecutor + + +def main(): + parser = ap.ArgumentParser() + parser.add_argument("-c", "--config") + args = parser.parse_args() + + with open(os.path.expanduser(args.config), "r") as fo: + config = yaml.load(fo, Loader=yaml.SafeLoader) + + dir_path = config["csv_directory"] + data_columns = config["columns"] + range_of_days = config["max_days"] + num_of_participants = config["participant_count"] + + if not os.path.isdir(dir_path): + os.mkdir(dir_path) + + csv_service = csv_data_service.CSVDataService(dir_path, data_columns, range_of_days) + csv_service.prepare_csv_test_data() + + with ProcessPoolExecutor(4) as exe: + exe.map(csv_service.write_test_csv_files, range(num_of_participants)) + + print("Finished Writing files") + + +if __name__ == "__main__": + main() diff --git a/scripts/import.py b/scripts/import.py index 64fe502..366d2f4 100755 --- a/scripts/import.py +++ b/scripts/import.py @@ -21,7 +21,6 @@ def main(): config = yaml.load(fo, Loader=yaml.SafeLoader) api_url = config["api_url"] - # iterate over matching files on the filesystem for file in glob.iglob(args.expr, recursive=True): data_file = {} diff --git a/setup.py b/setup.py index f8976c3..caa0828 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ here = os.path.abspath(os.path.dirname(__file__)) -requires = ["pyaml", "pandas", "requests"] +requires = ["pyaml", "polars", "requests"] about = dict() with open(os.path.join(here, "dpimport", "__version__.py"), "r") as f: @@ -17,6 +17,6 @@ author_email=about["__author_email__"], url=about["__url__"], packages=find_packages(), - scripts=["scripts/import.py"], + scripts=["scripts/import.py", "scripts/generate_test_data"], install_requires=requires, ) diff --git a/tests/test_csv_data_service.py b/tests/test_csv_data_service.py new file mode 100644 index 0000000..8860e78 --- /dev/null +++ b/tests/test_csv_data_service.py @@ -0,0 +1,21 @@ +from unittest import TestCase +from scripts.csv_data_service.csv_data_service import CSVDataService + + +class TestCsvData(TestCase): + @classmethod + def setUpClass(cls): + cls.max_columns = 6 + cls.max_days = 5 + cls.participants = 5 + + def setUp(self): + self.csv_generator = CSVDataService( + self.max_columns, self.max_days, self.participants + ) + + def test_generate_column_data(self): + self.csv_generator._generate_columns() + assert len(self.csv_generator.columns) == self.max_columns + self.csv_generator._generate_column_data() + assert len(self.csv_generator.day_data) == self.max_columns diff --git a/tests/test_importer_service.py b/tests/test_importer_service.py index 97b4f70..d85bad4 100644 --- a/tests/test_importer_service.py +++ b/tests/test_importer_service.py @@ -15,14 +15,14 @@ def setUp(self): def test_file_divergence(self): path = "blah.csv" - assert self.importer.diverge_files(path) == None + assert self.importer.process_file(path) == None with patch.object( self.importer, "process_data_file" ) as mocked_process_data_file: data_file_path = "study-subject-assessment-day1to4.csv" - self.importer.diverge_files(data_file_path) + self.importer.process_file(data_file_path) mocked_process_data_file.assert_called_once() with patch.object( @@ -30,7 +30,7 @@ def test_file_divergence(self): ) as mocked_process_metadata_file: metadata_file_path = "site_metadata.csv" - self.importer.diverge_files(metadata_file_path) + self.importer.process_file(metadata_file_path) mocked_process_metadata_file.assert_called_once() def test_processed_data_to_json(self): @@ -86,19 +86,11 @@ def test_processed_data_to_json(self): with patch.object( self.importer, "_file_info", return_value=mock_data_file_info - ), patch.object( - self.importer, "_read_csv", return_value={"columns": []} - ), patch.object( - self.importer, "collect_csv_row", return_value=subject_assessments - ): + ), patch.object(self.importer, "_read_csv", return_value=subject_assessments): self.importer.process_data_file(data_file_path, data_file_extension_to_dict) with patch.object( self.importer, "_file_info", return_value=mock_file_info - ), patch.object( - self.importer, "_read_csv", return_value={"columns": []} - ), patch.object( - self.importer, "collect_csv_row", return_value=participants - ): + ), patch.object(self.importer, "_read_csv", return_value=participants): self.importer.process_metadata_file( metadata_file_path, metadata_file_extension_to_dict ) From ca9246c4cce417689f4a08a3823f846352ca53ae Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Fri, 25 Aug 2023 14:16:54 -0500 Subject: [PATCH 03/21] Service * Converted api functions to import service * Added api auth keys to service # participant csv data # json # participant csv data --- examples/config.yml | 2 ++ scripts/api/api.py | 24 ---------------- scripts/csv_data_service/csv_data_service.py | 1 - .../data_import_service.py | 21 ++++++-------- scripts/import.py | 10 ++++--- scripts/{api => importer_service}/__init__.py | 0 .../importer_service/api_importer_service.py | 28 +++++++++++++++++++ 7 files changed, 45 insertions(+), 41 deletions(-) delete mode 100644 scripts/api/api.py rename scripts/{api => importer_service}/__init__.py (100%) create mode 100644 scripts/importer_service/api_importer_service.py diff --git a/examples/config.yml b/examples/config.yml index d6e6f19..3084b06 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -3,3 +3,5 @@ columns: 5 max_days: 45 participant_count: 5 api_url: http://localhost:8000/api/v1/import/data/ +api_user: importer +api_key: super_secret diff --git a/scripts/api/api.py b/scripts/api/api.py deleted file mode 100644 index 7f58ec7..0000000 --- a/scripts/api/api.py +++ /dev/null @@ -1,24 +0,0 @@ -import requests -import logging - - -def routes(url, path): - routes = {"metadata": url + "metadata", "day_data": url + "day"} - - return routes[path] - - -def upsert_file(url, file_data): - headers = {"content-type": "application/json"} - r = requests.post( - url, - headers=headers, - data=file_data, - ) - status = r.status_code - if status != 200: - response = r.json()["message"] - print(response) - else: - response = r.json()["data"] - print(response) diff --git a/scripts/csv_data_service/csv_data_service.py b/scripts/csv_data_service/csv_data_service.py index e591a4b..a94aea0 100644 --- a/scripts/csv_data_service/csv_data_service.py +++ b/scripts/csv_data_service/csv_data_service.py @@ -11,7 +11,6 @@ def __init__(self, path, max_columns, max_days): self.max_days = max_days self.columns = [] self.day_data = [] - self.assessment_data = [] self.path = path def _generate_columns(self): diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index fb9b5f5..c0bbe9b 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -27,17 +27,16 @@ def __init__(self, data_file, metadata_file): def process_file(self, path): basename = os.path.basename(path) - data_file = self.DATAFILE.match(basename) - metadata_file = self.METADATA.match(basename) + is_data_file = self.DATAFILE.match(basename) + is_metadata_file = self.METADATA.match(basename) - if data_file: - data_file_info = data_file.groupdict() + if is_data_file: + file_extension_to_dict = self.DATAFILE.match(basename).groupdict() + return self.process_data_file(path, file_extension_to_dict) + if is_metadata_file: + metadata_file_extension = self.METADATA.match(basename).groupdict() - return self.process_data_file(path, data_file_info) - if metadata_file: - metadata_file_info = metadata_file.groupdict() - - return self.process_metadata_file(path, metadata_file_info) + return self.process_metadata_file(path, metadata_file_extension) else: return None @@ -106,17 +105,15 @@ def _file_info(self, path): } def processed_data_to_json(self): - print("converting data to json") processed_data = ( json.dumps(self.data_file) if self.data_file and len(self.data_file["subject_assessments"]) > 0 else None ) - print("processing meatada to json") processed_metadata = ( json.dumps(self.metadata_file) if self.metadata_file and len(self.metadata_file["participants"]) > 0 else None ) - print("processed json") + print("processed file data to json") return processed_data, processed_metadata diff --git a/scripts/import.py b/scripts/import.py index 366d2f4..db821b8 100755 --- a/scripts/import.py +++ b/scripts/import.py @@ -3,12 +3,9 @@ import os import glob import yaml -import logging import argparse as ap from data_importer_service import data_import_service -from api import api - -logger = logging.getLogger(__name__) +from importer_service import api_importer_service def main(): @@ -21,6 +18,11 @@ def main(): config = yaml.load(fo, Loader=yaml.SafeLoader) api_url = config["api_url"] + credentials = { + "x-api-user": config["api_user"], + "x-api-key": config["api_key"], + } + api = api_importer_service.ImporterApiService(credentials) # iterate over matching files on the filesystem for file in glob.iglob(args.expr, recursive=True): data_file = {} diff --git a/scripts/api/__init__.py b/scripts/importer_service/__init__.py similarity index 100% rename from scripts/api/__init__.py rename to scripts/importer_service/__init__.py diff --git a/scripts/importer_service/api_importer_service.py b/scripts/importer_service/api_importer_service.py new file mode 100644 index 0000000..75ab675 --- /dev/null +++ b/scripts/importer_service/api_importer_service.py @@ -0,0 +1,28 @@ +import requests + + +class ImporterApiService: + def __init__( + self, + credentials, + ): + self.headers = {"content-type": "application/json", **credentials} + + def routes(self, url, path): + routes = {"metadata": url + "metadata", "day_data": url + "day"} + + return routes[path] + + def upsert_file(self, url, file_data): + r = requests.post( + url, + headers=self.headers, + data=file_data, + ) + status = r.status_code + if status != 200: + response = r.json()["message"] + print(response) + else: + response = r.json()["data"] + print(response) From c704d467f5076d91dbca0627a30ab4b0e8f0d6c9 Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Sun, 15 Oct 2023 22:48:26 -0500 Subject: [PATCH 04/21] Change keys for columns * Subject ID and uppercase Study keys needed to be updated for import --- scripts/data_importer_service/data_import_service.py | 4 ++++ tests/test_importer_service.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index c0bbe9b..3a1d4f0 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -66,6 +66,10 @@ def process_metadata_file(self, path, file_extension): metadata = self._file_info(path) participants = self._read_csv(path) + for participant in participants: + participant["study"] = participant.pop("Study") + participant["subject"] = participant.pop("Subject ID") + metadata.update( { "role": "metadata", diff --git a/tests/test_importer_service.py b/tests/test_importer_service.py index d85bad4..e490ccd 100644 --- a/tests/test_importer_service.py +++ b/tests/test_importer_service.py @@ -112,7 +112,7 @@ def test_processed_data_to_json(self): ) assert ( data_to_json[1] - == '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Subject ID": "YA1", "Active": 1, "Consent": "-", "Study": "YA"}, {"Subject ID": "YA2", "Active": 1, "Consent": "-", "Study": "YA"}]}' + == '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "-", "study": "YA", "subject": "YA1"}, {"Active": 1, "Consent": "-", "study": "YA", "subject": "YA2"}]}' ) self.importer.data_file["subject_assessments"] = [] @@ -121,5 +121,5 @@ def test_processed_data_to_json(self): assert data_to_json == ( None, - '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Subject ID": "YA1", "Active": 1, "Consent": "-", "Study": "YA"}, {"Subject ID": "YA2", "Active": 1, "Consent": "-", "Study": "YA"}]}', + '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "-", "study": "YA", "subject": "YA1"}, {"Active": 1, "Consent": "-", "study": "YA", "subject": "YA2"}]}', ) From 8a787223003a66ef29cc75ddfc36a68bc98fc86e Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Mon, 18 Dec 2023 15:10:30 -0500 Subject: [PATCH 05/21] Key change * updated key in metadata * Updated tests --- scripts/data_importer_service/data_import_service.py | 2 +- scripts/import.py | 3 +-- tests/test_importer_service.py | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index 3a1d4f0..93cdb07 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -67,8 +67,8 @@ def process_metadata_file(self, path, file_extension): participants = self._read_csv(path) for participant in participants: - participant["study"] = participant.pop("Study") participant["subject"] = participant.pop("Subject ID") + participant["study"] = participant.pop("Study") metadata.update( { diff --git a/scripts/import.py b/scripts/import.py index db821b8..fa73cd4 100755 --- a/scripts/import.py +++ b/scripts/import.py @@ -23,7 +23,7 @@ def main(): "x-api-key": config["api_key"], } api = api_importer_service.ImporterApiService(credentials) - # iterate over matching files on the filesystem + for file in glob.iglob(args.expr, recursive=True): data_file = {} metadata_file = {} @@ -32,7 +32,6 @@ def main(): ) importer_service.process_file(file) data, meta = importer_service.processed_data_to_json() - if data: api.upsert_file(api.routes(api_url, "day_data"), data) diff --git a/tests/test_importer_service.py b/tests/test_importer_service.py index e490ccd..46c2614 100644 --- a/tests/test_importer_service.py +++ b/tests/test_importer_service.py @@ -112,7 +112,7 @@ def test_processed_data_to_json(self): ) assert ( data_to_json[1] - == '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "-", "study": "YA", "subject": "YA1"}, {"Active": 1, "Consent": "-", "study": "YA", "subject": "YA2"}]}' + == '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "-", "subject": "YA1", "study": "YA"}, {"Active": 1, "Consent": "-", "subject": "YA2", "study": "YA"}]}' ) self.importer.data_file["subject_assessments"] = [] @@ -121,5 +121,5 @@ def test_processed_data_to_json(self): assert data_to_json == ( None, - '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "-", "study": "YA", "subject": "YA1"}, {"Active": 1, "Consent": "-", "study": "YA", "subject": "YA2"}]}', + '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "-", "subject": "YA1", "study": "YA"}, {"Active": 1, "Consent": "-", "subject": "YA2", "study": "YA"}]}', ) From e07ffd7e5debfb38980c2f4b381d15ecaeb5e915 Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Mon, 15 Jan 2024 09:01:54 -0600 Subject: [PATCH 06/21] Handle Values * Handle infinity and nan values * If compute error, import as string values --- .../data_import_service.py | 17 ++++++++++++++++- .../importer_service/api_importer_service.py | 2 +- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index 93cdb07..fdf4650 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -1,8 +1,10 @@ +import math import re import os import hashlib import json import polars as pl + from urllib.parse import quote import mimetypes as mt import logging @@ -57,6 +59,16 @@ def process_data_file(self, path, file_extension): subject_assessments = self._read_csv( path, ) + for assessment in subject_assessments: + for variable in assessment: + isUnsupportedValue = ( + assessment[variable] == math.inf + or assessment[variable] == -math.inf + or math.isnan(assessment[variable]) + ) + if isUnsupportedValue: + assessment[variable] = None + self.data_file.update( {"metadata": metadata, "subject_assessments": subject_assessments} ) @@ -86,7 +98,10 @@ def process_metadata_file(self, path, file_extension): return def _read_csv(self, file_path): - df = pl.read_csv(file_path) + try: + df = pl.read_csv(file_path, infer_schema_length=int(1e19)) + except pl.ComputeError: + df = pl.read_csv(file_path, infer_schema_length=0) return df.to_dicts() def _file_info(self, path): diff --git a/scripts/importer_service/api_importer_service.py b/scripts/importer_service/api_importer_service.py index 75ab675..610fb38 100644 --- a/scripts/importer_service/api_importer_service.py +++ b/scripts/importer_service/api_importer_service.py @@ -21,7 +21,7 @@ def upsert_file(self, url, file_data): ) status = r.status_code if status != 200: - response = r.json()["message"] + response = r.json() print(response) else: response = r.json()["data"] From 4ef295a89614cb494e5361241883b1f7cb523804 Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Tue, 16 Jan 2024 09:38:14 -0600 Subject: [PATCH 07/21] String * String values --- scripts/data_importer_service/data_import_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index fdf4650..a8a3413 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -67,7 +67,7 @@ def process_data_file(self, path, file_extension): or math.isnan(assessment[variable]) ) if isUnsupportedValue: - assessment[variable] = None + assessment[variable] = "{var}".format(var=assessment[variable]) self.data_file.update( {"metadata": metadata, "subject_assessments": subject_assessments} From c756537fb8620b16cd51073efd082c15a763be0e Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Tue, 16 Jan 2024 09:42:01 -0600 Subject: [PATCH 08/21] Nan * Fix nan --- scripts/data_importer_service/data_import_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index a8a3413..185ae2b 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -64,10 +64,10 @@ def process_data_file(self, path, file_extension): isUnsupportedValue = ( assessment[variable] == math.inf or assessment[variable] == -math.inf - or math.isnan(assessment[variable]) + or assessment[variable] != assessment[variable] ) if isUnsupportedValue: - assessment[variable] = "{var}".format(var=assessment[variable]) + assessment[variable] = None self.data_file.update( {"metadata": metadata, "subject_assessments": subject_assessments} From b013a59a037824e5d41c1ceb63ca0aa61795a476 Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Mon, 5 Feb 2024 17:31:57 -0600 Subject: [PATCH 09/21] Updates * Updates to import script * Add assessment, study and subject properties to data * Update Tests * Added route to restarte metadata collection --- .../data_import_service.py | 28 +++++++++++++------ .../importer_service/api_importer_service.py | 10 +++++++ scripts/refresh_metadata.py | 28 +++++++++++++++++++ tests/test_importer_service.py | 19 ++++--------- 4 files changed, 62 insertions(+), 23 deletions(-) create mode 100644 scripts/refresh_metadata.py diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index 185ae2b..be4a496 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -44,22 +44,30 @@ def process_file(self, path): def process_data_file(self, path, file_extension): file_extension.update({"time_end": file_extension["end"]}) - collection_base = "{study}{subject}{assessment}".format( - study=file_extension["study"], - subject=file_extension["subject"], - assessment=file_extension["assessment"], - ).encode("utf-8") - hash_collection = hashlib.sha256(collection_base).hexdigest() + study = file_extension["study"] + subject = file_extension["subject"] + assessmentName = file_extension["assessment"] + metadata = self._file_info(path) - metadata.update( - {"role": "data", "collection": hash_collection, **file_extension} - ) + metadata.update({"role": "data", **file_extension}) + del file_extension["extension"] subject_assessments = self._read_csv( path, ) for assessment in subject_assessments: + assessment["subject"] = subject + assessment["site"] = study + assessment["assessment"] = assessmentName + + if hasattr(assessment, "subject_id"): + assessment.pop("subject_id") + if hasattr(assessment, "subjectid"): + assessment.pop("subjectid") + if hasattr(assessment, "study"): + assessment.pop("study") + for variable in assessment: isUnsupportedValue = ( assessment[variable] == math.inf @@ -81,6 +89,8 @@ def process_metadata_file(self, path, file_extension): for participant in participants: participant["subject"] = participant.pop("Subject ID") participant["study"] = participant.pop("Study") + # participant["Consent"] = "2022-06-02" + # participant["synced"] = "2024-02-02" metadata.update( { diff --git a/scripts/importer_service/api_importer_service.py b/scripts/importer_service/api_importer_service.py index 610fb38..9a6ff1b 100644 --- a/scripts/importer_service/api_importer_service.py +++ b/scripts/importer_service/api_importer_service.py @@ -26,3 +26,13 @@ def upsert_file(self, url, file_data): else: response = r.json()["data"] print(response) + + def refresh_metadata_collection(self, url): + r = requests.delete(url, headers=self.headers) + status = r.status_code + + if status != 200: + print("There was an error.", r.json()["message"]) + else: + response = r.json()["data"] + print(response) diff --git a/scripts/refresh_metadata.py b/scripts/refresh_metadata.py new file mode 100644 index 0000000..d08fa58 --- /dev/null +++ b/scripts/refresh_metadata.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python + +import os +import yaml +import argparse as ap +from importer_service import api_importer_service + + +def main(): + parser = ap.ArgumentParser() + parser.add_argument("-c", "--config") + args = parser.parse_args() + + with open(os.path.expanduser(args.config), "r") as fo: + config = yaml.load(fo, Loader=yaml.SafeLoader) + + api_url = config["api_url"] + credentials = { + "x-api-user": config["api_user"], + "x-api-key": config["api_key"], + } + api = api_importer_service.ImporterApiService(credentials) + print(api.routes(api_url, "metadata"), "THE ROUTE") + api.refresh_metadata_collection(api.routes(api_url, "metadata")) + + +if __name__ == "__main__": + main() diff --git a/tests/test_importer_service.py b/tests/test_importer_service.py index 46c2614..4c40531 100644 --- a/tests/test_importer_service.py +++ b/tests/test_importer_service.py @@ -80,8 +80,8 @@ def test_processed_data_to_json(self): "mode": 0o644, } participants = [ - {"Subject ID": "YA1", "Active": 1, "Consent": "-", "Study": "YA"}, - {"Subject ID": "YA2", "Active": 1, "Consent": "-", "Study": "YA"}, + {"Subject ID": "YA1", "Active": 1, "Consent": "2022-06-02", "Study": "YA"}, + {"Subject ID": "YA2", "Active": 1, "Consent": "2022-06-02", "Study": "YA"}, ] with patch.object( @@ -99,20 +99,11 @@ def test_processed_data_to_json(self): assert ( data_to_json[0] - == '{"metadata": {"path": "study-subject-assessment-day1to4.csv", "filetype": ' - '"text/csv", "encoding": "utf-8", "basename": ' - '"study-subject-assessment-day1to4.csv", "dirname": "/path/to/files", ' - '"mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, ' - '"role": "data", "collection": ' - '"5e74265e4a4d3760737bbf39a513a938b5bd333a958c699d5006ae026ae0017a", "study": ' - '"study", "subject": "subject", "assessment": "assessment", "units": "day", ' - '"start": "1", "end": "4", "extension": ".csv", "time_end": "4"}, ' - '"subject_assessments": [{"var1": 1, "var2": 2, "var3": "str"}, {"var1": 2, ' - '"var2": 2, "var3": "str", "var4": 5, "var6": 6, "var7": "str2"}]}' + == '{"metadata": {"path": "study-subject-assessment-day1to4.csv", "filetype": "text/csv", "encoding": "utf-8", "basename": "study-subject-assessment-day1to4.csv", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "data", "study": "study", "subject": "subject", "assessment": "assessment", "units": "day", "start": "1", "end": "4", "extension": ".csv", "time_end": "4"}, "subject_assessments": [{"var1": 1, "var2": 2, "var3": "str", "subject": "subject", "site": "study", "assessment": "assessment"}, {"var1": 2, "var2": 2, "var3": "str", "var4": 5, "var6": 6, "var7": "str2", "subject": "subject", "site": "study", "assessment": "assessment"}]}' ) assert ( data_to_json[1] - == '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "-", "subject": "YA1", "study": "YA"}, {"Active": 1, "Consent": "-", "subject": "YA2", "study": "YA"}]}' + == '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "2022-06-02", "subject": "YA1", "study": "YA"}, {"Active": 1, "Consent": "2022-06-02", "subject": "YA2", "study": "YA"}]}' ) self.importer.data_file["subject_assessments"] = [] @@ -121,5 +112,5 @@ def test_processed_data_to_json(self): assert data_to_json == ( None, - '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "-", "subject": "YA1", "study": "YA"}, {"Active": 1, "Consent": "-", "subject": "YA2", "study": "YA"}]}', + '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "2022-06-02", "subject": "YA1", "study": "YA"}, {"Active": 1, "Consent": "2022-06-02", "subject": "YA2", "study": "YA"}]}', ) From adfdd5d4b8976f8b0ccfab275bb51cd12c776448 Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Tue, 6 Feb 2024 10:26:52 -0600 Subject: [PATCH 10/21] Add updates * Updates to import --- .../data_import_service.py | 30 +++++++------------ 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index be4a496..c88496e 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -44,30 +44,22 @@ def process_file(self, path): def process_data_file(self, path, file_extension): file_extension.update({"time_end": file_extension["end"]}) - study = file_extension["study"] - subject = file_extension["subject"] - assessmentName = file_extension["assessment"] - + collection_base = "{study}{subject}{assessment}".format( + study=file_extension["study"], + subject=file_extension["subject"], + assessment=file_extension["assessment"], + ).encode("utf-8") + hash_collection = hashlib.sha256(collection_base).hexdigest() metadata = self._file_info(path) - metadata.update({"role": "data", **file_extension}) - + metadata.update( + {"role": "data", "collection": hash_collection, **file_extension} + ) del file_extension["extension"] subject_assessments = self._read_csv( path, ) for assessment in subject_assessments: - assessment["subject"] = subject - assessment["site"] = study - assessment["assessment"] = assessmentName - - if hasattr(assessment, "subject_id"): - assessment.pop("subject_id") - if hasattr(assessment, "subjectid"): - assessment.pop("subjectid") - if hasattr(assessment, "study"): - assessment.pop("study") - for variable in assessment: isUnsupportedValue = ( assessment[variable] == math.inf @@ -89,8 +81,8 @@ def process_metadata_file(self, path, file_extension): for participant in participants: participant["subject"] = participant.pop("Subject ID") participant["study"] = participant.pop("Study") - # participant["Consent"] = "2022-06-02" - # participant["synced"] = "2024-02-02" + participant["Consent"] = "2022-06-02" + participant["synced"] = "2022-06-02" metadata.update( { From 4ebafb3c2a4a5d9671635e78729e4663f5ea2d72 Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Tue, 6 Feb 2024 13:27:14 -0600 Subject: [PATCH 11/21] Support for new data structure * Updates for new data migration structure --- .../data_import_service.py | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index c88496e..d06e79e 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -14,7 +14,7 @@ class DataImporterService: DATAFILE = re.compile( - r"(?P\w+)\-(?P\w+)\-(?P\w+)\-(?Pday)(?P[+-]?\d+(?:\.\d+)?)to(?P[+-]?\d+(?:\.\d+)?)(?P.csv)" + r"(?P\w+)\-(?P\w+)\-(?P\w+)\-(?Pday)(?P[+-]?\d+(?:\.\d+)?)to(?P[+-]?\d+(?:\.\d+)?)(?P.csv)" ) METADATA = re.compile(r"(?P\w+)\_metadata(?P.csv)") GLOB_SUB = re.compile( @@ -44,22 +44,16 @@ def process_file(self, path): def process_data_file(self, path, file_extension): file_extension.update({"time_end": file_extension["end"]}) - collection_base = "{study}{subject}{assessment}".format( - study=file_extension["study"], - subject=file_extension["subject"], - assessment=file_extension["assessment"], - ).encode("utf-8") - hash_collection = hashlib.sha256(collection_base).hexdigest() metadata = self._file_info(path) - metadata.update( - {"role": "data", "collection": hash_collection, **file_extension} - ) + metadata.update({"role": "data", **file_extension}) + del file_extension["extension"] - subject_assessments = self._read_csv( + participant_assessments = self._read_csv( path, ) - for assessment in subject_assessments: + for assessment in participant_assessments: + for variable in assessment: isUnsupportedValue = ( assessment[variable] == math.inf @@ -70,7 +64,7 @@ def process_data_file(self, path, file_extension): assessment[variable] = None self.data_file.update( - {"metadata": metadata, "subject_assessments": subject_assessments} + {"metadata": metadata, "participant_assessments": participant_assessments} ) return @@ -79,10 +73,10 @@ def process_metadata_file(self, path, file_extension): participants = self._read_csv(path) for participant in participants: - participant["subject"] = participant.pop("Subject ID") + participant["participant"] = participant.pop("Subject ID") participant["study"] = participant.pop("Study") - participant["Consent"] = "2022-06-02" - participant["synced"] = "2022-06-02" + # participant["Consent"] = "2022-06-02" + # participant["synced"] = "2024-02-02" metadata.update( { @@ -128,7 +122,7 @@ def _file_info(self, path): def processed_data_to_json(self): processed_data = ( json.dumps(self.data_file) - if self.data_file and len(self.data_file["subject_assessments"]) > 0 + if self.data_file and len(self.data_file["participant_assessments"]) > 0 else None ) processed_metadata = ( From 3483283206cff574381b5b40bc4886fdb297cd8c Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Tue, 6 Feb 2024 13:29:10 -0600 Subject: [PATCH 12/21] Updated tests * Updated tests --- tests/test_importer_service.py | 36 ++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/tests/test_importer_service.py b/tests/test_importer_service.py index 4c40531..7264c9b 100644 --- a/tests/test_importer_service.py +++ b/tests/test_importer_service.py @@ -20,7 +20,7 @@ def test_file_divergence(self): with patch.object( self.importer, "process_data_file" ) as mocked_process_data_file: - data_file_path = "study-subject-assessment-day1to4.csv" + data_file_path = "study-participant-assessment-day1to4.csv" self.importer.process_file(data_file_path) mocked_process_data_file.assert_called_once() @@ -34,15 +34,15 @@ def test_file_divergence(self): mocked_process_metadata_file.assert_called_once() def test_processed_data_to_json(self): - data_file_path = "study-subject-assessment-day1to4.csv" + data_file_path = "study-participant-assessment-day1to4.csv" data_file_extension_to_dict = self.importer.DATAFILE.match( data_file_path ).groupdict() mock_data_file_info = { - "path": "study-subject-assessment-day1to4.csv", + "path": "study-participant-assessment-day1to4.csv", "filetype": "text/csv", "encoding": "utf-8", - "basename": "study-subject-assessment-day1to4.csv", + "basename": "study-participant-assessment-day1to4.csv", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, @@ -50,7 +50,7 @@ def test_processed_data_to_json(self): "gid": 1000, "mode": 0o644, } - subject_assessments = [ + participant_assessments = [ { "var1": 1, "var2": 2, @@ -80,13 +80,25 @@ def test_processed_data_to_json(self): "mode": 0o644, } participants = [ - {"Subject ID": "YA1", "Active": 1, "Consent": "2022-06-02", "Study": "YA"}, - {"Subject ID": "YA2", "Active": 1, "Consent": "2022-06-02", "Study": "YA"}, + { + "Subject ID": "YA1", + "Active": 1, + "Consent": "2022-06-02", + "Study": "YA", + }, + { + "Subject ID": "YA2", + "Active": 1, + "Consent": "2022-06-02", + "Study": "YA", + }, ] with patch.object( self.importer, "_file_info", return_value=mock_data_file_info - ), patch.object(self.importer, "_read_csv", return_value=subject_assessments): + ), patch.object( + self.importer, "_read_csv", return_value=participant_assessments + ): self.importer.process_data_file(data_file_path, data_file_extension_to_dict) with patch.object( self.importer, "_file_info", return_value=mock_file_info @@ -99,18 +111,18 @@ def test_processed_data_to_json(self): assert ( data_to_json[0] - == '{"metadata": {"path": "study-subject-assessment-day1to4.csv", "filetype": "text/csv", "encoding": "utf-8", "basename": "study-subject-assessment-day1to4.csv", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "data", "study": "study", "subject": "subject", "assessment": "assessment", "units": "day", "start": "1", "end": "4", "extension": ".csv", "time_end": "4"}, "subject_assessments": [{"var1": 1, "var2": 2, "var3": "str", "subject": "subject", "site": "study", "assessment": "assessment"}, {"var1": 2, "var2": 2, "var3": "str", "var4": 5, "var6": 6, "var7": "str2", "subject": "subject", "site": "study", "assessment": "assessment"}]}' + == '{"metadata": {"path": "study-participant-assessment-day1to4.csv", "filetype": "text/csv", "encoding": "utf-8", "basename": "study-participant-assessment-day1to4.csv", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "data", "study": "study", "participant": "participant", "assessment": "assessment", "units": "day", "start": "1", "end": "4", "extension": ".csv", "time_end": "4"}, "participant_assessments": [{"var1": 1, "var2": 2, "var3": "str"}, {"var1": 2, "var2": 2, "var3": "str", "var4": 5, "var6": 6, "var7": "str2"}]}' ) assert ( data_to_json[1] - == '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "2022-06-02", "subject": "YA1", "study": "YA"}, {"Active": 1, "Consent": "2022-06-02", "subject": "YA2", "study": "YA"}]}' + == '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "2022-06-02", "participant": "YA1", "study": "YA"}, {"Active": 1, "Consent": "2022-06-02", "participant": "YA2", "study": "YA"}]}' ) - self.importer.data_file["subject_assessments"] = [] + self.importer.data_file["participant_assessments"] = [] data_to_json = self.importer.processed_data_to_json() assert data_to_json == ( None, - '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "2022-06-02", "subject": "YA1", "study": "YA"}, {"Active": 1, "Consent": "2022-06-02", "subject": "YA2", "study": "YA"}]}', + '{"metadata": {"filetype": "text/csv", "encoding": "utf-8", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "metadata", "study": "site", "extension": ".csv"}, "participants": [{"Active": 1, "Consent": "2022-06-02", "participant": "YA1", "study": "YA"}, {"Active": 1, "Consent": "2022-06-02", "participant": "YA2", "study": "YA"}]}', ) From f08527f792543474202e28525321bca445d4935c Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Tue, 20 Feb 2024 17:36:56 -0600 Subject: [PATCH 13/21] Test constn * Test consent and synced --- scripts/data_importer_service/data_import_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index d06e79e..9b2e200 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -75,8 +75,8 @@ def process_metadata_file(self, path, file_extension): for participant in participants: participant["participant"] = participant.pop("Subject ID") participant["study"] = participant.pop("Study") - # participant["Consent"] = "2022-06-02" - # participant["synced"] = "2024-02-02" + participant["Consent"] = "2022-06-02" + participant["synced"] = "2024-02-02" metadata.update( { From 2dc1a0488427e7eef4f10a2cc2b59a09b43644f0 Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Tue, 9 Apr 2024 08:09:17 -0600 Subject: [PATCH 14/21] Extract variables * extract variables from assessment --- .../data_import_service.py | 21 +++++++++++++------ tests/test_importer_service.py | 4 +++- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index 9b2e200..4f67fe7 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -1,7 +1,6 @@ import math import re import os -import hashlib import json import polars as pl @@ -43,8 +42,10 @@ def process_file(self, path): return None def process_data_file(self, path, file_extension): - file_extension.update({"time_end": file_extension["end"]}) metadata = self._file_info(path) + assessment_variables = [] + + file_extension.update({"time_end": file_extension["end"]}) metadata.update({"role": "data", **file_extension}) del file_extension["extension"] @@ -55,6 +56,8 @@ def process_data_file(self, path, file_extension): for assessment in participant_assessments: for variable in assessment: + assessment_variables.append(variable) + isUnsupportedValue = ( assessment[variable] == math.inf or assessment[variable] == -math.inf @@ -63,9 +66,18 @@ def process_data_file(self, path, file_extension): if isUnsupportedValue: assessment[variable] = None + var_set = set(assessment_variables) + assessment_variables = list(var_set) + assessment_variables.sort() + self.data_file.update( - {"metadata": metadata, "participant_assessments": participant_assessments} + { + "metadata": metadata, + "participant_assessments": participant_assessments, + "assessment_variables": assessment_variables, + } ) + return def process_metadata_file(self, path, file_extension): @@ -75,8 +87,6 @@ def process_metadata_file(self, path, file_extension): for participant in participants: participant["participant"] = participant.pop("Subject ID") participant["study"] = participant.pop("Study") - participant["Consent"] = "2022-06-02" - participant["synced"] = "2024-02-02" metadata.update( { @@ -130,5 +140,4 @@ def processed_data_to_json(self): if self.metadata_file and len(self.metadata_file["participants"]) > 0 else None ) - print("processed file data to json") return processed_data, processed_metadata diff --git a/tests/test_importer_service.py b/tests/test_importer_service.py index 7264c9b..9fad22b 100644 --- a/tests/test_importer_service.py +++ b/tests/test_importer_service.py @@ -1,5 +1,6 @@ from unittest import TestCase from unittest.mock import patch +import pprint from scripts.data_importer_service.data_import_service import DataImporterService @@ -108,10 +109,11 @@ def test_processed_data_to_json(self): ) data_to_json = self.importer.processed_data_to_json() + pprint.pprint(data_to_json) assert ( data_to_json[0] - == '{"metadata": {"path": "study-participant-assessment-day1to4.csv", "filetype": "text/csv", "encoding": "utf-8", "basename": "study-participant-assessment-day1to4.csv", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "data", "study": "study", "participant": "participant", "assessment": "assessment", "units": "day", "start": "1", "end": "4", "extension": ".csv", "time_end": "4"}, "participant_assessments": [{"var1": 1, "var2": 2, "var3": "str"}, {"var1": 2, "var2": 2, "var3": "str", "var4": 5, "var6": 6, "var7": "str2"}]}' + == '{"metadata": {"path": "study-participant-assessment-day1to4.csv", "filetype": "text/csv", "encoding": "utf-8", "basename": "study-participant-assessment-day1to4.csv", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "data", "study": "study", "participant": "participant", "assessment": "assessment", "units": "day", "start": "1", "end": "4", "extension": ".csv", "time_end": "4"}, "participant_assessments": [{"var1": 1, "var2": 2, "var3": "str"}, {"var1": 2, "var2": 2, "var3": "str", "var4": 5, "var6": 6, "var7": "str2"}], "assessment_variables": ["var1", "var2", "var3", "var4", "var6", "var7"]}' ) assert ( data_to_json[1] From 4bbacf45887105fbc8059a71db488c2fe1c53900 Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Tue, 9 Apr 2024 08:22:57 -0600 Subject: [PATCH 15/21] remove print log * removed print --- tests/test_importer_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_importer_service.py b/tests/test_importer_service.py index 9fad22b..cba0e9e 100644 --- a/tests/test_importer_service.py +++ b/tests/test_importer_service.py @@ -109,7 +109,6 @@ def test_processed_data_to_json(self): ) data_to_json = self.importer.processed_data_to_json() - pprint.pprint(data_to_json) assert ( data_to_json[0] From dae624fbb44975fa66addf7cf794ed44a65e79cb Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Tue, 9 Apr 2024 08:51:52 -0600 Subject: [PATCH 16/21] variables dictionary * added variables as dictionaries --- scripts/data_importer_service/data_import_service.py | 4 ++++ tests/test_importer_service.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index 4f67fe7..a7ba56d 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -69,6 +69,10 @@ def process_data_file(self, path, file_extension): var_set = set(assessment_variables) assessment_variables = list(var_set) assessment_variables.sort() + assessment_variables = [ + dict(name=variable, assessment=metadata.get("assessment")) + for variable in assessment_variables + ] self.data_file.update( { diff --git a/tests/test_importer_service.py b/tests/test_importer_service.py index cba0e9e..9442c4d 100644 --- a/tests/test_importer_service.py +++ b/tests/test_importer_service.py @@ -112,7 +112,7 @@ def test_processed_data_to_json(self): assert ( data_to_json[0] - == '{"metadata": {"path": "study-participant-assessment-day1to4.csv", "filetype": "text/csv", "encoding": "utf-8", "basename": "study-participant-assessment-day1to4.csv", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "data", "study": "study", "participant": "participant", "assessment": "assessment", "units": "day", "start": "1", "end": "4", "extension": ".csv", "time_end": "4"}, "participant_assessments": [{"var1": 1, "var2": 2, "var3": "str"}, {"var1": 2, "var2": 2, "var3": "str", "var4": 5, "var6": 6, "var7": "str2"}], "assessment_variables": ["var1", "var2", "var3", "var4", "var6", "var7"]}' + == '{"metadata": {"path": "study-participant-assessment-day1to4.csv", "filetype": "text/csv", "encoding": "utf-8", "basename": "study-participant-assessment-day1to4.csv", "dirname": "/path/to/files", "mtime": 1234567890.0, "size": 1024, "uid": 1000, "gid": 1000, "mode": 420, "role": "data", "study": "study", "participant": "participant", "assessment": "assessment", "units": "day", "start": "1", "end": "4", "extension": ".csv", "time_end": "4"}, "participant_assessments": [{"var1": 1, "var2": 2, "var3": "str"}, {"var1": 2, "var2": 2, "var3": "str", "var4": 5, "var6": 6, "var7": "str2"}], "assessment_variables": [{"name": "var1", "assessment": "assessment"}, {"name": "var2", "assessment": "assessment"}, {"name": "var3", "assessment": "assessment"}, {"name": "var4", "assessment": "assessment"}, {"name": "var6", "assessment": "assessment"}, {"name": "var7", "assessment": "assessment"}]}' ) assert ( data_to_json[1] From f03fc80c1d6019fe2275c11f57b7d4a9c72d3f97 Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Tue, 9 Apr 2024 09:34:28 -0600 Subject: [PATCH 17/21] PR comments * Pr comments --- scripts/data_importer_service/data_import_service.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/scripts/data_importer_service/data_import_service.py b/scripts/data_importer_service/data_import_service.py index a7ba56d..2d17165 100644 --- a/scripts/data_importer_service/data_import_service.py +++ b/scripts/data_importer_service/data_import_service.py @@ -53,6 +53,7 @@ def process_data_file(self, path, file_extension): participant_assessments = self._read_csv( path, ) + for assessment in participant_assessments: for variable in assessment: @@ -67,11 +68,9 @@ def process_data_file(self, path, file_extension): assessment[variable] = None var_set = set(assessment_variables) - assessment_variables = list(var_set) - assessment_variables.sort() assessment_variables = [ dict(name=variable, assessment=metadata.get("assessment")) - for variable in assessment_variables + for variable in sorted(var_set) ] self.data_file.update( From cf69b8455b077045cef3becbddeeccd33abcb1d1 Mon Sep 17 00:00:00 2001 From: Ivan Trujillo Ayala Date: Tue, 28 May 2024 10:19:49 -0400 Subject: [PATCH 18/21] print struct * print structure --- scripts/import.py | 5 +++++ scripts/importer_service/api_importer_service.py | 10 ++++++---- scripts/refresh_metadata.py | 1 - 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/scripts/import.py b/scripts/import.py index fa73cd4..747c740 100755 --- a/scripts/import.py +++ b/scripts/import.py @@ -33,9 +33,14 @@ def main(): importer_service.process_file(file) data, meta = importer_service.processed_data_to_json() if data: + print("Day Data struct:") + print(data) api.upsert_file(api.routes(api_url, "day_data"), data) if meta: + print("Metadata struct:") + print(meta) + api.upsert_file(api.routes(api_url, "metadata"), meta) diff --git a/scripts/importer_service/api_importer_service.py b/scripts/importer_service/api_importer_service.py index 9a6ff1b..a35c5f0 100644 --- a/scripts/importer_service/api_importer_service.py +++ b/scripts/importer_service/api_importer_service.py @@ -20,12 +20,14 @@ def upsert_file(self, url, file_data): data=file_data, ) status = r.status_code - if status != 200: + try: response = r.json() print(response) - else: - response = r.json()["data"] - print(response) + except Exception as e: + print(status) + print(f"Error: {e}") + print(r) + pass def refresh_metadata_collection(self, url): r = requests.delete(url, headers=self.headers) diff --git a/scripts/refresh_metadata.py b/scripts/refresh_metadata.py index d08fa58..83f0456 100644 --- a/scripts/refresh_metadata.py +++ b/scripts/refresh_metadata.py @@ -20,7 +20,6 @@ def main(): "x-api-key": config["api_key"], } api = api_importer_service.ImporterApiService(credentials) - print(api.routes(api_url, "metadata"), "THE ROUTE") api.refresh_metadata_collection(api.routes(api_url, "metadata")) From bb1ac638efdbd4e32c7bb85dc69a74da346b6503 Mon Sep 17 00:00:00 2001 From: Nicholas Maloney Date: Thu, 5 Jun 2025 09:38:33 -0400 Subject: [PATCH 19/21] Allow SSL verification to be bypassed When connecting with local/self-signed/dev environments it is sometimes necessary to bypass SSL verification. This commit adds a config option to do so but it is not the default nor is it considered secure/best-practice. --- README.md | 24 +++++++++++++++---- examples/config.yml | 8 +++---- requirements.txt | 4 +++- scripts/import.py | 7 +++++- .../importer_service/api_importer_service.py | 5 +++- 5 files changed, 37 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index b98c201..c20605f 100644 --- a/README.md +++ b/README.md @@ -11,18 +11,34 @@ simple [`glob`]() expression. ## Installation -Just use `pip` +Option 1: Install via `pip` + ```bash pip install https://github.com/AMP-SCZ/dpimport.git ``` +Option 2: Clone the repository and run manually + +```bash +git clone https://github.com/AMP-SCZ/dpimport.git +cd dpimport +python -m venv venv +source venv/bin/activate +pip install -r requirements.txt +python scripts/import.py -c config.yml '/PHOENIX/GENERAL/STUDY_A/SUB_001/DATA_TYPE/processed/*.csv' +``` + ## Configuration DPimport requires a configuration file in YAML format, passed as a command -line argument with `-c|--config`, for establishing a MongoDB database -connection. You will find an example configuration file in the `examples` -directory within this repository. +line argument with `-c|--config`, for connecting with an instance of the DPdash +application API. The configuration file should contain the following fields: + +api_url - Endpoint for the DPdash API +api_user - Username for the DPdash API +api_key - API key for the DPdash API +verify_ssl - Whether to verify SSL certificates (default: True) ## Usage diff --git a/examples/config.yml b/examples/config.yml index 3084b06..e020203 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -1,7 +1,7 @@ -csv_directory: './test_data' columns: 5 max_days: 45 participant_count: 5 -api_url: http://localhost:8000/api/v1/import/data/ -api_user: importer -api_key: super_secret +api_url: https://dpdash.local/api/v1/import/data/ +api_user: api-user-1 +api_key: api-key-1 +verify_ssl: True \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index d6e1198..f7a244e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ --e . +polars +pyaml +requests diff --git a/scripts/import.py b/scripts/import.py index 747c740..fb9a75f 100755 --- a/scripts/import.py +++ b/scripts/import.py @@ -22,7 +22,12 @@ def main(): "x-api-user": config["api_user"], "x-api-key": config["api_key"], } - api = api_importer_service.ImporterApiService(credentials) + if("verify_ssl" in config and config["verify_ssl"] == False): + verify_ssl = False + else: + verify_ssl = True + + api = api_importer_service.ImporterApiService(credentials, verify_ssl) for file in glob.iglob(args.expr, recursive=True): data_file = {} diff --git a/scripts/importer_service/api_importer_service.py b/scripts/importer_service/api_importer_service.py index a35c5f0..be5a20d 100644 --- a/scripts/importer_service/api_importer_service.py +++ b/scripts/importer_service/api_importer_service.py @@ -5,8 +5,10 @@ class ImporterApiService: def __init__( self, credentials, + verify_ssl=True, ): self.headers = {"content-type": "application/json", **credentials} + self.verify_ssl = verify_ssl def routes(self, url, path): routes = {"metadata": url + "metadata", "day_data": url + "day"} @@ -18,6 +20,7 @@ def upsert_file(self, url, file_data): url, headers=self.headers, data=file_data, + verify=self.verify_ssl, ) status = r.status_code try: @@ -30,7 +33,7 @@ def upsert_file(self, url, file_data): pass def refresh_metadata_collection(self, url): - r = requests.delete(url, headers=self.headers) + r = requests.delete(url, headers=self.headers, verify=self.verify_ssl) status = r.status_code if status != 200: From bb14b7967e56ce60b361c27ef7647c0799543c1d Mon Sep 17 00:00:00 2001 From: "Billah, Tashrif" <35086881+tashrifbillah@users.noreply.github.com> Date: Wed, 1 Oct 2025 09:54:21 -0400 Subject: [PATCH 20/21] omit printing of imported data --- scripts/import.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/scripts/import.py b/scripts/import.py index fb9a75f..20e9688 100755 --- a/scripts/import.py +++ b/scripts/import.py @@ -38,14 +38,13 @@ def main(): importer_service.process_file(file) data, meta = importer_service.processed_data_to_json() if data: - print("Day Data struct:") - print(data) + # print("Day Data struct:") + # print(data) api.upsert_file(api.routes(api_url, "day_data"), data) if meta: - print("Metadata struct:") - print(meta) - + # print("Metadata struct:") + # print(meta) api.upsert_file(api.routes(api_url, "metadata"), meta) From dbf699fefdfc24f14b495ba3964d39374f8b3111 Mon Sep 17 00:00:00 2001 From: "Billah, Tashrif" <35086881+tashrifbillah@users.noreply.github.com> Date: Wed, 1 Oct 2025 09:55:17 -0400 Subject: [PATCH 21/21] omit unhelpful ssl_verify=False warning This should also speed up the import API. --- scripts/importer_service/api_importer_service.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/importer_service/api_importer_service.py b/scripts/importer_service/api_importer_service.py index be5a20d..46a4ae6 100644 --- a/scripts/importer_service/api_importer_service.py +++ b/scripts/importer_service/api_importer_service.py @@ -1,5 +1,6 @@ import requests - +import urllib3 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class ImporterApiService: def __init__(