Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ First time here? Check out our [Docs](https://dotimplement.github.io/HealthChain
## Why use HealthChain?
- **EHR integrations are manual and time-consuming** - **HealthChainAPI** abstracts away complexities so you can focus on AI development, not learning FHIR APIs, CDS Hooks, and authentication schemes.
- **Healthcare data is fragmented and complex** - **InteropEngine** handles the conversion between FHIR, CDA, and HL7v2 so you don't have to become an expert in healthcare data standards.
- [**Most healthcare data is unstructured**](https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6372467/) - HealthChain **Pipelines** are optimized for real-time AI and NLP applications that deal with realistic healthcare data.
- [**Most healthcare data is unstructured**](https://pmc.ncbi.nlm.nih.gov/articles/PMC10566734/) - HealthChain **Pipelines** are optimized for real-time AI and NLP applications that deal with realistic healthcare data.
- **Built by health tech developers, for health tech developers** - HealthChain is tech stack agnostic, modular, and easily extensible with built-in compliance and audit features.

## HealthChainAPI
Expand Down
70 changes: 49 additions & 21 deletions cookbook/cds_discharge_summarizer_hf_chat.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import healthchain as hc

from healthchain.gateway import HealthChainAPI, CDSHooksService
from healthchain.pipeline import SummarizationPipeline
from healthchain.models import CDSRequest, CDSResponse, Prefetch
from healthchain.data_generators import CdsDataGenerator
from healthchain.sandbox.use_cases import ClinicalDecisionSupport
from healthchain.models import Prefetch, CDSRequest, CDSResponse
from healthchain.data_generators import CdsDataGenerator

from langchain_huggingface.llms import HuggingFaceEndpoint
from langchain_huggingface import ChatHuggingFace

from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

Expand All @@ -21,47 +20,76 @@

def create_summarization_chain():
hf = HuggingFaceEndpoint(
repo_id="HuggingFaceH4/zephyr-7b-beta",
repo_id="deepseek-ai/DeepSeek-R1-0528",
task="text-generation",
max_new_tokens=512,
do_sample=False,
repetition_penalty=1.03,
)

model = ChatHuggingFace(llm=hf)

template = """
You are a bed planner for a hospital. Provide a concise, objective summary of the input text in short bullet points separated by new lines,
focusing on key actions such as appointments and medication dispense instructions, without using second or third person pronouns.\n'''{text}'''
You are a discharge planning assistant for hospital operations.
Provide a concise, objective summary focusing on actionable items
for care coordination, including appointments, medications, and
follow-up instructions. Format as bullet points with no preamble.\n'''{text}'''
"""
prompt = PromptTemplate.from_template(template)

return prompt | model | StrOutputParser()


@hc.sandbox
# Create the healthcare application
app = HealthChainAPI(
title="Discharge Note Summarizer",
description="AI-powered discharge note summarization service",
)

chain = create_summarization_chain()
pipeline = SummarizationPipeline.load(
chain, source="langchain", template_path="templates/cds_card_template.json"
)

# Create CDS Hooks service
cds = CDSHooksService()


@cds.hook("encounter-discharge", id="discharge-summarizer")
def discharge_summarizer(request: CDSRequest) -> CDSResponse:
result = pipeline.process_request(request)
return result


# Register the CDS service
app.register_service(cds, path="/cds")


@hc.sandbox(api="http://localhost:8000")
class DischargeNoteSummarizer(ClinicalDecisionSupport):
def __init__(self):
# Initialize pipeline and data generator
chain = create_summarization_chain()
self.pipeline = SummarizationPipeline.load(
chain, source="langchain", template_path="templates/cds_card_template.json"
)
super().__init__(path="/cds/cds-services/discharge-summarizer")
self.data_generator = CdsDataGenerator()

@hc.ehr(workflow="encounter-discharge")
def load_data_in_client(self) -> Prefetch:
# Generate synthetic FHIR data for testing
data = self.data_generator.generate_prefetch(
free_text_path="data/discharge_notes.csv", column_name="text"
)
return data

@hc.api
def my_service(self, request: CDSRequest) -> CDSResponse:
# Process the request through our pipeline
result = self.pipeline.process_request(request)
return result


if __name__ == "__main__":
# Start the sandbox server
import uvicorn
import threading

# Start the API server in a separate thread
def start_api():
uvicorn.run(app, port=8000)

api_thread = threading.Thread(target=start_api, daemon=True)
api_thread.start()

# Start the sandbox
summarizer = DischargeNoteSummarizer()
summarizer.start_sandbox()
49 changes: 39 additions & 10 deletions cookbook/cds_discharge_summarizer_hf_trf.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import healthchain as hc

from healthchain.gateway import HealthChainAPI, CDSHooksService
from healthchain.pipeline import SummarizationPipeline
from healthchain.sandbox.use_cases import ClinicalDecisionSupport
from healthchain.models import Prefetch, CDSRequest, CDSResponse
Expand All @@ -13,12 +13,35 @@
os.environ["HUGGINGFACEHUB_API_TOKEN"] = getpass.getpass("Enter your token: ")


@hc.sandbox
# Create the healthcare application
app = HealthChainAPI(
title="Discharge Note Summarizer",
description="AI-powered discharge note summarization service",
)

# Initialize pipeline
pipeline = SummarizationPipeline.from_model_id(
"google/pegasus-xsum", source="huggingface", task="summarization"
)

# Create CDS Hooks service
cds = CDSHooksService()


@cds.hook("encounter-discharge", id="discharge-summarizer")
def discharge_summarizer(request: CDSRequest) -> CDSResponse:
result = pipeline.process_request(request)
return result


# Register the CDS service
app.register_service(cds, path="/cds")


@hc.sandbox(api="http://localhost:8000")
class DischargeNoteSummarizer(ClinicalDecisionSupport):
def __init__(self):
self.pipeline = SummarizationPipeline.from_model_id(
"google/pegasus-xsum", source="huggingface", task="summarization"
)
super().__init__(path="/cds/cds-services/discharge-summarizer")
self.data_generator = CdsDataGenerator()

@hc.ehr(workflow="encounter-discharge")
Expand All @@ -28,12 +51,18 @@ def load_data_in_client(self) -> Prefetch:
)
return data

@hc.api
def my_service(self, request: CDSRequest) -> CDSResponse:
result = self.pipeline.process_request(request)
return result


if __name__ == "__main__":
import uvicorn
import threading

# Start the API server in a separate thread
def start_api():
uvicorn.run(app, port=8000)

api_thread = threading.Thread(target=start_api, daemon=True)
api_thread.start()

# Start the sandbox
summarizer = DischargeNoteSummarizer()
summarizer.start_sandbox()
170 changes: 170 additions & 0 deletions cookbook/notereader_clinical_coding_fhir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
A complete CDI service that processes clinical notes and extracts billing codes.
Demonstrates FHIR-native pipelines, legacy system integration, and multi-source data handling.

Requirements:
- pip install healthchain
- pip install scispacy
- pip install https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/v0.5.4/en_core_sci_sm-0.5.4.tar.gz
- pip install python-dotenv

Run:
- python notereader_clinical_coding_fhir.py # Demo and start server
"""

import os
import uvicorn
from datetime import datetime, timezone

import healthchain as hc
from fhir.resources.documentreference import DocumentReference
from fhir.resources.meta import Meta
from spacy.tokens import Span
from dotenv import load_dotenv

from healthchain.fhir import create_document_reference
from healthchain.gateway.api import HealthChainAPI
from healthchain.gateway.fhir import FHIRGateway
from healthchain.gateway.soap import NoteReaderService
from healthchain.io import CdaAdapter, Document
from healthchain.models import CdaRequest
from healthchain.pipeline.medicalcodingpipeline import MedicalCodingPipeline
from healthchain.sandbox.use_cases import ClinicalDocumentation


load_dotenv()


BILLING_URL = (
f"fhir://api.medplum.com/fhir/R4/"
f"?client_id={os.environ.get('MEDPLUM_CLIENT_ID')}"
f"&client_secret={os.environ.get('MEDPLUM_CLIENT_SECRET')}"
f"&token_url={os.environ.get('MEDPLUM_TOKEN_URL', 'https://api.medplum.com/oauth2/token')}"
f"&scope={os.environ.get('MEDPLUM_SCOPE', 'openid')}"
)


def create_pipeline():
"""Build FHIR-native ML pipeline with automatic problem extraction."""
pipeline = MedicalCodingPipeline.from_model_id("en_core_sci_sm", source="spacy")

# Add custom entity linking
@pipeline.add_node(position="after", reference="SpacyNLP")
def link_entities(doc: Document) -> Document:
"""Add CUI codes to medical entities for problem extraction"""
if not Span.has_extension("cui"):
Span.set_extension("cui", default=None)

spacy_doc = doc.nlp.get_spacy_doc()

# Simple dummy linker for demo purposes
dummy_linker = {
"pneumonia": "233604007",
"type 2 diabetes mellitus": "44054006",
"congestive heart failure": "42343007",
"chronic kidney disease": "431855005",
"hypertension": "38341003",
"community acquired pneumonia": "385093006",
"ventilator associated pneumonia": "233717007",
"anaphylaxis": "39579001",
"delirium": "2776000",
"depression": "35489007",
"asthma": "195967001",
"copd": "13645005",
}

for ent in spacy_doc.ents:
if ent.text.lower() in dummy_linker:
ent._.cui = dummy_linker[ent.text.lower()]

return doc

return pipeline


def create_app():
"""Create production healthcare API."""
pipeline = create_pipeline()
cda_adapter = CdaAdapter()

# Modern FHIR sources
fhir_gateway = FHIRGateway()
fhir_gateway.add_source("billing", BILLING_URL)

# Legacy CDA processing
note_service = NoteReaderService()

@note_service.method("ProcessDocument")
def ai_coding_workflow(request: CdaRequest):
doc = cda_adapter.parse(request)
doc = pipeline(doc)

for condition in doc.fhir.problem_list:
# Add basic provenance tracking
condition.meta = Meta(
source="urn:healthchain:pipeline:cdi",
lastUpdated=datetime.now(timezone.utc).isoformat(),
)
fhir_gateway.create(condition, source="billing")

cda_response = cda_adapter.format(doc)

return cda_response

# Register services
app = HealthChainAPI(title="Epic CDI Service with FHIR integration")
app.register_gateway(fhir_gateway, path="/fhir")
app.register_service(note_service, path="/notereader")

return app


def create_sandbox():
@hc.sandbox(api="http://localhost:8000/")
class NotereaderSandbox(ClinicalDocumentation):
"""Sandbox for testing clinical documentation workflows"""

def __init__(self):
super().__init__()
self.data_path = "./resources/uclh_cda.xml"

@hc.ehr(workflow="sign-note-inpatient")
def load_clinical_document(self) -> DocumentReference:
"""Load a sample CDA document for processing"""
with open(self.data_path, "r") as file:
xml_content = file.read()

return create_document_reference(
data=xml_content,
content_type="text/xml",
description="Sample CDA document from sandbox",
)

return NotereaderSandbox()


# Create the app
app = create_app()


if __name__ == "__main__":
import threading
from time import sleep

# Start server
def run_server():
uvicorn.run(app, port=8000, log_level="warning")

server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
sleep(2) # Wait for startup

# Test sandbox
sandbox = create_sandbox()
sandbox.start_sandbox()

try:
server_thread.join()
except KeyboardInterrupt:
pass
Binary file added docs/assets/images/medplum_client.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading