Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,5 @@ scrap/
.ruff_cache/
.python-version
.cursor/
scripts/
.private/
.idea/
20 changes: 9 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,16 @@ fhir.add_source("epic", "fhir://fhir.epic.com/r4?client_id=epic_client_id")
fhir.add_source("cerner", "fhir://fhir.cerner.com/r4?client_id=cerner_client_id")

@fhir.aggregate(Patient)
def enrich_patient_data(id: str, source: str = None) -> Patient:
def enrich_patient_data(id: str, source: str) -> Patient:
"""Get patient data from any connected EHR and add AI enhancements"""
patient = fhir.read(Patient, id, source)

# Add AI processing metadata
patient.extension = patient.extension or []
patient.extension.append({
"url": "http://healthchain.org/fhir/ai-processed",
"valueString": f"Enhanced by HealthChain from {source}"
})

return patient
bundle = fhir.search(
Patient,
{"_id": id},
source,
add_provenance=True,
provenance_tag="ai-enhanced",
)
return bundle

app.register_gateway(fhir)

Expand Down
2,151 changes: 2,151 additions & 0 deletions cookbook/data/notereader_cda.xml

Large diffs are not rendered by default.

114 changes: 114 additions & 0 deletions cookbook/multi_ehr_data_aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""
Multi-Source FHIR Data Aggregation

Demonstrates aggregating patient data from multiple FHIR sources with
simple pipeline processing and provenance tracking.

Requirements:
- pip install healthchain python-dotenv

Run:
- python data_aggregation.py
"""

from typing import List

from dotenv import load_dotenv

from fhir.resources.bundle import Bundle
from fhir.resources.condition import Condition
from fhir.resources.annotation import Annotation

from healthchain.gateway import FHIRGateway, HealthChainAPI
from healthchain.gateway.clients.fhir.base import FHIRAuthConfig
from healthchain.pipeline import Pipeline
from healthchain.io.containers import Document
from healthchain.fhir import merge_bundles


load_dotenv()


# Epic FHIR Sandbox - configure via environment, then build connection string
config = FHIRAuthConfig.from_env("EPIC")
EPIC_URL = config.to_connection_string()

# Cerner Open Sandbox
CERNER_URL = "fhir://fhir-open.cerner.com/r4/ec2458f2-1e24-41c8-b71b-0e701af7583d"


def create_pipeline() -> Pipeline[Document]:
"""Build simple pipeline for demo purposes."""
pipeline = Pipeline[Document]()

@pipeline.add_node
def deduplicate(doc: Document) -> Document:
"""Remove duplicate conditions by resource ID."""
conditions = doc.fhir.get_resources("Condition")
unique = list({c.id: c for c in conditions if c.id}.values())
doc.fhir.add_resources(unique, "Condition", replace=True)
print(f"Deduplicated {len(unique)} conditions")
return doc

@pipeline.add_node
def add_annotation(doc: Document) -> Document:
"""Add a note to each Condition indicating pipeline processing."""
conditions = doc.fhir.get_resources("Condition")
for condition in conditions:
note_text = "This resource has been processed by healthchain pipeline"
annotation = Annotation(text=note_text)
condition.note = (condition.note or []) + [annotation]
print(f"Added annotation to {len(conditions)} conditions")
return doc

return pipeline


def create_app():
# Initialize gateway and add sources
gateway = FHIRGateway()
gateway.add_source("epic", EPIC_URL)
gateway.add_source("cerner", CERNER_URL)

pipeline = create_pipeline()

@gateway.aggregate(Condition)
def get_unified_patient(patient_id: str, sources: List[str]) -> Bundle:
"""Aggregate conditions for a patient from multiple sources"""
bundles = []
for source in sources:
try:
bundle = gateway.search(
Condition,
{"patient": patient_id},
source,
add_provenance=True,
provenance_tag="aggregated",
)
bundles.append(bundle)
except Exception as e:
print(f"Error from {source}: {e}")

# Merge bundles - OperationOutcome resources are automatically extracted
merged_bundle = merge_bundles(bundles, deduplicate=True)

doc = Document(data=merged_bundle)
doc = pipeline(doc)

# print([outcome.model_dump() for outcome in doc.fhir.operation_outcomes])

return doc.fhir.bundle.model_dump()

app = HealthChainAPI()
app.register_gateway(gateway)

return app


if __name__ == "__main__":
import uvicorn

app = create_app()
uvicorn.run(app, port=8888)
# Runs at: http://127.0.0.1:8888/
27 changes: 9 additions & 18 deletions cookbook/notereader_clinical_coding_fhir.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,17 @@
- python notereader_clinical_coding_fhir.py # Demo and start server
"""

import os
import uvicorn
from datetime import datetime, timezone

import healthchain as hc

from fhir.resources.documentreference import DocumentReference
from fhir.resources.meta import Meta
from spacy.tokens import Span
from dotenv import load_dotenv

from healthchain.fhir import create_document_reference
from healthchain.fhir import create_document_reference, add_provenance_metadata
from healthchain.gateway.api import HealthChainAPI
from healthchain.gateway.fhir import FHIRGateway
from healthchain.gateway.clients.fhir.base import FHIRAuthConfig
from healthchain.gateway.soap import NoteReaderService
from healthchain.io import CdaAdapter, Document
from healthchain.models import CdaRequest
Expand All @@ -35,14 +33,9 @@

load_dotenv()


BILLING_URL = (
f"fhir://api.medplum.com/fhir/R4/"
f"?client_id={os.environ.get('MEDPLUM_CLIENT_ID')}"
f"&client_secret={os.environ.get('MEDPLUM_CLIENT_SECRET')}"
f"&token_url={os.environ.get('MEDPLUM_TOKEN_URL', 'https://api.medplum.com/oauth2/token')}"
f"&scope={os.environ.get('MEDPLUM_SCOPE', 'openid')}"
)
# Load configuration from environment variables
config = FHIRAuthConfig.from_env("MEDPLUM")
BILLING_URL = config.to_connection_string()


def create_pipeline():
Expand Down Expand Up @@ -84,7 +77,6 @@ def link_entities(doc: Document) -> Document:


def create_app():
"""Create production healthcare API."""
pipeline = create_pipeline()
cda_adapter = CdaAdapter()

Expand All @@ -102,9 +94,8 @@ def ai_coding_workflow(request: CdaRequest):

for condition in doc.fhir.problem_list:
# Add basic provenance tracking
condition.meta = Meta(
source="urn:healthchain:pipeline:cdi",
lastUpdated=datetime.now(timezone.utc).isoformat(),
condition = add_provenance_metadata(
condition, source="epic-notereader", tag_code="cdi"
)
fhir_gateway.create(condition, source="billing")

Expand All @@ -127,7 +118,7 @@ class NotereaderSandbox(ClinicalDocumentation):

def __init__(self):
super().__init__()
self.data_path = "./resources/uclh_cda.xml"
self.data_path = "./data/notereader_cda.xml"

@hc.ehr(workflow="sign-note-inpatient")
def load_clinical_document(self) -> DocumentReference:
Expand Down
Binary file added docs/assets/images/epicsandbox1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/images/epicsandbox2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/images/epicsandbox3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/images/epicsandbox4.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/images/epicsandboxlogin.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading