From cba10f7b991214883e799fe86d70ed32ed4415ed Mon Sep 17 00:00:00 2001 From: mohsenht Date: Tue, 20 Jan 2026 10:16:04 -0500 Subject: [PATCH 01/18] Using Pathfinder package with local sqlite files --- compose.yml | 16 ++ workers/arax/worker.py | 62 +++++--- workers/arax_pathfinder/Dockerfile | 34 +++++ workers/arax_pathfinder/__init__.py | 0 workers/arax_pathfinder/requirements.txt | 2 + workers/arax_pathfinder/worker.py | 178 +++++++++++++++++++++++ 6 files changed, 269 insertions(+), 23 deletions(-) create mode 100644 workers/arax_pathfinder/Dockerfile create mode 100644 workers/arax_pathfinder/__init__.py create mode 100644 workers/arax_pathfinder/requirements.txt create mode 100644 workers/arax_pathfinder/worker.py diff --git a/compose.yml b/compose.yml index 35d2b24..b2fdeae 100644 --- a/compose.yml +++ b/compose.yml @@ -273,6 +273,22 @@ services: volumes: - ./logs:/app/logs - ./.env:/app/.env + arax_pathfinder: + container_name: arax_pathfinder + build: + context: . + dockerfile: workers/arax_pathfinder/Dockerfile + restart: unless-stopped + depends_on: + shepherd_db: + condition: service_healthy + shepherd_broker: + condition: service_healthy + volumes: + - ./logs:/app/logs + - ./.env:/app/.env + - /Users/facadmin/PycharmProjects/shepherd/curie_ngd_v1.0_KG2.10.2.sqlite:/data/curie_ngd.sqlite:ro + - /Users/facadmin/PycharmProjects/shepherd/kg2c_v1.0_KG2.10.2.sqlite:/data/kg2c.sqlite:ro ######### BTE bte: diff --git a/workers/arax/worker.py b/workers/arax/worker.py index f22c686..53510c3 100644 --- a/workers/arax/worker.py +++ b/workers/arax/worker.py @@ -18,33 +18,49 @@ tracer = setup_tracer(STREAM) -async def arax(task, logger: logging.Logger): +def is_pathfinder_query(message): try: - start = time.time() - query_id = task[1]["query_id"] - logger.info(f"Getting message from db for query id {query_id}") - message = await get_message(query_id, logger) - message["submitter"] = "Shepherd" - logger.info(f"Get the message from db {message}") - - headers = {"Content-Type": "application/json"} - response = requests.post(settings.arax_url, json=message, headers=headers) - - logger.info(f"Status Code from ARAX response: {response.status_code}") - result = response.json() - - except Exception as e: - logger.error(f"Error occurred in ARAX entry module: {e}") - result = {"status": "error", "error": str(e)} - - response_id = task[1]["response_id"] + # this can still fail if the input looks like e.g.: + # "query_graph": None + qedges = message.get("message", {}).get("query_graph", {}).get("edges", {}) + except: + qedges = {} + try: + # this can still fail if the input looks like e.g.: + # "query_graph": None + qpaths = message.get("message", {}).get("query_graph", {}).get("paths", {}) + except: + qpaths = {} + if len(qpaths) > 1: + raise Exception("Only a single path is supported", 400) + if (len(qpaths) > 0) and (len(qedges) > 0): + raise Exception("Mixed mode pathfinder queries are not supported", 400) + return len(qpaths) == 1 - await save_message(response_id, result, logger) - workflow = [{"id": "arax"}] +async def arax(task, logger: logging.Logger): + start = time.time() + query_id = task[1]["query_id"] + logger.info(f"Getting message from db for query id {query_id}") + message = await get_message(query_id, logger) + if is_pathfinder_query(message): + workflow = [{"id": "arax.pathfinder"}] + else: + try: + workflow = [{"id": "arax"}] + message["submitter"] = "Shepherd" + logger.info(f"Get the message from db {message}") + headers = {"Content-Type": "application/json"} + response = requests.post(settings.arax_url, json=message, headers=headers) + logger.info(f"Status Code from ARAX response: {response.status_code}") + result = response.json() + except Exception as e: + logger.error(f"Error occurred calling ARAX service: {e}") + result = {"status": "error", "error": str(e)} + response_id = task[1]["response_id"] + await save_message(response_id, result, logger) await wrap_up_task(STREAM, GROUP, task, workflow, logger) - logger.info(f"Finished task {task[0]} in {time.time() - start}") @@ -61,7 +77,7 @@ async def process_task(task, parent_ctx, logger, limiter): async def poll_for_tasks(): async for task, parent_ctx, logger, limiter in get_tasks( - STREAM, GROUP, CONSUMER, TASK_LIMIT + STREAM, GROUP, CONSUMER, TASK_LIMIT ): asyncio.create_task(process_task(task, parent_ctx, logger, limiter)) diff --git a/workers/arax_pathfinder/Dockerfile b/workers/arax_pathfinder/Dockerfile new file mode 100644 index 0000000..890204f --- /dev/null +++ b/workers/arax_pathfinder/Dockerfile @@ -0,0 +1,34 @@ +# Use RENCI python base image +FROM ghcr.io/translatorsri/renci-python-image:3.11.5 + +# Add image info +LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd + +ENV PYTHONHASHSEED=0 + +# set up requirements +WORKDIR /app + +# make sure all is writeable for the nru USER later on +RUN chmod -R 777 . + +# Install requirements +COPY ./shepherd_utils ./shepherd_utils +COPY ./pyproject.toml . +RUN pip install . + +COPY ./workers/arax_pathfinder/requirements.txt . +RUN pip install -r requirements.txt + +# switch to the non-root user (nru). defined in the base image +USER nru + +# Copy in files +COPY ./workers/arax_pathfinder ./ + +# Set up base for command and any variables +# that shouldn't be modified +# ENTRYPOINT ["uvicorn", "shepherd_server.server:APP"] + +# Variables that can be overriden +CMD ["python", "worker.py"] diff --git a/workers/arax_pathfinder/__init__.py b/workers/arax_pathfinder/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/workers/arax_pathfinder/requirements.txt b/workers/arax_pathfinder/requirements.txt new file mode 100644 index 0000000..0e22cce --- /dev/null +++ b/workers/arax_pathfinder/requirements.txt @@ -0,0 +1,2 @@ +catrax-pathfinder==1.0.2 +biolink-helper-pkg==1.0.0 \ No newline at end of file diff --git a/workers/arax_pathfinder/worker.py b/workers/arax_pathfinder/worker.py new file mode 100644 index 0000000..b65a6f0 --- /dev/null +++ b/workers/arax_pathfinder/worker.py @@ -0,0 +1,178 @@ +"""Arax ARA Pathfinder module.""" + +import requests +import asyncio +import json +import logging +import time +import uuid +from pathlib import Path +from pathfinder.Pathfinder import Pathfinder +from biolink_helper_pkg import BiolinkHelper + +from shepherd_utils.config import settings +from shepherd_utils.db import ( + get_message, + save_message, +) +from shepherd_utils.otel import setup_tracer +from shepherd_utils.shared import ( + get_tasks, + wrap_up_task, +) + +# Queue name +STREAM = "arax.pathfinder" +# Consumer group, most likely you don't need to change this. +GROUP = "consumer" +CONSUMER = str(uuid.uuid4())[:8] +TASK_LIMIT = 100 +tracer = setup_tracer(STREAM) + +NUM_TOTAL_HOPS = 4 +MAX_PATHFINDER_PATHS = 500 +BIOLINK_VERSION = "4.2.5" + +RAW_URL = ( + "https://raw.githubusercontent.com/RTXteam/RTX/master/" + "code/ARAX/KnowledgeSources/general_concepts.json" +) +OUT_PATH = Path("general_concepts.json") + +def download_file(url: str, out_path: Path, overwrite: bool = False) -> Path: + out_path = Path(out_path) + + if out_path.exists() and not overwrite: + return out_path + + out_path.parent.mkdir(parents=True, exist_ok=True) + + r = requests.get(url, timeout=60) + r.raise_for_status() + + out_path.write_bytes(r.content) + return out_path + + +def get_blocked_list(): + download_file(RAW_URL, OUT_PATH, False) + + with open(OUT_PATH, 'r') as file: + json_block_list = json.load(file) + synonyms = set(s.lower() for s in json_block_list['synonyms']) + return set(json_block_list['curies']), synonyms + + +async def pathfinder(task, logger: logging.Logger): + start = time.time() + query_id = task[1]["query_id"] + workflow = json.loads(task[1]["workflow"]) + response_id = task[1]["response_id"] + message = await get_message(query_id, logger) + parameters = message.get("parameters") or {} + parameters["timeout"] = parameters.get("timeout", settings.lookup_timeout) + parameters["tiers"] = parameters.get("tiers") or [0] + message["parameters"] = parameters + + qgraph = message["message"]["query_graph"] + pinned_node_keys = [] + pinned_node_ids = [] + for node_key, node in qgraph["nodes"].items(): + pinned_node_keys.append(node_key) + if node.get("ids", None) is not None: + pinned_node_ids.append(node["ids"][0]) + if len(set(pinned_node_ids)) != 2: + logger.error("Pathfinder queries require two pinned nodes.") + return message, 500 + + intermediate_categories = [] + path_key = next(iter(qgraph["paths"].keys())) + qpath = qgraph["paths"][path_key] + if qpath.get("constraints", None) is not None: + constraints = qpath["constraints"] + if len(constraints) > 1: + logger.error("Pathfinder queries do not support multiple constraints.") + return message, 500 + if len(constraints) > 0: + intermediate_categories = ( + constraints[0].get("intermediate_categories", None) or [] + ) + if len(intermediate_categories) > 1: + logger.error( + "Pathfinder queries do not support multiple intermediate categories" + ) + return message, 500 + else: + intermediate_categories = ["biolink:NamedThing"] + + blocked_curies, blocked_synonyms = get_blocked_list() + pathfinder = Pathfinder( + "MLRepo", + "https://kg2cploverdb.test.transltr.io", + "sqlite:/data/curie_ngd.sqlite", + "sqlite:/data/kg2c.sqlite", + blocked_curies, + blocked_synonyms, + logger + ) + + biolink_dir = "/tmp/biolink" + Path(biolink_dir).mkdir(parents=True, exist_ok=True) + biolink_helper = BiolinkHelper(BIOLINK_VERSION, biolink_dir) + descendants = set(biolink_helper.get_descendants(intermediate_categories[0])) + + try: + result, aux_graphs, knowledge_graph = pathfinder.get_paths( + pinned_node_ids[0], + pinned_node_ids[1], + pinned_node_keys[0], + pinned_node_keys[1], + NUM_TOTAL_HOPS, + NUM_TOTAL_HOPS, + MAX_PATHFINDER_PATHS, + descendants, + ) + res = [] + if result is not None: + res.append({ + "id": result["id"], + "analyses": result['analyses'], + "node_bindings": result['node_bindings'], + "essence": "result" + }) + if aux_graphs is None: + aux_graphs = {} + if knowledge_graph is None: + knowledge_graph = {} + message["message"]["knowledge_graph"] = knowledge_graph + message["message"]["auxiliary_graphs"] = aux_graphs + message["message"]["results"] = res + await save_message(response_id, message, logger) + except Exception as e: + logger.error(f"PathFinder failed to find paths between {pinned_node_keys[0]} and {pinned_node_keys[1]}. " + f"Error message is: {e}") + message = {"status": "error", "error": str(e)} + await save_message(response_id, message, logger) + + await wrap_up_task(STREAM, GROUP, task, workflow, logger) + logger.info(f"Task took {time.time() - start}") + + +async def process_task(task, parent_ctx, logger, limiter): + span = tracer.start_span(STREAM, context=parent_ctx) + try: + await pathfinder(task, logger) + finally: + span.end() + limiter.release() + + +async def poll_for_tasks(): + async for task, parent_ctx, logger, limiter in get_tasks( + STREAM, GROUP, CONSUMER, TASK_LIMIT + ): + asyncio.create_task(process_task(task, parent_ctx, logger, limiter)) + + +if __name__ == "__main__": + asyncio.run(poll_for_tasks()) From 5539e3dee524821304216f930e86b2f1b6bb220d Mon Sep 17 00:00:00 2001 From: mohsenht Date: Wed, 21 Jan 2026 11:08:46 -0500 Subject: [PATCH 02/18] Using Pathfinder package with mysql server --- compose.yml | 2 -- workers/arax_pathfinder/worker.py | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/compose.yml b/compose.yml index b2fdeae..759a025 100644 --- a/compose.yml +++ b/compose.yml @@ -287,8 +287,6 @@ services: volumes: - ./logs:/app/logs - ./.env:/app/.env - - /Users/facadmin/PycharmProjects/shepherd/curie_ngd_v1.0_KG2.10.2.sqlite:/data/curie_ngd.sqlite:ro - - /Users/facadmin/PycharmProjects/shepherd/kg2c_v1.0_KG2.10.2.sqlite:/data/kg2c.sqlite:ro ######### BTE bte: diff --git a/workers/arax_pathfinder/worker.py b/workers/arax_pathfinder/worker.py index b65a6f0..e228854 100644 --- a/workers/arax_pathfinder/worker.py +++ b/workers/arax_pathfinder/worker.py @@ -109,8 +109,8 @@ async def pathfinder(task, logger: logging.Logger): pathfinder = Pathfinder( "MLRepo", "https://kg2cploverdb.test.transltr.io", - "sqlite:/data/curie_ngd.sqlite", - "sqlite:/data/kg2c.sqlite", + "mysql:arax-databases-mysql.rtx.ai:public_ro:curie_ngd_v1_0_kg2_10_2", + "mysql:arax-databases-mysql.rtx.ai:public_ro:kg2c_v1_0_kg2_10_2", blocked_curies, blocked_synonyms, logger From cf94d6dd9ac6c9545fad3576999b2a22f4a1d14d Mon Sep 17 00:00:00 2001 From: mohsenht Date: Wed, 21 Jan 2026 11:16:26 -0500 Subject: [PATCH 03/18] Settings for arax pathfinder --- shepherd_utils/config.py | 12 ++++++++++++ workers/arax_pathfinder/worker.py | 21 +++++++++------------ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/shepherd_utils/config.py b/shepherd_utils/config.py index d9a7ade..4529ed5 100644 --- a/shepherd_utils/config.py +++ b/shepherd_utils/config.py @@ -22,7 +22,19 @@ class Settings(BaseSettings): kg_retrieval_url: str = "https://strider.renci.org/asyncquery" sync_kg_retrieval_url: str = "https://strider.renci.org/query" omnicorp_url: str = "https://aragorn-ranker.renci.org/omnicorp_overlay" + + # ARAX configs arax_url: str = "https://arax.ncats.io/shepherd/api/arax/v1.4/query" + plover_url: str = "https://kg2cploverdb.test.transltr.io" + curie_ngd_addr: str = "mysql:arax-databases-mysql.rtx.ai:public_ro:curie_ngd_v1_0_kg2_10_2" + node_degree_addr: str = "mysql:arax-databases-mysql.rtx.ai:public_ro:kg2c_v1_0_kg2_10_2" + arax_biolink_version: str = "4.2.5" + arax_blocked_list_url: str = ( + "https://raw.githubusercontent.com/RTXteam/RTX/master/" + "code/ARAX/KnowledgeSources/general_concepts.json" + ) + # End of ARAX configs + node_norm: str = "https://biothings.ci.transltr.io/nodenorm/api/" pathfinder_redis_host: str = "host.docker.internal" diff --git a/workers/arax_pathfinder/worker.py b/workers/arax_pathfinder/worker.py index e228854..d44e655 100644 --- a/workers/arax_pathfinder/worker.py +++ b/workers/arax_pathfinder/worker.py @@ -31,12 +31,9 @@ NUM_TOTAL_HOPS = 4 MAX_PATHFINDER_PATHS = 500 -BIOLINK_VERSION = "4.2.5" -RAW_URL = ( - "https://raw.githubusercontent.com/RTXteam/RTX/master/" - "code/ARAX/KnowledgeSources/general_concepts.json" -) + + OUT_PATH = Path("general_concepts.json") def download_file(url: str, out_path: Path, overwrite: bool = False) -> Path: @@ -55,7 +52,7 @@ def download_file(url: str, out_path: Path, overwrite: bool = False) -> Path: def get_blocked_list(): - download_file(RAW_URL, OUT_PATH, False) + download_file(settings.arax_blocked_list_url, OUT_PATH, False) with open(OUT_PATH, 'r') as file: json_block_list = json.load(file) @@ -108,17 +105,17 @@ async def pathfinder(task, logger: logging.Logger): blocked_curies, blocked_synonyms = get_blocked_list() pathfinder = Pathfinder( "MLRepo", - "https://kg2cploverdb.test.transltr.io", - "mysql:arax-databases-mysql.rtx.ai:public_ro:curie_ngd_v1_0_kg2_10_2", - "mysql:arax-databases-mysql.rtx.ai:public_ro:kg2c_v1_0_kg2_10_2", + settings.plover_url, + settings.curie_ngd_addr, + settings.node_degree_addr, blocked_curies, blocked_synonyms, logger ) - biolink_dir = "/tmp/biolink" - Path(biolink_dir).mkdir(parents=True, exist_ok=True) - biolink_helper = BiolinkHelper(BIOLINK_VERSION, biolink_dir) + biolink_cache_dir = "/tmp/biolink" + Path(biolink_cache_dir).mkdir(parents=True, exist_ok=True) + biolink_helper = BiolinkHelper(settings.arax_biolink_version, biolink_cache_dir) descendants = set(biolink_helper.get_descendants(intermediate_categories[0])) try: From 4999dfff38fede49b42846bf1d8968b64fdbc627 Mon Sep 17 00:00:00 2001 From: mohsenht Date: Wed, 21 Jan 2026 13:55:49 -0500 Subject: [PATCH 04/18] Black style errors --- shepherd_utils/config.py | 8 +++++-- workers/arax/worker.py | 2 +- workers/arax_pathfinder/worker.py | 35 ++++++++++++++++--------------- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/shepherd_utils/config.py b/shepherd_utils/config.py index 4529ed5..341771e 100644 --- a/shepherd_utils/config.py +++ b/shepherd_utils/config.py @@ -26,8 +26,12 @@ class Settings(BaseSettings): # ARAX configs arax_url: str = "https://arax.ncats.io/shepherd/api/arax/v1.4/query" plover_url: str = "https://kg2cploverdb.test.transltr.io" - curie_ngd_addr: str = "mysql:arax-databases-mysql.rtx.ai:public_ro:curie_ngd_v1_0_kg2_10_2" - node_degree_addr: str = "mysql:arax-databases-mysql.rtx.ai:public_ro:kg2c_v1_0_kg2_10_2" + curie_ngd_addr: str = ( + "mysql:arax-databases-mysql.rtx.ai:public_ro:curie_ngd_v1_0_kg2_10_2" + ) + node_degree_addr: str = ( + "mysql:arax-databases-mysql.rtx.ai:public_ro:kg2c_v1_0_kg2_10_2" + ) arax_biolink_version: str = "4.2.5" arax_blocked_list_url: str = ( "https://raw.githubusercontent.com/RTXteam/RTX/master/" diff --git a/workers/arax/worker.py b/workers/arax/worker.py index 53510c3..d1eacda 100644 --- a/workers/arax/worker.py +++ b/workers/arax/worker.py @@ -77,7 +77,7 @@ async def process_task(task, parent_ctx, logger, limiter): async def poll_for_tasks(): async for task, parent_ctx, logger, limiter in get_tasks( - STREAM, GROUP, CONSUMER, TASK_LIMIT + STREAM, GROUP, CONSUMER, TASK_LIMIT ): asyncio.create_task(process_task(task, parent_ctx, logger, limiter)) diff --git a/workers/arax_pathfinder/worker.py b/workers/arax_pathfinder/worker.py index d44e655..90c4d41 100644 --- a/workers/arax_pathfinder/worker.py +++ b/workers/arax_pathfinder/worker.py @@ -32,10 +32,9 @@ NUM_TOTAL_HOPS = 4 MAX_PATHFINDER_PATHS = 500 - - OUT_PATH = Path("general_concepts.json") + def download_file(url: str, out_path: Path, overwrite: bool = False) -> Path: out_path = Path(out_path) @@ -54,10 +53,10 @@ def download_file(url: str, out_path: Path, overwrite: bool = False) -> Path: def get_blocked_list(): download_file(settings.arax_blocked_list_url, OUT_PATH, False) - with open(OUT_PATH, 'r') as file: + with open(OUT_PATH, "r") as file: json_block_list = json.load(file) - synonyms = set(s.lower() for s in json_block_list['synonyms']) - return set(json_block_list['curies']), synonyms + synonyms = set(s.lower() for s in json_block_list["synonyms"]) + return set(json_block_list["curies"]), synonyms async def pathfinder(task, logger: logging.Logger): @@ -91,9 +90,7 @@ async def pathfinder(task, logger: logging.Logger): logger.error("Pathfinder queries do not support multiple constraints.") return message, 500 if len(constraints) > 0: - intermediate_categories = ( - constraints[0].get("intermediate_categories", None) or [] - ) + intermediate_categories = (constraints[0].get("intermediate_categories", None) or []) if len(intermediate_categories) > 1: logger.error( "Pathfinder queries do not support multiple intermediate categories" @@ -110,7 +107,7 @@ async def pathfinder(task, logger: logging.Logger): settings.node_degree_addr, blocked_curies, blocked_synonyms, - logger + logger, ) biolink_cache_dir = "/tmp/biolink" @@ -131,12 +128,14 @@ async def pathfinder(task, logger: logging.Logger): ) res = [] if result is not None: - res.append({ - "id": result["id"], - "analyses": result['analyses'], - "node_bindings": result['node_bindings'], - "essence": "result" - }) + res.append( + { + "id": result["id"], + "analyses": result['analyses'], + "node_bindings": result['node_bindings'], + "essence": "result" + } + ) if aux_graphs is None: aux_graphs = {} if knowledge_graph is None: @@ -146,8 +145,10 @@ async def pathfinder(task, logger: logging.Logger): message["message"]["results"] = res await save_message(response_id, message, logger) except Exception as e: - logger.error(f"PathFinder failed to find paths between {pinned_node_keys[0]} and {pinned_node_keys[1]}. " - f"Error message is: {e}") + logger.error( + f"PathFinder failed to find paths between {pinned_node_keys[0]} and {pinned_node_keys[1]}. " + f"Error message is: {e}" + ) message = {"status": "error", "error": str(e)} await save_message(response_id, message, logger) From 767e51b2896bfddd5dc4c5425edd82994c619209 Mon Sep 17 00:00:00 2001 From: mohsenht Date: Wed, 21 Jan 2026 13:59:22 -0500 Subject: [PATCH 05/18] Black style errors --- shepherd_server/main.py | 1 - workers/arax_pathfinder/worker.py | 12 +++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/shepherd_server/main.py b/shepherd_server/main.py index 6e4aac3..7b39542 100644 --- a/shepherd_server/main.py +++ b/shepherd_server/main.py @@ -1,6 +1,5 @@ import uvicorn - if __name__ == "__main__": uvicorn.run( "shepherd_server.server:APP", diff --git a/workers/arax_pathfinder/worker.py b/workers/arax_pathfinder/worker.py index 90c4d41..997e324 100644 --- a/workers/arax_pathfinder/worker.py +++ b/workers/arax_pathfinder/worker.py @@ -90,7 +90,9 @@ async def pathfinder(task, logger: logging.Logger): logger.error("Pathfinder queries do not support multiple constraints.") return message, 500 if len(constraints) > 0: - intermediate_categories = (constraints[0].get("intermediate_categories", None) or []) + intermediate_categories = ( + constraints[0].get("intermediate_categories", None) or [] + ) if len(intermediate_categories) > 1: logger.error( "Pathfinder queries do not support multiple intermediate categories" @@ -131,9 +133,9 @@ async def pathfinder(task, logger: logging.Logger): res.append( { "id": result["id"], - "analyses": result['analyses'], - "node_bindings": result['node_bindings'], - "essence": "result" + "analyses": result["analyses"], + "node_bindings": result["node_bindings"], + "essence": "result", } ) if aux_graphs is None: @@ -167,7 +169,7 @@ async def process_task(task, parent_ctx, logger, limiter): async def poll_for_tasks(): async for task, parent_ctx, logger, limiter in get_tasks( - STREAM, GROUP, CONSUMER, TASK_LIMIT + STREAM, GROUP, CONSUMER, TASK_LIMIT ): asyncio.create_task(process_task(task, parent_ctx, logger, limiter)) From 540f220c95eaf80410498be3993e040ddf751a27 Mon Sep 17 00:00:00 2001 From: mohsenht Date: Fri, 23 Jan 2026 11:33:17 -0500 Subject: [PATCH 06/18] New pathfinder package release update. --- workers/arax_pathfinder/requirements.txt | 2 +- workers/arax_pathfinder/worker.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/workers/arax_pathfinder/requirements.txt b/workers/arax_pathfinder/requirements.txt index 0e22cce..e39dad0 100644 --- a/workers/arax_pathfinder/requirements.txt +++ b/workers/arax_pathfinder/requirements.txt @@ -1,2 +1,2 @@ -catrax-pathfinder==1.0.2 +catrax-pathfinder==1.1.1 biolink-helper-pkg==1.0.0 \ No newline at end of file diff --git a/workers/arax_pathfinder/worker.py b/workers/arax_pathfinder/worker.py index 997e324..e64920a 100644 --- a/workers/arax_pathfinder/worker.py +++ b/workers/arax_pathfinder/worker.py @@ -30,7 +30,10 @@ tracer = setup_tracer(STREAM) NUM_TOTAL_HOPS = 4 +MAX_HOPS_TO_EXPLORE = 6 MAX_PATHFINDER_PATHS = 500 +PRUNE_TOP_K = 30 +NODE_DEGREE_THRESHOLD = 30000 OUT_PATH = Path("general_concepts.json") @@ -124,8 +127,10 @@ async def pathfinder(task, logger: logging.Logger): pinned_node_keys[0], pinned_node_keys[1], NUM_TOTAL_HOPS, - NUM_TOTAL_HOPS, + MAX_HOPS_TO_EXPLORE, MAX_PATHFINDER_PATHS, + PRUNE_TOP_K, + NODE_DEGREE_THRESHOLD, descendants, ) res = [] From ce1a5e6cf77d885ecf0cb13b50b2398cb4cfe6ac Mon Sep 17 00:00:00 2001 From: mohsenht Date: Wed, 4 Feb 2026 13:52:39 -0500 Subject: [PATCH 07/18] Temporary faster pathfinder by decreasing parameters --- workers/arax_pathfinder/worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/workers/arax_pathfinder/worker.py b/workers/arax_pathfinder/worker.py index eb8cd68..5226e2e 100644 --- a/workers/arax_pathfinder/worker.py +++ b/workers/arax_pathfinder/worker.py @@ -30,10 +30,10 @@ TASK_LIMIT = 100 tracer = setup_tracer(STREAM) -NUM_TOTAL_HOPS = 4 -MAX_HOPS_TO_EXPLORE = 6 +NUM_TOTAL_HOPS = 3 +MAX_HOPS_TO_EXPLORE = 3 MAX_PATHFINDER_PATHS = 500 -PRUNE_TOP_K = 30 +PRUNE_TOP_K = 50 NODE_DEGREE_THRESHOLD = 30000 OUT_PATH = Path("general_concepts.json") From 8ec57a9f3872c544bed51b9b3ab8f548ae9acd63 Mon Sep 17 00:00:00 2001 From: mohsenht Date: Wed, 11 Feb 2026 14:09:06 -0500 Subject: [PATCH 08/18] Arax Pathfinder tested with 4 hops --- shepherd_utils/config.py | 2 +- workers/arax_pathfinder/worker.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/shepherd_utils/config.py b/shepherd_utils/config.py index 845c99f..d98555c 100644 --- a/shepherd_utils/config.py +++ b/shepherd_utils/config.py @@ -26,7 +26,7 @@ class Settings(BaseSettings): # ARAX configs arax_url: str = "https://arax.ncats.io/shepherd/api/arax/v1.4/query" - plover_url: str = "https://kg2cploverdb.test.transltr.io" + plover_url: str = "https://kg2cplover3.rtx.ai:9990" curie_ngd_addr: str = ( "mysql:arax-databases-mysql.rtx.ai:public_ro:curie_ngd_v1_0_kg2_10_2" ) diff --git a/workers/arax_pathfinder/worker.py b/workers/arax_pathfinder/worker.py index 5226e2e..2a16933 100644 --- a/workers/arax_pathfinder/worker.py +++ b/workers/arax_pathfinder/worker.py @@ -30,8 +30,8 @@ TASK_LIMIT = 100 tracer = setup_tracer(STREAM) -NUM_TOTAL_HOPS = 3 -MAX_HOPS_TO_EXPLORE = 3 +NUM_TOTAL_HOPS = 4 +MAX_HOPS_TO_EXPLORE = 4 MAX_PATHFINDER_PATHS = 500 PRUNE_TOP_K = 50 NODE_DEGREE_THRESHOLD = 30000 @@ -88,7 +88,7 @@ async def pathfinder(task, logger: logging.Logger): intermediate_categories = [] path_key = next(iter(qgraph["paths"].keys())) qpath = qgraph["paths"][path_key] - if qpath.get("constraints", None) is not None: + if qpath.get("constraints", None) is not None and len(qpath.get("constraints", [])) > 0: constraints = qpath["constraints"] if len(constraints) > 1: logger.error("Pathfinder queries do not support multiple constraints.") @@ -122,6 +122,8 @@ async def pathfinder(task, logger: logging.Logger): descendants = set(biolink_helper.get_descendants(intermediate_categories[0])) try: + start = time.perf_counter() + logger.info("Starting pathfinder.get_paths()") result, aux_graphs, knowledge_graph = pathfinder.get_paths( pinned_node_ids[0], pinned_node_ids[1], @@ -134,6 +136,8 @@ async def pathfinder(task, logger: logging.Logger): NODE_DEGREE_THRESHOLD, descendants, ) + elapsed = time.perf_counter() - start + logger.info(f"pathfinder.get_paths() finished in {elapsed:.3f} seconds") res = [] if result is not None: res.append( From 82852c474654fb4ef53d63844c8716234bdbbfdb Mon Sep 17 00:00:00 2001 From: mohsenht Date: Wed, 18 Feb 2026 11:48:34 -0500 Subject: [PATCH 09/18] Async Arax Pathfinder --- workers/arax_pathfinder/worker.py | 79 +++++++++++++++++++------------ 1 file changed, 48 insertions(+), 31 deletions(-) diff --git a/workers/arax_pathfinder/worker.py b/workers/arax_pathfinder/worker.py index 2a16933..595fa37 100644 --- a/workers/arax_pathfinder/worker.py +++ b/workers/arax_pathfinder/worker.py @@ -63,6 +63,47 @@ def get_blocked_list(): return set(json_block_list["curies"]), synonyms +def execute_pathfinding_sync(pinned_node_ids, pinned_node_keys, intermediate_categories, logger): + + blocked_curies, blocked_synonyms = get_blocked_list() + + pathfinder_instance = Pathfinder( + "MLRepo", + settings.plover_url, + settings.curie_ngd_addr, + settings.node_degree_addr, + blocked_curies, + blocked_synonyms, + logger, + ) + + biolink_cache_dir = "/tmp/biolink" + Path(biolink_cache_dir).mkdir(parents=True, exist_ok=True) + biolink_helper = BiolinkHelper(settings.arax_biolink_version, biolink_cache_dir) + descendants = set(biolink_helper.get_descendants(intermediate_categories[0])) + + start = time.perf_counter() + logger.info("Starting pathfinder.get_paths() in worker thread") + + result, aux_graphs, knowledge_graph = pathfinder_instance.get_paths( + pinned_node_ids[0], + pinned_node_ids[1], + pinned_node_keys[0], + pinned_node_keys[1], + NUM_TOTAL_HOPS, + MAX_HOPS_TO_EXPLORE, + MAX_PATHFINDER_PATHS, + PRUNE_TOP_K, + NODE_DEGREE_THRESHOLD, + descendants, + ) + + elapsed = time.perf_counter() - start + logger.info(f"pathfinder.get_paths() finished in {elapsed:.3f} seconds") + + return result, aux_graphs, knowledge_graph + + async def pathfinder(task, logger: logging.Logger): start = time.time() query_id = task[1]["query_id"] @@ -105,39 +146,15 @@ async def pathfinder(task, logger: logging.Logger): else: intermediate_categories = ["biolink:NamedThing"] - blocked_curies, blocked_synonyms = get_blocked_list() - pathfinder = Pathfinder( - "MLRepo", - settings.plover_url, - settings.curie_ngd_addr, - settings.node_degree_addr, - blocked_curies, - blocked_synonyms, - logger, - ) - - biolink_cache_dir = "/tmp/biolink" - Path(biolink_cache_dir).mkdir(parents=True, exist_ok=True) - biolink_helper = BiolinkHelper(settings.arax_biolink_version, biolink_cache_dir) - descendants = set(biolink_helper.get_descendants(intermediate_categories[0])) - try: - start = time.perf_counter() - logger.info("Starting pathfinder.get_paths()") - result, aux_graphs, knowledge_graph = pathfinder.get_paths( - pinned_node_ids[0], - pinned_node_ids[1], - pinned_node_keys[0], - pinned_node_keys[1], - NUM_TOTAL_HOPS, - MAX_HOPS_TO_EXPLORE, - MAX_PATHFINDER_PATHS, - PRUNE_TOP_K, - NODE_DEGREE_THRESHOLD, - descendants, + result, aux_graphs, knowledge_graph = await asyncio.to_thread( + execute_pathfinding_sync, + pinned_node_ids, + pinned_node_keys, + intermediate_categories, + logger ) - elapsed = time.perf_counter() - start - logger.info(f"pathfinder.get_paths() finished in {elapsed:.3f} seconds") + res = [] if result is not None: res.append( From 6b537e51e4c6728598fd4324c61c0b816a4fcb48 Mon Sep 17 00:00:00 2001 From: mohsenht Date: Mon, 2 Mar 2026 22:47:23 -0500 Subject: [PATCH 10/18] Pathfinder package updated --- workers/arax_pathfinder/requirements.txt | 2 +- workers/arax_pathfinder/worker.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/workers/arax_pathfinder/requirements.txt b/workers/arax_pathfinder/requirements.txt index e39dad0..bcb1c83 100644 --- a/workers/arax_pathfinder/requirements.txt +++ b/workers/arax_pathfinder/requirements.txt @@ -1,2 +1,2 @@ -catrax-pathfinder==1.1.1 +catrax-pathfinder==1.2.1 biolink-helper-pkg==1.0.0 \ No newline at end of file diff --git a/workers/arax_pathfinder/worker.py b/workers/arax_pathfinder/worker.py index 595fa37..c98ae82 100644 --- a/workers/arax_pathfinder/worker.py +++ b/workers/arax_pathfinder/worker.py @@ -33,8 +33,8 @@ NUM_TOTAL_HOPS = 4 MAX_HOPS_TO_EXPLORE = 4 MAX_PATHFINDER_PATHS = 500 -PRUNE_TOP_K = 50 -NODE_DEGREE_THRESHOLD = 30000 +PRUNE_TOP_K = 200 +NODE_DEGREE_THRESHOLD = 1000000 OUT_PATH = Path("general_concepts.json") From 991930674dec87606c18d71ec75e3eb9a4116f96 Mon Sep 17 00:00:00 2001 From: mohsenht Date: Wed, 4 Mar 2026 09:41:54 -0500 Subject: [PATCH 11/18] Pathfinder package updated --- workers/arax_pathfinder/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/arax_pathfinder/requirements.txt b/workers/arax_pathfinder/requirements.txt index bcb1c83..5f5123c 100644 --- a/workers/arax_pathfinder/requirements.txt +++ b/workers/arax_pathfinder/requirements.txt @@ -1,2 +1,2 @@ -catrax-pathfinder==1.2.1 +catrax-pathfinder==1.2.2 biolink-helper-pkg==1.0.0 \ No newline at end of file From 4771f9061964c437f9e30e9763669d603dfe5533 Mon Sep 17 00:00:00 2001 From: mohsenht Date: Tue, 10 Mar 2026 20:47:08 -0400 Subject: [PATCH 12/18] resolved conflicts --- workers/arax/worker.py | 61 +++++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/workers/arax/worker.py b/workers/arax/worker.py index 1cefad4..eebfb77 100644 --- a/workers/arax/worker.py +++ b/workers/arax/worker.py @@ -1,7 +1,6 @@ """ARAX entry module.""" import asyncio -import json import logging import requests import time @@ -20,30 +19,50 @@ tracer = setup_tracer(STREAM) -async def arax(task, logger: logging.Logger): +def is_pathfinder_query(message): try: - query_id = task[1]["query_id"] - logger.info(f"Getting message from db for query id {query_id}") - message = await get_message(query_id, logger) - message["submitter"] = "Shepherd" - logger.info(f"Get the message from db {message}") - - headers = {"Content-Type": "application/json"} - response = requests.post(settings.arax_url, json=message, headers=headers) - - logger.info(f"Status Code from ARAX response: {response.status_code}") - result = response.json() - result = add_shepherd_arax_to_edge_sources(result) - - except Exception as e: - logger.error(f"Error occurred in ARAX entry module: {e}") - result = {"status": "error", "error": str(e)} + # this can still fail if the input looks like e.g.: + # "query_graph": None + qedges = message.get("message", {}).get("query_graph", {}).get("edges", {}) + except: + qedges = {} + try: + # this can still fail if the input looks like e.g.: + # "query_graph": None + qpaths = message.get("message", {}).get("query_graph", {}).get("paths", {}) + except: + qpaths = {} + if len(qpaths) > 1: + raise Exception("Only a single path is supported", 400) + if (len(qpaths) > 0) and (len(qedges) > 0): + raise Exception("Mixed mode pathfinder queries are not supported", 400) + return len(qpaths) == 1 - response_id = task[1]["response_id"] - await save_message(response_id, result, logger) +async def arax(task, logger: logging.Logger): + start = time.time() + query_id = task[1]["query_id"] + logger.info(f"Getting message from db for query id {query_id}") + message = await get_message(query_id, logger) + if is_pathfinder_query(message): + workflow = [{"id": "arax.pathfinder"}] + else: + try: + workflow = [{"id": "arax"}] + message["submitter"] = "Shepherd" + logger.info(f"Get the message from db {message}") + headers = {"Content-Type": "application/json"} + response = requests.post(settings.arax_url, json=message, headers=headers) + logger.info(f"Status Code from ARAX response: {response.status_code}") + result = response.json() + result = add_shepherd_arax_to_edge_sources(result) + except Exception as e: + logger.error(f"Error occurred calling ARAX service: {e}") + result = {"status": "error", "error": str(e)} + response_id = task[1]["response_id"] + await save_message(response_id, result, logger) - task[1]["workflow"] = json.dumps([{"id": "arax"}]) + await wrap_up_task(STREAM, GROUP, task, workflow, logger) logger.info(f"Finished task {task[0]} in {time.time() - start}") From 107614b1ecad9eb43c6d55b461d90a8510a90ef8 Mon Sep 17 00:00:00 2001 From: mohsenht Date: Tue, 10 Mar 2026 20:55:02 -0400 Subject: [PATCH 13/18] resolved conflicts --- workers/arax/worker.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/workers/arax/worker.py b/workers/arax/worker.py index eebfb77..27b3f0b 100644 --- a/workers/arax/worker.py +++ b/workers/arax/worker.py @@ -1,6 +1,7 @@ """ARAX entry module.""" import asyncio +import json import logging import requests import time @@ -46,6 +47,7 @@ async def arax(task, logger: logging.Logger): message = await get_message(query_id, logger) if is_pathfinder_query(message): workflow = [{"id": "arax.pathfinder"}] + await wrap_up_task(STREAM, GROUP, task, workflow, logger) else: try: workflow = [{"id": "arax"}] @@ -61,8 +63,9 @@ async def arax(task, logger: logging.Logger): result = {"status": "error", "error": str(e)} response_id = task[1]["response_id"] await save_message(response_id, result, logger) + task[1]["workflow"] = json.dumps([{"id": "arax"}]) + - await wrap_up_task(STREAM, GROUP, task, workflow, logger) logger.info(f"Finished task {task[0]} in {time.time() - start}") From 268d2267e8c7362ead92196ce1eafeaeeb692f22 Mon Sep 17 00:00:00 2001 From: mohsenht Date: Tue, 10 Mar 2026 20:56:37 -0400 Subject: [PATCH 14/18] resolved conflicts --- workers/arax/worker.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/workers/arax/worker.py b/workers/arax/worker.py index 27b3f0b..ec356e6 100644 --- a/workers/arax/worker.py +++ b/workers/arax/worker.py @@ -50,7 +50,6 @@ async def arax(task, logger: logging.Logger): await wrap_up_task(STREAM, GROUP, task, workflow, logger) else: try: - workflow = [{"id": "arax"}] message["submitter"] = "Shepherd" logger.info(f"Get the message from db {message}") headers = {"Content-Type": "application/json"} @@ -65,7 +64,6 @@ async def arax(task, logger: logging.Logger): await save_message(response_id, result, logger) task[1]["workflow"] = json.dumps([{"id": "arax"}]) - logger.info(f"Finished task {task[0]} in {time.time() - start}") From c6b79c382f6eb96cb00cae8228e13e2e93ccdab7 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Wed, 11 Mar 2026 12:24:07 -0400 Subject: [PATCH 15/18] Update to latest main code --- workers/arax/worker.py | 13 ++++++---- workers/arax_pathfinder/worker.py | 43 ++++++++++++++++++++++++------- 2 files changed, 41 insertions(+), 15 deletions(-) diff --git a/workers/arax/worker.py b/workers/arax/worker.py index ec356e6..1ff858d 100644 --- a/workers/arax/worker.py +++ b/workers/arax/worker.py @@ -3,14 +3,18 @@ import asyncio import json import logging -import requests import time import uuid + +import requests + from shepherd_utils.config import settings from shepherd_utils.db import get_message, save_message -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.inject_shepherd_arax_provenance import ( + add_shepherd_arax_to_edge_sources, +) from shepherd_utils.otel import setup_tracer -from inject_shepherd_arax_provenance import add_shepherd_arax_to_edge_sources +from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task # Queue name STREAM = "arax" @@ -46,8 +50,7 @@ async def arax(task, logger: logging.Logger): logger.info(f"Getting message from db for query id {query_id}") message = await get_message(query_id, logger) if is_pathfinder_query(message): - workflow = [{"id": "arax.pathfinder"}] - await wrap_up_task(STREAM, GROUP, task, workflow, logger) + task[1]["workflow"] = json.dumps([{"id": "arax.pathfinder"}]) else: try: message["submitter"] = "Shepherd" diff --git a/workers/arax_pathfinder/worker.py b/workers/arax_pathfinder/worker.py index c98ae82..a40b5c2 100644 --- a/workers/arax_pathfinder/worker.py +++ b/workers/arax_pathfinder/worker.py @@ -1,24 +1,28 @@ """Arax ARA Pathfinder module.""" -import requests import asyncio import json import logging import time import uuid from pathlib import Path -from pathfinder.Pathfinder import Pathfinder + +import requests from biolink_helper_pkg import BiolinkHelper +from pathfinder.Pathfinder import Pathfinder -from shepherd_utils.inject_shepherd_arax_provenance import add_shepherd_arax_to_edge_sources from shepherd_utils.config import settings from shepherd_utils.db import ( get_message, save_message, ) +from shepherd_utils.inject_shepherd_arax_provenance import ( + add_shepherd_arax_to_edge_sources, +) from shepherd_utils.otel import setup_tracer from shepherd_utils.shared import ( get_tasks, + handle_task_failure, wrap_up_task, ) @@ -107,7 +111,6 @@ def execute_pathfinding_sync(pinned_node_ids, pinned_node_keys, intermediate_cat async def pathfinder(task, logger: logging.Logger): start = time.time() query_id = task[1]["query_id"] - workflow = json.loads(task[1]["workflow"]) response_id = task[1]["response_id"] message = await get_message(query_id, logger) parameters = message.get("parameters") or {} @@ -184,24 +187,44 @@ async def pathfinder(task, logger: logging.Logger): message = {"status": "error", "error": str(e)} await save_message(response_id, message, logger) - await wrap_up_task(STREAM, GROUP, task, workflow, logger) logger.info(f"Task took {time.time() - start}") -async def process_task(task, parent_ctx, logger, limiter): +async def process_task(task, parent_ctx, logger: logging.Logger, limiter): + """Process a given task and ACK in redis.""" + start = time.time() span = tracer.start_span(STREAM, context=parent_ctx) try: await pathfinder(task, logger) + # Always wrap up the task to ACK it in the broker + try: + await wrap_up_task(STREAM, GROUP, task, logger) + except Exception as e: + logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") + except asyncio.CancelledError: + logger.warning(f"Task {task[0]} was cancelled") + except Exception as e: + logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) + await handle_task_failure(STREAM, GROUP, task, logger) finally: span.end() limiter.release() + logger.info(f"Finished task {task[0]} in {time.time() - start}") async def poll_for_tasks(): - async for task, parent_ctx, logger, limiter in get_tasks( - STREAM, GROUP, CONSUMER, TASK_LIMIT - ): - asyncio.create_task(process_task(task, parent_ctx, logger, limiter)) + """On initialization, poll indefinitely for available tasks.""" + while True: + try: + async for task, parent_ctx, logger, limiter in get_tasks( + STREAM, GROUP, CONSUMER, TASK_LIMIT + ): + asyncio.create_task(process_task(task, parent_ctx, logger, limiter)) + except asyncio.CancelledError: + logging.info("Poll loop cancelled, shutting down.") + except Exception as e: + logging.error(f"Error in task polling loop: {e}", exc_info=True) + await asyncio.sleep(5) # back off before retrying if __name__ == "__main__": From f98c6e9730cf41aaebe56b2e8a1dce4c6b1e8b49 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Wed, 11 Mar 2026 12:25:07 -0400 Subject: [PATCH 16/18] Run black --- workers/arax_pathfinder/worker.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/workers/arax_pathfinder/worker.py b/workers/arax_pathfinder/worker.py index a40b5c2..fe6867a 100644 --- a/workers/arax_pathfinder/worker.py +++ b/workers/arax_pathfinder/worker.py @@ -67,7 +67,9 @@ def get_blocked_list(): return set(json_block_list["curies"]), synonyms -def execute_pathfinding_sync(pinned_node_ids, pinned_node_keys, intermediate_categories, logger): +def execute_pathfinding_sync( + pinned_node_ids, pinned_node_keys, intermediate_categories, logger +): blocked_curies, blocked_synonyms = get_blocked_list() @@ -132,7 +134,10 @@ async def pathfinder(task, logger: logging.Logger): intermediate_categories = [] path_key = next(iter(qgraph["paths"].keys())) qpath = qgraph["paths"][path_key] - if qpath.get("constraints", None) is not None and len(qpath.get("constraints", [])) > 0: + if ( + qpath.get("constraints", None) is not None + and len(qpath.get("constraints", [])) > 0 + ): constraints = qpath["constraints"] if len(constraints) > 1: logger.error("Pathfinder queries do not support multiple constraints.") @@ -155,7 +160,7 @@ async def pathfinder(task, logger: logging.Logger): pinned_node_ids, pinned_node_keys, intermediate_categories, - logger + logger, ) res = [] From 298aee577efbefe50153c49d81915d8634b4837e Mon Sep 17 00:00:00 2001 From: mohsenht Date: Wed, 11 Mar 2026 13:24:11 -0400 Subject: [PATCH 17/18] PloverDB url updated to point to CI --- shepherd_utils/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shepherd_utils/config.py b/shepherd_utils/config.py index d98555c..1178cf5 100644 --- a/shepherd_utils/config.py +++ b/shepherd_utils/config.py @@ -26,7 +26,7 @@ class Settings(BaseSettings): # ARAX configs arax_url: str = "https://arax.ncats.io/shepherd/api/arax/v1.4/query" - plover_url: str = "https://kg2cplover3.rtx.ai:9990" + plover_url: str = "https://kg2cploverdb.ci.transltr.io" curie_ngd_addr: str = ( "mysql:arax-databases-mysql.rtx.ai:public_ro:curie_ngd_v1_0_kg2_10_2" ) From 70758cbd6e0643009d1f1ba88840d645d7fd17c7 Mon Sep 17 00:00:00 2001 From: mohsenht Date: Wed, 11 Mar 2026 14:02:52 -0400 Subject: [PATCH 18/18] PRUNE more --- workers/arax_pathfinder/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/arax_pathfinder/worker.py b/workers/arax_pathfinder/worker.py index fe6867a..2c1014d 100644 --- a/workers/arax_pathfinder/worker.py +++ b/workers/arax_pathfinder/worker.py @@ -37,7 +37,7 @@ NUM_TOTAL_HOPS = 4 MAX_HOPS_TO_EXPLORE = 4 MAX_PATHFINDER_PATHS = 500 -PRUNE_TOP_K = 200 +PRUNE_TOP_K = 100 NODE_DEGREE_THRESHOLD = 1000000 OUT_PATH = Path("general_concepts.json")