Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: test
name: Tests and Lint

on:
# Trigger workflow on pull requests to the main branch
Expand All @@ -11,32 +11,46 @@ on:
- main

jobs:
test:
tests:
name: Run Unit Tests and Lint
runs-on: ubuntu-22.04

steps:
# Step 1: Checkout the repository
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4

# Step 2: Set up Python
- name: Set up Python
uses: actions/setup-python@v4
# Step 3: Install Poetry
- name: Install Poetry
uses: snok/install-poetry@v1
with:
python-version: "3.8.18" # Use your project’s Python version
version: 2.2.1
virtualenvs-create: true
virtualenvs-in-project: true

# Step 3: Install Poetry
- name: Install Poetry
- name: Verify Poetry install
run: |
curl -sSL https://install.python-poetry.org | POETRY_VERSION=1.8.4 python3 -
echo "$HOME/.local/bin" >> $GITHUB_PATH
echo "PATH is: $PATH"
which poetry
poetry --version

# Step 4: Install dependencies
# Step 2: Set up Python
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12" # Use project’s Python version
cache: "poetry"

# Step 5: Install dependencies
- name: Install dependencies
run: |
poetry install --with dev

# Step 5: Run Tests
# Step 6: Run Tests
- name: Run tests
run: |
poetry run pytest -s --cov=biasanalyzer --cov-config=.coveragerc

# Step 7: Run Ruff check
- name: Run ruff
run: poetry run ruff check .
2 changes: 1 addition & 1 deletion biasanalyzer/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.0'
__version__ = "0.1.0"
131 changes: 72 additions & 59 deletions biasanalyzer/api.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import time
from pydantic import ValidationError
from typing import List
from biasanalyzer.database import OMOPCDMDatabase, BiasDatabase

from IPython.display import display
from ipytree import Tree
from ipywidgets import Label, VBox
from pydantic import ValidationError

from biasanalyzer.cohort import CohortAction
from biasanalyzer.config import load_config
from ipywidgets import VBox, Label
from ipytree import Tree
from IPython.display import display
from biasanalyzer.utils import get_direction_arrow, notify_users, build_concept_tree
from biasanalyzer.database import BiasDatabase, OMOPCDMDatabase
from biasanalyzer.utils import build_concept_tree, get_direction_arrow, notify_users


class BIAS:
Expand All @@ -22,74 +24,87 @@ def __init__(self, config_file_path=None):

def set_config(self, config_file_path: str):
if not config_file_path:
notify_users('no configuration file specified. '
'Call set_config(config_file_path) next to specify configurations')
notify_users(
"no configuration file specified. Call set_config(config_file_path) next to specify configurations"
)
else:
try:
self.config = load_config(config_file_path)
notify_users(f'configuration specified in {config_file_path} loaded successfully')
notify_users(f"configuration specified in {config_file_path} loaded successfully")
except FileNotFoundError:
notify_users('specified configuration file does not exist. '
'Call set_config(config_file_path) next to specify a valid configuration file',
level='error')
notify_users(
"specified configuration file does not exist. "
"Call set_config(config_file_path) next to specify a valid configuration file",
level="error",
)
except ValidationError as ex:
notify_users(f'configuration yaml file is not valid with validation error: {ex}', level='error')
notify_users(f"configuration yaml file is not valid with validation error: {ex}", level="error")

def set_root_omop(self):
if not self.config:
notify_users('no valid configuration to set root OMOP CDM data. '
'Call set_config(config_file_path) to specify configurations first.')
notify_users(
"no valid configuration to set root OMOP CDM data. "
"Call set_config(config_file_path) to specify configurations first."
)
return

self.cleanup()

db_type = self.config['root_omop_cdm_database']['database_type']
if db_type == 'postgresql':
user = self.config['root_omop_cdm_database']['username']
password = self.config['root_omop_cdm_database']['password']
host = self.config['root_omop_cdm_database']['hostname']
port = self.config['root_omop_cdm_database']['port']
db = self.config['root_omop_cdm_database']['database']
db_type = self.config["root_omop_cdm_database"]["database_type"]
if db_type == "postgresql":
user = self.config["root_omop_cdm_database"]["username"]
password = self.config["root_omop_cdm_database"]["password"]
host = self.config["root_omop_cdm_database"]["hostname"]
port = self.config["root_omop_cdm_database"]["port"]
db = self.config["root_omop_cdm_database"]["database"]
db_url = f"postgresql://{user}:{password}@{host}:{port}/{db}"
self.omop_cdm_db = OMOPCDMDatabase(db_url)
self.bias_db = BiasDatabase(':memory:', omop_db_url=db_url)
elif db_type == 'duckdb':
db_path = self.config['root_omop_cdm_database'].get('database', ":memory:")
self.bias_db = BiasDatabase(":memory:", omop_db_url=db_url)
elif db_type == "duckdb":
db_path = self.config["root_omop_cdm_database"].get("database", ":memory:")
self.omop_cdm_db = OMOPCDMDatabase(db_path)
self.bias_db = BiasDatabase(':memory:', omop_db_url=db_path)
self.bias_db = BiasDatabase(":memory:", omop_db_url=db_path)
else:
notify_users(f"Unsupported database type: {db_type}")

def _set_cohort_action(self):
if self.omop_cdm_db is None:
notify_users('A valid OMOP CDM must be set before creating a cohort. '
'Call set_root_omop first to set a valid root OMOP CDM')
notify_users(
"A valid OMOP CDM must be set before creating a cohort. "
"Call set_root_omop first to set a valid root OMOP CDM"
)
return None
if self.cohort_action is None:
self.cohort_action = CohortAction(self.omop_cdm_db, self.bias_db)
return self.cohort_action

def get_domains_and_vocabularies(self):
if self.omop_cdm_db is None:
notify_users('A valid OMOP CDM must be set before getting domains. '
'Call set_root_omop first to set a valid root OMOP CDM')
notify_users(
"A valid OMOP CDM must be set before getting domains. "
"Call set_root_omop first to set a valid root OMOP CDM"
)
return None
return self.omop_cdm_db.get_domains_and_vocabularies()

def get_concepts(self, search_term, domain=None, vocabulary=None):
if self.omop_cdm_db is None:
notify_users('A valid OMOP CDM must be set before getting concepts. '
'Call set_root_omop first to set a valid root OMOP CDM')
notify_users(
"A valid OMOP CDM must be set before getting concepts. "
"Call set_root_omop first to set a valid root OMOP CDM"
)
return None
if domain is None and vocabulary is None:
notify_users('either domain or vocabulary must be set to constrain the number of returned concepts')
notify_users("either domain or vocabulary must be set to constrain the number of returned concepts")
return None
return self.omop_cdm_db.get_concepts(search_term, domain, vocabulary)

def get_concept_hierarchy(self, concept_id):
if self.omop_cdm_db is None:
notify_users('A valid OMOP CDM must be set before getting concepts. '
'Call set_root_omop first to set a valid root OMOP CDM')
notify_users(
"A valid OMOP CDM must be set before getting concepts. "
"Call set_root_omop first to set a valid root OMOP CDM"
)
return None
return self.omop_cdm_db.get_concept_hierarchy(concept_id)

Expand All @@ -98,20 +113,21 @@ def display_concept_tree(self, concept_tree: dict, level: int = 0, show_in_text_
Recursively prints the concept hierarchy tree in an indented format for display.
"""
details = concept_tree.get("details", {})
if 'parents' in concept_tree:
tree_type = 'parents'
elif 'children' in concept_tree:
tree_type = 'children'
if "parents" in concept_tree:
tree_type = "parents"
elif "children" in concept_tree:
tree_type = "children"
else:
notify_users('The input concept tree must contain parents or children key as the type of the tree.')
return ''
notify_users("The input concept tree must contain parents or children key as the type of the tree.")
return ""

if show_in_text_format:
if details:
direction_arrow = get_direction_arrow(tree_type)
print(
" " * level + f"{direction_arrow} {details['concept_name']} (ID: {details['concept_id']}, "
f"Code: {details['concept_code']})")
f"Code: {details['concept_code']})"
)

for child in concept_tree.get(tree_type, []):
if child:
Expand All @@ -128,9 +144,9 @@ def display_concept_tree(self, concept_tree: dict, level: int = 0, show_in_text_
display(VBox([Label("Concept Hierarchy"), tree]))
return root_node


def create_cohort(self, cohort_name: str, cohort_desc: str, query_or_yaml_file: str, created_by: str,
delay: float=0):
def create_cohort(
self, cohort_name: str, cohort_desc: str, query_or_yaml_file: str, created_by: str, delay: float = 0
):
"""
API method that allows to create a cohort
:param cohort_name: name of the cohort
Expand All @@ -149,17 +165,15 @@ def create_cohort(self, cohort_name: str, cohort_desc: str, query_or_yaml_file:
if delay > 0:
notify_users(f"[DEBUG] Simulating long-running task with {delay} seconds delay...")
time.sleep(delay)
notify_users('cohort created successfully')
notify_users("cohort created successfully")
return created_cohort
else:
notify_users('failed to create a valid cohort action object')
notify_users("failed to create a valid cohort action object")
return None


def get_cohorts_concept_stats(self, cohorts: List[int],
concept_type: str='condition_occurrence',
filter_count: int=0,
vocab=None):
def get_cohorts_concept_stats(
self, cohorts: List[int], concept_type: str = "condition_occurrence", filter_count: int = 0, vocab=None
):
"""
compute concept statistics such as concept prevalence in a union of multiple cohorts
:param cohorts: list of cohort ids
Expand All @@ -170,26 +184,25 @@ def get_cohorts_concept_stats(self, cohorts: List[int],
:return: ConceptHierarchy object
"""
if not cohorts:
notify_users('The input cohorts list is empty. At least one cohort id must be provided.')
notify_users("The input cohorts list is empty. At least one cohort id must be provided.")
return None
c_action = self._set_cohort_action()
if c_action:
return c_action.get_cohorts_concept_stats(cohorts, concept_type=concept_type, filter_count=filter_count,
vocab=vocab)
return c_action.get_cohorts_concept_stats(
cohorts, concept_type=concept_type, filter_count=filter_count, vocab=vocab
)
else:
notify_users('failed to get concept prevalence stats for the union of cohorts')
notify_users("failed to get concept prevalence stats for the union of cohorts")
return None


def compare_cohorts(self, cohort_id1, cohort_id2):
c_action = self._set_cohort_action()
if c_action:
return c_action.compare_cohorts(cohort_id1, cohort_id2)
else:
notify_users('failed to create a valid cohort action object')
notify_users("failed to create a valid cohort action object")
return None


def cleanup(self):
if self.bias_db:
self.bias_db.close()
Expand Down
3 changes: 3 additions & 0 deletions biasanalyzer/background/threading_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import threading
import traceback


class BackgroundResult:
def __init__(self):
self.value = None
Expand All @@ -12,6 +13,7 @@ def set(self, result, error=None):
self.error = error
self.ready = True


def run_in_background(func, *args, result_holder=None, on_complete=None, **kwargs):
"""
Run a time-consuming function in background
Expand All @@ -22,6 +24,7 @@ def run_in_background(func, *args, result_holder=None, on_complete=None, **kwarg
:param kwargs: any keyword arguments of the function to be passed in as a dict
:return: a background thread
"""

def wrapper():
try:
print("[*] Background task started...", flush=True)
Expand Down
Loading