diff --git a/workers/aragorn_lookup/worker.py b/workers/aragorn_lookup/worker.py index 4b1492b..56e7c38 100644 --- a/workers/aragorn_lookup/worker.py +++ b/workers/aragorn_lookup/worker.py @@ -120,6 +120,14 @@ async def aragorn_lookup(task, logger: logging.Logger): parameters["tiers"] = parameters.get("tiers") or [settings.default_data_tier] use_gandalf = parameters.get("gandalf", False) message["parameters"] = parameters + if "submitter" not in message: + message["submitter"] = ( + "infores:shepherd-aragorn:{maturity}@{location}@{url}".format( + maturity=settings.server_maturity, + location=settings.server_location, + url=settings.server_url, + ) + ) try: infer, question_qnode, answer_qnode, pathfinder = examine_query(message) except Exception as e: @@ -166,6 +174,7 @@ async def aragorn_lookup(task, logger: logging.Logger): if use_gandalf: for expanded_message in expanded_messages: callback_id = str(uuid.uuid4())[:8] + # Put callback UID and query ID in postgres await add_callback_id(query_id, callback_id, logger) logger.debug("""Sending lookup query to gandalf.""") @@ -345,6 +354,7 @@ def expand_aragorn_query(input_message, logger: logging.Logger): { "message": {"query_graph": qg}, "parameters": input_message["parameters"], + "submitter": input_message["submitter"], } ] # If we don't have any AMIE expansions, this will just generate the direct query @@ -371,6 +381,7 @@ def expand_aragorn_query(input_message, logger: logging.Logger): if "log_level" in input_message: message["log_level"] = input_message["log_level"] message["parameters"] = input_message["parameters"] + message["submitter"] = input_message["submitter"] messages.append(message) return messages diff --git a/workers/aragorn_pathfinder/worker.py b/workers/aragorn_pathfinder/worker.py index 361a5b7..9afe88c 100644 --- a/workers/aragorn_pathfinder/worker.py +++ b/workers/aragorn_pathfinder/worker.py @@ -119,7 +119,7 @@ async def shadowfax(task, logger: logging.Logger): "biolink:translates_to", "biolink:translation_of", "biolink:has_gene_product", - "biolink:gene_product_of" + "biolink:gene_product_of", ], }, "e1": { @@ -146,7 +146,7 @@ async def shadowfax(task, logger: logging.Logger): "biolink:translates_to", "biolink:translation_of", "biolink:has_gene_product", - "biolink:gene_product_of" + "biolink:gene_product_of", ], }, "e2": { @@ -173,7 +173,7 @@ async def shadowfax(task, logger: logging.Logger): "biolink:translates_to", "biolink:translation_of", "biolink:has_gene_product", - "biolink:gene_product_of" + "biolink:gene_product_of", ], }, }, diff --git a/workers/arax/inject_shepherd_arax_provenance.py b/workers/arax/inject_shepherd_arax_provenance.py index 62e5077..9a2a3ed 100644 --- a/workers/arax/inject_shepherd_arax_provenance.py +++ b/workers/arax/inject_shepherd_arax_provenance.py @@ -2,7 +2,6 @@ from typing import Any, Dict - SHEPHERD_ARAX_SOURCE = { "resource_id": "infores:shepherd-arax", "resource_role": "aggregator_knowledge_source", diff --git a/workers/bte_lookup/worker.py b/workers/bte_lookup/worker.py index 84cabf0..7cceab6 100644 --- a/workers/bte_lookup/worker.py +++ b/workers/bte_lookup/worker.py @@ -118,6 +118,14 @@ async def bte_lookup(task, logger: logging.Logger): parameters["timeout"] = parameters.get("timeout", settings.lookup_timeout) parameters["tiers"] = parameters.get("tiers") or [settings.default_data_tier] message["parameters"] = parameters + if "submitter" not in message: + message["submitter"] = ( + "infores:shepherd-bte:{maturity}@{location}@{url}".format( + maturity=settings.server_maturity, + location=settings.server_location, + url=settings.server_url, + ) + ) try: infer, question_qnode, answer_qnode, pathfinder = examine_query(message) except Exception as e: @@ -354,6 +362,7 @@ def fill_templates( del message["message"]["knowledge_graph"] message["parameters"] = query_body["parameters"] message["workflow"] = [{"id": "lookup"}] + message["submitter"] = query_body["submitter"] filled_templates.append(message) return filled_templates @@ -404,6 +413,7 @@ def expand_bte_query(query_dict: dict[str, Any], logger: logging.Logger) -> list { "message": {"query_graph": query_graph}, "parameters": query_dict["parameters"], + "submitter": query_dict["submitter"], } ] expanded_queries.extend(filled_templates) diff --git a/workers/merge_message/worker.py b/workers/merge_message/worker.py index d4e63ff..9c551b1 100644 --- a/workers/merge_message/worker.py +++ b/workers/merge_message/worker.py @@ -327,7 +327,9 @@ def filter_repeated_nodes(response, logger: logging.Logger): original_result_count = len(response["message"].get("results", [])) if original_result_count == 0: return - results = list(filter(lambda x: has_unique_nodes(x), response["message"]["results"])) + results = list( + filter(lambda x: has_unique_nodes(x), response["message"]["results"]) + ) response["message"]["results"] = results if len(results) != original_result_count: filter_kgraph_orphans(response, logger)