Skip to content
This repository was archived by the owner on Jan 29, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
de25b3c
add ner k8s
Oct 26, 2022
10c5bb4
updated based on Emilie PR
Oct 26, 2022
d9acafc
add retrieve csv and initial test
Oct 26, 2022
55816fb
Create test for create_indices module
EmilieDel Oct 26, 2022
e1017aa
Add test for run_ner_model_remote
EmilieDel Oct 26, 2022
9362068
Fix linting
EmilieDel Oct 27, 2022
202f14c
Update docs
EmilieDel Oct 27, 2022
05cdf4c
Fix type
EmilieDel Oct 27, 2022
348c11b
Ignore UserWarning from spacy
EmilieDel Oct 27, 2022
7d433fe
add async option
Oct 31, 2022
8670419
Merge branch 'feature/k8s_ner' of https://github.com/BlueBrain/Search…
Oct 31, 2022
de0ac24
small function name fix
Oct 31, 2022
93b4016
fix test ner
Oct 31, 2022
9b81f2f
fix test ner
Oct 31, 2022
537c7fd
Start RE
EmilieDel Oct 31, 2022
ea7e9b2
Correct changes mistakes
EmilieDel Oct 31, 2022
540e3f2
add test run main ner
Oct 31, 2022
200ab44
Merge branch 'feature/k8s_ner' of https://github.com/BlueBrain/Search…
Oct 31, 2022
6f8f298
Remove RE part
EmilieDel Oct 31, 2022
47b9923
remove unused import
EmilieDel Oct 31, 2022
2d1fdaf
Merge branch 'feature/k8s_ner' of https://github.com/BlueBrain/Search…
Oct 31, 2022
9da3e4b
linter
Oct 31, 2022
be00832
update test
Oct 31, 2022
2ffff51
fix test run ner
Oct 31, 2022
271fa44
Add time.sleep to see if test is passing
EmilieDel Nov 1, 2022
365b950
Change the place of time.sleep
EmilieDel Nov 1, 2022
6ac6180
correct test index name
Nov 1, 2022
ff5d750
Merge branch 'feature/k8s_ner' of https://github.com/BlueBrain/Search…
Nov 1, 2022
812e765
Add refresh before running run
EmilieDel Nov 1, 2022
c60a9fe
Change the mapping
EmilieDel Nov 1, 2022
e340b36
Remove time.sleep in get_es_client
EmilieDel Nov 1, 2022
b4f6746
small fix handle conflicts ner
Nov 2, 2022
b99bfb0
add test handle conflicts ner
Nov 2, 2022
2fa94e1
small fix lint
Nov 2, 2022
7a1cc4b
fix isort
Nov 2, 2022
dc26ad6
update test
Nov 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/source/api/bluesearch.k8s.ner.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
bluesearch.k8s.ner module
=========================

.. automodule:: bluesearch.k8s.ner
:members:
:undoc-members:
:show-inheritance:
1 change: 1 addition & 0 deletions docs/source/api/bluesearch.k8s.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Submodules

bluesearch.k8s.connect
bluesearch.k8s.create_indices
bluesearch.k8s.ner

Module contents
---------------
Expand Down
28 changes: 27 additions & 1 deletion src/bluesearch/k8s/create_indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

SETTINGS = {"number_of_shards": 2, "number_of_replicas": 1}

MAPPINGS_ARTICLES = {
MAPPINGS_ARTICLES: dict[str, Any] = {
"dynamic": "strict",
"properties": {
"article_id": {"type": "keyword"},
Expand All @@ -52,6 +52,12 @@
"section_name": {"type": "keyword"},
"paragraph_id": {"type": "short"},
"text": {"type": "text"},
"ner_ml_json_v2": {"type": "flattened"},
"ner_ml_version": {"type": "keyword"},
"ner_ruler_json_v2": {"type": "flattened"},
"ner_ruler_version": {"type": "keyword"},
"re": {"type": "flattened"},
"re_version": {"type": "keyword"},
"is_bad": {"type": "boolean"},
"embedding": {
"type": "dense_vector",
Expand Down Expand Up @@ -119,3 +125,23 @@ def remove_index(client: Elasticsearch, index: str | list[str]) -> None:

except Exception as err:
print("Elasticsearch add_index ERROR:", err)


def update_index_mapping(
client: Elasticsearch,
index: str,
settings: dict[str, Any] | None = None,
properties: dict[str, Any] | None = None,
) -> None:
"""Update the index with a new mapping and settings."""
if index not in client.indices.get_alias().keys():
raise RuntimeError("Index not in ES")

try:
if settings:
client.indices.put_settings(index=index, settings=settings)
if properties:
client.indices.put_mapping(index=index, properties=properties)
logger.info(f"Index {index} updated successfully")
except Exception as err:
print("Elasticsearch add_index ERROR:", err)
333 changes: 333 additions & 0 deletions src/bluesearch/k8s/ner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
# Blue Brain Search is a text mining toolbox focused on scientific use cases.
#
# Copyright (C) 2020 Blue Brain Project, EPFL.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Perform Name Entity Recognition (NER) on paragraphs."""
from __future__ import annotations

import logging
import os
import time
from datetime import datetime
from multiprocessing import Pool
from typing import Any

import elasticsearch
import numpy as np
import pandas as pd
import requests
import tqdm
from dotenv import load_dotenv
from elasticsearch.helpers import scan

from bluesearch.k8s import connect

load_dotenv()

logger = logging.getLogger(__name__)


def run(
client: elasticsearch.Elasticsearch,
version: str,
index: str = "paragraphs",
ner_method: str = "ml",
force: bool = False,
n_threads: int = 4,
run_async: bool = True,
) -> None:
"""Run the NER pipeline on the paragraphs in the database.

Parameters
----------
client
Elasticsearch client.
version
Version of the NER pipeline.
index
Name of the ES index.
ner_method
Method to use to perform NER.
force
If True, force the NER to be performed even in all paragraphs.
n_threads
Number of threads to use.
run_async
If True, run the NER asynchronously.
"""
# get NER method function and url
if ner_method == "ml":
url = os.environ["BENTOML_NER_ML_URL"]
elif ner_method == "ruler":
url = os.environ["BENTOML_NER_RULER_URL"]
else:
raise ValueError("The ner_method should be either 'ml' or 'ruler'.")

# get paragraphs without NER unless force is True
if force:
query: dict[str, Any] = {"match_all": {}}
else:
query = {"bool": {"must_not": {"term": {f"ner_{ner_method}_version": version}}}}
paragraph_count = client.options(request_timeout=30).count(
index=index, query=query
)["count"]
logger.info(
f"There are {paragraph_count} paragraphs without NER {ner_method} results."
)

# performs NER for all the documents
progress = tqdm.tqdm(
total=paragraph_count,
position=0,
unit=" Paragraphs",
desc="Updating NER",
)
if run_async:
# start a pool of workers
pool = Pool(processes=n_threads)
open_threads = []
for hit in scan(client, query={"query": query}, index=index, scroll="12h"):
# add a new thread to the pool
res = pool.apply_async(
run_ner_model_remote,
args=(
hit,
url,
ner_method,
index,
version,
),
)
open_threads.append(res)
progress.update(1)
# check if any thread is done
open_threads = [thr for thr in open_threads if not thr.ready()]
# wait if too many threads are running
while len(open_threads) > n_threads:
time.sleep(0.1)
open_threads = [thr for thr in open_threads if not thr.ready()]
# wait for all threads to finish
pool.close()
pool.join()
else:
for hit in scan(client, query={"query": query}, index=index, scroll="12h"):
run_ner_model_remote(
hit=hit,
url=url,
ner_method=ner_method,
index=index,
version=version,
client=client,
)
progress.update(1)

progress.close()


def run_ner_model_remote(
hit: dict[str, Any],
url: str,
ner_method: str,
index: str | None = None,
version: str | None = None,
client: elasticsearch.Elasticsearch | None = None,
) -> list[dict[str, Any]] | None:
"""Perform NER on a paragraph using a remote server.

Parameters
----------
hit
Elasticsearch hit.
url
URL of the NER server.
ner_method
Method to use to perform NER.
index
Name of the ES index.
version
Version of the NER pipeline.
"""
if client is None and index is None and version is None:
logger.info("Running NER in inference mode only.")
elif client is None and index is not None and version is not None:
client = connect.connect()
elif client is None and (index is not None or version is not None):
raise ValueError("Index and version should be both None or not None.")

url = "http://" + url + "/predict"

response = requests.post(
url,
headers={"accept": "application/json", "Content-Type": "text/plain"},
data=hit["_source"]["text"].encode("utf-8"),
)

if not response.status_code == 200:
raise ValueError("Error in the request")

results = response.json()
out = []
if results:
for res in results:
row = {}
row["entity_type"] = res["entity_group"]
row["entity"] = res["word"]
row["start"] = res["start"]
row["end"] = res["end"]
row["score"] = 0 if ner_method == "ruler" else res["score"]
row["source"] = ner_method
out.append(row)
else:
# if no entity is found, return an empty row,
# necessary for ES to find the field in the document
row = {}
row["entity_type"] = "Empty"
row["entity"] = ""
row["start"] = 0
row["end"] = 0
row["score"] = 0
row["source"] = ner_method
out.append(row)

if client is not None and index is not None and version is not None:
# update the NER field in the document
client.update(
index=index, doc={f"ner_{ner_method}_json_v2": out}, id=hit["_id"]
)
# update the version of the NER
client.update(
index=index, doc={f"ner_{ner_method}_version": version}, id=hit["_id"]
)
return None
else:
return out


def handle_conflicts(results_paragraph: list[dict]) -> list[dict]:
"""Handle conflicts between the NER pipeline and the entity ruler.

Parameters
----------
results_paragraph
List of entities found by the NER pipeline.
"""
# if there is only one entity, it will be kept
if len(results_paragraph) <= 1:
return results_paragraph

temp = sorted(
results_paragraph,
key=lambda x: (-(x["end"] - x["start"]), x["source"]),
)

results_cleaned: list[dict] = []

array = np.zeros(max([x["end"] for x in temp]))
for res in temp:
add_one = 1 if res["entity"][0] == " " else 0
sub_one = 1 if res["entity"][-1] == " " else 0
if len(results_cleaned) == 0:
results_cleaned.append(res)
array[res["start"] + add_one : res["end"] - sub_one] = 1
else:
if array[res["start"] + add_one : res["end"] - sub_one].sum() == 0:
results_cleaned.append(res)
array[res["start"] + add_one : res["end"] - sub_one] = 1

results_cleaned.sort(key=lambda x: x["start"])
return results_cleaned


def retrieve_csv(
client: elasticsearch.Elasticsearch,
index: str = "paragraphs",
ner_method: str = "both",
output_path: str = "./",
) -> None:
"""Retrieve the NER results from the database and save them in a csv file.

Parameters
----------
client
Elasticsearch client.
index
Name of the ES index.
ner_method
Method to use to perform NER.
output_path
Path where one wants to save the csv file.
"""
now = datetime.now().strftime("%d_%m_%Y_%H_%M")

if ner_method == "both":
query: dict[str, dict[str, Any]] = {
"bool": {
"filter": [
{"exists": {"field": "ner_ml_json_v2"}},
{"exists": {"field": "ner_ruler_json_v2"}},
]
}
}
elif ner_method in ["ml", "ruler"]:
query = {"exists": {"field": f"ner_{ner_method}_json_v2"}}
else:
raise ValueError("The ner_method should be either 'ml', 'ruler' or 'both'.")

paragraph_count = client.count(index=index, query=query)["count"]
logger.info(
f"There are {paragraph_count} paragraphs with NER {ner_method} results."
)

progress = tqdm.tqdm(
total=paragraph_count,
position=0,
unit=" Paragraphs",
desc="Retrieving NER",
)
results = []
for hit in scan(client, query={"query": query}, index=index, scroll="12h"):
if ner_method == "both":
results_paragraph = [
*hit["_source"]["ner_ml_json_v2"],
*hit["_source"]["ner_ruler_json_v2"],
]
results_paragraph = handle_conflicts(results_paragraph)
else:
results_paragraph = hit["_source"][f"ner_{ner_method}_json_v2"]

for res in results_paragraph:
row = {}
row["entity_type"] = res["entity_type"]
row["entity"] = res["entity"]
row["start"] = res["start"]
row["end"] = res["end"]
row["source"] = res["source"]
row["paragraph_id"] = hit["_id"]
row["article_id"] = hit["_source"]["article_id"]
results.append(row)

progress.update(1)
logger.info(f"Retrieved NER for paragraph {hit['_id']}, progress: {progress.n}")

progress.close()

df = pd.DataFrame(results)
df.to_csv(f"{output_path}/ner_es_results{ner_method}_{now}.csv", index=False)


if __name__ == "__main__":
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.WARNING)
client = connect.connect()
run(client, version="v2")
Loading