From d003e77f8e2a1130dd9b5f0140e90491de72eb3d Mon Sep 17 00:00:00 2001 From: hyi Date: Mon, 12 Jan 2026 22:14:51 -0500 Subject: [PATCH 1/9] refactored the code to enable adding memgraph graph backend --- .env-template | 8 +- .gitignore | 4 +- PLATER/services/app_trapi.py | 2 +- PLATER/services/util/api_utils.py | 12 +- .../base.py} | 293 ++++-------------- .../util/graph_backends/memgraph_adapter.py | 0 .../util/graph_backends/neo4j_adapter.py | 222 +++++++++++++ PLATER/services/util/overlay.py | 2 +- PLATER/services/util/question.py | 2 +- PLATER/tests/test_endpoint_factory.py | 2 +- PLATER/tests/test_graph_adapter.py | 6 +- README.md | 6 +- 12 files changed, 298 insertions(+), 261 deletions(-) rename PLATER/services/util/{graph_adapter.py => graph_backends/base.py} (61%) create mode 100644 PLATER/services/util/graph_backends/memgraph_adapter.py create mode 100644 PLATER/services/util/graph_backends/neo4j_adapter.py diff --git a/.env-template b/.env-template index 292a7bd..0b2e5b9 100644 --- a/.env-template +++ b/.env-template @@ -1,9 +1,13 @@ WEB_HOST=0.0.0.0 WEB_PORT=8080 +GRAPH_DB=neo4j NEO4J_HOST=localhost +NEO4J_BOLT_PORT=7687 NEO4J_USERNAME=neo4j -NEO4J_QUERY_TIMEOUT=600 -NEO4J_HTTP_PORT=7474 +MEMGRAPH_HOST=localhost +MEMGRAPH_BOLT_PORT=7687 +MEMGRAPH_USERNAME=memgraph +GRAPH_QUERY_TIMEOUT=600 PLATER_SERVICE_ADDRESS=localhost PLATER_TITLE=Plater PLATER_VERSION=1.5.1 diff --git a/.gitignore b/.gitignore index 87a2363..31e77b4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ *env __pycache__ .pytest_cache -.log +*.log kubernets *coverage -.idea \ No newline at end of file +.idea diff --git a/PLATER/services/app_trapi.py b/PLATER/services/app_trapi.py index 6be10a1..2dc9071 100644 --- a/PLATER/services/app_trapi.py +++ b/PLATER/services/app_trapi.py @@ -19,7 +19,7 @@ ) from PLATER.services.util.attribute_mapping import ATTRIBUTE_SKIP_LIST, ATTRIBUTE_TYPES from PLATER.services.util.bl_helper import BLHelper, get_bl_helper -from PLATER.services.util.graph_adapter import GraphInterface +from PLATER.services.util.graph_backends.base import GraphInterface from PLATER.services.util.metadata import get_graph_metadata, GraphMetadata from PLATER.services.util.overlay import Overlay from PLATER.services.util.question import Question diff --git a/PLATER/services/util/api_utils.py b/PLATER/services/util/api_utils.py index 8d173cb..41d0e5f 100644 --- a/PLATER/services/util/api_utils.py +++ b/PLATER/services/util/api_utils.py @@ -6,21 +6,23 @@ from fastapi import Response from fastapi.openapi.utils import get_openapi -from PLATER.services.util.graph_adapter import GraphInterface +from PLATER.services.util.graph_backends.base import GraphInterface +from PLATER.services.util.graph_backends.neo4j_adapter import Neo4jBackend from PLATER.services.config import config async def get_graph_interface(): """Get graph interface.""" - graph_interface = GraphInterface( + graph_backend = Neo4jBackend( host=config.get('NEO4J_HOST', 'localhost'), port=config.get('NEO4J_BOLT_PORT', '7687'), auth=( config.get('NEO4J_USERNAME'), config.get('NEO4J_PASSWORD') - ) - ) - await graph_interface.connect_to_neo4j() + )) + graph_interface = GraphInterface(graph_backend) + + await graph_interface.connect() return graph_interface diff --git a/PLATER/services/util/graph_adapter.py b/PLATER/services/util/graph_backends/base.py similarity index 61% rename from PLATER/services/util/graph_adapter.py rename to PLATER/services/util/graph_backends/base.py index 3eb044c..308ea1c 100644 --- a/PLATER/services/util/graph_adapter.py +++ b/PLATER/services/util/graph_backends/base.py @@ -1,13 +1,10 @@ import time -import neo4j -import asyncio - -import neo4j.exceptions +from abc import ABC, abstractmethod +from collections import defaultdict from functools import cache -from neo4j import unit_of_work from opentelemetry import trace -from collections import defaultdict -from reasoner_transpiler.cypher import transform_result, transform_edges_list + +from reasoner_transpiler.cypher import transform_edges_list from PLATER.services.config import config from PLATER.services.util.logutil import LoggingUtil from PLATER.services.util.bl_helper import get_biolink_model_toolkit @@ -16,229 +13,48 @@ config.get('logging_level'), config.get('logging_format')) -NEO4J_QUERY_TIMEOUT = int(config.get('NEO4J_QUERY_TIMEOUT', 1600)) - - -class Neo4jBoltDriver: - - def __init__(self, - host: str, - port: str, - auth: tuple, - database_name: str = 'neo4j'): - self.database_name = database_name - self.database_auth = auth - self.graph_db_uri = f'bolt://{host}:{port}' - self.neo4j_driver = None - self.sync_neo4j_driver = None - self._supports_apoc = None - - async def connect_to_neo4j(self, retries=0): - if not self.neo4j_driver: - self.neo4j_driver = neo4j.AsyncGraphDatabase.driver(self.graph_db_uri, - auth=self.database_auth, - **{'telemetry_disabled': True, - 'max_connection_pool_size': 1000}) - try: - await self.neo4j_driver.verify_connectivity() - except Exception as e: # currently the driver says it raises Exception, not something more specific - await self.neo4j_driver.close() - if retries <= 25: - await asyncio.sleep(8) - logger.error(f'Could not establish connection to neo4j, trying again... retry {retries + 1}') - await self.connect_to_neo4j(retries + 1) - else: - logger.error(f'Could not establish connection to neo4j, error: {e}') - raise e - - @staticmethod - @unit_of_work(timeout=NEO4J_QUERY_TIMEOUT) - async def _async_cypher_tx_function(tx, - cypher, - query_parameters=None, - convert_to_dict=False, - convert_to_trapi=False, - qgraph=None): - if not query_parameters: - query_parameters = {} - - neo4j_result: neo4j.AsyncResult = await tx.run(cypher, parameters=query_parameters) - if convert_to_trapi: - neo4j_record = await neo4j_result.single() - return transform_result(neo4j_record, qgraph) - elif convert_to_dict: - results = [] - async for record in neo4j_result: - results.append({key: value for key, value in record.items()}) - return results - return await convert_bolt_results_to_cypher_result(neo4j_result) +class GraphBackend(ABC): + """ + Abstract graph backend interface to support concrete implementations + such as Neo4jBackend and MemgraphBackend. + """ - @staticmethod - def _sync_cypher_tx_function(tx, - cypher, - query_parameters=None, - convert_to_dict=False): - if not query_parameters: - query_parameters = {} - neo4j_result: neo4j.Result = tx.run(cypher, parameters=query_parameters) - if convert_to_dict: - results = [] - for record in neo4j_result: - results.append({key: value for key, value in record.items()}) - return results - else: - return neo4j_result - - async def run(self, - query, - query_parameters=None, - return_errors=False, - convert_to_dict=False, - convert_to_trapi=False, - qgraph=None): - try: - async with self.neo4j_driver.session(database=self.database_name, - default_access_mode=neo4j.READ_ACCESS) as session: - run_async_result = await session.execute_read(self._async_cypher_tx_function, - query, - query_parameters=query_parameters, - convert_to_dict=convert_to_dict, - convert_to_trapi=convert_to_trapi, - qgraph=qgraph) - except neo4j.exceptions.ServiceUnavailable as e: - logger.error(f'Session could not establish connection to neo4j ({e}).. trying to connect again') - await self.connect_to_neo4j() - return await self.run(query, - query_parameters=query_parameters, - return_errors=return_errors, - convert_to_dict=convert_to_dict, - convert_to_trapi=convert_to_trapi, - qgraph=qgraph) - except neo4j.exceptions.Neo4jError as e: - logger.error(e) - if return_errors: - return {"results": [], - "errors": [{"code": e.code, - "message": e.message}]} - raise e - except neo4j.exceptions.DriverError as e: - logger.error(e) - if return_errors: - return {"results": [], - "errors": [{"message": f'A driver error occurred: {e}'}]} - raise e - return run_async_result - - def run_sync(self, - query, - query_parameters=None, - return_errors=False, - convert_to_dict=False): - if not self.sync_neo4j_driver: - self.sync_neo4j_driver = neo4j.GraphDatabase.driver(self.graph_db_uri, auth=self.database_auth) - try: - with self.sync_neo4j_driver.session(database=self.database_name, default_access_mode=neo4j.READ_ACCESS) as session: - - run_sync_result = session.execute_read(self._sync_cypher_tx_function, - query, - query_parameters=query_parameters, - convert_to_dict=convert_to_dict) - return run_sync_result - - except neo4j.exceptions.Neo4jError as e: - if return_errors: - logger.error(e) - return {"results": [], - "errors": [{"code": e.code, - "message": e.message}]} - raise e - except (neo4j.exceptions.DriverError, neo4j.exceptions.ServiceUnavailable) as e: - if return_errors: - logger.error(e) - return {"results": [], - "errors": [{"code": "", - "message": f'A driver error occurred: {e}'}]} - raise e - finally: - if self.sync_neo4j_driver: - self.sync_neo4j_driver.close() - self.sync_neo4j_driver = None - - def check_apoc_support(self): - apoc_version_query = 'call apoc.version()' - if self._supports_apoc is None: - try: - self.run_sync(apoc_version_query) - self._supports_apoc = True - except neo4j.exceptions.ClientError: - self._supports_apoc = False - return self._supports_apoc + @abstractmethod + async def connect(self): + pass + @abstractmethod async def close(self): - await self.neo4j_driver.close() - - -# this is kind of hacky but in order to return the same pydantic model result for both drivers -# convert the raw bolt cypher response to something that's formatted like the http json response -async def convert_bolt_results_to_cypher_result(result: neo4j.AsyncResult): - cypher_result = { - "results": [ - { - "columns": result.keys(), - "data": [{"row": [values for values in list(data.values())], "meta": []} - for data in await result.data()] - } - ], - "errors": [] - } - return cypher_result + pass + @abstractmethod + async def run(self, *args, **kwargs): + pass -def convert_http_response_to_dict(response: dict) -> list: - """ - Converts a neo4j result to a structured result. - :param response: neo4j http raw result. - :type response: dict - :return: reformatted dict - :rtype: dict - """ - results = response.get('results') - array = [] - if results: - for result in results: - cols = result.get('columns') - if cols: - data_items = result.get('data') - for item in data_items: - new_row = {} - row = item.get('row') - for col_name, col_value in zip(cols, row): - new_row[col_name] = col_value - array.append(new_row) - return array + @abstractmethod + def run_sync(self, *args, **kwargs): + pass + + @abstractmethod + def supports_apoc(self) -> bool: + pass class GraphInterface: """ - Singleton class for interfacing with the graph. + Singleton class for graph interfacing and access via its GraphBackend instance. """ class _GraphInterface: - def __init__(self, host, port, auth, protocol='bolt'): - self.protocol = protocol - if protocol == 'bolt': - self.driver = Neo4jBoltDriver(host=host, port=port, auth=auth) - else: - raise Exception(f'Unsupported graph interface protocol: {protocol}') + def __init__(self, backend: GraphBackend): + self.backend = backend self.schema = None - # used to keep track of derived inverted predicates self.inverted_predicates = defaultdict(lambda: defaultdict(set)) - # self.summary = None self.toolkit = get_biolink_model_toolkit() self.bl_version = config.get('BL_VERSION', '4.2.1') - async def connect_to_neo4j(self): - await self.driver.connect_to_neo4j() + async def connect(self): + await self.backend.connect() @cache def find_biolink_leaves(self, biolink_concepts: frozenset): @@ -265,13 +81,11 @@ def invert_predicate(self, biolink_predicate): element = self.toolkit.get_element(biolink_predicate) if element is None: return None - # If its symmetric if element.symmetric: return biolink_predicate - # if neither symmetric nor an inverse is found if not element.inverse: + # if neither symmetric nor an inverse is found return None - # if an inverse is found return self.toolkit.get_element(element['inverse']).slot_uri def get_schema(self): @@ -288,7 +102,7 @@ def get_schema(self): """ logger.info(f"Starting schema query {query} on graph... this might take a few.") before_time = time.time() - schema_query_results = self.driver.run_sync(query, convert_to_dict=True) + schema_query_results = self.backend.run_sync(query, convert_to_dict=True) after_time = time.time() logger.info(f"Completed schema query ({after_time - before_time} seconds). Preparing initial schema.") # iterate through results (multiple sets of source label, predicate, target label arrays) @@ -302,8 +116,8 @@ def get_schema(self): filter_named_thing = lambda x: list(filter(lambda y: y != 'biolink:NamedThing', x)) source_labels, predicate, target_labels = \ self.find_biolink_leaves(frozenset(filter_named_thing(schema_result['source_labels']))), \ - schema_result['predicate'], \ - self.find_biolink_leaves(frozenset(filter_named_thing(schema_result['target_labels']))) + schema_result['predicate'], \ + self.find_biolink_leaves(frozenset(filter_named_thing(schema_result['target_labels']))) for source_label in source_labels: for target_label in target_labels: self.schema[source_label][target_label].add(predicate) @@ -341,7 +155,7 @@ async def get_mini_schema(self, source_id, target_id): type(x) as predicate RETURN DISTINCT source_label, predicate, target_label """ - response = await self.driver.run(query, convert_to_dict=True) + response = await self.backend.run(query, convert_to_dict=True) return response async def get_node(self, curie: str) -> dict: @@ -353,9 +167,9 @@ async def get_node(self, curie: str) -> dict: :rtype: list """ query = f"MATCH (n:`biolink:NamedThing`{{id: $node_id}}) return n" - response = await self.driver.run(query, convert_to_dict=True, query_parameters={'node_id': curie}) + response = await self.backend.run(query, convert_to_dict=True, query_parameters={'node_id': curie}) if response and 'n' in response[0]: - node_object: neo4j.graph.Node = response[0]['n'] + node_object = response[0]['n'] node_properties = dict(node_object.items()) return { 'id': node_properties.pop('id'), @@ -378,7 +192,7 @@ async def get_single_hop_summary(self, """ query = f'MATCH (n:`biolink:NamedThing`{{id: $node_id}})-[r]-(m) ' \ f'RETURN type(r) as predicate, labels(m) as node_labels, count(r) as edge_count' - response = await self.driver.run(query, convert_to_dict=True, query_parameters={'node_id': curie}) + response = await self.backend.run(query, convert_to_dict=True, query_parameters={'node_id': curie}) summary_edges = defaultdict(dict) for record in response: @@ -439,7 +253,7 @@ async def get_single_hops(self, if limit is not None: query += f' LIMIT {limit}' - response = await self.driver.run(query, convert_to_dict=True, query_parameters={'node_id': curie, + response = await self.backend.run(query, convert_to_dict=True, query_parameters={'node_id': curie, 'predicate': predicate}) if count_only: @@ -492,7 +306,6 @@ async def run_cypher(self, :return: unprocessed neo4j response. :rtype: list """ - # get a reference to the current opentelemetry span otel_span = trace.get_current_span() if not otel_span or not otel_span.is_recording(): otel_span = None @@ -501,7 +314,8 @@ async def run_cypher(self, attributes={ 'cypher_query': cypher }) - cypher_results = await self.driver.run(cypher, + + cypher_results = await self.backend.run(cypher, convert_to_dict=convert_to_dict, convert_to_trapi=convert_to_trapi, qgraph=qgraph, @@ -534,25 +348,25 @@ def get_examples(self, if object_node_type and predicate: query = f"MATCH (subject:`{subject_node_type}`)-[edge:`{predicate}`]->(object:`{object_node_type}`) " \ f"{qualifiers_check} return subject, edge, object limit {num_examples}" - response = self.driver.run_sync(query, convert_to_dict=True) + response = self.backend.run_sync(query, convert_to_dict=True) return response elif object_node_type: query = f"MATCH (subject:`{subject_node_type}`)-[edge]->(object:`{object_node_type}`) " \ f"{qualifiers_check} return subject, edge, object limit {num_examples}" - response = self.driver.run_sync(query, convert_to_dict=True) + response = self.backend.run_sync(query, convert_to_dict=True) return response else: query = f"MATCH (subject:`{subject_node_type}`) " \ f"return subject limit {num_examples}" - response = self.driver.run_sync(query, convert_to_dict=True) + response = self.backend.run_sync(query, convert_to_dict=True) return response - def supports_apoc(self): + def supports_apoc(self) -> bool: """ Returns true if apoc is supported by backend database. :return: bool true if neo4j supports apoc. """ - return self.driver.check_apoc_support() + return self.backend.supports_apoc() async def run_apoc_cover(self, ids: list): """ @@ -569,7 +383,7 @@ async def run_apoc_cover(self, ids: list): WITH [elementId(rel), startNode(rel).id, type(rel), endNode(rel).id, properties(rel)] as row return collect(row) as apoc_cover_edges """ - result = await self.driver.run(query, convert_to_dict=True) + result = await self.backend.run(query, convert_to_dict=True) result_edges = result[0]['apoc_cover_edges'] # utilize the transpiler function to transform the list of edges into a map TRAPI edges # apoc_cover_kg_edges is a dict like {edge_id: trapi_edge} @@ -599,25 +413,22 @@ async def get_nodes(self, node_ids, core_attributes, attr_types, **kwargs): }} ]) as result """ - return await self.driver.run(query, **kwargs) + return await self.backend.run(query, **kwargs) async def close(self): - await self.driver.close() + await self.backend.close() instance = None - def __init__(self, host, port, auth, protocol='bolt'): + def __init__(self, backend: GraphBackend): # create a new instance if not already created. if not GraphInterface.instance: - GraphInterface.instance = GraphInterface._GraphInterface(host=host, - port=port, - auth=auth, - protocol=protocol) + GraphInterface.instance = GraphInterface._GraphInterface(backend) def __getattr__(self, item): # proxy function calls to the inner object. return getattr(self.instance, item) @staticmethod - async def connect_to_neo4j(): - await GraphInterface.instance.connect_to_neo4j() + async def connect(): + await GraphInterface.instance.connect() diff --git a/PLATER/services/util/graph_backends/memgraph_adapter.py b/PLATER/services/util/graph_backends/memgraph_adapter.py new file mode 100644 index 0000000..e69de29 diff --git a/PLATER/services/util/graph_backends/neo4j_adapter.py b/PLATER/services/util/graph_backends/neo4j_adapter.py new file mode 100644 index 0000000..1864094 --- /dev/null +++ b/PLATER/services/util/graph_backends/neo4j_adapter.py @@ -0,0 +1,222 @@ +import asyncio +import neo4j + +import neo4j.exceptions +from neo4j import unit_of_work + +from PLATER.services.config import config +from PLATER.services.util.logutil import LoggingUtil +from PLATER.services.util.graph_backends.base import GraphBackend +from reasoner_transpiler.cypher import transform_result + +logger = LoggingUtil.init_logging(__name__, + config.get('logging_level'), + config.get('logging_format')) + +NEO4J_QUERY_TIMEOUT = int(config.get('NEO4J_QUERY_TIMEOUT', 1600)) + + +class Neo4jBackend(GraphBackend): + + def __init__(self, host: str, port: str, auth: tuple, database_name='neo4j'): + self.database_name = database_name + self.database_auth = auth + self.graph_db_uri = f'bolt://{host}:{port}' + self.neo4j_driver = None + self.sync_neo4j_driver = None + self._supports_apoc = None + + async def connect(self, retries=0): + if not self.neo4j_driver: + self.neo4j_driver = neo4j.AsyncGraphDatabase.driver(self.graph_db_uri, + auth=self.database_auth, + **{'telemetry_disabled': True, + 'max_connection_pool_size': 1000}) + try: + await self.neo4j_driver.verify_connectivity() + except Exception as e: # currently the driver says it raises Exception, not something more specific + await self.neo4j_driver.close() + if retries <= 25: + await asyncio.sleep(8) + logger.error(f'Could not establish connection to neo4j, trying again... retry {retries + 1}') + await self.connect(retries + 1) + else: + logger.error(f'Could not establish connection to neo4j, error: {e}') + raise e + + @staticmethod + @unit_of_work(timeout=NEO4J_QUERY_TIMEOUT) + async def _async_cypher_tx_function(tx, + cypher, + query_parameters=None, + convert_to_dict=False, + convert_to_trapi=False, + qgraph=None): + if not query_parameters: + query_parameters = {} + + neo4j_result: neo4j.AsyncResult = await tx.run(cypher, parameters=query_parameters) + + if convert_to_trapi: + record = await neo4j_result.single() + return transform_result(record, qgraph) + + if convert_to_dict: + results = [] + async for record in neo4j_result: + results.append({k: v for k, v in record.items()}) + return results + + return await _convert_bolt_results_to_cypher_result(neo4j_result) + + @staticmethod + def _sync_cypher_tx_function(tx, + cypher, + query_parameters=None, + convert_to_dict=False): + if not query_parameters: + query_parameters = {} + neo4j_result: neo4j.Result = tx.run(cypher, parameters=query_parameters) + if convert_to_dict: + results = [] + for record in neo4j_result: + results.append({key: value for key, value in record.items()}) + return results + else: + return neo4j_result + + async def run(self, + query, + query_parameters=None, + return_errors=False, + convert_to_dict=False, + convert_to_trapi=False, + qgraph=None): + try: + async with self.neo4j_driver.session(database=self.database_name, + default_access_mode=neo4j.READ_ACCESS) as session: + return await session.execute_read(self._async_cypher_tx_function, + query, + query_parameters=query_parameters, + convert_to_dict=convert_to_dict, + convert_to_trapi=convert_to_trapi, + qgraph=qgraph) + except neo4j.exceptions.ServiceUnavailable as e: + logger.error(f'Session could not establish connection to neo4j ({e}).. trying to connect again') + await self.connect() + return await self.run(query, + query_parameters=query_parameters, + return_errors=return_errors, + convert_to_dict=convert_to_dict, + convert_to_trapi=convert_to_trapi, + qgraph=qgraph) + except neo4j.exceptions.Neo4jError as e: + logger.error(e) + if return_errors: + return {"results": [], + "errors": [{"code": e.code, + "message": e.message}]} + raise e + except neo4j.exceptions.DriverError as e: + logger.error(e) + if return_errors: + return {"results": [], + "errors": [{"message": f'A driver error occurred: {e}'}]} + raise e + + def run_sync(self, + query, + query_parameters=None, + return_errors=False, + convert_to_dict=False): + if not self.sync_neo4j_driver: + self.sync_neo4j_driver = neo4j.GraphDatabase.driver(self.graph_db_uri, auth=self.database_auth) + try: + with self.sync_neo4j_driver.session(database=self.database_name, default_access_mode=neo4j.READ_ACCESS) as session: + + run_sync_result = session.execute_read(self._sync_cypher_tx_function, + query, + query_parameters=query_parameters, + convert_to_dict=convert_to_dict) + return run_sync_result + + except neo4j.exceptions.Neo4jError as e: + if return_errors: + logger.error(e) + return {"results": [], + "errors": [{"code": e.code, + "message": e.message}]} + raise e + except (neo4j.exceptions.DriverError, neo4j.exceptions.ServiceUnavailable) as e: + if return_errors: + logger.error(e) + return {"results": [], + "errors": [{"code": "", + "message": f'A driver error occurred: {e}'}]} + raise e + finally: + if self.sync_neo4j_driver: + self.sync_neo4j_driver.close() + self.sync_neo4j_driver = None + + def check_apoc_support(self): + apoc_version_query = 'call apoc.version()' + if self._supports_apoc is None: + try: + self.run_sync(apoc_version_query) + self._supports_apoc = True + except neo4j.exceptions.ClientError: + self._supports_apoc = False + return self._supports_apoc + + def supports_apoc(self) -> bool: + if self._supports_apoc is None: + try: + self.run_sync("CALL apoc.version()") + self._supports_apoc = True + except neo4j.exceptions.ClientError: + self._supports_apoc = False + return self._supports_apoc + + async def close(self): + if self.neo4j_driver: + await self.neo4j_driver.close() + + +# this is kind of hacky but in order to return the same pydantic model result for both drivers +# convert the raw bolt cypher response to something that's formatted like the http json response +async def _convert_bolt_results_to_cypher_result(result: neo4j.AsyncResult): + cypher_result = { + "results": [ + { + "columns": result.keys(), + "data": [{"row": [values for values in list(data.values())], "meta": []} + for data in await result.data()] + } + ], + "errors": [] + } + return cypher_result + +def convert_http_response_to_dict(response: dict) -> list: + """ + Converts a neo4j result to a structured result. + :param response: neo4j http raw result. + :type response: dict + :return: reformatted dict + :rtype: dict + """ + results = response.get('results') + array = [] + if results: + for result in results: + cols = result.get('columns') + if cols: + data_items = result.get('data') + for item in data_items: + new_row = {} + row = item.get('row') + for col_name, col_value in zip(cols, row): + new_row[col_name] = col_value + array.append(new_row) + return array \ No newline at end of file diff --git a/PLATER/services/util/overlay.py b/PLATER/services/util/overlay.py index 584e7f4..e3e6571 100644 --- a/PLATER/services/util/overlay.py +++ b/PLATER/services/util/overlay.py @@ -1,4 +1,4 @@ -from PLATER.services.util.graph_adapter import GraphInterface +from PLATER.services.util.graph_backends.base import GraphInterface from PLATER.services.util.question import Question from reasoner_transpiler.attributes import ATTRIBUTE_TYPES from reasoner_transpiler.cypher import transform_attributes diff --git a/PLATER/services/util/question.py b/PLATER/services/util/question.py index 69cc2c5..c3c2b05 100644 --- a/PLATER/services/util/question.py +++ b/PLATER/services/util/question.py @@ -4,7 +4,7 @@ from secrets import token_hex from opentelemetry import trace -from PLATER.services.util.graph_adapter import GraphInterface +from PLATER.services.util.graph_backends.base import GraphInterface from reasoner_transpiler.cypher import get_query from PLATER.services.config import config, get_positive_int_from_config from PLATER.services.util.logutil import LoggingUtil diff --git a/PLATER/tests/test_endpoint_factory.py b/PLATER/tests/test_endpoint_factory.py index 24213a6..3ea5a99 100644 --- a/PLATER/tests/test_endpoint_factory.py +++ b/PLATER/tests/test_endpoint_factory.py @@ -3,7 +3,7 @@ import pytest import json from functools import reduce -from PLATER.services.util.graph_adapter import GraphInterface +from PLATER.services.util.graph_backends.base import GraphInterface from PLATER.services.util.metadata import GraphMetadata import os diff --git a/PLATER/tests/test_graph_adapter.py b/PLATER/tests/test_graph_adapter.py index 009f57b..4b00161 100644 --- a/PLATER/tests/test_graph_adapter.py +++ b/PLATER/tests/test_graph_adapter.py @@ -1,9 +1,7 @@ -import json -from collections import defaultdict -from PLATER.services.util.graph_adapter import GraphInterface, convert_http_response_to_dict +from PLATER.services.util.graph_backends.base import GraphInterface +from PLATER.services.util.graph_backends.neo4j_adapter import convert_http_response_to_dict from pytest_httpx import HTTPXMock import pytest -import os # TODO - implement these tests for Bolt diff --git a/README.md b/README.md index 6a56b6d..d67f150 100644 --- a/README.md +++ b/README.md @@ -144,10 +144,10 @@ To run the web server directly: WEB_HOST=0.0.0.0 WEB_PORT=8080 NEO4J_HOST=neo4j + NEO4J_BOLT_PORT=7687 NEO4J_USERNAME=neo4j NEO4J_PASSWORD= - NEO4J_HTTP_PORT=7474 - NEO4J_QUERY_TIMEOUT=600 + GRAPH_QUERY_TIMEOUT=600 PLATER_TITLE='Plater' PLATER_VERSION='1.5.1' BL_VERSION='4.1.6' @@ -187,7 +187,7 @@ To run the web server directly: ### Miscellaneous ###### `/about` Endpoint The `/about` endpoint can be used to present meta-data about the current PLATER instance. - This meta-data is served from `/PLATER/about.json` file. One can edit the contents of + This meta-data is served from `/PLATER/metadata/about.json` file. One can edit the contents of this file to suite needs. In containerized environment we recommend mounting this file as a volume. Eg: From d71a15604214b77a244e8a03237ccbdeab0dbad1 Mon Sep 17 00:00:00 2001 From: hyi Date: Tue, 13 Jan 2026 13:29:57 -0500 Subject: [PATCH 2/9] fixing tests --- PLATER/tests/test_graph_adapter.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/PLATER/tests/test_graph_adapter.py b/PLATER/tests/test_graph_adapter.py index 4b00161..f251d67 100644 --- a/PLATER/tests/test_graph_adapter.py +++ b/PLATER/tests/test_graph_adapter.py @@ -1,5 +1,5 @@ from PLATER.services.util.graph_backends.base import GraphInterface -from PLATER.services.util.graph_backends.neo4j_adapter import convert_http_response_to_dict +from PLATER.services.util.graph_backends.neo4j_adapter import Neo4jBackend, convert_http_response_to_dict from pytest_httpx import HTTPXMock import pytest @@ -162,7 +162,8 @@ async def test_driver_convert_to_dict(): @pytest.mark.asyncio async def test_graph_interface_biolink_leaves(httpx_mock: HTTPXMock): - gi = GraphInterface('localhost', '7474', auth=('neo4j', ''), protocol='bolt') + graph_backend = Neo4jBackend(host='localhost', port='7474', auth=('neo4j', '')) + gi = GraphInterface(graph_backend) set_1 = frozenset([ "biolink:SmallMolecule", "biolink:MolecularEntity", @@ -181,7 +182,8 @@ async def test_graph_interface_biolink_leaves(httpx_mock: HTTPXMock): @pytest.mark.asyncio async def test_graph_interface_predicate_inverse(httpx_mock: HTTPXMock): - gi = GraphInterface('localhost', '7474', auth=('neo4j', ''), protocol='bolt') + graph_backend = Neo4jBackend(host='localhost', port='7474', auth=('neo4j', '')) + gi = GraphInterface(graph_backend) non_exist_predicate = "biolink:some_predicate" assert gi.invert_predicate(non_exist_predicate) is None symmetric_predicate = "biolink:related_to" From 66cce61062b4742eac2ba15a6d6644d9e3cd75ff Mon Sep 17 00:00:00 2001 From: hyi Date: Tue, 13 Jan 2026 22:21:44 -0500 Subject: [PATCH 3/9] added first pass memgraph support --- PLATER/services/util/api_utils.py | 33 ++++- PLATER/services/util/graph_backends/base.py | 33 +++-- .../util/graph_backends/graph_utils.py | 17 +++ .../util/graph_backends/memgraph_adapter.py | 140 ++++++++++++++++++ .../util/graph_backends/neo4j_adapter.py | 21 +-- 5 files changed, 207 insertions(+), 37 deletions(-) create mode 100644 PLATER/services/util/graph_backends/graph_utils.py diff --git a/PLATER/services/util/api_utils.py b/PLATER/services/util/api_utils.py index 41d0e5f..f9dca7a 100644 --- a/PLATER/services/util/api_utils.py +++ b/PLATER/services/util/api_utils.py @@ -6,20 +6,39 @@ from fastapi import Response from fastapi.openapi.utils import get_openapi +from PLATER.services.util.logutil import LoggingUtil from PLATER.services.util.graph_backends.base import GraphInterface from PLATER.services.util.graph_backends.neo4j_adapter import Neo4jBackend +from PLATER.services.util.graph_backends.memgraph_adapter import MemgraphBackend from PLATER.services.config import config +logger = LoggingUtil.init_logging(__name__, + config.get('logging_level'), + config.get('logging_format')) async def get_graph_interface(): """Get graph interface.""" - graph_backend = Neo4jBackend( - host=config.get('NEO4J_HOST', 'localhost'), - port=config.get('NEO4J_BOLT_PORT', '7687'), - auth=( - config.get('NEO4J_USERNAME'), - config.get('NEO4J_PASSWORD') - )) + graph_db = config.get('GRAPH_DB', 'neo4j') + logger.info(f'config db: {config.get('GRAPH_DB')}, graph_db: {graph_db}') + if graph_db == 'memgraph': + mg_username = config.get('MEMGRAPH_USERNAME', None) + mg_password = config.get('MEMGRAPH_PASSWORD', None) + if mg_username and mg_password: + auth = (mg_username, mg_password) + else: + auth = None + graph_backend = MemgraphBackend( + host=config.get('MEMGRAPH_HOST', 'localhost'), + port=config.get('MEMGRAPH_BOLT_PORT', '7687'), + auth=auth) + else: + graph_backend = Neo4jBackend( + host=config.get('NEO4J_HOST', 'localhost'), + port=config.get('NEO4J_BOLT_PORT', '7687'), + auth=( + config.get('NEO4J_USERNAME'), + config.get('NEO4J_PASSWORD') + )) graph_interface = GraphInterface(graph_backend) await graph_interface.connect() diff --git a/PLATER/services/util/graph_backends/base.py b/PLATER/services/util/graph_backends/base.py index 308ea1c..70b6afa 100644 --- a/PLATER/services/util/graph_backends/base.py +++ b/PLATER/services/util/graph_backends/base.py @@ -18,7 +18,7 @@ class GraphBackend(ABC): Abstract graph backend interface to support concrete implementations such as Neo4jBackend and MemgraphBackend. """ - + supports_element_id: bool = False @abstractmethod async def connect(self): pass @@ -31,10 +31,6 @@ async def close(self): async def run(self, *args, **kwargs): pass - @abstractmethod - def run_sync(self, *args, **kwargs): - pass - @abstractmethod def supports_apoc(self) -> bool: pass @@ -88,7 +84,7 @@ def invert_predicate(self, biolink_predicate): return None return self.toolkit.get_element(element['inverse']).slot_uri - def get_schema(self): + async def get_schema(self): """ Gets the schema of the graph. To be used by. Also generates graph summary :return: Dict of structure source label as outer most keys, target labels as inner keys and list of predicates @@ -102,7 +98,7 @@ def get_schema(self): """ logger.info(f"Starting schema query {query} on graph... this might take a few.") before_time = time.time() - schema_query_results = self.backend.run_sync(query, convert_to_dict=True) + schema_query_results = await self.backend.run(query, convert_to_dict=True) after_time = time.time() logger.info(f"Completed schema query ({after_time - before_time} seconds). Preparing initial schema.") # iterate through results (multiple sets of source label, predicate, target label arrays) @@ -243,8 +239,19 @@ async def get_single_hops(self, if count_only: query += 'return count(r) as edge_count' else: - query += 'return distinct type(r) as predicate, properties(r) as edge_properties, ' \ - 'CASE WHEN elementId(m) = elementId(startNode(r)) THEN "<" ELSE ">" END AS edge_direction, ' \ + if self.backend.supports_element_id: + direction_expr = ( + 'CASE WHEN elementId(m) = elementId(startNode(r)) ' + 'THEN "<" ELSE ">" END AS edge_direction' + ) + else: + # Memgraph-safe fallback using node IDs since memgraph does not support elementId() + direction_expr = ( + 'CASE WHEN m.id = startNode(r).id ' + 'THEN "<" ELSE ">" END AS edge_direction' + ) + query += f'return distinct type(r) as predicate, properties(r) as edge_properties, ' \ + f'{direction_expr}, ' \ 'm.id as m_id, m.name as m_name, labels(m) as m_labels ORDER BY m_id' if offset is not None: @@ -324,7 +331,7 @@ async def run_cypher(self, otel_span.add_event("neo4j_query_end") return cypher_results - def get_examples(self, + async def get_examples(self, subject_node_type, object_node_type=None, predicate=None, @@ -348,17 +355,17 @@ def get_examples(self, if object_node_type and predicate: query = f"MATCH (subject:`{subject_node_type}`)-[edge:`{predicate}`]->(object:`{object_node_type}`) " \ f"{qualifiers_check} return subject, edge, object limit {num_examples}" - response = self.backend.run_sync(query, convert_to_dict=True) + response = await self.backend.run(query, convert_to_dict=True) return response elif object_node_type: query = f"MATCH (subject:`{subject_node_type}`)-[edge]->(object:`{object_node_type}`) " \ f"{qualifiers_check} return subject, edge, object limit {num_examples}" - response = self.backend.run_sync(query, convert_to_dict=True) + response = await self.backend.run(query, convert_to_dict=True) return response else: query = f"MATCH (subject:`{subject_node_type}`) " \ f"return subject limit {num_examples}" - response = self.backend.run_sync(query, convert_to_dict=True) + response = await self.backend.run(query, convert_to_dict=True) return response def supports_apoc(self) -> bool: diff --git a/PLATER/services/util/graph_backends/graph_utils.py b/PLATER/services/util/graph_backends/graph_utils.py new file mode 100644 index 0000000..33b9757 --- /dev/null +++ b/PLATER/services/util/graph_backends/graph_utils.py @@ -0,0 +1,17 @@ +from neo4j import AsyncResult + + +# this is kind of hacky but in order to return the same pydantic model result for both drivers +# convert the raw bolt cypher response to something that's formatted like the http json response +async def convert_bolt_results_to_cypher_result(result: AsyncResult): + cypher_result = { + "results": [ + { + "columns": result.keys(), + "data": [{"row": [values for values in list(data.values())], "meta": []} + for data in await result.data()] + } + ], + "errors": [] + } + return cypher_result diff --git a/PLATER/services/util/graph_backends/memgraph_adapter.py b/PLATER/services/util/graph_backends/memgraph_adapter.py index e69de29..4d84317 100644 --- a/PLATER/services/util/graph_backends/memgraph_adapter.py +++ b/PLATER/services/util/graph_backends/memgraph_adapter.py @@ -0,0 +1,140 @@ +import asyncio +import neo4j + +from PLATER.services.config import config +from PLATER.services.util.logutil import LoggingUtil +from PLATER.services.util.graph_backends.base import GraphBackend +from PLATER.services.util.graph_backends.graph_utils import convert_bolt_results_to_cypher_result +from reasoner_transpiler.cypher import transform_result + +logger = LoggingUtil.init_logging(__name__, + config.get('logging_level'), + config.get('logging_format')) + +MEMGRAPH_QUERY_TIMEOUT = int(config.get('GRAPH_QUERY_TIMEOUT', 1600)) + + +class MemgraphBackend(GraphBackend): + """ + Memgraph backend using the Neo4j Bolt driver. APOC is not supported. + """ + supports_element_id = False + + def __init__(self, host: str, port: str, auth: tuple | None = None): + self.database_auth = auth + self.graph_db_uri = f'bolt://{host}:{port}' + self.driver = None + self._supports_apoc = False + + async def connect(self, retries=0): + if not self.driver: + kwargs = { + "telemetry_disabled": True, + "max_connection_pool_size": 1000 + } + if self.database_auth: + kwargs["auth"] = self.database_auth + + self.driver = neo4j.AsyncGraphDatabase.driver(self.graph_db_uri, + **kwargs) + try: + await self.driver.verify_connectivity() + except Exception as e: + await self.driver.close() + if retries <= 25: + await asyncio.sleep(8) + logger.error(f'Could not establish connection to memgraph, trying again... retry {retries + 1}') + await self.connect(retries + 1) + else: + logger.error(f'Could not establish connection to memgraph, error: {e}') + raise e + + @staticmethod + async def _async_cypher_tx_function(tx, + cypher, + query_parameters=None, + convert_to_dict=False, + convert_to_trapi=False, + qgraph=None): + if not query_parameters: + query_parameters = {} + + result: neo4j.AsyncResult = await tx.run(cypher, parameters=query_parameters) + + if convert_to_trapi: + record = await result.single() + return transform_result(record, qgraph) + + if convert_to_dict: + rows = [] + async for record in result: + rows.append({k: v for k, v in record.items()}) + return rows + + return await convert_bolt_results_to_cypher_result(result) + + async def run(self, + query, + query_parameters=None, + return_errors=False, + convert_to_dict=False, + convert_to_trapi=False, + qgraph=None): + try: + async with self.driver.session(default_access_mode=neo4j.READ_ACCESS) as session: + task = asyncio.create_task( + session.execute_read( + self._async_cypher_tx_function, + query, + query_parameters=query_parameters, + convert_to_dict=convert_to_dict, + convert_to_trapi=convert_to_trapi, + qgraph=qgraph + ) + ) + + try: + return await asyncio.wait_for(task, timeout=MEMGRAPH_QUERY_TIMEOUT) + except asyncio.TimeoutError: + task.cancel() + logger.error( + f"Memgraph query timed out after {MEMGRAPH_QUERY_TIMEOUT}s" + ) + raise TimeoutError( + f"Memgraph query exceeded timeout of {MEMGRAPH_QUERY_TIMEOUT} seconds" + ) + except neo4j.exceptions.ServiceUnavailable as e: + logger.error(f'Session could not establish connection to Memgraph ({e}).. trying to connect again') + await self.connect() + return await self.run(query, + query_parameters=query_parameters, + return_errors=return_errors, + convert_to_dict=convert_to_dict, + convert_to_trapi=convert_to_trapi, + qgraph=qgraph) + except neo4j.exceptions.Neo4jError as e: + logger.error(e) + if return_errors: + return { + "results": [], + "errors": [{"code": e.code, "message": e.message}] + } + raise e + except neo4j.exceptions.DriverError as e: + logger.error(e) + if return_errors: + return { + "results": [], + "errors": [{"message": f'A driver error occurred: {e}'}] + } + raise e + + def supports_apoc(self) -> bool: + return False + + def check_apoc_support(self): + return False + + async def close(self): + if self.driver: + await self.driver.close() diff --git a/PLATER/services/util/graph_backends/neo4j_adapter.py b/PLATER/services/util/graph_backends/neo4j_adapter.py index 1864094..5f8c14f 100644 --- a/PLATER/services/util/graph_backends/neo4j_adapter.py +++ b/PLATER/services/util/graph_backends/neo4j_adapter.py @@ -7,16 +7,18 @@ from PLATER.services.config import config from PLATER.services.util.logutil import LoggingUtil from PLATER.services.util.graph_backends.base import GraphBackend +from PLATER.services.util.graph_backends.graph_utils import convert_bolt_results_to_cypher_result from reasoner_transpiler.cypher import transform_result logger = LoggingUtil.init_logging(__name__, config.get('logging_level'), config.get('logging_format')) -NEO4J_QUERY_TIMEOUT = int(config.get('NEO4J_QUERY_TIMEOUT', 1600)) +NEO4J_QUERY_TIMEOUT = int(config.get('GRAPH_QUERY_TIMEOUT', 1600)) class Neo4jBackend(GraphBackend): + supports_element_id = True def __init__(self, host: str, port: str, auth: tuple, database_name='neo4j'): self.database_name = database_name @@ -67,7 +69,7 @@ async def _async_cypher_tx_function(tx, results.append({k: v for k, v in record.items()}) return results - return await _convert_bolt_results_to_cypher_result(neo4j_result) + return await convert_bolt_results_to_cypher_result(neo4j_result) @staticmethod def _sync_cypher_tx_function(tx, @@ -183,21 +185,6 @@ async def close(self): await self.neo4j_driver.close() -# this is kind of hacky but in order to return the same pydantic model result for both drivers -# convert the raw bolt cypher response to something that's formatted like the http json response -async def _convert_bolt_results_to_cypher_result(result: neo4j.AsyncResult): - cypher_result = { - "results": [ - { - "columns": result.keys(), - "data": [{"row": [values for values in list(data.values())], "meta": []} - for data in await result.data()] - } - ], - "errors": [] - } - return cypher_result - def convert_http_response_to_dict(response: dict) -> list: """ Converts a neo4j result to a structured result. From bae7cdfc2dd5ace76bf47f6efe7827eea349e076 Mon Sep 17 00:00:00 2001 From: hyi Date: Wed, 14 Jan 2026 11:09:35 -0500 Subject: [PATCH 4/9] fix test --- PLATER/services/util/api_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/PLATER/services/util/api_utils.py b/PLATER/services/util/api_utils.py index f9dca7a..550e03d 100644 --- a/PLATER/services/util/api_utils.py +++ b/PLATER/services/util/api_utils.py @@ -19,7 +19,6 @@ async def get_graph_interface(): """Get graph interface.""" graph_db = config.get('GRAPH_DB', 'neo4j') - logger.info(f'config db: {config.get('GRAPH_DB')}, graph_db: {graph_db}') if graph_db == 'memgraph': mg_username = config.get('MEMGRAPH_USERNAME', None) mg_password = config.get('MEMGRAPH_PASSWORD', None) From dfa1c841abdcfdf316bb175233dbde6b60cc67ca Mon Sep 17 00:00:00 2001 From: hyi Date: Wed, 14 Jan 2026 14:47:03 -0500 Subject: [PATCH 5/9] added a small memgraph test --- PLATER/tests/test_graph_adapter.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/PLATER/tests/test_graph_adapter.py b/PLATER/tests/test_graph_adapter.py index f251d67..454ba66 100644 --- a/PLATER/tests/test_graph_adapter.py +++ b/PLATER/tests/test_graph_adapter.py @@ -1,5 +1,6 @@ from PLATER.services.util.graph_backends.base import GraphInterface from PLATER.services.util.graph_backends.neo4j_adapter import Neo4jBackend, convert_http_response_to_dict +from PLATER.services.util.graph_backends.memgraph_adapter import MemgraphBackend from pytest_httpx import HTTPXMock import pytest @@ -193,3 +194,8 @@ async def test_graph_interface_predicate_inverse(httpx_mock: HTTPXMock): predicate_no_inverse_and_not_symmetric = "biolink:has_part" assert gi.invert_predicate(predicate_no_inverse_and_not_symmetric) is None GraphInterface.instance = None + +def test_memgraph_backend_capabilities(): + backend = MemgraphBackend(host="localhost", port="7687") + assert backend.supports_element_id is False + assert backend.supports_apoc() is False From 4c2429671e8b5ac7badd73da466ebe5ebad7cc91 Mon Sep 17 00:00:00 2001 From: hyi Date: Wed, 14 Jan 2026 15:09:37 -0500 Subject: [PATCH 6/9] updated env-template --- .env-template | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.env-template b/.env-template index 0b2e5b9..4972ce7 100644 --- a/.env-template +++ b/.env-template @@ -1,12 +1,19 @@ WEB_HOST=0.0.0.0 WEB_PORT=8080 +# supported GRAPH_DB values are neo4j or memgraph GRAPH_DB=neo4j NEO4J_HOST=localhost NEO4J_BOLT_PORT=7687 NEO4J_USERNAME=neo4j +NEO4J_PASSWORD=neo4j_password MEMGRAPH_HOST=localhost MEMGRAPH_BOLT_PORT=7687 +# authentication is disabled by default in memgraph database. +# Remove the two lines below for setting MEMGRAPH_USERNAME and +# MEMGRAPH_PASSWORD if the default memgraph database behavior +# without user authentication is configured MEMGRAPH_USERNAME=memgraph +MEMGRAPH_PASSWORD=memgraph_password GRAPH_QUERY_TIMEOUT=600 PLATER_SERVICE_ADDRESS=localhost PLATER_TITLE=Plater From 3ab10eaee6a96da8cc702ae7639e4e23f0358fd1 Mon Sep 17 00:00:00 2001 From: hyi Date: Wed, 14 Jan 2026 16:03:16 -0500 Subject: [PATCH 7/9] updated readme --- README.md | 42 +++++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index d67f150..de7cd35 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,14 @@ ## About -Suppose you have constructed a biolink-compliant knowledge graph, and want to deploy it as a TRAPI endpoint with limited fuss. Plater is a web server that automatically exposes a Neo4j instance through [TRAPI](https://github.com/NCATSTranslator/ReasonerAPI) compliant endpoints. Plater brings several tools together in a web server to achieve this. It Uses [Reasoner Pydantic models](https://github.com/TranslatorSRI/reasoner-pydantic) for frontend validation and [Reasoner transpiler](https://github.com/ranking-agent/reasoner-transpiler) for transforming TRAPI to and from cypher and querying the Neo4j backend. The Neo4j database can be populated by using [KGX](https://github.com/biolink/kgx) upload, which is able to consume numerous graph input formats. By pointing Plater to Neo4j we can easily stand up a Knowledge Provider that provides the “lookup” operation and meta_knowledge_graph, as well as providing a platform to distribute common code implementing future operations across any endpoint built using Plater. In addition, with some configuration (x-trapi parameters etc...) options we can easily register our new instance to [Smart api](https://smart-api.info/). +Suppose you have constructed a biolink-compliant knowledge graph, and want to deploy it as a TRAPI endpoint with limited fuss. +Plater is a web server that automatically exposes a Neo4j or Memgraph instance through [TRAPI](https://github.com/NCATSTranslator/ReasonerAPI) compliant endpoints. +Plater brings several tools together in a web server to achieve this. It Uses [Reasoner Pydantic models](https://github.com/TranslatorSRI/reasoner-pydantic) for frontend validation +and [Reasoner transpiler](https://github.com/ranking-agent/reasoner-transpiler) for transforming TRAPI to and from cypher and querying the Neo4j or Memgraph backend. The Neo4j or Memgraph database +can be populated by using [KGX](https://github.com/biolink/kgx) upload, which is able to consume numerous graph input formats. By pointing Plater to Neo4j or Memgraph +we can easily stand up a Knowledge Provider that provides the “lookup” operation and meta_knowledge_graph, as well as providing a platform to +distribute common code implementing future operations across any endpoint built using Plater. In addition, with some configuration +(x-trapi parameters etc...) options we can easily register our new instance to [Smart api](https://smart-api.info/). Another tool that comes in handy with Plater is [Automat](https://github.com/RENCI-AUTOMAT/Automat-server), which helps expose multiple Plater servers at a single public url and proxies queries towards them. [Here](https://automat.renci.org) is an example of running Automat instance. @@ -15,43 +22,47 @@ Another tool that comes in handy with Plater is [Automat](https://github.com/REN ### Node and Edge lookup -------------------- -#### Neo4j Data Structure +#### Neo4j or Memgraph Data Structure ##### Nodes Nodes are expected to have the following core structure: -1. id : as neo4j node property with label `id` +1. id : as neo4j or memgraph node property with label `id` 2. category : Array of biolink types as neo4j node labels, it is required for every node to have at least the node label "biolink:NamedThing". 3. Additional attributes can be added and will be exposed. (more details on "Matching a TRAPI query" section) ##### Edges Edges need to have the following properties structure: -1. subject: as a neo4j edge property with label `subject` -2. object: as neo4j edge property with label `object` -3. predicate: as neo4j edge type -5. id: as neo4j edge property with label `id` +1. subject: as a neo4j or memgraph edge property with label `subject` +2. object: as a neo4j or memgraph edge property with label `object` +3. predicate: as a neo4j or memgraph edge type +5. id: as a neo4j or memgraph edge property with label `id` 6. Additional attributes will be returned in the TRAPI response attributes section. (more details on "Matching a TRAPI query" section) #### Matching a TRAPI query -PLATER matches nodes in neo4j using node labels. It expects nodes in neo4j to be labeled using [biolink types](https://biolink.github.io/biolink-model/docs/). Nodes in neo4j can have multiple labels. When looking a node from an incoming TRAPI query graph, the node type(s) are extracted for a node, and by traversing the biolink model, all subtypes and mixins that go with the query node type(s) will be used to lookup nodes. +PLATER matches nodes in neo4j or memgraph using node labels. It expects nodes in neo4j or memgraph to be labeled using [biolink types](https://biolink.github.io/biolink-model/docs/). +Nodes in neo4j or memgraph can have multiple labels. When looking a node from an incoming TRAPI query graph, the node type(s) are extracted +for a node, and by traversing the biolink model, all subtypes and mixins that go with the query node type(s) will be used to lookup nodes. -It's recommended that when encoding nodes labels in neo4j that we use the biolink class genealogy. For instance a node that is known to be a `biolink:SmallMolecule` can be assigned all of these classes ` ["biolink:SmallMolecule", "biolink:MolecularEntity", "biolink:ChemicalEntity", +It's recommended that when encoding nodes labels in neo4j that we use the biolink class genealogy. For instance a node that is known to be +a `biolink:SmallMolecule` can be assigned all of these classes ` ["biolink:SmallMolecule", "biolink:MolecularEntity", "biolink:ChemicalEntity", "biolink:PhysicalEssence", "biolink:NamedThing", "biolink:Entity", "biolink:PhysicalEssenceOrOccurrent"]` . -By doing such encoding, during lookup the incoming query is can be more laxed (ask for `biolink:NamedThing`) or more specific (ask for `biolink:SmallMolecule ` etc...), and PLATER would be able to use the encoded label information to find matching node(s). +By doing such encoding, during lookup the incoming query can be more laxed (ask for `biolink:NamedThing`) or more specific (ask for `biolink:SmallMolecule ` etc...), and PLATER would be able to use the encoded label information to find matching node(s). -Similarly, for edges, edge labels in neo4j are used to perform edge lookup. Predicate hierarchy in biolink would be consulted to find subclasses of the query predicate type(s) and those would be used in an `OR` combinatorial fashion to find results. +Similarly, for edges, edge labels in neo4j or memgraph are used to perform edge lookup. Predicate hierarchy in biolink would be consulted to find +subclasses of the query predicate type(s) and those would be used in an `OR` combinatorial fashion to find results. #### Subclass Inference -Plater does subclass inference if subclass edges are encoded into neo4j graph. For eg , let A be a super class of B and C. And let B, C are related to D and E respectively : +Plater does subclass inference if subclass edges are encoded into the graph. For eg , let A be a super class of B and C. And let B, C are related to D and E respectively : ``` (A) <- biolink:subclass_of - (B) - biolink:decreases_activity_of -> (D) @@ -78,7 +89,7 @@ Plater tries to resolve attibute types and value types for edges and nodes in th } ``` - To explain this a little further, suppose we have an attribute called "equivalent_identifiers" stored in neo4j. Our attr_val_map.json would be : + To explain this a little further, suppose we have an attribute called "equivalent_identifiers" stored in the graph. Our attr_val_map.json would be: ``` { @@ -112,9 +123,9 @@ Plater tries to resolve attibute types and value types for edges and nodes in th } ``` -2. In cases where there are attributes in neo4j that are not specified in attr_val_map.json, PLATER will try to resolve a biolink class by using the original attribute name using Biolink model toolkit. +2. In cases where there are attributes in the graph that are not specified in attr_val_map.json, PLATER will try to resolve a biolink class by using the original attribute name using Biolink model toolkit. 3. If the above steps fail the attribute will be presented having `"attribute_type_id": "biolink:Attribute"` and `"value_type_id": "EDAM:data_0006"` -4. If there are attributes that is not needed for presentation through TRAPI [Skip_attr.json](https://github.com/TranslatorSRI/Plater/blob/master/skip_attr.json) can be used to specify attribute names in neo4j to skip. +4. If there are attributes that is not needed for presentation through TRAPI [Skip_attr.json](https://github.com/TranslatorSRI/Plater/blob/master/skip_attr.json) can be used to specify attribute names in neo4j or memgraph to skip. KGX loading adds a new attributes `provided_by` and `knowledge_source` to nodes and edges respectively, which are the file name used to load the graph. By default, we have included these to the skip list. ### Provenance @@ -143,6 +154,7 @@ To run the web server directly: ```bash WEB_HOST=0.0.0.0 WEB_PORT=8080 + GRAPH_DB=neo4j NEO4J_HOST=neo4j NEO4J_BOLT_PORT=7687 NEO4J_USERNAME=neo4j From f938448782cf09f43fede505b08f2763e7030953 Mon Sep 17 00:00:00 2001 From: hyi Date: Thu, 22 Jan 2026 16:19:37 -0500 Subject: [PATCH 8/9] updated transpiler to support memgraph --- PLATER/requirements.txt | 2 +- PLATER/services/util/question.py | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/PLATER/requirements.txt b/PLATER/requirements.txt index 5e28e5f..e2a9028 100644 --- a/PLATER/requirements.txt +++ b/PLATER/requirements.txt @@ -3,7 +3,7 @@ pyaml==20.4.0 pytest==8.3.3 pytest-asyncio==0.24.0 uvicorn==0.24.0 -reasoner-transpiler==2.3.5 +reasoner-transpiler==2.4.0 reasoner-pydantic==5.0.6 httpx==0.27.2 pytest-httpx==0.32.0 diff --git a/PLATER/services/util/question.py b/PLATER/services/util/question.py index c3c2b05..bb6d642 100644 --- a/PLATER/services/util/question.py +++ b/PLATER/services/util/question.py @@ -8,6 +8,7 @@ from reasoner_transpiler.cypher import get_query from PLATER.services.config import config, get_positive_int_from_config from PLATER.services.util.logutil import LoggingUtil +from PLATER.services.config import config logger = LoggingUtil.init_logging( __name__, @@ -55,16 +56,25 @@ async def answer(self, graph_interface: GraphInterface, subclass_inference=True) if not otel_span or not otel_span.is_recording(): otel_span = None - # compile a cypher query and return a string - cypher = self.compile_cypher(**{"use_hints": True, - "subclass": subclass_inference, - "subclass_depth": SUBCLASS_DEPTH}) + graph_db = config.get('GRAPH_DB', 'neo4j') + if graph_db == 'memgraph': + # compile a cypher query and return a string + cypher = self.compile_cypher(**{"use_hints": False, + "dialect": "memgraph", + "subclass": subclass_inference, + "subclass_depth": SUBCLASS_DEPTH}) + else: + # compile a cypher query and return a string + cypher = self.compile_cypher(**{"use_hints": True, + "subclass": subclass_inference, + "subclass_depth": SUBCLASS_DEPTH}) # convert the incoming TRAPI query into a string for logging and tracing trapi_query = str(orjson.dumps(self._question_json), "utf-8") # create a probably-unique id to be associated with this query in the logs query_logging_id = token_hex(10) logger.info(f"querying neo4j for query {query_logging_id}, trapi: {trapi_query}") + logger.info(f"cypher query: {cypher}") start_time = time.time() result_qgraph = await graph_interface.run_cypher(cypher, convert_to_trapi=True, From 3d3b1160d4f8fe981ad88867260f89c7e96e34fb Mon Sep 17 00:00:00 2001 From: hyi Date: Thu, 22 Jan 2026 16:22:36 -0500 Subject: [PATCH 9/9] minor logging cleanup --- PLATER/services/util/question.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/PLATER/services/util/question.py b/PLATER/services/util/question.py index bb6d642..ae4b62f 100644 --- a/PLATER/services/util/question.py +++ b/PLATER/services/util/question.py @@ -73,14 +73,14 @@ async def answer(self, graph_interface: GraphInterface, subclass_inference=True) trapi_query = str(orjson.dumps(self._question_json), "utf-8") # create a probably-unique id to be associated with this query in the logs query_logging_id = token_hex(10) - logger.info(f"querying neo4j for query {query_logging_id}, trapi: {trapi_query}") - logger.info(f"cypher query: {cypher}") + logger.info(f"querying {graph_db} for query {query_logging_id}, trapi: {trapi_query}") + start_time = time.time() result_qgraph = await graph_interface.run_cypher(cypher, convert_to_trapi=True, qgraph=self._transpiler_qgraph) neo4j_duration = time.time() - start_time - logger.info(f"returned results from neo4j for {query_logging_id}, neo4j_duration: {neo4j_duration}") + logger.info(f"returned results from {graph_db} for {query_logging_id}, neo4j_duration: {neo4j_duration}") if otel_span is not None: otel_span.set_attributes( {