Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.venv
# secrets
config
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ wheels/

# Virtual environments
.venv

# IDEs
.idea/
10 changes: 10 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3.14-slim-bookworm
LABEL authors="Stephen Thompson, Jeremy Stein"
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
WORKDIR /app
ARG UVCACHE=/root/.cache/uv
COPY pyproject.toml uv.lock* /app/
RUN --mount=type=cache,target=${UVCACHE} uv pip install --system .
COPY . /app/
RUN --mount=type=cache,target=${UVCACHE} uv pip install --system .
CMD ["emap-extract-waveform"]
61 changes: 40 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,52 +1,71 @@
A controller for reading waveform data from a rabbitmq queue and processing it.
A controller for reading waveform data from a rabbitmq queue and processing it.

# Running the Code
## 1 Install and deploy EMAP
Follow the emap development [instructions](https://github.com/SAFEHR-data/emap/blob/main/docs/dev/core.md#deploying-a-live-version "Instructions for deploying a live version of EMAP") configure and deploy a version of EMAP. To run a local version you'll need to set
Follow the emap development [instructions](https://github.com/SAFEHR-data/emap/blob/main/docs/dev/core.md#deploying-a-live-version "Instructions for deploying a live version of EMAP") configure and deploy a version of EMAP. To run a local version you'll need to set

```
fake_uds:
enable_fake_uds: true
enable_fake_uds: true
uds:
UDS_JDBC_URL: jdbc:postgresql://fakeuds:5432/fakeuds
UDS_JDBC_URL: jdbc:postgresql://fakeuds:5432/fakeuds
```

and configure and synthetic waveform generator

```
waveform:
enable_waveform: true
enable_waveform_generator: true
CORE_WAVEFORM_RETENTION_HOURS: 24
waveform:
enable_waveform: true
enable_waveform_generator: true
CORE_WAVEFORM_RETENTION_HOURS: 24
WAVEFORM_HL7_SOURCE_ADDRESS_ALLOW_LIST: ALL
WAVEFORM_HL7_TEST_DUMP_FILE: ""
WAVEFORM_HL7_TEST_DUMP_FILE: ""
WAVEFORM_HL7_SAVE_DIRECTORY: "/waveform-saved-messages"
WAVEFORM_SYNTHETIC_NUM_PATIENTS: 2
WAVEFORM_SYNTHETIC_WARP_FACTOR:1
WAVEFORM_SYNTHETIC_START_DATETIME: "2024-01-02T12:00:00Z"
WAVEFORM_SYNTHETIC_END_DATETIME: "2024-01-03T12:00:00Z"
WAVEFORM_SYNTHETIC_NUM_PATIENTS: 2
WAVEFORM_SYNTHETIC_WARP_FACTOR:1
WAVEFORM_SYNTHETIC_START_DATETIME: "2024-01-02T12:00:00Z"
WAVEFORM_SYNTHETIC_END_DATETIME: "2024-01-03T12:00:00Z"
```

Once configured you can start it with
Once configured you can start it with

```
emap docker up -d
```

## 2 Install and deploy waveform reader using uv
## 2 Install and deploy waveform controller using docker

Configuration, copy the configuration file to the config directory and edit
as necessary. Remove the comment telling you not to put secrets in it.

```
uv venv .waveform-controller
source .waveform-controller/bin/activate
uv pip install . --active
cp settings.env.EXAMPLE config/settings.env
```
If it doesn't already exist you should create a directory named
`waveform-export` in the parent directory to store the saved waveform
messages.

## 3 Check if it's working
```
mkdir ../waveform-export
```

If successful you should be able to run the demo script and see waveform messaged dumped to the terminal.
Build and start the controller with docker
```
python waveform_controller.py
cd ../waveform-controller
docker compose build
docker compose up -d
```

## 3 Check if it's working

Running the controller will save (to `../waveform-export`) waveform messages
matched to Contact Serial Number (CSN) as csv files, each containing data for
one calender day, as
`YYYY-MM-DD.CSN.sourceName.units.csv`

Each row of the csv will contain

`csn, mrn, units, samplingRate, observationTime, waveformData`

# Developing
See [developing docs](docs/develop.md)
2 changes: 2 additions & 0 deletions config/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# secrets go here, do not put in git
*
15 changes: 15 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
services:
waveform-controller:
build:
context: .
dockerfile: Dockerfile
args:
HTTP_PROXY: ${HTTP_PROXY}
http_proxy: ${http_proxy}
HTTPS_PROXY: ${HTTPS_PROXY}
https_proxy: ${https_proxy}
# ideally we'd use docker secrets but it's not enabled currently
env_file:
- ./config/settings.env
volumes:
- ../waveform-export:/app/waveform-export
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ dependencies = [
"pre-commit>=4.5.0",
"psycopg2-binary>=2.9.11",
]

[project.scripts]
emap-extract-waveform = "waveform_controller.controller:receiver"
13 changes: 13 additions & 0 deletions settings.env.EXAMPLE
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# This is an EXAMPLE file, do not put real secrets in here.
# Copy it to ./config/settings.env and then DELETE THIS COMMENT.
UDS_DBNAME="fakeuds"
UDS_USERNAME="inform_user"
UDS_PASSWORD="inform"
UDS_HOST="localhost"
UDS_PORT="5433"
SCHEMA_NAME="star_dev"
RABBITMQ_USERNAME="my_name"
RABBITMQ_PASSWORD="my_pw"
RABBITMQ_HOST="localhost"
RABBITMQ_PORT=5672
RABBITMQ_QUEUE="waveform"
55 changes: 55 additions & 0 deletions waveform_controller/csv_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Writes a frame of waveform data to a csv file."""

import csv
from datetime import datetime
from pathlib import Path


def create_file_name(
sourceSystem: str, observationTime: datetime, csn: str, units: str
) -> str:
"""Create a unique file name based on the patient contact serial number
(csn) the date, and the source system."""
datestring = observationTime.strftime("%Y-%m-%d")
return f"{datestring}.{csn}.{sourceSystem}.{units}.csv"


def write_frame(waveform_message: dict, csn: str, mrn: str) -> bool:
"""Appends a frame of waveform data to a csv file (creates file if it
doesn't exist.

:return: True if write was successful.
"""
sourceSystem = waveform_message.get("sourceSystem", None)
observationTime = waveform_message.get("observationTime", False)

if not observationTime:
raise ValueError("waveform_message is missing observationTime")

observation_datetime = datetime.fromtimestamp(observationTime)
units = waveform_message.get("unit", "")

out_path = "waveform-export/"
Path(out_path).mkdir(exist_ok=True)

filename = out_path + create_file_name(
sourceSystem, observation_datetime, csn, units
)
with open(filename, "a") as fileout:
wv_writer = csv.writer(fileout, delimiter=",")
waveform_data = waveform_message.get("numericValues", "")
if waveform_data != "":
waveform_data = waveform_data.get("value", "")

wv_writer.writerow(
[
csn,
mrn,
units,
waveform_message.get("samplingRate", ""),
observationTime,
waveform_data,
]
)

return True
29 changes: 16 additions & 13 deletions waveform_controller/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
from datetime import datetime, timedelta

import waveform_controller.settings as settings
import waveform_controller.csv_writer as writer


class starDB:
sql_query: str = ""
connection_string: str = "dbname={} user={} password={} host={} port={}".format(
settings.UDS_DBNAME,
settings.UDS_USERNAME,
settings.UDS_PASSWORD,
settings.UDS_HOST,
settings.UDS_PORT,
settings.UDS_DBNAME, # type:ignore
settings.UDS_USERNAME, # type:ignore
settings.UDS_PASSWORD, # type:ignore
settings.UDS_HOST, # type:ignore
settings.UDS_PORT, # type:ignore
)

def init_query(self):
Expand All @@ -31,10 +32,13 @@ def get_row(self, location_string: str, start_datetime: str, end_datetime: str):
"start_datetime": start_datetime,
"end_datetime": end_datetime,
}
with psycopg2.connect(self.connection_string) as db_connection:
with db_connection.cursor() as curs:
curs.execute(self.sql_query, parameters)
single_row = curs.fetchone()
try:
with psycopg2.connect(self.connection_string) as db_connection:
with db_connection.cursor() as curs:
curs.execute(self.sql_query, parameters)
single_row = curs.fetchone()
except psycopg2.errors.UndefinedTable:
raise ConnectionError("There is no table in your data base")

return single_row

Expand All @@ -50,7 +54,6 @@ def waveform_callback(self, ch, method, properties, body):
obs_time_str = observation_time.strftime("%Y-%m-%d:%H:%M:%S")
start_time_str = start_time.strftime("%Y-%m-%d:%H:%M:%S")
matched_mrn = self.get_row(location_string, start_time_str, obs_time_str)
# print(f"Received a waveform message {data.get('observationTime', 'NAT')}")
print(
f"Received a waveform message from {location_string} at {obs_time_str} with matching mrn = {matched_mrn}"
)

if writer.write_frame(data, matched_mrn[2], matched_mrn[0]):
ch.basic_ack(method.delivery_tag)
33 changes: 21 additions & 12 deletions waveform_controller/settings.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
UDS_DBNAME = "fakeuds"
UDS_USERNAME = "inform_user"
UDS_PASSWORD = "inform"
UDS_HOST = "localhost"
UDS_PORT = "5433"
SCHEMA_NAME = "star_dev"

RABBITMQ_USERNAME = "my_name"
RABBITMQ_PASSWORD = "my_pw"
RABBITMQ_HOST = "localhost"
RABBITMQ_PORT = 5672
RABBITMQ_QUEUE = "waveform"
import os


def get_from_env(env_var, setting_name=None):
if setting_name is None:
setting_name = env_var
globals()[setting_name] = os.environ.get(env_var)


# read env vars into settings variables
get_from_env("UDS_DBNAME")
get_from_env("UDS_USERNAME")
get_from_env("UDS_PASSWORD")
get_from_env("UDS_HOST")
get_from_env("UDS_PORT")
get_from_env("SCHEMA_NAME")
get_from_env("RABBITMQ_USERNAME")
get_from_env("RABBITMQ_PASSWORD")
get_from_env("RABBITMQ_HOST")
get_from_env("RABBITMQ_PORT")
get_from_env("RABBITMQ_QUEUE")