diff --git a/.github/workflows/build-push-dev-image.yml b/.github/workflows/build-push-dev-image.yml index 13f8cfb7..b029d568 100644 --- a/.github/workflows/build-push-dev-image.yml +++ b/.github/workflows/build-push-dev-image.yml @@ -6,6 +6,7 @@ on: push: branches: - develop + - package-refactor paths-ignore: - README.md - .old_cicd/* @@ -83,4 +84,4 @@ jobs: containers.renci.org/${{ github.repository }}:develop containers.renci.org/${{ github.repository }}:${{ steps.vars.outputs.short_sha }} cache-from: type=registry,ref=${{ github.repository }}:buildcache-dev - cache-to: type=registry,ref=${{ github.repository }}:buildcache-dev,mode=max \ No newline at end of file + cache-to: type=registry,ref=${{ github.repository }}:buildcache-dev,mode=max diff --git a/Dockerfile b/Dockerfile index 47f32824..54db03be 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ # Use a Debian-based image for better compatibility -FROM python:3.11.14-slim-trixie +FROM python:3.12-slim-trixie # Set Airflow version and home directory -ARG AIRFLOW_VERSION=3.1.5 +ARG AIRFLOW_VERSION=3.1.7 ARG AIRFLOW_HOME=/opt/airflow # Environment variables @@ -37,6 +37,10 @@ RUN pip install --no-cache-dir \ "apache-airflow-providers-cncf-kubernetes" \ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-3.11.txt" +# Fix auth rollback bug. +RUN pip install --no-cache-dir \ + "apache-airflow-providers-fab==3.3.0rc1" + # Optional: install extra packages RUN pip install --no-cache-dir psycopg2-binary redis @@ -46,7 +50,8 @@ RUN pip install -r /tmp/requirements.txt RUN rm /tmp/requirements.txt - +# COPY . /opt/roger +# RUN pip install /opt/roger RUN apt-get purge -y --auto-remove \ build-essential \ @@ -57,6 +62,8 @@ RUN apt-get purge -y --auto-remove \ git && \ apt-get clean +RUN if [ -n "$ROGER_SOURCE" ]; then pip install -e $ROGER_SOURCE; fi + # Set ownership RUN chown -R airflow:airflow ${AIRFLOW_HOME} @@ -64,6 +71,8 @@ RUN chown -R airflow:airflow ${AIRFLOW_HOME} USER airflow WORKDIR ${AIRFLOW_HOME} +ENV PYTHONPATH=/opt/airflow/dags/repo/src/ + # Expose Airflow webserver port EXPOSE 8080 diff --git a/bin/dug_annotate/Makefile b/bin/dug_annotate/Makefile index 34d350ce..745c01de 100644 --- a/bin/dug_annotate/Makefile +++ b/bin/dug_annotate/Makefile @@ -33,12 +33,12 @@ clean: $(RM) -rf ${KGX_DIR} get_input_files: - $(TIME) python ${CLI_WRAPPER} -gd + $(TIME) roger -gd annotate_and_normalize: - $(TIME) python ${CLI_WRAPPER} -l + $(TIME) roger -l create_kgx_files: - $(TIME) python ${CLI_WRAPPER} -t + $(TIME) roger -t all: get_input_files annotate_and_normalize create_kgx_files diff --git a/bin/dug_indexing/Makefile b/bin/dug_indexing/Makefile index 5d015089..479f2b2c 100644 --- a/bin/dug_indexing/Makefile +++ b/bin/dug_indexing/Makefile @@ -19,7 +19,6 @@ THIS_MAKEFILE_PATH:=$(word $(words $(MAKEFILE_LIST)),$(MAKEFILE_LIST)) THIS_DIR:=$(shell cd $(dir $(THIS_MAKEFILE_PATH));pwd) ROGER_HOME=${THIS_DIR}/../.. -CLI_WRAPPER=${ROGER_HOME}/cli.py # Override Roger data dir ENV INDEXING_DIR=${ROGERENV_DATA__ROOT}/dug/expanded_concepts @@ -34,19 +33,19 @@ clean: $(RM) -rf ${CRAWL_DIR} crawl_concepts: - $(TIME) python ${CLI_WRAPPER} -C + $(TIME) roger -C index_concepts: crawl_concepts - $(TIME) python ${CLI_WRAPPER} -ic + $(TIME) roger -ic index_variables: - $(TIME) python ${CLI_WRAPPER} -iv + $(TIME) roger -iv validate_indexed_concepts: index_concepts - $(TIME) python ${CLI_WRAPPER} -vc + $(TIME) roger -vc validate_indexed_variables: index_variables - $(TIME) python ${CLI_WRAPPER} -vv + $(TIME) roger -vv all: validate_indexed_concepts validate_indexed_variables diff --git a/cli.py b/cli.py deleted file mode 100644 index be77525a..00000000 --- a/cli.py +++ /dev/null @@ -1,112 +0,0 @@ -import roger.core.base as RogerUtil -from roger.config import config -from roger.logger import get_logger -from dug_helpers.dug_utils import DugUtil, get_topmed_files, get_dbgap_files, get_sparc_files, get_anvil_files, get_nida_files -import sys -import argparse -import os -import time - - -log = get_logger() - -if __name__ == "__main__": - start = time.time() - log.info(f"Start TIME:{start}") - parser = argparse.ArgumentParser(description='Roger common cli tool.') - """ Common CLI. """ - parser.add_argument('-d', '--data-root', help="Root of data hierarchy", default=None) - - """ Roger CLI. """ - parser.add_argument('-v', '--dataset-version', help="Dataset version.", default="v1.0") - parser.add_argument('-g', '--get-kgx', help="Get KGX objects", action='store_true') - parser.add_argument('-s', '--create-schema', help="Infer schema", action='store_true') - parser.add_argument('-m', '--merge-kgx', help="Merge KGX nodes", action='store_true') - parser.add_argument('-b', '--create-bulk', help="Create bulk load", action='store_true') - parser.add_argument('-i', '--insert', help="Do the bulk insert", action='store_true') - parser.add_argument('-a', '--validate', help="Validate the insert", action='store_true') - - """ Dug Annotation CLI. """ - parser.add_argument('-gd', '--get_dug_input_files', help="Gets input files for annotation", - action="store_true") - parser.add_argument('-l', '--load-and-annotate',help="Annotates and normalizes datasets of varaibles.", - action="store_true") - parser.add_argument('-t', '--make-tagged-kg', help="Creates KGX files from annotated variable datesets.", - action="store_true") - - """ Dug indexing CLI . """ - parser.add_argument('-iv', '--index-variables', help="Index annotated variables to elastic search.", - action="store_true") - parser.add_argument('-C', '--crawl-concepts', help="Crawl tranql and index concepts", - action="store_true") - - parser.add_argument('-ic', '--index-concepts', help="Index expanded concepts to elastic search.", - action="store_true") - - parser.add_argument('-vc', '--validate-concepts', help="Validates indexing of concepts", - action="store_true") - - parser.add_argument('-vv', '--validate-variables', help="Validates indexing of variables", - action="store_true") - - args = parser.parse_args () - - if args.data_root is not None: - data_root = args.data_root - config.data_root = data_root - log.info (f"data root:{data_root}") - - # When all lights are on... - - # Annotation comes first - if args.get_dug_input_files: - get_topmed_files(config) - get_dbgap_files(config) - get_anvil_files(config) - # get_sparc_files(config) - # get_nida_files(config) - - if args.load_and_annotate: - DugUtil.clear_annotation_cached(config=config) - DugUtil.annotate_db_gap_files(config=config) - DugUtil.annotate_topmed_files(config=config) - DugUtil.annotate_anvil_files(config=config) - if args.make_tagged_kg: - DugUtil.make_kg_tagged(config=config) - - # Roger things - if args.get_kgx: - RogerUtil.get_kgx(config=config) - if args.merge_kgx: - RogerUtil.merge_nodes(config=config) - if args.create_schema: - RogerUtil.create_schema(config=config) - if args.create_bulk: - RogerUtil.create_bulk_load(config=config) - if args.insert: - RogerUtil.bulk_load(config=config) - if args.validate: - RogerUtil.validate(config=config) - RogerUtil.check_tranql(config=config) - - # Back to dug indexing - if args.index_variables: - DugUtil.index_variables(config=config) - - if args.validate_variables: - DugUtil.validate_indexed_variables(config=config) - - if args.crawl_concepts: - DugUtil.crawl_tranql(config=config) - - if args.index_concepts: - DugUtil.index_concepts(config=config) - - if args.validate_concepts: - DugUtil.validate_indexed_concepts(config=config) - - end = time.time() - time_elapsed = end - start - log.info(f"Completion TIME:{time_elapsed}") - - sys.exit (0) diff --git a/dags/annotate_and_index.py b/dags/annotate_and_index.py index e4cdfd98..4729e90c 100644 --- a/dags/annotate_and_index.py +++ b/dags/annotate_and_index.py @@ -9,8 +9,8 @@ import os from airflow.models import DAG -from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.empty import EmptyOperator +from airflow.providers.standard.operators.python import PythonOperator from roger.tasks import default_args, create_pipeline_taskgroup, logger, create_python_task env_enabled_datasets = os.getenv( @@ -52,35 +52,5 @@ init >> create_pipeline_taskgroup(dag, pipeline_class, config) >> finish - - - -with DAG( - dag_id='dag_test', - default_args=default_args, - params= - { - "repository_id": None, - "branch_name": None, - "commitid_from": None, - "commitid_to": None - }, - # schedule_interval=None -) as dag: - - init = EmptyOperator(task_id="init", dag=dag) - finish = EmptyOperator(task_id="finish", dag=dag) - - def print_context(ds=None, **kwargs): - print(">>>All kwargs") - print(kwargs) - print(">>>All ds") - print(ds) - - - init >> create_python_task(dag, "get_from_lakefs", print_context) >> finish - - #run_this = PythonOperator(task_id="print_the_context", python_callable=print_context) - if __name__ == "__main__": - dag.test() \ No newline at end of file + dag.test() diff --git a/dags/dug_helpers/__init__.py b/dags/dug_helpers/__init__.py deleted file mode 100644 index 9e28ad28..00000000 --- a/dags/dug_helpers/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from pathlib import Path - -DUG_DATA_DIR = Path(__file__).parent.resolve() / 'dug_data' diff --git a/dags/dug_helpers/dug_utils.py b/dags/dug_helpers/dug_utils.py deleted file mode 100644 index 52db9624..00000000 --- a/dags/dug_helpers/dug_utils.py +++ /dev/null @@ -1,1019 +0,0 @@ -import asyncio -import hashlib -import logging -import os -import re -import tarfile -import traceback -from functools import reduce -from io import StringIO -from pathlib import Path -from typing import Union, List - -import requests -from dug.core import get_parser, get_annotator, get_plugin_manager, DugConcept -from dug.core.annotators._base import Annotator -from dug.core.concept_expander import ConceptExpander -from dug.core.crawler import Crawler -from dug.core.factory import DugFactory -from dug.core.parsers import Parser, DugElement -from dug.core.async_search import Search -from dug.core.index import Index - -from roger.config import RogerConfig -from roger.core import storage -from roger.models.biolink import BiolinkModel -from roger.logger import get_logger -from utils.s3_utils import S3Utils - -log = get_logger() - - - -class Dug: - - def __init__(self, config: RogerConfig, to_string=True): - self.config = config - self.bl_toolkit = BiolinkModel() - dug_conf = config.to_dug_conf() - self.element_mapping = config.indexing.element_mapping - self.factory = DugFactory(dug_conf) - self.cached_session = self.factory.build_http_session() - self.event_loop = asyncio.new_event_loop() - if to_string: - self.log_stream = StringIO() - self.string_handler = logging.StreamHandler(self.log_stream) - log.addHandler(self.string_handler) - - self.annotator_name: str = config.annotation.annotator_type - - self.tranqlizer: ConceptExpander = self.factory.build_tranqlizer() - - graph_name = self.config["redisgraph"]["graph"] - source = f"redis:{graph_name}" - self.tranql_queries: dict = self.factory.build_tranql_queries(source) - self.node_to_element_queries: list = self.factory.build_element_extraction_parameters(source) - - indexing_config = config.indexing - self.variables_index = indexing_config.get('variables_index') - self.concepts_index = indexing_config.get('concepts_index') - self.kg_index = indexing_config.get('kg_index') - - self.search_obj: Search = self.factory.build_search_obj([ - self.variables_index, - self.concepts_index, - self.kg_index, - ]) - self.index_obj: Index = self.factory.build_indexer_obj([ - self.variables_index, - self.concepts_index, - self.kg_index, - - ]) - - def __enter__(self): - self.event_loop = asyncio.new_event_loop() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - # close elastic search connection - self.event_loop.run_until_complete(self.search_obj.es.close()) - # close async loop - if self.event_loop.is_running() and not self.event_loop.is_closed(): - self.event_loop.close() - if exc_type or exc_val or exc_tb: - traceback.print_exc() - log.error(f"{exc_val} {exc_val} {exc_tb}") - log.exception("Got an exception") - - def annotate_files(self, parser_name, parsable_files, - output_data_path=None): - """ - Annotates a Data element file using a Dug parser. - :param parser_name: Name of Dug parser to use. - :param parsable_files: Files to parse. - :return: None. - """ - dug_plugin_manager = get_plugin_manager() - parser: Parser = get_parser(dug_plugin_manager.hook, parser_name) - annotator: Annotator = get_annotator(dug_plugin_manager.hook, annotator_name=self.annotator_name, config=self.config.to_dug_conf()) - if not output_data_path: - output_data_path = storage.dug_annotation_path('') - log.info("Parsing files") - for parse_file in parsable_files: - log.debug("Creating Dug Crawler object") - crawler = Crawler( - crawl_file=parse_file, - parser=parser, - annotator=annotator, - tranqlizer='', - tranql_queries=[], - http_session=self.cached_session - ) - - # configure output space. - current_file_name = '.'.join(os.path.basename(parse_file).split('.')[:-1]) - elements_file_path = os.path.join(output_data_path, current_file_name) - elements_file_name = 'elements.pickle' - concepts_file_name = 'concepts.pickle' - - # create an empty elements file. This also creates output dir if it doesn't exist. - log.debug(f"Creating empty file: {elements_file_path}/element_file.json") - storage.write_object({}, os.path.join(elements_file_path, 'element_file.json')) - log.debug(parse_file) - log.debug(parser) - elements = parser(parse_file) - log.debug(elements) - crawler.elements = elements - - # @TODO propose for Dug to make this a crawler class init parameter(??) - crawler.crawlspace = elements_file_path - log.debug(f"Crawler annotator: {crawler.annotator}") - crawler.annotate_elements() - - # Extract out the concepts gotten out of annotation - # Extract out the elements - non_expanded_concepts = crawler.concepts - elements = crawler.elements - - # Write pickles of objects to file - log.info(f"Parsed and annotated: {parse_file}") - elements_out_file = os.path.join(elements_file_path, elements_file_name) - storage.write_object(elements, elements_out_file) - log.info(f"Pickled annotated elements to : {elements_file_path}/{elements_file_name}") - concepts_out_file = os.path.join(elements_file_path, concepts_file_name) - storage.write_object(non_expanded_concepts, concepts_out_file) - log.info(f"Pickled annotated concepts to : {elements_file_path}/{concepts_file_name}") - - def make_edge(self, - subj, - obj, - predicate='biolink:related_to', - predicate_label='related to', - relation='biolink:related_to', - relation_label='related to' - ): - """ - Create an edge between two nodes. - - :param subj: The identifier of the subject. - :param pred: The predicate linking the subject and object. - :param obj: The object of the relation. - :param predicate: Biolink compatible edge type. - :param predicate_label: Edge label. - :param relation: Ontological edge type. - :param relation_label: Ontological edge type label. - :returns: Returns and edge. - """ - edge_id = hashlib.md5(f'{subj}{predicate}{obj}'.encode('utf-8')).hexdigest() - return { - "subject": subj, - "predicate": predicate, - "predicate_label": predicate_label, - "id": edge_id, - "relation": relation, - "relation_label": relation_label, - "object": obj, - "provided_by": "renci.bdc.semanticsearch.annotator" - } - - def convert_to_kgx_json(self, elements, written_nodes=set()): - """ - Given an annotated and normalized set of study variables, - generate a KGX compliant graph given the normalized annotations. - Write that grpah to a graph database. - See BioLink Model for category descriptions. https://biolink.github.io/biolink-model/notes.html - """ - graph = { - "nodes": [], - "edges": [] - } - edges = graph['edges'] - nodes = graph['nodes'] - - for index, element in enumerate(elements): - # DugElement means a variable (Study variable...) - if not isinstance(element, DugElement): - continue - study_id = element.collection_id - if study_id not in written_nodes: - nodes.append({ - "id": study_id, - "category": ["biolink:Study"], - "name": study_id - }) - written_nodes.add(study_id) - """ connect the study and the variable. """ - edges.append(self.make_edge( - subj=element.id, - relation_label='part of', - relation='BFO:0000050', - obj=study_id, - predicate='biolink:part_of', - predicate_label='part of')) - edges.append(self.make_edge( - subj=study_id, - relation_label='has part', - relation="BFO:0000051", - obj=element.id, - predicate='biolink:has_part', - predicate_label='has part')) - - """ a node for the variable. Should be BL compatible """ - variable_node = { - "id": element.id, - "name": element.name, - "category": ["biolink:StudyVariable"], - "description": element.description.replace("'", '`').replace('\n', ' ') # bulk loader parsing issue - } - if element.id not in written_nodes: - nodes.append(variable_node) - written_nodes.add(element.id) - - for identifier, metadata in element.concepts.items(): - identifier_object = metadata.identifiers.get(identifier) - # This logic is treating DBGap files. - # First item in current DBGap xml files is a topmed tag, - # This is treated as a DugConcept Object. But since its not - # a concept we get from annotation (?) its never added to - # variable.concepts.items (Where variable is a DugElement object) - # The following logic is trying to extract types, and for the - # aformentioned topmed tag it adds `biolink:InfomrmationContentEntity` - # Maybe a better solution could be adding types on DugConcept objects - # More specifically Biolink compatible types (?) - # - if identifier_object: - category = identifier_object.types - elif identifier.startswith("TOPMED.TAG:"): - category = ["biolink:InformationContentEntity"] - else: - continue - if identifier not in written_nodes: - if isinstance(category, str): - bl_element = self.bl_toolkit.toolkit.get_element(category) - category = [bl_element.class_uri or bl_element.slot_uri] - nodes.append({ - "id": identifier, - "category": category, - "name": metadata.name - }) - written_nodes.add(identifier) - # related to edge - edges.append(self.make_edge( - subj=element.id, - obj=identifier - )) - # related to edge - edges.append(self.make_edge( - subj=identifier, - obj=element.id)) - return graph - - def make_tagged_kg(self, elements): - """ Make a Translator standard knowledge graph representing - tagged study variables. - :param variables: The variables to model. - :param tags: The tags characterizing the variables. - :returns: Returns dictionary with nodes and edges modeling a Translator/Biolink KG. - """ - graph = { - "nodes": [], - "edges": [] - } - edges = graph['edges'] - nodes = graph['nodes'] - studies = {} - - """ Create graph elements to model tags and their - links to identifiers gathered by semantic tagging. """ - tag_map = {} - # @TODO extract this into config or maybe dug ?? - topmed_tag_concept_type = "TOPMed Phenotype Concept" - nodes_written = set() - for tag in elements: - if not (isinstance(tag, DugConcept) and tag.type == topmed_tag_concept_type): - continue - tag_id = tag.id - tag_map[tag_id] = tag - nodes.append({ - "id": tag_id, - "name": tag.name, - "description": tag.description.replace("'", "`"), - "category": ["biolink:InformationContentEntity"] - }) - """ Link ontology identifiers we've found for this tag via nlp. """ - for identifier, metadata in tag.identifiers.items(): - if isinstance(metadata.types, str): - bl_element = self.bl_toolkit.toolkit.get_element(metadata.types) - category = [bl_element.class_uri or bl_element.slot_uri] - else: - category = metadata.types - synonyms = metadata.synonyms if metadata.synonyms else [] - nodes.append({ - "id": identifier, - "name": metadata.label, - "category": category, - "synonyms": synonyms - }) - nodes_written.add(identifier) - edges.append(self.make_edge( - subj=tag_id, - obj=identifier)) - edges.append(self.make_edge( - subj=identifier, - obj=tag_id)) - - concepts_graph = self.convert_to_kgx_json(elements, written_nodes=nodes_written) - graph['nodes'] += concepts_graph['nodes'] - graph['edges'] += concepts_graph['edges'] - - return graph - - def index_elements(self, elements_file): - log.info(f"Indexing {elements_file}...") - elements = storage.read_object(elements_file) - count = 0 - total = len(elements) - # Index Annotated Elements - log.info(f"found {len(elements)} from elements files.") - for element in elements: - count += 1 - # Only index DugElements as concepts will be indexed differently in next step - if not isinstance(element, DugConcept): - # override data-type with mapping values - if element.type.lower() in self.element_mapping: - element.type = self.element_mapping[element.type.lower()] - self.index_obj.index_element(element, index=self.variables_index) - percent_complete = (count / total) * 100 - if percent_complete % 10 == 0: - log.info(f"{percent_complete} %") - log.info(f"Done indexing {elements_file}.") - - def validate_indexed_elements(self, elements_file): - elements = [x for x in storage.read_object(elements_file) if not isinstance(x, DugConcept)] - # Pick ~ 10 % - sample_size = int(len(elements) * 0.1) - test_elements = elements[:sample_size] # random.choices(elements, k=sample_size) - log.info(f"Picked {len(test_elements)} from {elements_file} for validation.") - for element in test_elements: - # Pick a concept - concepts = [element.concepts[curie] for curie in element.concepts if element.concepts[curie].name] - - if len(concepts): - # Pick the first concept - concept = concepts[0] - curie = concept.id - search_term = re.sub(r'[^a-zA-Z0-9_\ ]+', '', concept.name) - log.debug(f"Searching for Concept: {curie} and Search term: {search_term}") - all_elements_ids = self._search_elements(curie, search_term) - present = element.id in all_elements_ids - if not present: - log.error(f"Did not find expected variable {element.id} in search result.") - log.error(f"Concept id : {concept.id}, Search term: {search_term}") - raise Exception(f"Validation exception - did not find variable {element.id} " - f"from {str(elements_file)}" - f"when searching variable index with" - f" Concept ID : {concept.id} using Search Term : {search_term} ") - else: - log.info( - f"{element.id} has no concepts annotated. Skipping validation for it." - ) - - def _search_elements(self, curie, search_term): - response = self.event_loop.run_until_complete(self.search_obj.search_vars_unscored( - concept=curie, - query=search_term - )) - ids_dict = [] - if 'total_items' in response: - if response['total_items'] == 0: - log.error(f"No search elements returned for variable search: {self.variables_index}.") - log.error(f"Concept id : {curie}, Search term: {search_term}") - raise Exception(f"Validation error - Did not find {curie} for" - f"Search term: {search_term}") - else: - del response['total_items'] - for element_type in response: - all_elements_ids = [e['id'] for e in - reduce(lambda x, y: x + y['elements'], response[element_type], [])] - ids_dict += all_elements_ids - return ids_dict - - def crawl_concepts(self, concepts, data_set_name): - """ - Adds tranql KG to Concepts, terms grabbed from KG are also added as search terms - :param concepts: - :param data_set_name: - :return: - """ - crawl_dir = storage.dug_crawl_path('crawl_output') - output_file_name = os.path.join(data_set_name, 'expanded_concepts.pickle') - extracted_dug_elements_file_name = os.path.join(data_set_name, 'extracted_graph_elements.pickle') - output_file = storage.dug_expanded_concepts_path(output_file_name) - extracted_output_file = storage.dug_expanded_concepts_path(extracted_dug_elements_file_name) - Path(crawl_dir).mkdir(parents=True, exist_ok=True) - extracted_dug_elements = [] - log.debug("Creating Dug Crawler object") - crawler = Crawler( - crawl_file="", - parser=None, - annotator=None, - tranqlizer=self.tranqlizer, - tranql_queries=self.tranql_queries, - http_session=self.cached_session, - ) - crawler.crawlspace = crawl_dir - counter = 0 - total = len(concepts) - for concept_id, concept in concepts.items(): - counter += 1 - try: - crawler.expand_concept(concept) - concept.set_search_terms() - concept.set_optional_terms() - except Exception as e: - log.error(concept) - raise e - for query in self.node_to_element_queries: - log.info(query) - casting_config = query['casting_config'] - tranql_source = query['tranql_source'] - dug_element_type = query['output_dug_type'] - new_elements = crawler.expand_to_dug_element( - concept=concept, - casting_config=casting_config, - dug_element_type=dug_element_type, - tranql_source=tranql_source - ) - log.debug("extracted:") - log.debug(str(list([el.get_searchable_dict() for el in new_elements]))) - extracted_dug_elements += new_elements - concept.clean() - percent_complete = int((counter / total) * 100) - if percent_complete % 10 == 0: - log.info(f"{percent_complete}%") - storage.write_object(obj=concepts, path=output_file) - storage.write_object(obj=extracted_dug_elements, path=extracted_output_file) - - def index_concepts(self, concepts): - log.info("Indexing Concepts") - total = len(concepts) - count = 0 - for concept_id, concept in concepts.items(): - count += 1 - self.index_obj.index_concept(concept, index=self.concepts_index) - # Index knowledge graph answers for each concept - for kg_answer_id, kg_answer in concept.kg_answers.items(): - self.index_obj.index_kg_answer( - concept_id=concept_id, - kg_answer=kg_answer, - index=self.kg_index, - id_suffix=kg_answer_id - ) - percent_complete = int((count / total) * 100) - if percent_complete % 10 == 0: - log.info(f"{percent_complete} %") - log.info("Done Indexing concepts") - - def validate_indexed_concepts(self, elements, concepts): - """ - Validates linked concepts are searchable - :param elements: Annotated dug elements - :param concepts: Crawled (expanded) concepts - :return: - """ - # 1 . Find concepts with KG <= 10% of all concepts, - # <= because we might have no results for some concepts from tranql - sample_concepts = {key: value for key, value in concepts.items() if value.kg_answers} - if len(concepts) == 0: - log.info(f"No Concepts found.") - return - log.info( - f"Found only {len(sample_concepts)} Concepts with Knowledge graph out of {len(concepts)}. {(len(sample_concepts) / len(concepts)) * 100} %") - # 2. pick elements that have concepts in the sample concepts set - sample_elements = {} - for element in elements: - if isinstance(element, DugConcept): - continue - for concept in element.concepts: - # add elements that have kg - if concept in sample_concepts: - sample_elements[concept] = sample_elements.get(concept, set()) - sample_elements[concept].add(element.id) - - # Time for some validation - for curie in concepts: - concept = concepts[curie] - if not len(concept.kg_answers): - continue - search_terms = [] - for key in concept.kg_answers: - kg_object = concept.kg_answers[key] - search_terms += kg_object.get_node_names() - search_terms += kg_object.get_node_synonyms() - # reduce(lambda x,y: x + y, [[node.get("name")] + node.get("synonyms", []) - # for node in concept.kg_answers["knowledge_graph"]["nodes"]], []) - # validation here is that for any of these nodes we should get back - # the variable. - # make unique - search_terms_cap = 10 - search_terms = list(set(search_terms))[:search_terms_cap] - log.debug(f"Using {len(search_terms)} Search terms for concept {curie}") - for search_term in search_terms: - # avoids elastic failure due to some reserved characters - # 'search_phase_execution_exception', 'token_mgr_error: Lexical error ... - search_term = re.sub(r'[^a-zA-Z0-9_\ ]+', '', search_term) - - searched_element_ids = self._search_elements(curie, search_term) - - if curie not in sample_elements: - log.error(f"Did not find Curie id {curie} in Elements.") - log.error(f"Concept id : {concept.id}, Search term: {search_term}") - raise Exception(f"Validation error - Did not find {element.id} for" - f" Concept id : {concept.id}, Search term: {search_term}") - else: - present = bool(len([x for x in sample_elements[curie] if x in searched_element_ids])) - if not present: - log.error(f"Did not find expected variable {element.id} in search result.") - log.error(f"Concept id : {concept.id}, Search term: {search_term}") - raise Exception(f"Validation error - Did not find {element.id} for" - f" Concept id : {concept.id}, Search term: {search_term}") - - def clear_index(self, index_id): - exists = self.search_obj.es.indices.exists(index=index_id) - if exists: - log.info(f"Deleting index {index_id}") - response = self.event_loop.run_until_complete(self.search_obj.es.indices.delete(index=index_id)) - log.info(f"Cleared Elastic : {response}") - log.info("Re-initializing the indicies") - self.index_obj.init_indices() - - def clear_variables_index(self): - self.clear_index(self.variables_index) - - def clear_kg_index(self): - self.clear_index(self.kg_index) - - def clear_concepts_index(self): - self.clear_index(self.concepts_index) - - -class DugUtil(): - - @staticmethod - def clear_annotation_cached(config=None, to_string=False): - with Dug(config, to_string=to_string) as dug: - annotation_path = storage.dug_annotation_path("") - storage.clear_dir(annotation_path) - # Clear http session cache - if config.annotation.clear_http_cache: - dug.cached_session.cache.clear() - - @staticmethod - def annotate_db_gap_files(config=None, to_string=False, input_data_path=None, output_data_path=None): - with Dug(config, to_string=to_string) as dug: - if not input_data_path: - files = storage.dug_dd_xml_objects( - input_data_path=input_data_path) - else: - files = storage.get_files_recursive( - lambda x: True, input_data_path - ) - parser_name = "DbGaP" - dug.annotate_files(parser_name=parser_name, - parsable_files=files, - output_data_path=output_data_path) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def annotate_anvil_files(config=None, to_string=False, - input_data_path=None, output_data_path=None): - with Dug(config, to_string=to_string) as dug: - if not input_data_path: - files = storage.dug_anvil_objects( - input_data_path=input_data_path) - else: - files = storage.get_files_recursive( - lambda x: True, input_data_path - ) - parser_name = "Anvil" - dug.annotate_files(parser_name=parser_name, - parsable_files=files, - output_data_path=output_data_path) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def annotate_cancer_commons_files(config=None, to_string=False, - input_data_path=None, - output_data_path=None): - with Dug(config, to_string=to_string) as dug: - if not input_data_path: - files = storage.dug_crdc_objects( - input_data_path=input_data_path) - else: - files = storage.get_files_recursive( - lambda x: True, input_data_path - ) - parser_name = "crdc" - dug.annotate_files(parser_name=parser_name, - parsable_files=files, - output_data_path=output_data_path) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def annotate_kids_first_files(config=None, to_string=False, - input_data_path=None, output_data_path=None): - with Dug(config, to_string=to_string) as dug: - if not input_data_path: - files = storage.dug_kfdrc_objects( - input_data_path=input_data_path) - else: - files = storage.get_files_recursive( - lambda x: True, input_data_path - ) - parser_name = "kfdrc" - dug.annotate_files(parser_name=parser_name, - parsable_files=files, - output_data_path=output_data_path) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def annotate_nida_files(config=None, to_string=False, - input_data_path=None, output_data_path=None): - with Dug(config, to_string=to_string) as dug: - if not input_data_path: - files = storage.dug_nida_objects( - input_data_path=input_data_path) - else: - files = storage.get_files_recursive( - lambda x: True, input_data_path - ) - parser_name = "NIDA" - dug.annotate_files(parser_name=parser_name, - parsable_files=files, - output_data_path=output_data_path) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def annotate_sparc_files(config=None, to_string=False, - input_data_path=None, output_data_path=None): - with Dug(config, to_string=to_string) as dug: - if not input_data_path: - files = storage.dug_sparc_objects( - input_data_path=input_data_path) - else: - files = storage.get_files_recursive( - lambda x: True, input_data_path - ) - parser_name = "SciCrunch" - dug.annotate_files(parser_name=parser_name, - parsable_files=files, - output_data_path=output_data_path) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def annotate_sprint_files(config=None, to_string=False, - input_data_path=None, output_data_path=None): - with Dug(config, to_string=to_string) as dug: - if not input_data_path: - files = storage.dug_sprint_objects( - input_data_path=input_data_path) - else: - files = storage.get_files_recursive( - lambda x: True, input_data_path - ) - parser_name = "SPRINT" - dug.annotate_files(parser_name=parser_name, - parsable_files=files, - output_data_path=output_data_path) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def annotate_topmed_files(config=None, to_string=False, - input_data_path=None, output_data_path=None): - with Dug(config, to_string=to_string) as dug: - if not input_data_path: - files = storage.dug_topmed_objects( - input_data_path=None) - else: - files = storage.get_files_recursive( - lambda x: True, input_data_path - ) - parser_name = "TOPMedTag" - log.info(files) - dug.annotate_files(parser_name=parser_name, - parsable_files=files, - output_data_path=output_data_path) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def annotate_bacpac_files(config=None, to_string=False, - input_data_path=None, output_data_path=None): - - log.info(f"Input data path is: {input_data_path}") - with Dug(config, to_string=to_string) as dug: - files = storage.dug_bacpac_objects( - input_data_path=input_data_path) - - parser_name = "BACPAC" - log.info(files) - dug.annotate_files(parser_name=parser_name, - parsable_files=files, - output_data_path=output_data_path) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - - @staticmethod - def annotate_heal_study_files(config=None, to_string=False, - input_data_path=None, output_data_path=None): - with Dug(config, to_string=to_string) as dug: - if not input_data_path: - files = storage.dug_heal_study_objects( - input_data_path=None) - else: - files = storage.get_files_recursive( - lambda x: True, input_data_path - ) - - parser_name = "heal-studies" - log.info(files) - dug.annotate_files(parser_name=parser_name, - parsable_files=files, - output_data_path=output_data_path) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - - @staticmethod - def annotate_heal_research_program_files(config=None, to_string=False, - input_data_path=None, - output_data_path=None): - with Dug(config, to_string=to_string) as dug: - if not input_data_path: - files = storage.dug_heal_research_program_objects( - input_data_path=None) - else: - files = storage.get_files_recursive( - lambda x: True, input_data_path - ) - parser_name = "heal-research" - log.info(files) - dug.annotate_files(parser_name=parser_name, - parsable_files=files, - output_data_path=output_data_path) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def make_kg_tagged(config=None, to_string=False, input_data_path=None, output_data_path=None): - with Dug(config, to_string=to_string) as dug: - output_base_path = output_data_path - if not output_data_path: - output_base_path = storage.dug_kgx_path("") - storage.clear_dir(output_base_path) - log.info("Starting building KGX files") - if not input_data_path: - elements_files = storage.dug_elements_objects() - else: - import glob - glob_pattern = str(input_data_path / "**" / 'elements.pickle') - elements_files = glob.glob(glob_pattern, recursive=True) - log.info(f"making kgx files for the following pickles: {elements_files}") - for file in elements_files: - elements = storage.read_object(file) - if "topmed_" in file: - kg = dug.make_tagged_kg(elements) - else: - kg = dug.convert_to_kgx_json(elements) - dug_base_file_name = file.split(os.path.sep)[-2] - output_file_path = os.path.join(output_base_path, dug_base_file_name + '_kgx.json') - storage.write_object(kg, output_file_path) - log.info(f"Wrote {len(kg['nodes'])} nodes and {len(kg['edges'])} edges, to {output_file_path}.") - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def index_variables(config=None, to_string=False): - with Dug(config, to_string=to_string) as dug: - dug.clear_variables_index() - elements_object_files = storage.dug_elements_objects() - for file in elements_object_files: - dug.index_elements(file) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def index_extracted_elements(config=None, to_string=False): - with Dug(config, to_string=to_string) as dug: - elements_object_files = storage.dug_extracted_elements_objects() - for file in elements_object_files: - dug.index_elements(file) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def index_concepts(config=None, to_string=False): - with Dug(config=config, to_string=to_string) as dug: - # These are concepts that have knowledge graphs from tranql - # clear out concepts and kg indicies from previous runs - dug.clear_concepts_index() - dug.clear_kg_index() - expanded_concepts_files = storage.dug_expanded_concept_objects() - for file in expanded_concepts_files: - concepts = storage.read_object(file) - dug.index_concepts(concepts=concepts) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def validate_indexed_variables(config=None, to_string=False): - with Dug(config, to_string=to_string) as dug: - elements_object_files = storage.dug_elements_objects() - for elements_object_file in elements_object_files: - log.info(f"Validating {elements_object_file}") - dug.validate_indexed_elements(elements_object_file) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def crawl_tranql(config=None, to_string=False): - log.info(config.dict) - with Dug(config, to_string=to_string) as dug: - concepts_files = storage.dug_concepts_objects() - crawl_dir = storage.dug_crawl_path('crawl_output') - log.info(f'Clearing crawl output dir {crawl_dir}') - storage.clear_dir(crawl_dir) - expanded_concepts_dir = storage.dug_expanded_concepts_path("") - log.info(f'Clearing expanded concepts dir: {expanded_concepts_dir}') - storage.clear_dir(expanded_concepts_dir) - log.info(f'Crawling Dug Concepts, found {len(concepts_files)} file(s).') - for file in concepts_files: - data_set = storage.read_object(file) - original_variables_dataset_name = os.path.split(os.path.dirname(file))[-1] - dug.crawl_concepts(concepts=data_set, - data_set_name=original_variables_dataset_name) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - @staticmethod - def validate_indexed_concepts(config=None, to_string=False): - with Dug(config, to_string=to_string) as dug: - get_data_set_name = lambda file: os.path.split(os.path.dirname(file))[-1] - expanded_concepts_files_dict = { - get_data_set_name(file): file for file in storage.dug_expanded_concept_objects() - } - annotated_elements_files_dict = { - get_data_set_name(file): file for file in storage.dug_elements_objects() - } - try: - assert len(expanded_concepts_files_dict) == len(annotated_elements_files_dict) - except: - log.error("Files Annotated Elements files and Expanded concepts files, should be pairs") - if len(expanded_concepts_files_dict) > len(annotated_elements_files_dict): - log.error("Some Annotated Elements files (from load_and_annotate task) are missing") - else: - log.error("Some Expanded Concepts files (from crawl task) are missing") - log.error(f"Annotated Datasets : {list(annotated_elements_files_dict.keys())}") - log.error(f"Expanded Concepts Datasets: {list(expanded_concepts_files_dict.keys())}") - exit(-1) - for data_set_name in annotated_elements_files_dict: - log.debug(f"Reading concepts and elements for dataset {data_set_name}") - elements_file_path = annotated_elements_files_dict[data_set_name] - concepts_file_path = expanded_concepts_files_dict[data_set_name] - dug_elements = storage.read_object(elements_file_path) - dug_concepts = storage.read_object(concepts_file_path) - log.debug(f"Read {len(dug_elements)} elements, and {len(dug_concepts)} Concepts") - log.info(f"Validating {data_set_name}") - dug.validate_indexed_concepts(elements=dug_elements, concepts=dug_concepts) - output_log = dug.log_stream.getvalue() if to_string else '' - return output_log - - -class FileFetcher: - - def __init__( - self, - remote_host: str, - remote_dir: Union[str, Path], - local_dir: Union[str, Path] = "." - ): - self.remote_host = remote_host - self.remote_dir = remote_dir.rstrip("/") if isinstance(remote_dir, str) else str(remote_dir.as_posix()) - self.local_dir = Path(local_dir).resolve() - - def __call__(self, remote_file_path: Union[str, Path]) -> Path: - remote_path = self.remote_dir + "/" + remote_file_path - local_path = self.local_dir / remote_file_path - url = f"{self.remote_host}{remote_path}" - log.debug(f"Fetching {url}") - try: - response = requests.get(url, allow_redirects=True) - except Exception as e: - log.error(f"Unexpected {e.__class__.__name__}: {e}") - raise RuntimeError(f"Unable to fetch {url}") - else: - log.debug(f"Response: {response.status_code}") - if response.status_code == 200: - with local_path.open('wb') as file_obj: - file_obj.write(response.content) - return local_path - else: - log.debug(f"Unable to fetch {url}: {response.status_code}") - raise RuntimeError(f"Unable to fetch {url}") - - -def get_versioned_files(config: RogerConfig, data_format, output_file_path, data_store="s3", unzip=False): - """ - Fetches a dug inpu data files to input file directory - """ - meta_data = storage.read_relative_object("../../metadata.yaml") - output_dir: Path = storage.dug_input_files_path(output_file_path) - # clear dir - storage.clear_dir(output_dir) - data_sets = config.dug_inputs.data_sets - log.info(f"dataset: {data_sets}") - pulled_files = [] - s3_utils = S3Utils(config.s3_config) - for data_set in data_sets: - data_set_name, current_version = data_set.split(':') - for item in meta_data["dug_inputs"]["versions"]: - if item["version"] == current_version and item["name"] == data_set_name and item["format"] == data_format: - if data_store == "s3": - for filename in item["files"]["s3"]: - log.info(f"Fetching {filename}") - output_name = filename.split('/')[-1] - output_path = output_dir / output_name - s3_utils.get( - str(filename), - str(output_path), - ) - if unzip: - log.info(f"Unzipping {output_path}") - tar = tarfile.open(str(output_path)) - tar.extractall(path=output_dir) - pulled_files.append(output_path) - else: - for filename in item["files"]["stars"]: - log.info(f"Fetching {filename}") - # fetch from stars - remote_host = config.annotation_base_data_uri - fetch = FileFetcher( - remote_host=remote_host, - remote_dir=current_version, - local_dir=output_dir) - output_path = fetch(filename) - if unzip: - log.info(f"Unzipping {output_path}") - tar = tarfile.open(str(output_path)) - tar.extractall(path=output_dir) - pulled_files.append(output_path) - return [str(filename) for filename in pulled_files] - - -def get_dbgap_files(config: RogerConfig, to_string=False) -> List[str]: - return get_versioned_files(config, 'dbGaP', 'db_gap', data_store=config.dug_inputs.data_source, unzip=True) - - -def get_nida_files(config: RogerConfig, to_string=False) -> List[str]: - return get_versioned_files(config, "nida", "nida", data_store=config.dug_inputs.data_source, unzip=True) - - -def get_sparc_files(config: RogerConfig, to_string=False) -> List[str]: - return get_versioned_files(config, "sparc", "sparc", data_store=config.dug_inputs.data_source, unzip=True) - - -def get_anvil_files(config: RogerConfig, to_string=False) -> List[str]: - return get_versioned_files(config, "anvil", "anvil", data_store=config.dug_inputs.data_source, unzip=True) - - -def get_kids_first_files(config: RogerConfig, to_string=False) -> List[str]: - return get_versioned_files(config, "kfdrc", "kfdrc", data_store=config.dug_inputs.data_source, unzip=True) - - -def get_cancer_data_commons_files(config: RogerConfig, to_string=False) -> List[str]: - return get_versioned_files(config, "crdc", "crdc", data_store=config.dug_inputs.data_source, unzip=True) - - -def get_sprint_files(config: RogerConfig, to_string=False) -> List[str]: - return get_versioned_files(config, "sprint", "sprint", data_store=config.dug_inputs.data_source, unzip=True) - -def get_bacpac_files(config: RogerConfig, to_string=False) -> List[str]: - return get_versioned_files(config, "bacpac", "bacpac", data_store=config.dug_inputs.data_source, unzip=True) - -def get_topmed_files(config: RogerConfig, to_string=False) -> List[str]: - return get_versioned_files(config, "topmed", "topmed", data_store=config.dug_inputs.data_source, unzip=False) - -def get_heal_study_files(config: RogerConfig, to_string=False) -> List[str]: - return get_versioned_files(config, "heal-studies", "heal-study-imports", data_store=config.dug_inputs.data_source, unzip=True) - -def get_heal_research_program_files(config: RogerConfig, to_string=False) -> List[str]: - return get_versioned_files(config, "heal-research", "heal-research-programs", data_store=config.dug_inputs.data_source, unzip=True) diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py index d94d2262..9fb62a8e 100644 --- a/dags/knowledge_graph_build.py +++ b/dags/knowledge_graph_build.py @@ -6,7 +6,7 @@ """ from airflow.models import DAG -from airflow.operators.empty import EmptyOperator +from airflow.providers.standard.operators.empty import EmptyOperator import roger from roger.tasks import default_args, create_python_task from roger.config import config diff --git a/dags/roger/pipelines/__init__.py b/dags/roger/pipelines/__init__.py deleted file mode 100644 index d6664b8e..00000000 --- a/dags/roger/pipelines/__init__.py +++ /dev/null @@ -1,28 +0,0 @@ -"Modules for individual datasets" - -import pkgutil -from pathlib import Path -import importlib - -from .base import DugPipeline - -def get_pipeline_classes(pipeline_names_dict): - """Return a list of all defined pipeline classes - """ - - base_path = Path(__file__).resolve().parent - - for (_, mod_name, _) in pkgutil.iter_modules([base_path]): - if mod_name == 'base': - continue - - # No need to actuall get the module symbol, once it's imported, it will - # show up below in __subclasses__. - importlib.import_module(f"{__name__}.{mod_name}") - pipeline_list = [] - - for subclass in DugPipeline.__subclasses__(): - if getattr(subclass, 'pipeline_name') and getattr(subclass, 'pipeline_name') in pipeline_names_dict.keys(): - subclass.input_version = pipeline_names_dict[getattr(subclass, 'pipeline_name')] - pipeline_list.append(subclass) - return pipeline_list diff --git a/dags/roger/pipelines/heal_studies.py b/dags/roger/pipelines/heal_studies.py deleted file mode 100644 index a08e8115..00000000 --- a/dags/roger/pipelines/heal_studies.py +++ /dev/null @@ -1,16 +0,0 @@ -"Pipeline for Heal-studies data" - -from roger.pipelines import DugPipeline -from roger.core import storage - -class HealStudiesPipeline(DugPipeline): - "Pipeline for Heal-studies data set" - pipeline_name = "heal-mds-studies" - parser_name = "heal-studies" - - def get_objects(self, input_data_path=None): - if not input_data_path: - input_data_path = storage.dug_heal_study_path() - files = storage.get_files_recursive(lambda file_name: file_name.endswith('.xml'), - input_data_path) - return sorted([str(f) for f in files]) diff --git a/dags/test_dag.py b/dags/test_dag.py new file mode 100644 index 00000000..3073b30e --- /dev/null +++ b/dags/test_dag.py @@ -0,0 +1,37 @@ +"""Just a test dag to see if all the wrappers are working correctly. +""" + +from airflow.models import DAG +from airflow.providers.standard.operators.empty import EmptyOperator +from roger.tasks import default_args, create_python_task + +with DAG( + dag_id='test_dag', + default_args=default_args, + params= + { + "repository_id": None, + "branch_name": None, + "commitid_from": None, + "commitid_to": None + }, + # schedule_interval=None +) as dag: + + init = EmptyOperator(task_id="init", dag=dag) + finish = EmptyOperator(task_id="finish", dag=dag) + + def print_context(ds=None, **kwargs): + print(">>>All kwargs") + print(kwargs) + print(">>>All ds") + print(ds) + + (init >> + create_python_task(dag, "print_context", print_context) >> + finish) + + #run_this = PythonOperator(task_id="print_the_context", python_callable=print_context) + +if __name__ == "__main__": + dag.test() diff --git a/docker-compose.yaml b/docker-compose.yaml index fd81f5f5..9a52c670 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,4 +1,3 @@ -version: '3.8' x-airflow-common: &airflow-common build: . @@ -36,6 +35,7 @@ x-airflow-common: &airflow-common - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins - ./config:/opt/airflow/config + - .:/opt/roger depends_on: postgres: condition: service_healthy @@ -110,7 +110,7 @@ services: ports: - "8080:8080" healthcheck: - test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + test: ["CMD", "curl", "--fail", "http://localhost:8080/api/v2/monitor/health"] interval: 30s timeout: 10s retries: 5 @@ -164,4 +164,4 @@ volumes: redis-stack-data: networks: airflow-network: - driver: bridge \ No newline at end of file + driver: bridge diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..862ab8f0 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,11 @@ +[build-system] +requires = [ + "setuptools>=42", + "wheel" +] +build-backend = "setuptools.build_meta" + +[tool.pytest.ini_options] +testpaths = [ + "tests", +] diff --git a/requirements.txt b/requirements.txt index def8e43b..2ae08886 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ git+https://github.com/falkordb/falkordb-bulk-loader.git@v1.0.6 setuptools>=66 pytest PyYAML -git+https://github.com/helxplatform/dug@v2.13.12 +git+https://github.com/helxplatform/dug@DugModel2.0 orjson==3.9.15 git+https://github.com/helxplatform/kg_utils.git@v0.0.10 git+https://github.com/helxplatform/python-stringcase@1.2.1 @@ -14,9 +14,11 @@ git+https://github.com/helxplatform/avalon.git@lakefs-1.71.0 h11>=0.16.0 starlette>=0.49.1 datetime +redis +falkordb #--- patch aiohttp>=3.13.3 werkzeug==3.1.4 cryptography>=44.0.1 urllib3>=2.6.2 -marshmallow==3.26.2 # upgrade from 3.26.1 specified in airflow constraints file \ No newline at end of file +marshmallow==3.26.2 # upgrade from 3.26.1 specified in airflow constraints file diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..4c257339 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,40 @@ +[metadata] +name = roger +version = 0.10.4.1 +author = Renaissance Computing Institute + RTI +description = Data pipeline automation for dug +long_description = file: README.md +long_description_content_type = text/markdown +url = https://github.com/helxplatform/roger +project_urls = + Bug Tracker = https://github.com/helxplatform/roger/issues +classifiers = + Programming Language :: Python :: 3 + License :: OSI Approved :: MIT License + Operating System :: OS Independent + +[options] +package_dir = + = src +packages = find: +python_requires = >=3.10 +include_package_data = true +install_requires = + dug + orjson + requests + requests_cache + redis + +[options.entry_points] +console_scripts = + dug = dug.cli:main + roger = roger.cli:main + +[options.extras_require] +rest = + jsonschema + apache-airflow + +[options.packages.find] +where = src diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..45f160da --- /dev/null +++ b/setup.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python + +import setuptools + +if __name__ == "__main__": + setuptools.setup() \ No newline at end of file diff --git a/dags/roger/__init__.py b/src/roger/__init__.py similarity index 100% rename from dags/roger/__init__.py rename to src/roger/__init__.py diff --git a/dags/_version.py b/src/roger/_version.py similarity index 100% rename from dags/_version.py rename to src/roger/_version.py diff --git a/src/roger/cli.py b/src/roger/cli.py new file mode 100644 index 00000000..4c0b3f02 --- /dev/null +++ b/src/roger/cli.py @@ -0,0 +1,155 @@ +"""CLI interface for roger +""" + +import sys +import argparse +import os +import time +import pathlib + +import roger +from roger.config import config +from roger.logger import get_logger +from roger.pipelines import get_pipeline_classes + +log = get_logger() + +def get_arguments(): + "Parse argv" + + parser = argparse.ArgumentParser(description='Roger common cli tool.') + """ Common CLI. """ + parser.add_argument('-d', '--data-root', default=None, + help="Root of data hierarchy") + parser.add_argument('-ip', '--input-path', default=None, type=pathlib.Path, + help="Input files path") + parser.add_argument('-op', '--output-path', default=None, type=pathlib.Path, + help="Output files path") + + """ Roger CLI. """ + parser.add_argument('-v', '--dataset-version', help="Dataset version.", + default="v1.0") + parser.add_argument('-g', '--get-kgx', help="Get KGX objects", + action='store_true') + parser.add_argument('-s', '--create-schema', help="Infer schema", + action='store_true') + parser.add_argument('-m', '--merge-kgx', help="Merge KGX nodes", + action='store_true') + parser.add_argument('-b', '--create-bulk', help="Create bulk load", + action='store_true') + parser.add_argument('-i', '--insert', help="Do the bulk insert", + action='store_true') + parser.add_argument('-a', '--validate', help="Validate the insert", + action='store_true') + + dataset_envspec = os.getenv("ROGER_DUG__INPUTS_DATA__SETS", + "topmed:v2.0,dbGaP:v1.0,anvil:v1.0") + data_sets = dataset_envspec.split(",") + parser.add_argument('-D', '--datasets', action="append", + default=None, + help="Dataset pipelines name:vers to run. " + f"(default: f{str(data_sets)}") + + """ Dug Annotation CLI. """ + parser.add_argument('-gd', '--get_dug_input_files', action="store_true", + help="Gets input files for annotation") + parser.add_argument('-l', '--load-and-annotate', action="store_true", + help="Annotates and normalizes datasets of varaibles.") + parser.add_argument('-t', '--make-tagged-kg', action="store_true", + help="Creates KGX files from annotated variable " + "datasets.") + + """ Dug indexing CLI . """ + parser.add_argument('-iv', '--index-variables', action="store_true", + help="Index annotated variables to elastic search.") + parser.add_argument('-C', '--crawl-concepts', action="store_true", + help="Crawl tranql and index concepts") + parser.add_argument('-ic', '--index-concepts', action="store_true", + help="Index expanded concepts to elastic search.") + + parser.add_argument('-vc', '--validate-concepts', action="store_true", + help="Validates indexing of concepts") + + parser.add_argument('-vv', '--validate-variables', action="store_true", + help="Validates indexing of variables") + + args = parser.parse_args () + + if args.data_root is not None: + data_root = args.data_root + config.data_root = data_root + log.info (f"data root:{data_root}") + + if not args.datasets: + args.datasets = data_sets + + return args + +def main(): + start = time.time() + log.info(f"Start TIME:{start}") + + args = get_arguments() + # When all lights are on... + + # Instantiate the pipeline classes + pipeline_names = {x.split(':')[0]: x.split(':')[1] for x in args.datasets} + log.info("Working on dataset list %s", str(pipeline_names)) + pipeline_classes = get_pipeline_classes(pipeline_names) + pipelines = [pipeclass(config) for pipeclass in pipeline_classes] + + for pipe in pipelines: + # Do all actions for one pipeline first, then move on to the next: + log.info("Running pipeline %s", pipe.pipeline_name) + + # Annotation comes first + if args.get_dug_input_files: + pipe.get_versioned_files() + + if args.load_and_annotate: + pipe.clear_annotation_cached(output_data_path=args.output_path) + pipe.annotate(input_data_path=args.input_path, + output_data_path=args.output_path) + + if args.make_tagged_kg: + pipe.make_kg_tagged() + + # Roger things + if args.get_kgx: + roger.get_kgx(config=config) + if args.merge_kgx: + roger.merge_nodes(config=config) + if args.create_schema: + roger.create_schema(config=config) + if args.create_bulk: + roger.create_bulk_load(config=config) + if args.insert: + roger.bulk_load(config=config) + if args.validate: + roger.validate(config=config) + roger.check_tranql(config=config) + + # Back to dug indexing + if args.index_variables: + pipe.index_variables() + + if args.validate_variables: + pipe.validate_indexed_variables() + + if args.crawl_concepts: + pipe.crawl_tranql() + + if args.index_concepts: + pipe.index_concepts() + + if args.validate_concepts: + pipe.validate_indexed_concepts() + + end = time.time() + time_elapsed = end - start + log.info(f"Completion TIME:{time_elapsed}") + + sys.exit (0) + +if __name__ == "__main__": + main() diff --git a/dags/roger/components/__init__.py b/src/roger/components/__init__.py similarity index 100% rename from dags/roger/components/__init__.py rename to src/roger/components/__init__.py diff --git a/dags/roger/components/data_conversion.py b/src/roger/components/data_conversion.py similarity index 100% rename from dags/roger/components/data_conversion.py rename to src/roger/components/data_conversion.py diff --git a/dags/roger/components/data_conversion_utils.py b/src/roger/components/data_conversion_utils.py similarity index 100% rename from dags/roger/components/data_conversion_utils.py rename to src/roger/components/data_conversion_utils.py diff --git a/dags/roger/config/__init__.py b/src/roger/config/__init__.py similarity index 95% rename from dags/roger/config/__init__.py rename to src/roger/config/__init__.py index cc017fca..fd4330d0 100644 --- a/dags/roger/config/__init__.py +++ b/src/roger/config/__init__.py @@ -12,7 +12,10 @@ from ._base import DictLike from .s3_config import S3Config -CONFIG_FILENAME = Path(__file__).parent.resolve() / "config.yaml" +if os.environ.get('ROGER_CONFIG_FILE', None): + CONFIG_FILENAME = Path(os.environ.get('ROGER_CONFIG_FILE')) +else: + CONFIG_FILENAME = Path(__file__).parent.resolve() / "config.yaml" @dataclass class RedisConfig(DictLike): @@ -21,6 +24,7 @@ class RedisConfig(DictLike): host: str = "redis" graph: str = "test" port: int = 6379 + use_redis_cache: bool = True def __post_init__(self): self.port = int(self.port) @@ -144,6 +148,8 @@ class IndexingConfig(DictLike): variables_index: str = "variables_index" concepts_index: str = "concepts_index" kg_index: str = "kg_index" + studies_index: str = "studies_index" + sections_index: str = "sections_index" tranql_min_score: float = 0.2 excluded_identifiers: List[str] = field(default_factory=lambda: [ "CHEBI:17336" @@ -224,10 +230,16 @@ def to_dug_conf(self) -> DugConfig: redis_host=self.redisgraph.host, redis_password=self.redisgraph.password, redis_port=self.redisgraph.port, + use_redis_cache=self.redisgraph.use_redis_cache, nboost_host=self.elasticsearch.nboost_host, preprocessor=self.annotation.preprocessor, annotator_type=self.annotation.annotator_type, annotator_args=self.annotation.annotator_args, + concepts_index_name=self.indexing.get('concepts_index'), + variables_index_name=self.indexing.get('variables_index'), + studies_index_name=self.indexing.get('studies_index'), + sections_index_name=self.indexing.get('sections_index'), + kg_index_name=self.indexing.get('kg_index'), normalizer={ 'url': self.annotation.normalizer, }, diff --git a/dags/roger/config/_base.py b/src/roger/config/_base.py similarity index 100% rename from dags/roger/config/_base.py rename to src/roger/config/_base.py diff --git a/dags/roger/config/config.yaml b/src/roger/config/config.yaml similarity index 98% rename from dags/roger/config/config.yaml rename to src/roger/config/config.yaml index c407555f..6844fd2a 100644 --- a/dags/roger/config/config.yaml +++ b/src/roger/config/config.yaml @@ -76,6 +76,8 @@ indexing: # eg : dbgap:Non-HEAL Studies,bacpac:HEAL Research Programs element_mapping: "" variables_index: "variables_index" + studies_index: "studies_index" + sections_index: "sections_index" concepts_index: "concepts_index" kg_index: "kg_index" tranql_min_score: 0.2 diff --git a/dags/roger/config/dev-config.yaml b/src/roger/config/dev-config.yaml similarity index 100% rename from dags/roger/config/dev-config.yaml rename to src/roger/config/dev-config.yaml diff --git a/dags/roger/config/s3_config.py b/src/roger/config/s3_config.py similarity index 100% rename from dags/roger/config/s3_config.py rename to src/roger/config/s3_config.py diff --git a/src/roger/config/test-config.yaml b/src/roger/config/test-config.yaml new file mode 100644 index 00000000..3149ba15 --- /dev/null +++ b/src/roger/config/test-config.yaml @@ -0,0 +1,179 @@ +redisgraph: + username: "" + password: "weak" + host: localhost + graph: test + port: 6379 + use_redis_cache: False + +logging: + level: DEBUG + format: '[%(name)s][%(filename)s][%(lineno)d][%(funcName)20s] %(levelname)s: %(message)s' + +data_root: roger/data + +kgx_base_data_uri: https://stars.renci.org/var/kgx_data/ +annotation_base_data_uri: https://stars.renci.org/var/dug/ + +kgx: + biolink_model_version: v3.1.2 + merge_db_temp_dir: workspace + data_sets: + - baseline-graph:v5.0 + +dug_inputs: + data_source: s3 + data_sets: + - topmed:v1.0 + - bdc:v1.0 + - anvil:v1.0 + +#https://github.com/RedisGraph/redisgraph-bulk-loader/blob/master/redisgraph_bulk_loader/bulk_insert.py#L43 +bulk_loader: + separator: 0x1E + enforce_schema: False + skip_invalid_nodes: False + skip_invalid_edges: False + quote: 0 + max_token_count: 1024 + max_buffer_size: 2048 + max_token_size: 500 + index: [] + full_text_index: [] + +annotation: + clear_http_cache: false + annotator_type: sapbert + annotator_args: + monarch: + url: "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + sapbert: + classification_url: "https://med-nemo.apps.renci.org/annotate/" + annotator_url: "https://sap-qdrant.apps.renci.org/annotate/" + score_threshold: 0.8 + bagel: + enabled: false + url: "http://localhost:9099/group_synonyms_openai" + prompt: "bagel/ask_classes" + llm_args: + llm_model_name: "gpt-4o-2024-05-13" + organization: + access_key: + llm_model_args: + top_p: 0 + temperature: 0.1 + normalizer: "https://nodenormalization-exp.apps.renci.org/get_normalized_nodes?conflate=false&description=true&curie=" + synonym_service: "https://name-resolution-sri.renci.org/reverse_lookup" + ontology_metadata: "https://api.monarchinitiative.org/api/bioentity/" + + preprocessor: + debreviator: + BMI: "body mass index" + stopwords: "the" + ontology_greenlist: ["PATO", "CHEBI", "MONDO", "UBERON", "HP", "MESH", "UMLS"] + +indexing: + # colon seperated mappings list by comma + # eg : dbgap:Non-HEAL Studies,bacpac:HEAL Research Programs + element_mapping: "" + variables_index: "variables_index" + concepts_index: "concepts_index" + kg_index: "kg_index" + tranql_min_score: 0.2 + excluded_identifiers: + - "CHEBI:17336" + queries: + "disease": ["disease", "phenotypic_feature"] + "pheno": ["phenotypic_feature", "disease"] + "anat": ["disease", "anatomical_entity"] + "chem_to_disease": ["chemical_entity", "disease"] + "small_molecule_to_disease": ["small_molecule", "disease"] + "chemical_mixture_to_disease": ["chemical_mixture", "disease"] + "phen_to_anat": ["phenotypic_feature", "anatomical_entity"] + tranql_endpoint: "http://tranql-service/tranql/query?dynamic_id_resolution=true&asynchronous=false" + node_to_element_queries: + enabled: false + cde: + node_type: biolink:Publication + curie_prefix: "HEALCDE" + list_field_choose_first: + - "files" + attribute_mapping: + name: "name" + desc: "summary" + collection_name: "cde_category" + collection_id: "cde_category" + action: "files" + +elasticsearch: + host: localhost + username: elastic + password: "12345" + nboost_host: "" + scheme: "http" + ca_path: "" + +validation: + queries: + count_nodes: + name: "Count Nodes" + query: "MATCH (a) RETURN COUNT(a)" + count_edges: + name: "Count Edges" + query: "MATCH (a)-[e]-(b) RETURN COUNT(e)" + connectivity: + name: TOPMED Connectivity + query: "MATCH (a { id : '$var' })--(b) RETURN a.category, b.id" + args: + - var: TOPMED.TAG:8 + - var: TOPMED.VAR:phv00000484.v1.p10 + - var: TOPMED.VAR:phv00000487.v1.p10 + - var: TOPMED.VAR:phv00000496.v1.p10 + - var: TOPMED.VAR:phv00000517.v1.p10 + - var: TOPMED.VAR:phv00000518.v1.p10 + - var: TOPMED.VAR:phv00000528.v1.p10 + - var: TOPMED.VAR:phv00000529.v1.p10 + - var: TOPMED.VAR:phv00000530.v1.p10 + - var: TOPMED.VAR:phv00000531.v1.p10 + count_connected_nodes: + name: Count Connected Nodes + query: "MATCH (a)-[e]-(b) RETURN count(a), count(b)" + query_by_type: + name: Query by Type + query: "MATCH (a:gene)-[e]-(b) WHERE 'chemical_substance' IN b.category RETURN count(distinct(a)), count(distinct(b))" + smiles_values: + name: Query Chemicals with smiles that look like arrays + query: "Match (a: chemical_substance { simple_smiles: '$var' }) RETURN a.id" + args: + - var: "[Os+6]" + - var: "[SiH2]" + - var: "[CH]" + - var: "[S-2]" + - var: "[Ti+4]" + - var: "[P-3]" + - var: "[Ca+2]" + - var: "[Au+3]" + - var: "[TeH2]" + - var: "[Pb]" + - var: "[B+]" + - var: "[AsH]" + - var: "[O-][I+2]([O-])[O-]" + - var: "[He+]" + - var: "[Mo+6]" + - var: "[N-]=[N+]=[N-]" + - var: "[Ag+]" + - var: "[Zn+2]" + - var: "[C-]#[O+]" +s3: + host: "" + bucket: "" + access_key: "" + secret_key: "" + +lakefs_config: + enabled: false + access_key_id: "" + secret_access_key: "" + host: "" + branch: "" + repo: "" diff --git a/dags/roger/core/__init__.py b/src/roger/core/__init__.py similarity index 100% rename from dags/roger/core/__init__.py rename to src/roger/core/__init__.py diff --git a/dags/roger/core/base.py b/src/roger/core/base.py similarity index 100% rename from dags/roger/core/base.py rename to src/roger/core/base.py diff --git a/dags/roger/core/bulkload.py b/src/roger/core/bulkload.py similarity index 100% rename from dags/roger/core/bulkload.py rename to src/roger/core/bulkload.py diff --git a/dags/roger/core/enums.py b/src/roger/core/enums.py similarity index 100% rename from dags/roger/core/enums.py rename to src/roger/core/enums.py diff --git a/dags/roger/core/redis_graph.py b/src/roger/core/redis_graph.py similarity index 97% rename from dags/roger/core/redis_graph.py rename to src/roger/core/redis_graph.py index ca65ddce..5d89ef74 100644 --- a/dags/roger/core/redis_graph.py +++ b/src/roger/core/redis_graph.py @@ -3,8 +3,8 @@ import redis # from redisgraph import Node, Edge, Graph # https://redis-py.readthedocs.io/en/v4.5.1/redismodules.html#redisgraph-commands -from redis.commands.graph.node import Node -from redis.commands.graph.edge import Edge +from falkordb.node import Node +from falkordb.edge import Edge from roger.logger import get_logger @@ -88,4 +88,4 @@ def test (): rg.delete () # rg.query ("""MATCH (a { id : 'chemical_substance' }) RETURN a""") -#test () \ No newline at end of file +#test () diff --git a/dags/roger/core/storage.py b/src/roger/core/storage.py similarity index 100% rename from dags/roger/core/storage.py rename to src/roger/core/storage.py diff --git a/dags/roger/logger.py b/src/roger/logger.py similarity index 100% rename from dags/roger/logger.py rename to src/roger/logger.py diff --git a/dags/roger/models/__init__.py b/src/roger/models/__init__.py similarity index 100% rename from dags/roger/models/__init__.py rename to src/roger/models/__init__.py diff --git a/dags/roger/models/biolink.py b/src/roger/models/biolink.py similarity index 100% rename from dags/roger/models/biolink.py rename to src/roger/models/biolink.py diff --git a/dags/roger/models/kgx.py b/src/roger/models/kgx.py similarity index 100% rename from dags/roger/models/kgx.py rename to src/roger/models/kgx.py diff --git a/dags/roger/pipelines/README.md b/src/roger/pipelines/README.md similarity index 100% rename from dags/roger/pipelines/README.md rename to src/roger/pipelines/README.md diff --git a/src/roger/pipelines/__init__.py b/src/roger/pipelines/__init__.py new file mode 100644 index 00000000..ff75b18d --- /dev/null +++ b/src/roger/pipelines/__init__.py @@ -0,0 +1,41 @@ +"Modules for individual datasets" + +import pkgutil +from pathlib import Path +import importlib + +from .base import DugPipeline, DDM2Pipeline + +def get_all_subclasses(cls): + """Recurse to get all subsubclasses, etc.""" + rval = [cls] + for sc in cls.__subclasses__(): + rval.extend(get_all_subclasses(sc)) + return rval + +def get_pipeline_classes(pipeline_names): + """Return a list of all defined pipeline classes + """ + + base_path = Path(__file__).resolve().parent + + for (_, mod_name, _) in pkgutil.iter_modules([base_path]): + if mod_name == 'base': + continue + + # No need to actuall get the module symbol, once it's imported, it will + # show up below in __subclasses__. + importlib.import_module(f"{__name__}.{mod_name}") + pipeline_list = [] + + for subclass in get_all_subclasses(DugPipeline): + if (getattr(subclass, 'pipeline_name') and + getattr(subclass, 'pipeline_name') in pipeline_names): + try: + subclass.input_version = pipeline_names[ + getattr(subclass, 'pipeline_name')] + except TypeError: + # If someone passed in the list, don't bother with the verison. + pass + pipeline_list.append(subclass) + return pipeline_list diff --git a/dags/roger/pipelines/anvil.py b/src/roger/pipelines/anvil.py similarity index 96% rename from dags/roger/pipelines/anvil.py rename to src/roger/pipelines/anvil.py index baa82c05..ec6d06f4 100644 --- a/dags/roger/pipelines/anvil.py +++ b/src/roger/pipelines/anvil.py @@ -7,6 +7,7 @@ class AnvilPipeline(DugPipeline): "Pipeline for Anvil data set" pipeline_name = 'anvil' parser_name = 'Anvil' + files_dir = 'anvil' def get_objects(self, input_data_path=None): """Retrieve anvil objects diff --git a/dags/roger/pipelines/bacpac.py b/src/roger/pipelines/bacpac.py similarity index 100% rename from dags/roger/pipelines/bacpac.py rename to src/roger/pipelines/bacpac.py diff --git a/dags/roger/pipelines/base.py b/src/roger/pipelines/base.py similarity index 84% rename from dags/roger/pipelines/base.py rename to src/roger/pipelines/base.py index 5c1df4cc..66886f73 100644 --- a/dags/roger/pipelines/base.py +++ b/src/roger/pipelines/base.py @@ -15,7 +15,7 @@ import requests -from dug.core import get_parser, get_annotator, get_plugin_manager, DugConcept +from dug.core import get_parser, get_annotator, get_plugin_manager, DugConcept, DugVariable, DugStudy, DugSection from dug.core.concept_expander import ConceptExpander from dug.core.crawler import Crawler from dug.core.factory import DugFactory @@ -29,7 +29,7 @@ from roger.models.biolink import BiolinkModel from roger.logger import get_logger -from utils.s3_utils import S3Utils +from roger.utils.s3_utils import S3Utils log = get_logger() @@ -115,7 +115,7 @@ def __init__(self, config: RogerConfig, to_string=False): "Set instance variables and check to make sure we're overriden" if not self.pipeline_name: raise PipelineException( - "Subclass must at least define pipeline_name as class var") + "Subclass must at least define pipeline_name as class var") self.config = config self.bl_toolkit = BiolinkModel() dug_conf = config.to_dug_conf() @@ -139,20 +139,13 @@ def __init__(self, config: RogerConfig, to_string=False): indexing_config = config.indexing self.variables_index = indexing_config.get('variables_index') + self.studies_index = indexing_config.get('studies_index') self.concepts_index = indexing_config.get('concepts_index') + self.sections_index = indexing_config.get('sections_index') self.kg_index = indexing_config.get('kg_index') - self.search_obj: Search = self.factory.build_search_obj([ - self.variables_index, - self.concepts_index, - self.kg_index, - ]) - self.index_obj: Index = self.factory.build_indexer_obj([ - self.variables_index, - self.concepts_index, - self.kg_index, - - ]) + self.search_obj: Search = self.factory.build_search_obj() + self.index_obj: Index = self.factory.build_indexer_obj() def __enter__(self): self.event_loop = asyncio.new_event_loop() @@ -192,21 +185,21 @@ def get_parser_name(self): can also be overriden. """ return getattr(self, 'parser_name', self.pipeline_name) - + def get_annotator_name(self): - """ - Access method for annotator_name - Defaults to annotator_monarch unless specified using annotation.annotator_type in the configuration file. + """ Access method for annotator_name + + Defaults to annotator_monarch unless specified using + annotation.annotator_type in the configuration file. """ return self.config.annotation.annotator_type - def get_parser(self): dug_plugin_manager = get_plugin_manager() parser: Parser = get_parser(dug_plugin_manager.hook, self.get_parser_name()) return parser - + def get_annotator(self): dug_plugin_manager = get_plugin_manager() annotator: Annotator = get_annotator( @@ -216,24 +209,33 @@ def get_annotator(self): ) return annotator + def clear_annotation_cached(self, to_string=False, output_data_path=None): + if not output_data_path: + output_data_path = storage.dug_annotation_path("") + storage.clear_dir(output_data_path) + # Clear http session cache + if self.config.annotation.clear_http_cache: + self.cached_session.cache.clear() + def init_annotator(self, max_retries=5, base_delay=1, max_delay=10): attempt = 0 while attempt < max_retries: - try: + try: log.info("Initializing annotator") - annotator = self.get_annotator() + annotator = self.get_annotator() return annotator # success except Exception as e: attempt += 1 if attempt == max_retries: - log.error("Max retries reached when creating annotator. Failing with error: %s", e) + log.error("Max retries reached when creating annotator. " + "Failing with error: %s", e) raise delay = min(base_delay * (2 ** (attempt - 1)), max_delay) delay += random.uniform(0, 1) # add jitter - log.warning("Error occurred: %s. Retrying in %.2f seconds...", e, delay) + log.warning("Error occurred: %s. Retrying in %.2f seconds...", + e, delay) time.sleep(delay) - def annotate_files(self, parsable_files, output_data_path=None): """ Annotates a Data element file using a Dug parser. @@ -249,9 +251,9 @@ def annotate_files(self, parsable_files, output_data_path=None): log.info("Done intializing parser") annotator = self.init_annotator() log.info("Done intializing annotator") - for _, parse_file in enumerate(parsable_files): - log.debug("Creating Dug Crawler object on parse_file %s at %d of %d", - parse_file, _ , len(parsable_files)) + for ct, parse_file in enumerate(parsable_files): + log.debug("Creating Dug Crawler object on parse_file %s " + "at %d of %d", parse_file, ct , len(parsable_files)) crawler = Crawler( crawl_file=parse_file, parser=parser, @@ -267,8 +269,8 @@ def annotate_files(self, parsable_files, output_data_path=None): elements_file_path = os.path.join( output_data_path, current_file_name) elements_file = os.path.join(elements_file_path, 'elements.txt') - concepts_file = os.path.join(elements_file_path, 'concepts.txt') - + concepts_file = os.path.join(elements_file_path, 'concepts.txt') + # Use the specified parser to parse the parse_file into elements. log.debug("Parser is %s", str(parser)) elements = parser(parse_file) @@ -292,12 +294,15 @@ def annotate_files(self, parsable_files, output_data_path=None): elements = crawler.elements # Write pickles of objects to file - log.info("Parsed and annotated: %s", parse_file) - - storage.write_object(jsonpickle.encode(elements, indent=2), elements_file) + log.info("Parsed and annotated: %s", parse_file) + + storage.write_object(jsonpickle.encode(elements, indent=2), + elements_file) log.info("Serialized annotated elements to : %s", elements_file) - storage.write_object(jsonpickle.encode(non_expanded_concepts, indent=2), concepts_file) + storage.write_object( + jsonpickle.encode(non_expanded_concepts, indent=2), + concepts_file) log.info("Serialized annotated concepts to : %s", concepts_file) def convert_to_kgx_json(self, elements, written_nodes=None): @@ -321,10 +326,10 @@ def convert_to_kgx_json(self, elements, written_nodes=None): # DugElement means a variable (Study variable...) if not isinstance(element, DugElement): continue - study_id = element.collection_id - study_link = element.collection_action - study_desc = element.collection_desc - study_name = element.collection_name or element.collection_id + study_id = element.id + study_link = element.action + study_desc = element.description + study_name = element.name or element.id if study_id not in written_nodes: @@ -428,7 +433,7 @@ def make_tagged_kg(self, elements): # @TODO extract this into config or maybe dug ?? topmed_tag_concept_type = "TOPMed Phenotype Concept" nodes_written = set() - for tag in elements: + for tag in elements: if not (isinstance(tag, DugConcept) and tag.type == topmed_tag_concept_type): continue @@ -491,8 +496,12 @@ def index_elements(self, elements_file): # no id no indexing continue # Use the Dug Index object to submit the element to ES - self.index_obj.index_element( - element, index=self.variables_index) + if isinstance(element, DugVariable): + self.index_obj.index_element(element, index=self.variables_index) + elif isinstance(element, DugStudy): + self.index_obj.index_element(element, index=self.studies_index) + elif isinstance(element, DugSection): + self.index_obj.index_element(element, index=self.sections_index) percent_complete = (count / total) * 100 if percent_complete % 10 == 0: log.info("%d %%", percent_complete) @@ -500,7 +509,8 @@ def index_elements(self, elements_file): def validate_indexed_element_file(self, elements_file): "After submitting elements for indexing, verify that they're available" - elements = [x for x in jsonpickle.decode(storage.read_object(elements_file)) + elements = [x for x in jsonpickle.decode( + storage.read_object(elements_file)) if not isinstance(x, DugConcept)] # Pick ~ 10 % sample_size = int(len(elements) * 0.1) @@ -519,6 +529,9 @@ def validate_indexed_element_file(self, elements_file): concept = concepts[0] curie = concept.id search_term = re.sub(r'[^a-zA-Z0-9_\ ]+', '', concept.name) + if len(search_term) < 3: + # anything less than 3 chars won't return hits + continue log.debug("Searching for Concept: %s and Search term: %s", str(curie), search_term) all_elements_ids = self._search_elements(curie, search_term) @@ -540,24 +553,36 @@ def validate_indexed_element_file(self, elements_file): def _search_elements(self, curie, search_term): "Asynchronously call a search on the curie and search term" - response = self.event_loop.run_until_complete(self.search_obj.search_vars_unscored( - concept=curie, - query=search_term + page_size = 1000 + offset = 0 + hits, total_items, _ = self.event_loop.run_until_complete( + self.search_obj.search_elements( + self.variables_index, + concept=curie, + query=search_term, + size=page_size, + offset=offset )) - ids_dict = [] - if 'total_items' in response: - if response['total_items'] == 0: - log.error(f"No search elements returned for variable search: {self.variables_index}.") - log.error(f"Concept id : {curie}, Search term: {search_term}") - raise Exception(f"Validation error - Did not find {curie} for" - f"Search term: {search_term}") - else: - del response['total_items'] - for element_type in response: - all_elements_ids = [e['id'] for e in - reduce(lambda x, y: x + y['elements'], response[element_type], [])] - ids_dict += all_elements_ids - return ids_dict + if total_items == 0: + log.error(f"No search elements returned for variable search: " + f"{self.variables_index}.") + log.error(f"Concept id : {curie}, Search term: {search_term}") + raise Exception(f"Validation error - Did not find {curie} for" + f"Search term: {search_term}") + + while len(hits) < total_items: + offset += page_size + new_hits, _, _ = self.event_loop.run_until_complete( + self.search_obj.search_elements( + self.variables_index, + concept=curie, + query=search_term, + size=page_size, + offset=offset + )) + hits += new_hits + id_list = [e['_source']['id'] for e in hits] + return id_list def crawl_concepts(self, concepts, data_set_name, output_path=None): """Adds tranql KG to Concepts @@ -567,13 +592,14 @@ def crawl_concepts(self, concepts, data_set_name, output_path=None): :param data_set_name: :return: """ - # TODO crawl dir seems to be storaing crawling info to avoid re-crawling, but is that consting us much? , it was when tranql was slow, but - # might right to consider getting rid of it. + # TODO crawl dir seems to be storaing crawling info to avoid + # re-crawling, but is that consting us much? , it was when tranql was + # slow, but might right to consider getting rid of it. crawl_dir = storage.dug_crawl_path('crawl_output') output_file_name = os.path.join(data_set_name, 'expanded_concepts.txt') - extracted_dug_elements_file_name = os.path.join(data_set_name, - 'extracted_graph_elements.txt') + extracted_dug_elements_file_name = os.path.join( + data_set_name, 'extracted_graph_elements.txt') if not output_path: output_file = storage.dug_expanded_concepts_path(output_file_name) extracted_output_file = storage.dug_expanded_concepts_path( @@ -581,8 +607,9 @@ def crawl_concepts(self, concepts, data_set_name, output_path=None): ) else: output_file = os.path.join(output_path, output_file_name) - extracted_output_file = os.path.join( output_path, extracted_dug_elements_file_name) - + extracted_output_file = os.path.join( + output_path, extracted_dug_elements_file_name) + Path(crawl_dir).mkdir(parents=True, exist_ok=True) extracted_dug_elements = [] log.debug("Creating Dug Crawler object") @@ -606,27 +633,29 @@ def crawl_concepts(self, concepts, data_set_name, output_path=None): except Exception as e: log.error(concept) raise e - for query in self.node_to_element_queries: - log.info(query) - casting_config = query['casting_config'] - tranql_source = query['tranql_source'] - dug_element_type = query['output_dug_type'] - extracted_dug_elements += crawler.expand_to_dug_element( - concept=concept, - casting_config=casting_config, - dug_element_type=dug_element_type, - tranql_source=tranql_source - ) + # for query in self.node_to_element_queries: + # log.info(query) + # casting_config = query['casting_config'] + # tranql_source = query['tranql_source'] + # dug_element_type = query['output_dug_type'] + # extracted_dug_elements += crawler.expand_to_dug_element( + # concept=concept, + # casting_config=casting_config, + # dug_element_type=dug_element_type, + # tranql_source=tranql_source + # ) concept.clean() percent_complete = int((counter / total) * 100) if percent_complete % 10 == 0: log.info("%d%%", percent_complete) log.info("Crawling %s done", data_set_name) - storage.write_object(obj=jsonpickle.encode(concepts, indent=2), path=output_file) + storage.write_object(obj=jsonpickle.encode(concepts, indent=2), + path=output_file) log.info ("Concepts serialized to %s", output_file) - storage.write_object(obj=jsonpickle.encode(extracted_dug_elements, indent=2), - path=extracted_output_file) - log.info("Extracted elements serialized to %s", extracted_output_file) + # storage.write_object(obj=jsonpickle.encode(extracted_dug_elements, + # indent=2), + # path=extracted_output_file) + # log.info("Extracted elements serialized to %s", extracted_output_file) def _index_concepts(self, concepts): "Submit concepts to ElasticSearch for indexing" @@ -732,7 +761,8 @@ def _validate_indexed_concepts(self, elements, concepts): def clear_index(self, index_id): "Delete the index specified by index_id from ES" - exists = self.event_loop.run_until_complete(self.search_obj.es.indices.exists(index=index_id)) + exists = self.event_loop.run_until_complete( + self.search_obj.es.indices.exists(index=index_id)) if exists: log.info("Deleting index %s", str(index_id)) response = self.event_loop.run_until_complete( @@ -821,6 +851,11 @@ def get_versioned_files(self): current_version)) return [str(filename) for filename in pulled_files] + @staticmethod + def input_file_filter(file_name): + """Default filter for """ + return file_name.endswith('.xml') + def get_objects(self, input_data_path=None): """Retrieve initial source objects for parsing @@ -831,16 +866,20 @@ def get_objects(self, input_data_path=None): input_data_path = storage.dug_input_files_path( self.get_files_dir()) files = storage.get_files_recursive( - lambda file_name: file_name.endswith('.xml'), + self.input_file_filter, input_data_path) return sorted([str(f) for f in files]) def annotate(self, to_string=False, files=None, input_data_path=None, output_data_path=None): "Annotate files with the appropriate parsers and crawlers" + log.debug("annotate called with files %s, input path %s, " + "output path $s", files, str(input_data_path), + str(output_data_path)) if files is None: files = self.get_objects(input_data_path=input_data_path) - self.annotate_files(parsable_files=files, output_data_path=output_data_path) + self.annotate_files(parsable_files=files, + output_data_path=output_data_path) output_log = self.log_stream.getvalue() if to_string else '' return output_log @@ -855,8 +894,9 @@ def index_variables(self, to_string=False, element_object_files=None, """ # self.clear_variables_index() if element_object_files is None: - element_object_files = storage.dug_elements_objects(input_data_path,format='txt') - for file_ in element_object_files: + element_object_files = storage.dug_elements_objects( + input_data_path,format='txt') + for file_ in element_object_files: self.index_elements(file_) output_log = self.log_stream.getvalue() if to_string else '' return output_log @@ -867,30 +907,40 @@ def validate_indexed_variables(self, to_string=None, output_data_path=None): "Validate output from index variables task for pipeline" if not element_object_files: - element_object_files = storage.dug_elements_objects(input_data_path, format='txt') + element_object_files = storage.dug_elements_objects( + input_data_path, format='txt') for file_ in element_object_files: log.info("Validating %s", str(file_)) self.validate_indexed_element_file(file_) output_log = self.log_stream.getvalue() if to_string else '' return output_log - def validate_indexed_concepts(self, config=None, to_string=None, input_data_path=None, output_data_path=None): + def validate_indexed_concepts(self, config=None, to_string=None, + input_data_path=None, output_data_path=None): """ Entry for validate concepts """ - get_data_set_name = lambda file: os.path.split(os.path.dirname(file))[-1] + get_data_set_name = lambda file: ( + os.path.split(os.path.dirname(file))[-1]) expanded_concepts_files_dict = { - get_data_set_name(file): file for file in storage.dug_expanded_concept_objects(data_path=input_data_path, format='txt') + get_data_set_name(file): file for file in + storage.dug_expanded_concept_objects(data_path=input_data_path, + format='txt') } annotated_elements_files_dict = { - get_data_set_name(file): file for file in storage.dug_elements_objects(data_path=input_data_path, format='txt') + get_data_set_name(file): file for file in + storage.dug_elements_objects(data_path=input_data_path, + format='txt') } - try: - assert len(expanded_concepts_files_dict) == len(annotated_elements_files_dict) + try: + assert (len(expanded_concepts_files_dict) == + len(annotated_elements_files_dict)) except: - log.error("Files Annotated Elements files and Expanded concepts files, should be pairs") + log.error("Files Annotated Elements files and " + "expanded concepts files, should be pairs") if len(expanded_concepts_files_dict) > len(annotated_elements_files_dict): - log.error("Some Annotated Elements files (from load_and_annotate task) are missing") + log.error("Some Annotated Elements files " + "(from load_and_annotate task) are missing") else: log.error("Some Expanded Concepts files (from crawl task) are missing") log.error(f"Annotated Datasets : {list(annotated_elements_files_dict.keys())}") @@ -936,9 +986,10 @@ def make_kg_tagged(self, to_string=False, elements_files=None, def crawl_tranql(self, to_string=False, concept_files=None, input_data_path=None, output_data_path=None): - "Perform the tranql crawl" + "Perform the tranql crawl" if not concept_files: - concept_files = storage.dug_concepts_objects(input_data_path, format='txt') + concept_files = storage.dug_concepts_objects( + input_data_path, format='txt') if output_data_path: crawl_dir = os.path.join(output_data_path, 'crawl_output') @@ -956,15 +1007,16 @@ def crawl_tranql(self, to_string=False, concept_files=None, log.info("Crawling Dug Concepts, found %d file(s).", len(concept_files)) for file_ in concept_files: - objects = storage.read_object(file_) - objects = objects or {} + objects = storage.read_object(file_) + objects = objects or {} if not objects: log.info(f'no concepts in {file_}') data_set = jsonpickle.decode(objects) original_variables_dataset_name = os.path.split( os.path.dirname(file_))[-1] self.crawl_concepts(concepts=data_set, - data_set_name=original_variables_dataset_name, output_path= output_data_path) + data_set_name=original_variables_dataset_name, + output_path= output_data_path) output_log = self.log_stream.getvalue() if to_string else '' return output_log @@ -984,10 +1036,23 @@ def index_concepts(self, to_string=False, if self.config.indexing.node_to_element_queries: log.info("*******************") - extracted_elements_files = storage.dug_extracted_elements_objects(data_path=input_data_path) + extracted_elements_files = storage.dug_extracted_elements_objects( + data_path=input_data_path) log.info(f"{extracted_elements_files}") for file_ in extracted_elements_files: log.info(f"reading file {file_}") self.index_elements(file_) output_log = self.log_stream.getvalue() if to_string else '' return output_log + +class DDM2Pipeline(DugPipeline): + """Base class for pipelines working on Dug Data Model v2""" + + @staticmethod + def input_file_filter(file_name): + """Retrieve initial source objects for parsing in .dug.json format + + This ideally removes the need for specialized get_objects methods in + DDM2 pipelines. + """ + return file_name.endswith('.dug.json'), diff --git a/dags/roger/pipelines/bdc.py b/src/roger/pipelines/bdc.py similarity index 100% rename from dags/roger/pipelines/bdc.py rename to src/roger/pipelines/bdc.py diff --git a/dags/roger/pipelines/bdc_pipelines.py b/src/roger/pipelines/bdc_pipelines.py similarity index 100% rename from dags/roger/pipelines/bdc_pipelines.py rename to src/roger/pipelines/bdc_pipelines.py diff --git a/dags/roger/pipelines/crdc.py b/src/roger/pipelines/crdc.py similarity index 100% rename from dags/roger/pipelines/crdc.py rename to src/roger/pipelines/crdc.py diff --git a/dags/roger/pipelines/ctn.py b/src/roger/pipelines/ctn.py similarity index 100% rename from dags/roger/pipelines/ctn.py rename to src/roger/pipelines/ctn.py diff --git a/dags/roger/pipelines/db_gap.py b/src/roger/pipelines/db_gap.py similarity index 100% rename from dags/roger/pipelines/db_gap.py rename to src/roger/pipelines/db_gap.py diff --git a/src/roger/pipelines/heal_cdes.py b/src/roger/pipelines/heal_cdes.py new file mode 100644 index 00000000..275dff59 --- /dev/null +++ b/src/roger/pipelines/heal_cdes.py @@ -0,0 +1,17 @@ +"Pipeline to ingest HEAL in new data model" + +from roger.pipelines import DDM2Pipeline +from roger.core import storage + +class HealStudiesDDM2Pipeline(DDM2Pipeline): + "Pipeline for HEAL data using dug data model v2" + pipeline_name = "heal-cdes" + parser_name = "heal-ddm2" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_heal_study_path() + files = storage.get_files_recursive( + lambda file_name: file_name.endswith('.dug.json'), + input_data_path) + return sorted([str(f) for f in files]) diff --git a/dags/roger/pipelines/heal_research_programs.py b/src/roger/pipelines/heal_research_programs.py similarity index 100% rename from dags/roger/pipelines/heal_research_programs.py rename to src/roger/pipelines/heal_research_programs.py diff --git a/src/roger/pipelines/heal_studies.py b/src/roger/pipelines/heal_studies.py new file mode 100644 index 00000000..3f9c7f81 --- /dev/null +++ b/src/roger/pipelines/heal_studies.py @@ -0,0 +1,17 @@ +"Pipeline for Heal-studies data" + +from roger.pipelines import DDM2Pipeline +from roger.core import storage + +class HealStudiesPipeline(DDM2Pipeline): + "Pipeline for Heal-studies data set" + pipeline_name = "heal-mds-studies" + parser_name = "heal-ddm2" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_heal_study_path() + files = storage.get_files_recursive( + lambda file_name: file_name.endswith('.dug.json'), + input_data_path) + return sorted([str(f) for f in files]) diff --git a/dags/roger/pipelines/kfdrc.py b/src/roger/pipelines/kfdrc.py similarity index 100% rename from dags/roger/pipelines/kfdrc.py rename to src/roger/pipelines/kfdrc.py diff --git a/dags/roger/pipelines/nida.py b/src/roger/pipelines/nida.py similarity index 100% rename from dags/roger/pipelines/nida.py rename to src/roger/pipelines/nida.py diff --git a/dags/roger/pipelines/picsure_test.py b/src/roger/pipelines/picsure_test.py similarity index 91% rename from dags/roger/pipelines/picsure_test.py rename to src/roger/pipelines/picsure_test.py index bea4469f..21e04b64 100644 --- a/dags/roger/pipelines/picsure_test.py +++ b/src/roger/pipelines/picsure_test.py @@ -7,6 +7,7 @@ class PicSure(DugPipeline): "Pipeline for BACPAC data set" pipeline_name = "bdc-test6" #lakefs parser_name = "dbgap" + files_dir = "anvil" def get_objects(self, input_data_path=None): """Retrieve anvil objects @@ -23,4 +24,4 @@ def get_objects(self, input_data_path=None): input_data_path) logger.info("**********") logger.info(files) - return sorted([str(f) for f in files]) \ No newline at end of file + return sorted([str(f) for f in files]) diff --git a/dags/roger/pipelines/radx.py b/src/roger/pipelines/radx.py similarity index 100% rename from dags/roger/pipelines/radx.py rename to src/roger/pipelines/radx.py diff --git a/dags/roger/pipelines/sparc.py b/src/roger/pipelines/sparc.py similarity index 100% rename from dags/roger/pipelines/sparc.py rename to src/roger/pipelines/sparc.py diff --git a/dags/roger/pipelines/topmed.py b/src/roger/pipelines/topmed.py similarity index 100% rename from dags/roger/pipelines/topmed.py rename to src/roger/pipelines/topmed.py diff --git a/dags/roger/pvc.yaml b/src/roger/pvc.yaml similarity index 100% rename from dags/roger/pvc.yaml rename to src/roger/pvc.yaml diff --git a/dags/roger/tasks.py b/src/roger/tasks.py similarity index 97% rename from dags/roger/tasks.py rename to src/roger/tasks.py index cd828383..bcca2e77 100755 --- a/dags/roger/tasks.py +++ b/src/roger/tasks.py @@ -10,8 +10,8 @@ # Airflow 3.x - prefer provider imports and new public types from airflow.providers.standard.operators.python import PythonOperator -from airflow.operators.empty import EmptyOperator -from airflow.utils.task_group import TaskGroup +from airflow.providers.standard.operators.empty import EmptyOperator +from airflow.sdk import TaskGroup from airflow.models import DAG from airflow.models.taskinstance import TaskInstance from airflow.providers.standard.operators.bash import BashOperator @@ -219,7 +219,7 @@ def generate_dir_name_from_task_instance(task_instance: TaskInstance, # local dir structure. if not roger_config.lakefs_config.enabled: return None - root_data_dir = os.getenv("ROGER_DATA_DIR").rstrip('/') + root_data_dir = os.getenv("ROGER_DATA_DIR", "/tmp/roger/data").rstrip('/') task_id = task_instance.task_id dag_id = task_instance.dag_id run_id = task_instance.run_id @@ -295,7 +295,9 @@ def setup_input_data(context: Context, exec_conf): logger.info(">>> end of downloading data") -def create_python_task(dag, name, a_callable, func_kwargs=None, external_repos=None, pass_conf=True, no_output_files=False): +def create_python_task(dag, name, a_callable, func_kwargs=None, + external_repos=None, pass_conf=True, + no_output_files=False): """ Create a python task. :param func_kwargs: additional arguments for callable. :param dag: dag to add task to. @@ -349,7 +351,6 @@ def create_python_task(dag, name, a_callable, func_kwargs=None, external_repos=N return PythonOperator(**python_operator_args) - def create_pipeline_taskgroup( dag, pipeline_class: type, diff --git a/dags/utils/__init__.py b/src/roger/utils/__init__.py similarity index 100% rename from dags/utils/__init__.py rename to src/roger/utils/__init__.py diff --git a/dags/utils/s3_utils.py b/src/roger/utils/s3_utils.py similarity index 100% rename from dags/utils/s3_utils.py rename to src/roger/utils/s3_utils.py diff --git a/tests/conftest.py b/tests/conftest.py index e69de29b..bdc954cb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -0,0 +1 @@ +pythonpath = "dags" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index d0a68d1c..edb68d21 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,7 +1,8 @@ import os +import glob +import json from roger.core.enums import SchemaType -import json class BiolinkMock: def __init__(self): @@ -39,9 +40,18 @@ def kgx_objects(): return [os.path.join(*os.path.split(__file__)[:-1], 'data', file) for file in kgx_files] -def merged_objects(): - return [os.path.join(*os.path.split(__file__)[:-1], 'data', file) - for file in merged_files] +def merged_objects(file_type, path=None): + """ A list of merged KGX objects. """ + if not path: + merged_pattern = merge_path(f"**/{file_type}.jsonl") + else: + merged_pattern = merge_path(f"**/{file_type}.jsonl", path=path) + # this thing should always return one edges or nodes file (based on file_type) + try: + return sorted(glob.glob(merged_pattern, recursive=True))[0] + except IndexError: + raise ValueError(f"Could not find merged KGX of type {file_type} " + f"in {merged_pattern}") def bulk_path(*args, **kwargs): return os.path.join(*os.path.split(__file__)[:-1], 'data', 'bulk') diff --git a/tests/integration/test_dug_utils.py b/tests/integration/test_dug_utils.py deleted file mode 100644 index 4e31f820..00000000 --- a/tests/integration/test_dug_utils.py +++ /dev/null @@ -1,62 +0,0 @@ -import tempfile - -from pathlib import Path - -import pytest - -from dug_helpers.dug_utils import FileFetcher, get_topmed_files, get_dbgap_files -from roger.config import config - - -def test_fetch_network_file(): - filename = "README.md" - with tempfile.TemporaryDirectory() as tmp_dir: - fetch1 = FileFetcher( - "https://github.com", - "/helxplatform/roger/blob/main/", - tmp_dir, - ) - expected_path = Path(tmp_dir) / filename - assert not expected_path.exists() - fetch1(filename) - assert expected_path.exists() - - with tempfile.TemporaryDirectory() as tmp_dir: - fetch2 = FileFetcher( - "https://github.com", - Path("/helxplatform/roger/blob/main/"), - Path(tmp_dir), - ) - - expected_path = Path(tmp_dir) / filename - assert not expected_path.exists() - fetch2(filename) - assert expected_path.exists() - - -def test_fetcher_errors(): - - filename = "DOES NOT EXIST.md" - - with tempfile.TemporaryDirectory() as tmp_dir: - fetch = FileFetcher( - "https://github.com", - Path("/helxplatform/roger/blob/main/"), - Path(tmp_dir), - ) - with pytest.raises(RuntimeError): - fetch(filename) - - -@pytest.mark.skip() -def test_get_topmed_files(): - file_names = get_topmed_files(config=config) - for file_name in file_names: - assert Path(file_name).exists() - - -@pytest.mark.skip() -def test_get_dbgap_files(): - file_names = get_dbgap_files(config=config) - for file_name in file_names: - assert Path(file_name).exists() \ No newline at end of file