diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..c1fb6ad --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +.venv +# secrets +config diff --git a/.gitignore b/.gitignore index 505a3b1..0635af1 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,6 @@ wheels/ # Virtual environments .venv + +# IDEs +.idea/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ecae0ad --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.14-slim-bookworm +LABEL authors="Stephen Thompson, Jeremy Stein" +COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ +WORKDIR /app +ARG UVCACHE=/root/.cache/uv +COPY pyproject.toml uv.lock* /app/ +RUN --mount=type=cache,target=${UVCACHE} uv pip install --system . +COPY . /app/ +RUN --mount=type=cache,target=${UVCACHE} uv pip install --system . +CMD ["emap-extract-waveform"] diff --git a/README.md b/README.md index 2d63f1e..7de37b2 100644 --- a/README.md +++ b/README.md @@ -1,52 +1,71 @@ -A controller for reading waveform data from a rabbitmq queue and processing it. +A controller for reading waveform data from a rabbitmq queue and processing it. # Running the Code ## 1 Install and deploy EMAP -Follow the emap development [instructions](https://github.com/SAFEHR-data/emap/blob/main/docs/dev/core.md#deploying-a-live-version "Instructions for deploying a live version of EMAP") configure and deploy a version of EMAP. To run a local version you'll need to set +Follow the emap development [instructions](https://github.com/SAFEHR-data/emap/blob/main/docs/dev/core.md#deploying-a-live-version "Instructions for deploying a live version of EMAP") configure and deploy a version of EMAP. To run a local version you'll need to set ``` fake_uds: - enable_fake_uds: true + enable_fake_uds: true uds: - UDS_JDBC_URL: jdbc:postgresql://fakeuds:5432/fakeuds + UDS_JDBC_URL: jdbc:postgresql://fakeuds:5432/fakeuds ``` and configure and synthetic waveform generator ``` -waveform: - enable_waveform: true - enable_waveform_generator: true - CORE_WAVEFORM_RETENTION_HOURS: 24 +waveform: + enable_waveform: true + enable_waveform_generator: true + CORE_WAVEFORM_RETENTION_HOURS: 24 WAVEFORM_HL7_SOURCE_ADDRESS_ALLOW_LIST: ALL - WAVEFORM_HL7_TEST_DUMP_FILE: "" + WAVEFORM_HL7_TEST_DUMP_FILE: "" WAVEFORM_HL7_SAVE_DIRECTORY: "/waveform-saved-messages" - WAVEFORM_SYNTHETIC_NUM_PATIENTS: 2 - WAVEFORM_SYNTHETIC_WARP_FACTOR:1 - WAVEFORM_SYNTHETIC_START_DATETIME: "2024-01-02T12:00:00Z" - WAVEFORM_SYNTHETIC_END_DATETIME: "2024-01-03T12:00:00Z" + WAVEFORM_SYNTHETIC_NUM_PATIENTS: 2 + WAVEFORM_SYNTHETIC_WARP_FACTOR:1 + WAVEFORM_SYNTHETIC_START_DATETIME: "2024-01-02T12:00:00Z" + WAVEFORM_SYNTHETIC_END_DATETIME: "2024-01-03T12:00:00Z" ``` -Once configured you can start it with +Once configured you can start it with ``` emap docker up -d ``` -## 2 Install and deploy waveform reader using uv +## 2 Install and deploy waveform controller using docker + +Configuration, copy the configuration file to the config directory and edit +as necessary. Remove the comment telling you not to put secrets in it. ``` -uv venv .waveform-controller -source .waveform-controller/bin/activate -uv pip install . --active +cp settings.env.EXAMPLE config/settings.env ``` +If it doesn't already exist you should create a directory named +`waveform-export` in the parent directory to store the saved waveform +messages. -## 3 Check if it's working +``` +mkdir ../waveform-export +``` -If successful you should be able to run the demo script and see waveform messaged dumped to the terminal. +Build and start the controller with docker ``` -python waveform_controller.py +cd ../waveform-controller +docker compose build +docker compose up -d ``` +## 3 Check if it's working + +Running the controller will save (to `../waveform-export`) waveform messages +matched to Contact Serial Number (CSN) as csv files, each containing data for +one calender day, as +`YYYY-MM-DD.CSN.sourceName.units.csv` + +Each row of the csv will contain + +`csn, mrn, units, samplingRate, observationTime, waveformData` + # Developing See [developing docs](docs/develop.md) diff --git a/config/.gitignore b/config/.gitignore new file mode 100644 index 0000000..a5f977a --- /dev/null +++ b/config/.gitignore @@ -0,0 +1,2 @@ +# secrets go here, do not put in git +* diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..a49869c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,15 @@ +services: + waveform-controller: + build: + context: . + dockerfile: Dockerfile + args: + HTTP_PROXY: ${HTTP_PROXY} + http_proxy: ${http_proxy} + HTTPS_PROXY: ${HTTPS_PROXY} + https_proxy: ${https_proxy} + # ideally we'd use docker secrets but it's not enabled currently + env_file: + - ./config/settings.env + volumes: + - ../waveform-export:/app/waveform-export diff --git a/pyproject.toml b/pyproject.toml index f013aad..fc9c7bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,3 +9,6 @@ dependencies = [ "pre-commit>=4.5.0", "psycopg2-binary>=2.9.11", ] + +[project.scripts] +emap-extract-waveform = "waveform_controller.controller:receiver" diff --git a/settings.env.EXAMPLE b/settings.env.EXAMPLE new file mode 100644 index 0000000..35027b6 --- /dev/null +++ b/settings.env.EXAMPLE @@ -0,0 +1,13 @@ +# This is an EXAMPLE file, do not put real secrets in here. +# Copy it to ./config/settings.env and then DELETE THIS COMMENT. +UDS_DBNAME="fakeuds" +UDS_USERNAME="inform_user" +UDS_PASSWORD="inform" +UDS_HOST="localhost" +UDS_PORT="5433" +SCHEMA_NAME="star_dev" +RABBITMQ_USERNAME="my_name" +RABBITMQ_PASSWORD="my_pw" +RABBITMQ_HOST="localhost" +RABBITMQ_PORT=5672 +RABBITMQ_QUEUE="waveform" diff --git a/waveform_controller/csv_writer.py b/waveform_controller/csv_writer.py new file mode 100644 index 0000000..3b8a9b1 --- /dev/null +++ b/waveform_controller/csv_writer.py @@ -0,0 +1,55 @@ +"""Writes a frame of waveform data to a csv file.""" + +import csv +from datetime import datetime +from pathlib import Path + + +def create_file_name( + sourceSystem: str, observationTime: datetime, csn: str, units: str +) -> str: + """Create a unique file name based on the patient contact serial number + (csn) the date, and the source system.""" + datestring = observationTime.strftime("%Y-%m-%d") + return f"{datestring}.{csn}.{sourceSystem}.{units}.csv" + + +def write_frame(waveform_message: dict, csn: str, mrn: str) -> bool: + """Appends a frame of waveform data to a csv file (creates file if it + doesn't exist. + + :return: True if write was successful. + """ + sourceSystem = waveform_message.get("sourceSystem", None) + observationTime = waveform_message.get("observationTime", False) + + if not observationTime: + raise ValueError("waveform_message is missing observationTime") + + observation_datetime = datetime.fromtimestamp(observationTime) + units = waveform_message.get("unit", "") + + out_path = "waveform-export/" + Path(out_path).mkdir(exist_ok=True) + + filename = out_path + create_file_name( + sourceSystem, observation_datetime, csn, units + ) + with open(filename, "a") as fileout: + wv_writer = csv.writer(fileout, delimiter=",") + waveform_data = waveform_message.get("numericValues", "") + if waveform_data != "": + waveform_data = waveform_data.get("value", "") + + wv_writer.writerow( + [ + csn, + mrn, + units, + waveform_message.get("samplingRate", ""), + observationTime, + waveform_data, + ] + ) + + return True diff --git a/waveform_controller/db.py b/waveform_controller/db.py index e908cd3..c03f6f7 100644 --- a/waveform_controller/db.py +++ b/waveform_controller/db.py @@ -4,16 +4,17 @@ from datetime import datetime, timedelta import waveform_controller.settings as settings +import waveform_controller.csv_writer as writer class starDB: sql_query: str = "" connection_string: str = "dbname={} user={} password={} host={} port={}".format( - settings.UDS_DBNAME, - settings.UDS_USERNAME, - settings.UDS_PASSWORD, - settings.UDS_HOST, - settings.UDS_PORT, + settings.UDS_DBNAME, # type:ignore + settings.UDS_USERNAME, # type:ignore + settings.UDS_PASSWORD, # type:ignore + settings.UDS_HOST, # type:ignore + settings.UDS_PORT, # type:ignore ) def init_query(self): @@ -31,10 +32,13 @@ def get_row(self, location_string: str, start_datetime: str, end_datetime: str): "start_datetime": start_datetime, "end_datetime": end_datetime, } - with psycopg2.connect(self.connection_string) as db_connection: - with db_connection.cursor() as curs: - curs.execute(self.sql_query, parameters) - single_row = curs.fetchone() + try: + with psycopg2.connect(self.connection_string) as db_connection: + with db_connection.cursor() as curs: + curs.execute(self.sql_query, parameters) + single_row = curs.fetchone() + except psycopg2.errors.UndefinedTable: + raise ConnectionError("There is no table in your data base") return single_row @@ -50,7 +54,6 @@ def waveform_callback(self, ch, method, properties, body): obs_time_str = observation_time.strftime("%Y-%m-%d:%H:%M:%S") start_time_str = start_time.strftime("%Y-%m-%d:%H:%M:%S") matched_mrn = self.get_row(location_string, start_time_str, obs_time_str) - # print(f"Received a waveform message {data.get('observationTime', 'NAT')}") - print( - f"Received a waveform message from {location_string} at {obs_time_str} with matching mrn = {matched_mrn}" - ) + + if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): + ch.basic_ack(method.delivery_tag) diff --git a/waveform_controller/settings.py b/waveform_controller/settings.py index a6e439e..2ec92c9 100644 --- a/waveform_controller/settings.py +++ b/waveform_controller/settings.py @@ -1,12 +1,21 @@ -UDS_DBNAME = "fakeuds" -UDS_USERNAME = "inform_user" -UDS_PASSWORD = "inform" -UDS_HOST = "localhost" -UDS_PORT = "5433" -SCHEMA_NAME = "star_dev" - -RABBITMQ_USERNAME = "my_name" -RABBITMQ_PASSWORD = "my_pw" -RABBITMQ_HOST = "localhost" -RABBITMQ_PORT = 5672 -RABBITMQ_QUEUE = "waveform" +import os + + +def get_from_env(env_var, setting_name=None): + if setting_name is None: + setting_name = env_var + globals()[setting_name] = os.environ.get(env_var) + + +# read env vars into settings variables +get_from_env("UDS_DBNAME") +get_from_env("UDS_USERNAME") +get_from_env("UDS_PASSWORD") +get_from_env("UDS_HOST") +get_from_env("UDS_PORT") +get_from_env("SCHEMA_NAME") +get_from_env("RABBITMQ_USERNAME") +get_from_env("RABBITMQ_PASSWORD") +get_from_env("RABBITMQ_HOST") +get_from_env("RABBITMQ_PORT") +get_from_env("RABBITMQ_QUEUE")