diff --git a/cookbook/cds_discharge_summarizer_hf_chat.py b/cookbook/cds_discharge_summarizer_hf_chat.py index 72df60d3..ad0a15c0 100644 --- a/cookbook/cds_discharge_summarizer_hf_chat.py +++ b/cookbook/cds_discharge_summarizer_hf_chat.py @@ -1,17 +1,18 @@ -import healthchain as hc +import os +import getpass + from healthchain.gateway import HealthChainAPI, CDSHooksService from healthchain.pipeline import SummarizationPipeline -from healthchain.sandbox.use_cases import ClinicalDecisionSupport -from healthchain.models import Prefetch, CDSRequest, CDSResponse -from healthchain.data_generators import CdsDataGenerator +from healthchain.models import CDSRequest, CDSResponse from langchain_huggingface.llms import HuggingFaceEndpoint from langchain_huggingface import ChatHuggingFace from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser -import getpass -import os +from dotenv import load_dotenv + +load_dotenv() if not os.getenv("HUGGINGFACEHUB_API_TOKEN"): @@ -65,24 +66,12 @@ def discharge_summarizer(request: CDSRequest) -> CDSResponse: app.register_service(cds, path="/cds") -@hc.sandbox(api="http://localhost:8000") -class DischargeNoteSummarizer(ClinicalDecisionSupport): - def __init__(self): - super().__init__(path="/cds/cds-services/discharge-summarizer") - self.data_generator = CdsDataGenerator() - - @hc.ehr(workflow="encounter-discharge") - def load_data_in_client(self) -> Prefetch: - data = self.data_generator.generate_prefetch( - free_text_path="data/discharge_notes.csv", column_name="text" - ) - return data - - if __name__ == "__main__": import uvicorn import threading + from healthchain.sandbox import SandboxClient + # Start the API server in a separate thread def start_api(): uvicorn.run(app, port=8000) @@ -90,6 +79,24 @@ def start_api(): api_thread = threading.Thread(target=start_api, daemon=True) api_thread.start() - # Start the sandbox - summarizer = DischargeNoteSummarizer() - summarizer.start_sandbox() + # Create sandbox client and load test data + client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/cds/cds-services/discharge-summarizer", + ) + # Load discharge notes from CSV + client.load_free_text( + workflow="encounter-discharge", + csv_path="data/discharge_notes.csv", + column_name="text", + ) + # Send requests and get responses + responses = client.send_requests() + + # Save results + client.save_results("./output/") + + try: + api_thread.join() + except KeyboardInterrupt: + pass diff --git a/cookbook/cds_discharge_summarizer_hf_trf.py b/cookbook/cds_discharge_summarizer_hf_trf.py index e65e9916..2cb49baa 100644 --- a/cookbook/cds_discharge_summarizer_hf_trf.py +++ b/cookbook/cds_discharge_summarizer_hf_trf.py @@ -1,12 +1,13 @@ -import healthchain as hc +import os +import getpass + from healthchain.gateway import HealthChainAPI, CDSHooksService from healthchain.pipeline import SummarizationPipeline -from healthchain.sandbox.use_cases import ClinicalDecisionSupport -from healthchain.models import Prefetch, CDSRequest, CDSResponse -from healthchain.data_generators import CdsDataGenerator +from healthchain.models import CDSRequest, CDSResponse -import getpass -import os +from dotenv import load_dotenv + +load_dotenv() if not os.getenv("HUGGINGFACEHUB_API_TOKEN"): @@ -38,24 +39,12 @@ def discharge_summarizer(request: CDSRequest) -> CDSResponse: app.register_service(cds, path="/cds") -@hc.sandbox(api="http://localhost:8000") -class DischargeNoteSummarizer(ClinicalDecisionSupport): - def __init__(self): - super().__init__(path="/cds/cds-services/discharge-summarizer") - self.data_generator = CdsDataGenerator() - - @hc.ehr(workflow="encounter-discharge") - def load_data_in_client(self) -> Prefetch: - data = self.data_generator.generate_prefetch( - free_text_path="data/discharge_notes.csv", column_name="text" - ) - return data - - if __name__ == "__main__": import uvicorn import threading + from healthchain.sandbox import SandboxClient + # Start the API server in a separate thread def start_api(): uvicorn.run(app, port=8000) @@ -63,6 +52,24 @@ def start_api(): api_thread = threading.Thread(target=start_api, daemon=True) api_thread.start() - # Start the sandbox - summarizer = DischargeNoteSummarizer() - summarizer.start_sandbox() + # Create sandbox client and load test data + client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/cds/cds-services/discharge-summarizer", + ) + # Load discharge notes from CSV + client.load_free_text( + workflow="encounter-discharge", + csv_path="data/discharge_notes.csv", + column_name="text", + ) + # Send requests and get responses + responses = client.send_requests() + + # Save results + client.save_results("./output/") + + try: + api_thread.join() + except KeyboardInterrupt: + pass diff --git a/cookbook/notereader_clinical_coding_fhir.py b/cookbook/notereader_clinical_coding_fhir.py index f57925f6..76e9e8b3 100644 --- a/cookbook/notereader_clinical_coding_fhir.py +++ b/cookbook/notereader_clinical_coding_fhir.py @@ -14,14 +14,11 @@ """ import logging -import uvicorn -import healthchain as hc -from fhir.resources.documentreference import DocumentReference from spacy.tokens import Span from dotenv import load_dotenv -from healthchain.fhir import create_document_reference, add_provenance_metadata +from healthchain.fhir import add_provenance_metadata from healthchain.gateway.api import HealthChainAPI from healthchain.gateway.fhir import FHIRGateway from healthchain.gateway.clients.fhir.base import FHIRAuthConfig @@ -29,12 +26,11 @@ from healthchain.io import CdaAdapter, Document from healthchain.models import CdaRequest from healthchain.pipeline.medicalcodingpipeline import MedicalCodingPipeline -from healthchain.sandbox.use_cases import ClinicalDocumentation + # Suppress Spyne warnings logging.getLogger("spyne.model.complex").setLevel(logging.ERROR) - load_dotenv() # Load configuration from environment variables @@ -115,37 +111,16 @@ def ai_coding_workflow(request: CdaRequest): return app -def create_sandbox(): - @hc.sandbox(api="http://localhost:8000/") - class NotereaderSandbox(ClinicalDocumentation): - """Sandbox for testing clinical documentation workflows""" - - def __init__(self): - super().__init__() - self.data_path = "./data/notereader_cda.xml" - - @hc.ehr(workflow="sign-note-inpatient") - def load_clinical_document(self) -> DocumentReference: - """Load a sample CDA document for processing""" - with open(self.data_path, "r") as file: - xml_content = file.read() - - return create_document_reference( - data=xml_content, - content_type="text/xml", - description="Sample CDA document from sandbox", - ) - - return NotereaderSandbox() - - # Create the app app = create_app() if __name__ == "__main__": import threading + import uvicorn + from time import sleep + from healthchain.sandbox import SandboxClient # Start server def run_server(): @@ -155,9 +130,18 @@ def run_server(): server_thread.start() sleep(2) # Wait for startup - # Test sandbox - sandbox = create_sandbox() - sandbox.start_sandbox() + # Create sandbox client for testing + client = SandboxClient( + api_url="http://localhost:8000", endpoint="/notereader/fhir/", protocol="soap" + ) + # Load clinical document from file + client.load_from_path("./data/notereader_cda.xml") + + # Send request and save response + responses = client.send_requests() + + # Save results + client.save_results("./output/") try: server_thread.join() diff --git a/docs/api/clients.md b/docs/api/clients.md deleted file mode 100644 index 52fc7590..00000000 --- a/docs/api/clients.md +++ /dev/null @@ -1,3 +0,0 @@ -# Clients - -::: healthchain.sandbox.clients.ehr.EHRClient diff --git a/docs/api/data_generators.md b/docs/api/data_generators.md deleted file mode 100644 index fa53d591..00000000 --- a/docs/api/data_generators.md +++ /dev/null @@ -1,4 +0,0 @@ -# Data Generators - -::: healthchain.data_generators.cdsdatagenerator -::: healthchain.data_generators.encountergenerators diff --git a/docs/api/use_cases.md b/docs/api/sandbox.md similarity index 59% rename from docs/api/use_cases.md rename to docs/api/sandbox.md index 119a1fa9..d1b8ddd0 100644 --- a/docs/api/use_cases.md +++ b/docs/api/sandbox.md @@ -1,9 +1,11 @@ -# Use Cases +# Sandbox Client + +::: healthchain.sandbox.sandboxclient.SandboxClient + +::: healthchain.sandbox.generators.cdsdatagenerator -::: healthchain.sandbox.use_cases.cds ::: healthchain.models.requests.cdsrequest ::: healthchain.models.responses.cdsresponse -::: healthchain.sandbox.use_cases.clindoc ::: healthchain.models.requests.cdarequest ::: healthchain.models.responses.cdaresponse diff --git a/docs/api/service.md b/docs/api/service.md deleted file mode 100644 index fd4313d2..00000000 --- a/docs/api/service.md +++ /dev/null @@ -1,3 +0,0 @@ -# Service - -::: healthchain.service.service diff --git a/docs/cookbook/clinical_coding.md b/docs/cookbook/clinical_coding.md index 924220ee..9ea0b277 100644 --- a/docs/cookbook/clinical_coding.md +++ b/docs/cookbook/clinical_coding.md @@ -217,37 +217,21 @@ app.register_service(note_service, path="/notereader") ## Test with Sample Documents -HealthChain provides a [sandbox utility](../reference/utilities/sandbox.md) which simulates the NoteReader workflow end-to-end. It loads your sample CDA document, sends it to your service via the configured endpoint, and saves the request/response exchange in an `output/` directory. This lets you test the complete integration locally before connecting to Epic. +HealthChain provides a [sandbox client utility](../reference/utilities/sandbox.md) which simulates the NoteReader workflow end-to-end. It loads your sample CDA document, sends it to your service via the configured endpoint, and saves the request/response exchange in an `output/` directory. This lets you test the complete integration locally before connecting to Epic. ```python -import healthchain as hc - -from healthchain.sandbox.use_cases import ClinicalDocumentation -from healthchain.fhir import create_document_reference - -from fhir.resources.documentreference import DocumentReference - -def create_sandbox(): - @hc.sandbox(api="http://localhost:8000/") - class NotereaderSandbox(ClinicalDocumentation): - """Sandbox for testing clinical documentation workflows""" - def __init__(self): - super().__init__() - self.data_path = "./data/notereader_cda.xml" - - @hc.ehr(workflow="sign-note-inpatient") - def load_clinical_document(self) -> DocumentReference: - """Load a sample CDA document for processing""" - with open(self.data_path, "r") as file: - xml_content = file.read() - - return create_document_reference( - data=xml_content, - content_type="text/xml", - description="Sample CDA document from sandbox", - ) - - return NotereaderSandbox() +from healthchain.sandbox import SandboxClient + +# Create sandbox client for SOAP/CDA testing +client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/notereader/ProcessDocument", + workflow="sign-note-inpatient", + protocol="soap" +) + +# Load sample CDA document +client.load_from_path("./data/notereader_cda.xml") ``` ## Run the Complete Example @@ -256,13 +240,18 @@ Now for the moment of truth! Start your service and run the sandbox to see the c ```python import uvicorn +import threading -# Start the service on port 8000 -uvicorn.run(app) +# Start the API server in a separate thread +def start_api(): + uvicorn.run(app, port=8000) -# Create and start the sandbox -sandbox = create_sandbox() -sandbox.start_sandbox() +api_thread = threading.Thread(target=start_api, daemon=True) +api_thread.start() + +# Send requests and save responses with sandbox client +client.send_requests() +client.save_results("./output/") ``` !!! abstract "What happens when you run this" diff --git a/docs/cookbook/discharge_summarizer.md b/docs/cookbook/discharge_summarizer.md index 94bb2d19..96a9a7af 100644 --- a/docs/cookbook/discharge_summarizer.md +++ b/docs/cookbook/discharge_summarizer.md @@ -8,21 +8,14 @@ Check out the full working example [here](https://github.com/dotimplement/Health ## Setup +### Install Dependencies + ```bash -pip install healthchain +pip install healthchain python-dotenv ``` -Make sure you have a [Hugging Face API token](https://huggingface.co/docs/hub/security-tokens) and set it as the `HUGGINGFACEHUB_API_TOKEN` environment variable. +This example uses a Hugging Face model for the summarization task, so make sure you have a [Hugging Face API token](https://huggingface.co/docs/hub/security-tokens) and set it as the `HUGGINGFACEHUB_API_TOKEN` environment variable. -```python -import getpass -import os - -if not os.getenv("HUGGINGFACEHUB_API_TOKEN"): - os.environ["HUGGINGFACEHUB_API_TOKEN"] = getpass.getpass( - "Enter your token: " - ) -``` If you are using a chat model, make sure you have the necessary `langchain` packages installed. @@ -30,6 +23,16 @@ If you are using a chat model, make sure you have the necessary `langchain` pack pip install langchain langchain-huggingface ``` +### Download Sample Data + +Download the sample data `discharge_notes.csv` into a `data/` folder in your project root using `wget`: + +```bash +mkdir -p data +cd data +wget https://github.com/dotimplement/HealthChain/raw/main/cookbook/data/discharge_notes.csv +``` + ## Initialize the pipeline First, we'll create a [summarization pipeline](../reference/pipeline/pipeline.md) with domain-specific prompting for discharge workflows. You can choose between: @@ -85,7 +88,7 @@ The `SummarizationPipeline` automatically: - Parses FHIR resources from CDS Hooks requests - Extracts clinical text from discharge documents -- Formats outputs as CDS cards according to the CDS Hooks specification +- Formats outputs as CDS cards ## Add the CDS FHIR Adapter @@ -146,37 +149,36 @@ app = HealthChainAPI(title="Discharge Summary CDS Service") app.register_service(cds_service) ``` -## Test with Sandbox +## Test with Sample Data -Use the [sandbox utility](../reference/utilities/sandbox.md) to test the service with sample data: +HealthChain provides a [sandbox client utility](../reference/utilities/sandbox.md) which simulates the CDS hooks workflow end-to-end. It loads your sample free text data and formats it into CDS requests, sends it to your service, and saves the request/response exchange in an `output/` directory. This lets you test the complete integration locally and inspect the inputs and outputs before connecting to a real EHR instance. -!!! note "Download Sample Data" - - Download sample discharge note files from [cookbook/data](https://github.com/dotimplement/HealthChain/tree/main/cookbook/data) and place them in a `data/` folder in your project root. ```python -import healthchain as hc -from healthchain.sandbox.use_cases import ClinicalDecisionSupport -from healthchain.models import Prefetch -from healthchain.data_generators import CdsDataGenerator - -@hc.sandbox(api="http://localhost:8000") -class DischargeNoteSummarizer(ClinicalDecisionSupport): - def __init__(self): - super().__init__(path="/cds-services/discharge-summary") - self.data_generator = CdsDataGenerator() - - @hc.ehr(workflow="encounter-discharge") - def load_data_in_client(self) -> Prefetch: - data = self.data_generator.generate( - free_text_path="data/discharge_notes.csv", column_name="text" - ) - return data +from healthchain.sandbox import SandboxClient + +# Create sandbox client for testing +client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/cds/cds-services/discharge-summarizer", + workflow="encounter-discharge" +) + +# Load discharge notes from CSV and generate FHIR data +client.load_free_text( + csv_path="data/discharge_notes.csv", + column_name="text", + workflow="encounter-discharge" +) ``` +!!! tip "Learn More About Test Data Generation" + + Read more about the test FHIR data generator for CDS hooks [here](../reference/utilities/data_generator.md) + ## Run the Complete Example -Put it all together and run both the service and sandbox: +Put it all together and run both the service and sandbox client: ```python import uvicorn @@ -189,9 +191,9 @@ def start_api(): api_thread = threading.Thread(target=start_api, daemon=True) api_thread.start() -# Start the sandbox -summarizer = DischargeNoteSummarizer() -summarizer.start_sandbox() +# Send requests and save responses with sandbox client +client.send_requests() +client.save_results("./output/") ``` !!! tip "Service Endpoints" diff --git a/docs/quickstart.md b/docs/quickstart.md index 7f96c355..2a93fe66 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -178,24 +178,23 @@ cda_document = engine.from_fhir(fhir_resources, dest_format=FormatType.CDA) ### Sandbox Testing -Test your AI applications in realistic healthcare contexts with sandbox environments for CDS Hooks and clinical documentation workflows. +Test your AI applications in realistic healthcare contexts with `SandboxClient` for CDS Hooks and clinical documentation workflows. [(Full Documentation on Sandbox)](./reference/utilities/sandbox.md) ```python -import healthchain as hc -from healthchain.sandbox.use_cases import ClinicalDecisionSupport +from healthchain.sandbox import SandboxClient -@hc.sandbox -class MyCDS(ClinicalDecisionSupport): - def __init__(self): - self.pipeline = SummarizationPipeline.from_model_id("facebook/bart-large-cnn") - - @hc.ehr(workflow="encounter-discharge") - def ehr_database_client(self): - return self.data_generator.generate_prefetch() +# Create client and load test data +client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/cds/cds-services/my-service", + workflow="encounter-discharge" +) -# Run with: healthchain run mycds.py +# Load from datasets or files +client.load_from_registry("synthea", num_patients=5) +responses = client.send_requests() ``` ### FHIR Helpers @@ -218,59 +217,33 @@ condition = create_condition( ### Data Generator -You can use the data generator to generate synthetic data for your sandbox runs. +You can use the data generator to generate synthetic FHIR data for testing. -The `.generate_prefetch()` method is dependent on use case and workflow. For example, `CdsDataGenerator` will generate synthetic [FHIR](https://hl7.org/fhir/) data as [Pydantic](https://docs.pydantic.dev/) models suitable for the workflow specified by the use case. +The `CdsDataGenerator` generates synthetic [FHIR](https://hl7.org/fhir/) data as [Pydantic](https://docs.pydantic.dev/) models suitable for different CDS workflows. Use it standalone or with `SandboxClient.load_free_text()` to include text-based data. [(Full Documentation on Data Generators)](./reference/utilities/data_generator.md) -=== "Within client" - ```python - import healthchain as hc - - from healthchain.sandbox.use_cases import ClinicalDecisionSupport - from healthchain.models import Prefetch - from healthchain.data_generators import CdsDataGenerator - - @hc.sandbox - class MyCoolSandbox(ClinicalDecisionSupport): - def __init__(self) -> None: - self.data_generator = CdsDataGenerator() - - @hc.ehr(workflow="patient-view") - def load_data_in_client(self) -> Prefetch: - data = self.data_generator.generate_prefetch() - return data - - @hc.api - def my_server(self, request) -> None: - pass - ``` - - -=== "On its own" - ```python - from healthchain.data_generators import CdsDataGenerator - from healthchain.sandbox.workflows import Workflow - - # Initialize data generator - data_generator = CdsDataGenerator() - - # Generate FHIR resources for use case workflow - data_generator.set_workflow(Workflow.encounter_discharge) - data = data_generator.generate_prefetch() - - print(data.model_dump()) - - # { - # "prefetch": { - # "encounter": - # { - # "resourceType": ... - # } - # } - #} - ``` +```python +from healthchain.sandbox.generators import CdsDataGenerator +from healthchain.sandbox.workflows import Workflow + +# Initialize data generator +data_generator = CdsDataGenerator() + +# Generate FHIR resources for specific workflow +data_generator.set_workflow(Workflow.encounter_discharge) +data = data_generator.generate_prefetch() + +print(data.model_dump()) + +# { +# "prefetch": { +# "encounter": { +# "resourceType": ... +# } +# } +# } +``` ## Going further ✨ Check out our [Cookbook](cookbook/index.md) section for more worked examples! HealthChain is still in its early stages, so if you have any questions please feel free to reach us on [Github](https://github.com/dotimplement/HealthChain/discussions) or [Discord](https://discord.gg/UQC6uAepUz). diff --git a/docs/reference/pipeline/adapters/cdaadapter.md b/docs/reference/pipeline/adapters/cdaadapter.md index 909b14c1..1d05060f 100644 --- a/docs/reference/pipeline/adapters/cdaadapter.md +++ b/docs/reference/pipeline/adapters/cdaadapter.md @@ -10,7 +10,7 @@ This adapter is particularly useful for clinical documentation improvement (CDI) | Input | Output | Document Access | |-------|--------|-----------------| -| [**CdaRequest**](../../../api/use_cases.md#healthchain.models.requests.cdarequest.CdaRequest) | [**CdaResponse**](../../../api/use_cases.md#healthchain.models.responses.cdaresponse.CdaResponse) | `Document.fhir.problem_list`, `Document.fhir.medication_list`, `Document.text` | +| [**CdaRequest**](../../../api/sandbox.md#healthchain.models.requests.cdarequest.CdaRequest) | [**CdaResponse**](../../../api/sandbox.md#healthchain.models.responses.cdaresponse.CdaResponse) | `Document.fhir.problem_list`, `Document.fhir.medication_list`, `Document.text` | ## Document Data Access diff --git a/docs/reference/pipeline/adapters/cdsfhiradapter.md b/docs/reference/pipeline/adapters/cdsfhiradapter.md index 55ef62f1..4d21e1be 100644 --- a/docs/reference/pipeline/adapters/cdsfhiradapter.md +++ b/docs/reference/pipeline/adapters/cdsfhiradapter.md @@ -10,7 +10,7 @@ This adapter is specifically designed for building CDS services that receive FHI | Input | Output | Document Access | |-------|--------|-----------------| -| [**CDSRequest**](../../../api/use_cases.md#healthchain.models.requests.cdsrequest.CDSRequest) | [**CDSResponse**](../../../api/use_cases.md#healthchain.models.responses.cdsresponse.CDSResponse) | `Document.fhir.get_prefetch_resources()`, `Document.cds.cards` | +| [**CDSRequest**](../../../api/sandbox.md#healthchain.models.requests.cdsrequest.CDSRequest) | [**CDSResponse**](../../../api/sandbox.md#healthchain.models.responses.cdsresponse.CDSResponse) | `Document.fhir.get_prefetch_resources()`, `Document.cds.cards` | ## Document Data Access diff --git a/docs/reference/utilities/data_generator.md b/docs/reference/utilities/data_generator.md index 57c37aee..a7324cf8 100644 --- a/docs/reference/utilities/data_generator.md +++ b/docs/reference/utilities/data_generator.md @@ -25,34 +25,34 @@ Current implemented workflows: For more information on CDS workflows, see the [CDS Hooks Protocol](../gateway/cdshooks.md) documentation. -You can use the data generator within a client function or on its own. +You can use the data generator with `SandboxClient.load_free_text()` or standalone: -=== "Within client" +=== "With SandboxClient" ```python - import healthchain as hc - from healthchain.sandbox.use_cases import ClinicalDecisionSupport - from healthchain.models import Prefetch - from healthchain.data_generators import CdsDataGenerator - - @hc.sandbox - class MyCoolSandbox(ClinicalDecisionSupport): - def __init__(self) -> None: - self.data_generator = CdsDataGenerator() - - @hc.ehr(workflow="patient-view") - def load_data_in_client(self) -> Prefetch: - prefetch = self.data_generator.generate_prefetch() - return prefetch - - @hc.api - def my_server(self, request) -> None: - pass + from healthchain.sandbox import SandboxClient + + # Create client + client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/cds/cds-services/my-service", + workflow="encounter-discharge" + ) + + # Generate FHIR data from clinical notes + client.load_free_text( + csv_path="./data/discharge_notes.csv", + column_name="text", + workflow="encounter-discharge", + random_seed=42 + ) + + responses = client.send_requests() ``` -=== "On its own" +=== "Standalone" ```python - from healthchain.data_generators import CdsDataGenerator + from healthchain.sandbox.generators import CdsDataGenerator from healthchain.sandbox.workflows import Workflow # Initialize data generator @@ -85,10 +85,6 @@ data_generator.generate(constrain=["has_medication_requests"]) ``` --> -## Other synthetic data sources - -If you are looking for realistic datasets, you are also free to load your own data in a sandbox run! Check out [MIMIC](https://mimic.mit.edu/) for comprehensive continuity of care records and free-text data, or [Synthea](https://synthetichealth.github.io/synthea/) for synthetically generated FHIR resources. Both are open-source, although you will need to complete [PhysioNet Credentialing](https://mimic.mit.edu/docs/gettingstarted/) to access MIMIC. - ## Loading free-text You can specify the `free_text_csv` field of the `.generate_prefetch()` method to load in free-text sources into the data generator, e.g. discharge summaries. This will wrap the text into a FHIR [DocumentReference](https://build.fhir.org/documentreference.html) resource (N.B. currently we place the text directly in the resource attachment, although it is technically supposed to be base64 encoded). diff --git a/docs/reference/utilities/sandbox.md b/docs/reference/utilities/sandbox.md index 4bfe4c1f..b1d1a6a1 100644 --- a/docs/reference/utilities/sandbox.md +++ b/docs/reference/utilities/sandbox.md @@ -1,104 +1,168 @@ -# Sandbox Testing +# Sandbox Client -Sandbox environments provide testing utilities for validating your HealthChain applications in realistic healthcare contexts. These are primarily used for development and testing rather than production deployment. - -!!! info "For production applications, use [HealthChainAPI](../gateway/api.md) instead" - - Sandbox is a testing utility. For production healthcare AI applications, use the [Gateway](../gateway/gateway.md) with [HealthChainAPI](../gateway/api.md). +The sandbox client provides a simplified interface for testing and validating your applications in realistic healthcare contexts. Use `SandboxClient` to quickly spin up demos and test with various data sources and workflows. ## Quick Example Test CDS Hooks workflows with synthetic data: ```python -import healthchain as hc -from healthchain.sandbox.use_cases import ClinicalDecisionSupport +from healthchain.sandbox import SandboxClient + +# Create client +client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/cds/cds-services/my-service", + workflow="encounter-discharge" +) + +# Load data and send requests +client.load_from_registry("synthea", num_patients=5) +responses = client.send_requests() +``` -@hc.sandbox -class TestCDS(ClinicalDecisionSupport): - def __init__(self): - self.pipeline = SummarizationPipeline.from_model_id("facebook/bart-large-cnn") +## SandboxClient - @hc.ehr(workflow="encounter-discharge") - def ehr_database_client(self): - return self.data_generator.generate_prefetch() +### Initializing -# Run with: healthchain run test_cds.py +```python +from healthchain.sandbox import SandboxClient + +client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/cds/cds-services/my-service", + workflow="encounter-discharge", # Optional, auto-detected if not provided + protocol="rest", # "rest" for CDS Hooks, "soap" for CDA + timeout=10.0 +) ``` -## Available Testing Scenarios - -- **[CDS Hooks](../gateway/cdshooks.md)**: `ClinicalDecisionSupport` - Test clinical decision support workflows -- **[Clinical Documentation](../gateway/soap_cda.md)**: `ClinicalDocumentation` - Test SOAP/CDA document processing workflows - -## EHR Client Simulation +### Loading Data -The `@hc.ehr` decorator simulates EHR client behavior for testing. You must specify a **workflow** that determines how your data will be formatted. +=== "From Registry" + ```python + # Load from pre-configured datasets + client.load_from_registry("mimic-on-fhir", sample_size=10) + client.load_from_registry("synthea", num_patients=5) + # See available datasets + from healthchain.sandbox import list_available_datasets + print(list_available_datasets()) + ``` -=== "Clinical Decision Support" +=== "From Files" ```python - import healthchain as hc - from healthchain.sandbox.use_cases import ClinicalDecisionSupport - from healthchain.models import Prefetch - from fhir.resources.patient import Patient - - @hc.sandbox - class MyCoolSandbox(ClinicalDecisionSupport): - @hc.ehr(workflow="patient-view", num=10) - def load_data_in_client(self) -> Prefetch: - # Load your test data here - return Prefetch(prefetch={"patient": Patient(id="123")}) + # Load single file + client.load_from_path("./data/clinical_note.xml") + + # Load directory + client.load_from_path("./data/cda_files/", pattern="*.xml") ``` -=== "Clinical Documentation" +=== "From Free Text CSV" ```python - import healthchain as hc - from healthchain.sandbox.use_cases import ClinicalDocumentation - from healthchain.fhir import create_document_reference - from fhir.resources.documentreference import DocumentReference - - @hc.sandbox - class MyCoolSandbox(ClinicalDocumentation): - @hc.ehr(workflow="sign-note-inpatient", num=10) - def load_data_in_client(self) -> DocumentReference: - # Load your test data here - return create_document_reference(data="", content_type="text/xml") + # Generate synthetic FHIR from clinical notes + client.load_free_text( + csv_path="./data/discharge_notes.csv", + column_name="text", + workflow="encounter-discharge", + random_seed=42 + ) ``` -**Parameters:** +### Sending Requests -- `workflow`: The healthcare workflow to simulate (e.g., "patient-view", "sign-note-inpatient") -- `num`: Optional number of requests to generate for testing +```python +# Send all queued requests +responses = client.send_requests() -## Migration to Production +# Save results +client.save_results("./output/") -!!! warning "Sandbox Decorators are Deprecated" - `@hc.api` is deprecated. Use [HealthChainAPI](../gateway/api.md) for production. +# Get status +status = client.get_status() +print(status) +``` -**Quick Migration:** +## Available Testing Scenarios -```python -# Before (Testing) - Shows deprecation warning -@hc.sandbox -class TestCDS(ClinicalDecisionSupport): - @hc.api # ⚠️ DEPRECATED - def my_service(self, request): ... +**CDS Hooks** (REST protocol): + +- `workflow`: "patient-view", "encounter-discharge", "order-select", etc. +- Load FHIR Prefetch data +- Test clinical decision support services + +**Clinical Documentation** (SOAP protocol): + +- `workflow`: "sign-note-inpatient", "sign-note-outpatient" +- Load CDA XML documents +- Test SOAP/CDA document processing + +## Complete Examples + +=== "CDS Hooks Test" + ```python + from healthchain.sandbox import SandboxClient + + # Initialize for CDS Hooks + client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/cds/cds-services/discharge-summarizer", + workflow="encounter-discharge", + protocol="rest" + ) + + # Load and send + client.load_from_registry("synthea", num_patients=3) + responses = client.send_requests() + client.save_results("./output/") + ``` -# After (Production) -from healthchain.gateway import HealthChainAPI, CDSHooksService +=== "Clinical Documentation Test" + ```python + from healthchain.sandbox import SandboxClient + + # Initialize for SOAP/CDA + client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/notereader/fhir/", + workflow="sign-note-inpatient", + protocol="soap" + ) + + # Load CDA files + client.load_from_path("./data/cda_files/", pattern="*.xml") + responses = client.send_requests() + client.save_results("./output/") + ``` -app = HealthChainAPI() -cds = CDSHooksService() +## Migration Guide -@cds.hook("patient-view") -def my_service(request): ... +!!! warning "Decorator Pattern Deprecated" + The `@hc.sandbox` and `@hc.ehr` decorators with `ClinicalDecisionSupport` and `ClinicalDocumentation` base classes are deprecated. Use `SandboxClient` instead. + +**Before:** +```python +@hc.sandbox +class TestCDS(ClinicalDecisionSupport): + @hc.ehr(workflow="patient-view") + def load_data(self): + return prefetch_data +``` -app.register_service(cds) +**After:** +```python +client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/cds/cds-services/my-service", + workflow="patient-view" +) +client.load_from_registry("synthea", num_patients=5) +responses = client.send_requests() ``` -**Next Steps:** +## Next Steps -1. **Testing**: Continue using sandbox utilities with deprecation warnings +1. **Testing**: Use `SandboxClient` for local development and testing 2. **Production**: Migrate to [HealthChainAPI Gateway](../gateway/gateway.md) 3. **Protocols**: See [CDS Hooks](../gateway/cdshooks.md) and [SOAP/CDA](../gateway/soap_cda.md) diff --git a/healthchain/__init__.py b/healthchain/__init__.py index 62fab4bd..92fb633c 100644 --- a/healthchain/__init__.py +++ b/healthchain/__init__.py @@ -4,7 +4,6 @@ from .utils.logger import add_handlers from .config.base import ConfigManager, ValidationLevel -from .sandbox.decorator import sandbox, api, ehr # Enable deprecation warnings warnings.filterwarnings("always", category=DeprecationWarning, module="healthchain") @@ -16,3 +15,28 @@ # Export them at the top level __all__ = ["ConfigManager", "ValidationLevel", "api", "ehr", "sandbox"] + + +# Legacy import with warning +def __getattr__(name): + if name == "data_generators": + warnings.warn( + "Importing data_generators from healthchain is deprecated. " + "Use 'from healthchain.sandbox import generators' instead.", + DeprecationWarning, + stacklevel=2, + ) + from healthchain.sandbox import generators + + return generators + elif name == "use_cases": + warnings.warn( + "Importing use_cases from healthchain is deprecated. " + "Use 'from healthchain.sandbox import use_cases' instead.", + DeprecationWarning, + stacklevel=2, + ) + from healthchain.sandbox import use_cases + + return use_cases + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/healthchain/sandbox/__init__.py b/healthchain/sandbox/__init__.py index 0eaec44e..5c1c708c 100644 --- a/healthchain/sandbox/__init__.py +++ b/healthchain/sandbox/__init__.py @@ -1,21 +1,34 @@ -from .decorator import sandbox, api, ehr -from .environment import SandboxEnvironment -from .use_cases import ( - ClinicalDecisionSupport, - ClinicalDocumentation, - CdsRequestConstructor, - ClinDocRequestConstructor, -) -from .clients import EHRClient +import warnings + +from .sandboxclient import SandboxClient +from .datasets import DatasetRegistry, DatasetLoader, list_available_datasets + + +# Import loaders to trigger auto-registration __all__ = [ - "sandbox", - "api", - "ehr", - "SandboxEnvironment", - "ClinicalDecisionSupport", - "ClinicalDocumentation", - "CdsRequestConstructor", - "ClinDocRequestConstructor", - "EHRClient", + "SandboxClient", + "DatasetRegistry", + "DatasetLoader", + "list_available_datasets", ] + + +def __getattr__(name): + deprecated_names = [ + "sandbox", + "api", + "ehr", + "ClinicalDecisionSupport", + "ClinicalDocumentation", + ] + + if name in deprecated_names: + warnings.warn( + f"{name} is deprecated and has been removed. " + f"Use SandboxClient instead.", + DeprecationWarning, + stacklevel=2, + ) + raise AttributeError(f"{name} has been removed") + raise AttributeError(f"module 'healthchain.sandbox' has no attribute '{name}'") diff --git a/healthchain/sandbox/base.py b/healthchain/sandbox/base.py index e38ef298..5db7a742 100644 --- a/healthchain/sandbox/base.py +++ b/healthchain/sandbox/base.py @@ -1,20 +1,22 @@ from abc import ABC, abstractmethod -from typing import Dict, List, Optional +from typing import Dict +from enum import Enum -from healthchain.sandbox.workflows import UseCaseType, Workflow +from healthchain.models.hooks.prefetch import Prefetch +from healthchain.sandbox.workflows import Workflow -class BaseClient(ABC): - """Base client class - A client can be an EHR or CPOE etc. - The basic operation is that it sends data in a specified standard. +class ApiProtocol(Enum): """ + Enum defining the supported API protocols. - @abstractmethod - def send_request(self) -> None: - """ - Sends a request to AI service - """ + Available protocols: + - soap: SOAP protocol + - rest: REST protocol + """ + + soap = "SOAP" + rest = "REST" class BaseRequestConstructor(ABC): @@ -30,40 +32,38 @@ def construct_request(self, data, workflow: Workflow) -> Dict: pass -class BaseUseCase(ABC): +class DatasetLoader(ABC): """ - Abstract base class for healthcare use cases in the sandbox environment. + Abstract base class for dataset loaders. - This class provides a foundation for implementing different healthcare use cases - such as Clinical Decision Support (CDS) or Clinical Documentation (NoteReader). - Subclasses must implement the type and strategy properties. + Subclasses should implement the load() method to return Prefetch data + from their specific dataset source. """ - def __init__( - self, - client: Optional[BaseClient] = None, - ) -> None: - self._client: BaseClient = client + @abstractmethod + def load(self, **kwargs) -> Prefetch: + """ + Load dataset and return as Prefetch object. - self.responses: List[Dict[str, str]] = [] - self.sandbox_id = None - self.url = None + Args: + **kwargs: Loader-specific parameters - @property - @abstractmethod - def type(self) -> UseCaseType: + Returns: + Prefetch object containing FHIR resources + + Raises: + FileNotFoundError: If dataset files are not found + ValueError: If dataset parameters are invalid + """ pass @property @abstractmethod - def strategy(self) -> BaseRequestConstructor: + def name(self) -> str: + """Dataset name for registration.""" pass @property - def path(self) -> str: - path = self._path - if not path.startswith("/"): - path = "/" + path - if not path.endswith("/"): - path = path + "/" - return path + def description(self) -> str: + """Optional description of the dataset.""" + return "" diff --git a/healthchain/sandbox/clients/__init__.py b/healthchain/sandbox/clients/__init__.py deleted file mode 100644 index fbb6cce3..00000000 --- a/healthchain/sandbox/clients/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .ehr import EHRClient - -__all__ = ["EHRClient"] diff --git a/healthchain/sandbox/clients/ehr.py b/healthchain/sandbox/clients/ehr.py deleted file mode 100644 index 4cadaf71..00000000 --- a/healthchain/sandbox/clients/ehr.py +++ /dev/null @@ -1,121 +0,0 @@ -import logging -from typing import Any, Callable, Dict, List, Optional - -import httpx - -from healthchain.models import CDSRequest, CDSResponse -from healthchain.models.responses.cdaresponse import CdaResponse -from healthchain.sandbox.base import BaseClient, BaseRequestConstructor -from healthchain.sandbox.workflows import Workflow -from healthchain.service.endpoints import ApiProtocol - -log = logging.getLogger(__name__) - - -class EHRClient(BaseClient): - def __init__( - self, - func: Callable[..., Any], - workflow: Workflow, - strategy: BaseRequestConstructor, - timeout: Optional[float] = 10.0, - ): - """ - Initializes the EHRClient with a data generator function and optional workflow and use case. - Should be a subclass of BaseUseCase. Example - ClinicalDecisionSupport() - - Parameters: - func (Callable[..., Any]): A function to generate data for requests. - workflow ([Workflow]): The workflow context to apply to the data generator. - strategy (BaseStrategy): The strategy object to construct requests based on the generated data. - timeout (Optional[float], default=10.0) : The maximum time in seconds to wait for a response from the server. This parameter determines how long the client will wait before considering a request timed out. - - """ - # TODO: Add option to pass in different provider options - self.data_generator_func: Callable[..., Any] = func - self.workflow: Workflow = workflow - self.strategy: BaseRequestConstructor = strategy - self.vendor = None - self.request_data: List[CDSRequest] = [] - self.timeout = timeout - - def set_vendor(self, name) -> None: - self.vendor = name - - def generate_request(self, *args: Any, **kwargs: Any) -> None: - """ - Generates a request using the data produced by the data generator function, - and appends it to the internal request queue. - - Parameters: - *args (Any): Positional arguments passed to the data generator function. - **kwargs (Any): Keyword arguments passed to the data generator function. - - Raises: - ValueError: If the use case is not configured. - """ - data = self.data_generator_func(*args, **kwargs) - self.request_data.append(self.strategy.construct_request(data, self.workflow)) - - async def send_request(self, url: str) -> List[Dict]: - """ - Sends all queued requests to the specified URL and collects the responses. - - Parameters: - url (str): The URL to which the requests will be sent. - Returns: - List[dict]: A list of JSON responses from the server. - Notes: - This method logs errors rather than raising them, to avoid interrupting the batch processing of requests. - """ - async with httpx.AsyncClient(follow_redirects=True) as client: - responses: List[Dict] = [] - timeout = httpx.Timeout(self.timeout, read=None) - for request in self.request_data: - try: - if self.strategy.api_protocol == ApiProtocol.soap: - headers = {"Content-Type": "text/xml; charset=utf-8"} - response = await client.post( - url=url, - data=request.document, - headers=headers, - timeout=timeout, - ) - response.raise_for_status() - response_model = CdaResponse(document=response.text) - responses.append(response_model.model_dump_xml()) - else: - # TODO: use model_dump_json() once Pydantic V2 timezone serialization issue is resolved - log.debug(f"Making POST request to: {url}") - response = await client.post( - url=url, - json=request.model_dump(exclude_none=True), - timeout=timeout, - ) - response.raise_for_status() - response_data = response.json() - try: - cds_response = CDSResponse(**response_data) - responses.append(cds_response.model_dump(exclude_none=True)) - except Exception: - # Fallback to raw response if parsing fails - responses.append(response_data) - except httpx.HTTPStatusError as exc: - try: - error_content = exc.response.json() - except Exception: - error_content = exc.response.text - log.error( - f"Error response {exc.response.status_code} while requesting {exc.request.url!r}: {error_content}" - ) - responses.append({}) - except httpx.TimeoutException as exc: - log.error(f"Request to {exc.request.url!r} timed out!") - responses.append({}) - except httpx.RequestError as exc: - log.error( - f"An error occurred while requesting {exc.request.url!r}." - ) - responses.append({}) - - return responses diff --git a/healthchain/sandbox/datasets.py b/healthchain/sandbox/datasets.py new file mode 100644 index 00000000..336cacf7 --- /dev/null +++ b/healthchain/sandbox/datasets.py @@ -0,0 +1,132 @@ +""" +Dataset registry and loader infrastructure for SandboxClient. + +Provides a centralized registry for loading test datasets like MIMIC-on-FHIR and Synthea. +""" + +import logging + +from typing import Any, Dict, List + +from healthchain.models import Prefetch +from healthchain.sandbox.base import DatasetLoader + + +log = logging.getLogger(__name__) + + +class DatasetRegistry: + """ + Registry for managing available datasets. + + Datasets are registered at import time and can be loaded by name. + """ + + _datasets: Dict[str, DatasetLoader] = {} + + @classmethod + def register(cls, loader: DatasetLoader) -> None: + """ + Register a dataset loader. + + Args: + loader: DatasetLoader instance to register + + Raises: + ValueError: If dataset name is already registered + """ + name = loader.name + if name in cls._datasets: + log.warning(f"Dataset '{name}' is already registered. Overwriting.") + + cls._datasets[name] = loader + log.debug(f"Registered dataset: {name}") + + @classmethod + def load(cls, name: str, **kwargs) -> Prefetch: + """ + Load a dataset by name. + + Args: + name: Name of the dataset to load + **kwargs: Dataset-specific parameters + + Returns: + Prefetch object containing FHIR resources + + Raises: + KeyError: If dataset name is not registered + """ + if name not in cls._datasets: + raise KeyError( + f"Dataset '{name}' not found. " + f"Available datasets: {cls.list_datasets()}" + ) + + loader = cls._datasets[name] + log.info(f"Loading dataset: {name}") + return loader.load(**kwargs) + + @classmethod + def list_datasets(cls) -> List[str]: + """ + Get list of registered dataset names. + + Returns: + List of dataset names + """ + return list(cls._datasets.keys()) + + @classmethod + def get_dataset_info(cls, name: str) -> Dict[str, Any]: + """ + Get information about a registered dataset. + + Args: + name: Name of the dataset + + Returns: + Dictionary with dataset information + + Raises: + KeyError: If dataset name is not registered + """ + if name not in cls._datasets: + raise KeyError(f"Dataset '{name}' not found") + + loader = cls._datasets[name] + return { + "name": loader.name, + "description": loader.description, + "loader_class": loader.__class__.__name__, + } + + @classmethod + def clear(cls) -> None: + """Clear all registered datasets. Mainly for testing.""" + cls._datasets.clear() + + +def list_available_datasets() -> Dict[str, str]: + """ + Get a dictionary of all available datasets with their descriptions. + + This helper function provides an easy way to discover what datasets + are available in the registry without needing to check documentation. + + Returns: + Dictionary mapping dataset names to their descriptions + + Example: + >>> from healthchain.sandbox import list_available_datasets + >>> datasets = list_available_datasets() + >>> print(datasets) + { + 'mimic-on-fhir': 'MIMIC-IV-on-FHIR: Real de-identified clinical data...', + 'synthea-patients': 'Synthea: Synthetic patient data generator...' + } + """ + return { + name: DatasetRegistry.get_dataset_info(name)["description"] + for name in DatasetRegistry.list_datasets() + } diff --git a/healthchain/sandbox/decorator.py b/healthchain/sandbox/decorator.py deleted file mode 100644 index 103b8e22..00000000 --- a/healthchain/sandbox/decorator.py +++ /dev/null @@ -1,320 +0,0 @@ -import logging -import httpx -import logging.config - -from functools import wraps -from typing import Any, Type, TypeVar, Optional, Callable, Union, Dict - -from healthchain.sandbox.base import BaseUseCase -from healthchain.sandbox.environment import SandboxEnvironment -from healthchain.sandbox.workflows import Workflow, UseCaseType -from healthchain.sandbox.utils import ( - find_attributes_of_type, - assign_to_attribute, -) - -log = logging.getLogger(__name__) -# traceback.print_exc() - -F = TypeVar("F", bound=Callable) - - -def is_client(attr): - """Check if an attribute is marked as a client""" - return hasattr(attr, "is_client") - - -def validate_single_registration(count, attribute_name): - """ - Ensure only one method is registered for a role. - Raises RuntimeError if multiple methods are registered. - """ - if count > 1: - raise RuntimeError( - f"Multiple methods are registered as {attribute_name}. Only one is allowed." - ) - - -def register_method(instance, method, cls, name, attribute_name): - """ - Register a method for a specific role and execute it - """ - method_func = method.__get__(instance, cls) - log.debug(f"Set {name} as {attribute_name}") - return method_func() - - -def api(func: Optional[F] = None) -> Union[Callable[..., Any], Callable[[F], F]]: - """ - A decorator that wraps a function in an APIMethod; this wraps a function that handles LLM/NLP - processing and tags it as a service route to be mounted onto the main service endpoints. - - It does not take any additional arguments for now, but we may consider adding configs - - .. deprecated:: 1.0.0 - This decorator is deprecated and will be removed in a future version. - Please use the new HealthChainAPI to create services instead. - """ - import warnings - - warnings.warn( - "The @api decorator is deprecated and will be removed in a future version. " - "Please use the new HealthChainAPI to create services instead.", - DeprecationWarning, - stacklevel=2, - ) - - def decorator(func: F) -> F: - return func - - if func is None: - return decorator - else: - return decorator(func) - - -def ehr( - func: Optional[F] = None, *, workflow: Workflow, num: int = 1 -) -> Union[Callable[..., Any], Callable[[F], F]]: - """ - A decorator that wraps around a data generator function and returns an EHRClient - - Parameters: - func (Optional[Callable]): The function to be decorated. If None, this allows the decorator to - be used with arguments. - workflow ([str]): The workflow identifier which should match an item in the Workflow enum. - This specifies the context in which the EHR function will operate. - num (int): The number of requests to generate in the queue; defaults to 1. - - Returns: - Callable: A decorated callable that incorporates EHR functionality or the decorator itself - if 'func' is None, allowing it to be used as a parameterized decorator. - - Raises: - ValueError: If the workflow does not correspond to any defined enum or if use case is not configured. - NotImplementedError: If the use case class is not one of the supported types. - - Example: - @ehr(workflow='patient-view', num=2) - def generate_data(self, config): - # Function implementation - """ - - def decorator(func: F) -> F: - func.is_client = True - - @wraps(func) - def wrapper(self, *args: Any, **kwargs: Any) -> Any: - # Import here to avoid circular imports - from healthchain.data_generators import CdsDataGenerator - from healthchain.sandbox.clients.ehr import EHRClient - - # Validate function decorated is a use case base class - assert issubclass( - type(self), BaseUseCase - ), f"{self.__class__.__name__} must be subclass of valid Use Case strategy!" - - # Validate workflow is a valid workflow - try: - workflow_enum = Workflow(workflow) - except ValueError as e: - raise ValueError( - f"{e}: please select from {[x.value for x in Workflow]}" - ) - - # Set workflow in data generator if configured - data_generator_attributes = find_attributes_of_type(self, CdsDataGenerator) - for i in range(len(data_generator_attributes)): - attribute_name = data_generator_attributes[i] - try: - assign_to_attribute( - self, attribute_name, "set_workflow", workflow_enum - ) - except Exception as e: - log.error( - f"Could not set workflow {workflow_enum.value} for data generator method {attribute_name}: {e}" - ) - if i > 1: - log.warning("More than one DataGenerator instances found.") - - # Wrap the function in EHRClient with workflow and strategy passed in - if self.type in UseCaseType: - method = EHRClient(func, workflow=workflow_enum, strategy=self.strategy) - # Generate the number of requests specified with method - for _ in range(num): - method.generate_request(self, *args, **kwargs) - else: - raise NotImplementedError( - f"Use case {self.type} not recognised, check if implemented." - ) - return method - - return wrapper - - if func is None: - return decorator - else: - return decorator(func) - - -def sandbox(arg: Optional[Any] = None, **kwargs: Any) -> Callable: - """ - Decorator factory for creating a sandboxed environment. The sandbox provides a controlled - environment for testing healthcare applications by simulating EHR system interactions. - Should be used with a use case class, such as ClinicalDocumentation or ClinicalDecisionSupport. - - Parameters: - api: API URL as string - config: Dictionary of configuration options - - Returns: - A decorator function that sets up the sandbox environment for the decorated class. - - Raises: - ValueError: If no API URL is provided or if the URL is invalid - TypeError: If decorated class is not a valid use case - - Example: - ```python - # Using with API URL - @sandbox("http://localhost:8000") - class MyUseCase(ClinicalDocumentation): - def __init__(self): - super().__init__() - - # Using with config - @sandbox(api="http://localhost:8000", config={"timeout": 30}) - class MyUseCase(ClinicalDocumentation): - def __init__(self): - super().__init__() - ``` - """ - if callable(arg): - # Decorator used without parentheses - cls = arg - return sandbox_decorator()(cls) - else: - # Arguments were provided - api_url = None - - # Check if api was provided as a direct argument - if isinstance(arg, str): - api_url = arg - # Check if api was provided in kwargs - elif "api" in kwargs: - api_url = kwargs["api"] - - if api_url is None: - raise ValueError("'api' is a required argument") - - try: - api = httpx.URL(api_url) - except Exception as e: - raise ValueError(f"Invalid API URL: {str(e)}") - - config = ( - kwargs.get("config", {}) - if arg is None - else arg - if isinstance(arg, dict) - else {} - ) - - return sandbox_decorator(api, config) - - -def sandbox_decorator( - api: Optional[Union[str, httpx.URL]] = None, config: Optional[Dict] = None -) -> Callable: - """ - Internal decorator function that sets up a sandbox environment for a use case class. - This function modifies the class initialization to incorporate service and client management. - - Parameters: - api: The API URL to be used for the sandbox. Can be a string or httpx.URL object. - config: Optional dictionary containing configurations for the sandbox environment. - Defaults to an empty dictionary if not provided. - - Returns: - A wrapper function that modifies the class to which it is applied, adding sandbox - functionality including start_sandbox and stop_sandbox methods. - - Raises: - TypeError: If the decorated class is not a subclass of BaseUseCase. - ValueError: If the 'api' argument is not provided. - """ - if api is None: - raise ValueError("'api' is a required argument") - - if config is None: - config = {} - - def wrapper(cls: Type) -> Type: - if not issubclass(cls, BaseUseCase): - raise TypeError( - f"The 'sandbox' decorator can only be applied to subclasses of BaseUseCase, got {cls.__name__}" - ) - - original_init = cls.__init__ - - def new_init(self, *args: Any, **kwargs: Any) -> None: - # Initialize parent class - super(cls, self).__init__(*args, **kwargs) - original_init(self, *args, **kwargs) - - client_count = 0 - - for name in dir(self): - attr = getattr(self, name) - if callable(attr): - # Register client - if is_client(attr): - client_count += 1 - validate_single_registration(client_count, "_client") - self._client = register_method(self, attr, cls, name, "_client") - - # Initialize sandbox environment - # TODO: Path should be passed from a config not UseCase instance - self.sandbox_env = SandboxEnvironment( - client=self._client, - use_case_type=self.type, - api=api, - path=self.path, - config=config, - ) - - # Replace original __init__ with new_init - cls.__init__ = new_init - - def start_sandbox( - self, - service_id: Optional[str] = None, - save_data: bool = True, - save_dir: str = "./output/", - logging_config: Optional[Dict] = None, - ) -> None: - """ - Starts the sandbox: initializes service and sends request through the client. - - Args: - service_id: Service identifier (default None) - save_data: Whether to save request/response data - save_dir: Directory to save data - logging_config: Optional logging configuration - """ - self.sandbox_env.start_sandbox( - service_id=service_id, - save_data=save_data, - save_dir=save_dir, - ) - - def stop_sandbox(self) -> None: - """Shuts down sandbox instance""" - self.sandbox_env.stop_sandbox() - - cls.start_sandbox = start_sandbox - cls.stop_sandbox = stop_sandbox - - return cls - - return wrapper diff --git a/healthchain/sandbox/environment.py b/healthchain/sandbox/environment.py deleted file mode 100644 index 09af3d5e..00000000 --- a/healthchain/sandbox/environment.py +++ /dev/null @@ -1,129 +0,0 @@ -import asyncio -import logging -import uuid -import httpx - -from pathlib import Path -from typing import Dict, Optional - -from healthchain.sandbox.base import BaseClient -from healthchain.sandbox.utils import ensure_directory_exists, save_data_to_directory -from healthchain.sandbox.workflows import UseCaseType - -log = logging.getLogger(__name__) - - -class SandboxEnvironment: - """ - Manages the sandbox environment for testing and validation. - Handles service initialization, client requests, and data management. - - This class provides a controlled environment for testing healthcare services, - managing the lifecycle of sandbox instances, handling request/response data, - and providing utilities for data persistence and logging. - """ - - def __init__( - self, - api: httpx.URL, - path: str, - client: Optional[BaseClient] = None, - use_case_type: Optional[UseCaseType] = None, - config: Optional[Dict] = None, - ): - """ - Initialize the sandbox environment - - Args: - api: The API URL to be used for the sandbox - path: The endpoint path to send requests to - client: The client to use for sending requests - use_case_type: Type of use case (clindoc, cds) - config: Optional configuration dictionary for the sandbox - """ - self._client = client - self.type = use_case_type - self.api = api - self.path = path - self.config = config - - self.responses = [] - self.sandbox_id = None - self.url = None - - def start_sandbox( - self, - service_id: Optional[str] = None, - save_data: bool = True, - save_dir: str = "./output/", - ) -> None: - """ - Starts the sandbox: initializes service and sends request through the client. - - Args: - service_id: Service identifier (default None) - save_data: Whether to save request/response data - save_dir: Directory to save data - logging_config: Optional logging configuration - """ - if self._client is None: - raise RuntimeError( - "Client is not configured. Please check your class initialization." - ) - - self.sandbox_id = uuid.uuid4() - - log.info( - f"Starting sandbox {self.sandbox_id} with use case type {self.type.value}..." - ) - endpoint = self.api.join(self.path) - if service_id: - endpoint = endpoint.join(service_id) - - log.info( - f"Sending {len(self._client.request_data)} requests generated by {self._client.__class__.__name__} to {endpoint}" - ) - log.debug(f"API: {self.api}, Path: {self.path}, Endpoint: {endpoint}") - - try: - self.responses = asyncio.run(self._client.send_request(url=endpoint)) - except Exception as e: - log.error(f"Couldn't start client: {e}", exc_info=True) - - if save_data: - save_dir = Path(save_dir) - request_path = ensure_directory_exists(save_dir / "requests") - - if self.type == UseCaseType.clindoc: - extension = "xml" - save_data_to_directory( - [request.model_dump_xml() for request in self._client.request_data], - "request", - self.sandbox_id, - request_path, - extension, - ) - else: - extension = "json" - save_data_to_directory( - [ - request.model_dump(exclude_none=True) - for request in self._client.request_data - ], - "request", - self.sandbox_id, - request_path, - extension, - ) - - log.info(f"Saved request data at {request_path}/") - - response_path = ensure_directory_exists(save_dir / "responses") - save_data_to_directory( - self.responses, - "response", - self.sandbox_id, - response_path, - extension, - ) - log.info(f"Saved response data at {response_path}/") diff --git a/healthchain/data_generators/__init__.py b/healthchain/sandbox/generators/__init__.py similarity index 100% rename from healthchain/data_generators/__init__.py rename to healthchain/sandbox/generators/__init__.py diff --git a/healthchain/data_generators/basegenerators.py b/healthchain/sandbox/generators/basegenerators.py similarity index 100% rename from healthchain/data_generators/basegenerators.py rename to healthchain/sandbox/generators/basegenerators.py diff --git a/healthchain/data_generators/cdsdatagenerator.py b/healthchain/sandbox/generators/cdsdatagenerator.py similarity index 99% rename from healthchain/data_generators/cdsdatagenerator.py rename to healthchain/sandbox/generators/cdsdatagenerator.py index 115d7cf3..f9d9742f 100644 --- a/healthchain/data_generators/cdsdatagenerator.py +++ b/healthchain/sandbox/generators/cdsdatagenerator.py @@ -7,7 +7,7 @@ from fhir.resources.resource import Resource -from healthchain.data_generators.basegenerators import generator_registry +from healthchain.sandbox.generators.basegenerators import generator_registry from healthchain.models import Prefetch from healthchain.fhir import create_document_reference from healthchain.sandbox.workflows import Workflow diff --git a/healthchain/data_generators/conditiongenerators.py b/healthchain/sandbox/generators/conditiongenerators.py similarity index 97% rename from healthchain/data_generators/conditiongenerators.py rename to healthchain/sandbox/generators/conditiongenerators.py index ed393bcc..366b9984 100644 --- a/healthchain/data_generators/conditiongenerators.py +++ b/healthchain/sandbox/generators/conditiongenerators.py @@ -5,13 +5,13 @@ from fhir.resources.condition import ConditionStage, ConditionParticipant from healthchain.fhir.helpers import create_single_codeable_concept, create_condition -from healthchain.data_generators.basegenerators import ( +from healthchain.sandbox.generators.basegenerators import ( BaseGenerator, generator_registry, register_generator, CodeableConceptGenerator, ) -from healthchain.data_generators.value_sets.conditioncodes import ( +from healthchain.sandbox.generators.value_sets.conditioncodes import ( ConditionCodeSimple, ConditionCodeComplex, ) diff --git a/healthchain/data_generators/encountergenerators.py b/healthchain/sandbox/generators/encountergenerators.py similarity index 98% rename from healthchain/data_generators/encountergenerators.py rename to healthchain/sandbox/generators/encountergenerators.py index f41f53d4..133364f4 100644 --- a/healthchain/data_generators/encountergenerators.py +++ b/healthchain/sandbox/generators/encountergenerators.py @@ -7,7 +7,7 @@ from fhir.resources.codeableconcept import CodeableConcept from fhir.resources.period import Period from fhir.resources.reference import Reference -from healthchain.data_generators.basegenerators import ( +from healthchain.sandbox.generators.basegenerators import ( BaseGenerator, generator_registry, register_generator, diff --git a/healthchain/data_generators/generator_templates/templates.py b/healthchain/sandbox/generators/generator_templates/templates.py similarity index 100% rename from healthchain/data_generators/generator_templates/templates.py rename to healthchain/sandbox/generators/generator_templates/templates.py diff --git a/healthchain/data_generators/medicationadministrationgenerators.py b/healthchain/sandbox/generators/medicationadministrationgenerators.py similarity index 96% rename from healthchain/data_generators/medicationadministrationgenerators.py rename to healthchain/sandbox/generators/medicationadministrationgenerators.py index 90392308..ef42355a 100644 --- a/healthchain/data_generators/medicationadministrationgenerators.py +++ b/healthchain/sandbox/generators/medicationadministrationgenerators.py @@ -5,7 +5,7 @@ from fhir.resources.medicationadministration import MedicationAdministrationDosage from fhir.resources.reference import Reference from fhir.resources.codeablereference import CodeableReference -from healthchain.data_generators.basegenerators import ( +from healthchain.sandbox.generators.basegenerators import ( BaseGenerator, generator_registry, register_generator, diff --git a/healthchain/data_generators/medicationrequestgenerators.py b/healthchain/sandbox/generators/medicationrequestgenerators.py similarity index 93% rename from healthchain/data_generators/medicationrequestgenerators.py rename to healthchain/sandbox/generators/medicationrequestgenerators.py index 03dbec37..a1c48cfb 100644 --- a/healthchain/data_generators/medicationrequestgenerators.py +++ b/healthchain/sandbox/generators/medicationrequestgenerators.py @@ -1,13 +1,13 @@ from typing import Optional from faker import Faker -from healthchain.data_generators.basegenerators import ( +from healthchain.sandbox.generators.basegenerators import ( BaseGenerator, generator_registry, register_generator, CodeableConceptGenerator, ) -from healthchain.data_generators.value_sets.medicationcodes import ( +from healthchain.sandbox.generators.value_sets.medicationcodes import ( MedicationRequestMedication, ) from fhir.resources.medicationrequest import MedicationRequest diff --git a/healthchain/data_generators/patientgenerators.py b/healthchain/sandbox/generators/patientgenerators.py similarity index 98% rename from healthchain/data_generators/patientgenerators.py rename to healthchain/sandbox/generators/patientgenerators.py index 61b15704..f14c9ca6 100644 --- a/healthchain/data_generators/patientgenerators.py +++ b/healthchain/sandbox/generators/patientgenerators.py @@ -1,7 +1,7 @@ from typing import Optional from faker import Faker -from healthchain.data_generators.basegenerators import ( +from healthchain.sandbox.generators.basegenerators import ( BaseGenerator, generator_registry, register_generator, diff --git a/healthchain/data_generators/practitionergenerators.py b/healthchain/sandbox/generators/practitionergenerators.py similarity index 98% rename from healthchain/data_generators/practitionergenerators.py rename to healthchain/sandbox/generators/practitionergenerators.py index f0950cb2..284184a5 100644 --- a/healthchain/data_generators/practitionergenerators.py +++ b/healthchain/sandbox/generators/practitionergenerators.py @@ -1,7 +1,7 @@ from typing import Optional from faker import Faker -from healthchain.data_generators.basegenerators import ( +from healthchain.sandbox.generators.basegenerators import ( BaseGenerator, generator_registry, register_generator, diff --git a/healthchain/data_generators/proceduregenerators.py b/healthchain/sandbox/generators/proceduregenerators.py similarity index 93% rename from healthchain/data_generators/proceduregenerators.py rename to healthchain/sandbox/generators/proceduregenerators.py index 2c9f8156..a16f0ba6 100644 --- a/healthchain/data_generators/proceduregenerators.py +++ b/healthchain/sandbox/generators/proceduregenerators.py @@ -1,13 +1,13 @@ from typing import Optional from faker import Faker -from healthchain.data_generators.basegenerators import ( +from healthchain.sandbox.generators.basegenerators import ( BaseGenerator, generator_registry, register_generator, CodeableConceptGenerator, ) -from healthchain.data_generators.value_sets.procedurecodes import ( +from healthchain.sandbox.generators.value_sets.procedurecodes import ( ProcedureCodeSimple, ProcedureCodeComplex, ) diff --git a/healthchain/data_generators/value_sets/__init__.py b/healthchain/sandbox/generators/value_sets/__init__.py similarity index 100% rename from healthchain/data_generators/value_sets/__init__.py rename to healthchain/sandbox/generators/value_sets/__init__.py diff --git a/healthchain/data_generators/value_sets/base.py b/healthchain/sandbox/generators/value_sets/base.py similarity index 100% rename from healthchain/data_generators/value_sets/base.py rename to healthchain/sandbox/generators/value_sets/base.py diff --git a/healthchain/data_generators/value_sets/codesystems.py b/healthchain/sandbox/generators/value_sets/codesystems.py similarity index 100% rename from healthchain/data_generators/value_sets/codesystems.py rename to healthchain/sandbox/generators/value_sets/codesystems.py diff --git a/healthchain/data_generators/value_sets/conditioncodes.py b/healthchain/sandbox/generators/value_sets/conditioncodes.py similarity index 100% rename from healthchain/data_generators/value_sets/conditioncodes.py rename to healthchain/sandbox/generators/value_sets/conditioncodes.py diff --git a/healthchain/data_generators/value_sets/medicationcodes.py b/healthchain/sandbox/generators/value_sets/medicationcodes.py similarity index 100% rename from healthchain/data_generators/value_sets/medicationcodes.py rename to healthchain/sandbox/generators/value_sets/medicationcodes.py diff --git a/healthchain/data_generators/value_sets/procedurecodes.py b/healthchain/sandbox/generators/value_sets/procedurecodes.py similarity index 100% rename from healthchain/data_generators/value_sets/procedurecodes.py rename to healthchain/sandbox/generators/value_sets/procedurecodes.py diff --git a/healthchain/sandbox/requestconstructors.py b/healthchain/sandbox/requestconstructors.py new file mode 100644 index 00000000..600ef8c5 --- /dev/null +++ b/healthchain/sandbox/requestconstructors.py @@ -0,0 +1,193 @@ +""" +Request constructors for different clinical integration protocols. + +- CdsRequestConstructor: Builds CDS Hooks requests (REST/JSON) +- ClinDocRequestConstructor: Builds NoteReader requests (SOAP/XML) +""" + +import logging +import base64 +import pkgutil +import xmltodict + +from typing import Dict, Optional +from fhir.resources.resource import Resource + +from healthchain.sandbox.base import BaseRequestConstructor, ApiProtocol +from healthchain.sandbox.workflows import ( + UseCaseMapping, + Workflow, + validate_workflow, +) +from healthchain.models.requests import CDSRequest +from healthchain.models import CdaRequest +from healthchain.utils.utils import insert_at_key +from healthchain.models.hooks import ( + OrderSelectContext, + OrderSignContext, + PatientViewContext, + EncounterDischargeContext, + Prefetch, +) + + +log = logging.getLogger(__name__) + + +class CdsRequestConstructor(BaseRequestConstructor): + """ + Constructs and validates CDS Hooks requests for Clinical Decision Support (CDS) workflows. + + This class facilitates building HL7-compliant CDS Hooks requests for use with the REST API. + It supports multiple standard CDS workflows (e.g., patient-view, order-select, etc.) and + verifies both input workflow and prefetch data integrity. + + Attributes: + api_protocol (ApiProtocol): Specifies the supported API protocol (REST). + context_mapping (dict): Maps supported Workflow enums to their context model classes. + """ + + def __init__(self) -> None: + self.api_protocol = ApiProtocol.rest + self.context_mapping = { + Workflow.order_select: OrderSelectContext, + Workflow.order_sign: OrderSignContext, + Workflow.patient_view: PatientViewContext, + Workflow.encounter_discharge: EncounterDischargeContext, + } + + @validate_workflow(UseCaseMapping.ClinicalDecisionSupport) + def construct_request( + self, + prefetch_data: Dict[str, Resource], + workflow: Workflow, + context: Optional[Dict[str, str]] = {}, + ) -> CDSRequest: + """ + Build a CDS Hooks request including context and prefetch data. + + Args: + prefetch_data (Dict[str, Resource]): Dictionary mapping prefetch template names to FHIR resource objects. + workflow (Workflow): The name of the CDS Hooks workflow (e.g., Workflow.patient_view). + context (Optional[Dict[str, str]]): Optional context values for initializing the workflow's context model. + + Returns: + CDSRequest: Pydantic model representing a well-formed CDS Hooks request. + + Raises: + ValueError: If the workflow is not supported or lacks a defined context model. + TypeError: If prefetch_data is not an instance of Prefetch. + + Note: + Only CDS workflows supported by UseCaseMapping.ClinicalDecisionSupport are valid. + The expected prefetch_data argument is a Prefetch object encapsulating FHIR resources. + + # TODO: Add FhirServer support in future. + """ + + log.debug(f"Constructing CDS request for {workflow.value} from {prefetch_data}") + + context_model = self.context_mapping.get(workflow, None) + if context_model is None: + raise ValueError( + f"Invalid workflow {workflow.value} or workflow model not implemented." + ) + if not isinstance(prefetch_data, Prefetch): + raise TypeError( + f"Prefetch data must be a Prefetch object, but got {type(prefetch_data)}" + ) + request = CDSRequest( + hook=workflow.value, + context=context_model(**context), + prefetch=prefetch_data.prefetch, + ) + return request + + +class ClinDocRequestConstructor(BaseRequestConstructor): + """ + Constructs and validates CDA-based clinical documentation requests for NoteReader workflows. + + This constructor handles the preparation of a SOAP envelope containing a base64-encoded CDA XML document, + suitable for clinical documentation use cases (e.g., 'sign-note-inpatient', 'sign-note-outpatient'). + It ensures the input XML is valid, encodes it, and inserts it into the expected place within a SOAP envelope + template, producing a structured `CdaRequest` model for downstream processing or transmission. + + Attributes: + api_protocol (ApiProtocol): The protocol used for API communication (SOAP). + soap_envelope (Dict): Loaded SOAP envelope template as a dictionary. + + Methods: + construct_cda_xml_document(): + Not implemented. Intended to wrap FHIR Document resources into a CDA XML document. + construct_request(data: str, workflow: Workflow) -> CdaRequest: + Validates and encodes the input CDA XML, inserts it into the SOAP envelope, + and returns a structured CdaRequest object. + """ + + def __init__(self) -> None: + self.api_protocol: ApiProtocol = ApiProtocol.soap + self.soap_envelope: Dict = self._load_soap_envelope() + + def _load_soap_envelope(self): + data = pkgutil.get_data("healthchain", "templates/soap_envelope.xml") + return xmltodict.parse(data.decode("utf-8")) + + def construct_cda_xml_document(self): + """ + Placeholder for CDA construction logic. + + This function should take FHIR resources and construct a CDA XML document + using a suitable template. Currently not implemented. + """ + raise NotImplementedError("This function is not implemented yet.") + + @validate_workflow(UseCaseMapping.ClinicalDocumentation) + def construct_request(self, data: str, workflow: Workflow) -> CdaRequest: + """ + Construct a CdaRequest for clinical documentation workflows. + + This method creates a request containing the input CDA XML string wrapped in a SOAP envelope. + It validates that the input is a well-formed XML string, base64-encodes it, + and embeds it at the appropriate location in the SOAP envelope template. + + Args: + data (str): The raw CDA XML document as a string. + workflow (Workflow): The workflow type for the documentation use case (e.g., sign-note-inpatient). + + Returns: + CdaRequest: A model containing the finalized SOAP envelope with the base64-encoded CDA XML. + + Raises: + ValueError: If the input is not a string, or if the SOAP template is missing required keys. + None: If the input XML is not valid (logs a warning and returns None). + + Note: + This method does not implement workflow-specific logic. Extend if such handling is required. + """ + # TODO: Add workflow-specific handling if needed + if not isinstance(data, str): + raise ValueError(f"Expected str, got {type(data).__name__}") + + # Validate that the string is well-formed XML + import xml.etree.ElementTree as ET + + try: + ET.fromstring(data) + except ET.ParseError as e: + log.warning("Input is not valid XML: %s", e) + return None + + # Make a copy of the SOAP envelope template + soap_envelope = self.soap_envelope.copy() + + # Base64 encode the XML + cda_xml_encoded = base64.b64encode(data.encode("utf-8")).decode("utf-8") + + # Insert encoded cda in the Document section + if not insert_at_key(soap_envelope, "urn:Document", cda_xml_encoded): + raise ValueError("Key 'urn:Document' missing from SOAP envelope template!") + + request = CdaRequest.from_dict(soap_envelope) + + return request diff --git a/healthchain/sandbox/sandboxclient.py b/healthchain/sandbox/sandboxclient.py new file mode 100644 index 00000000..2587fcb0 --- /dev/null +++ b/healthchain/sandbox/sandboxclient.py @@ -0,0 +1,499 @@ +""" +SandboxClient for quickly spinning up demos and loading test datasets. + +Replaces the decorator-based sandbox pattern with direct instantiation. +""" + +import json +import logging +import uuid +import httpx + +from pathlib import Path +from typing import Any, Dict, List, Literal, Optional, Union + +from healthchain.sandbox.base import ApiProtocol +from healthchain.models import CDSRequest, CDSResponse, Prefetch +from healthchain.models.responses.cdaresponse import CdaResponse +from healthchain.sandbox.workflows import Workflow +from healthchain.sandbox.utils import ensure_directory_exists, save_data_to_directory +from healthchain.sandbox.requestconstructors import ( + CdsRequestConstructor, + ClinDocRequestConstructor, +) + + +log = logging.getLogger(__name__) + + +class SandboxClient: + """ + Simplified client for testing healthcare services with various data sources. + + This class provides an intuitive interface for: + - Loading test datasets (MIMIC-on-FHIR, Synthea, CSV) + - Generating synthetic FHIR data + - Sending requests to healthcare services + - Managing request/response lifecycle + + Examples: + Load from dataset registry: + >>> client = SandboxClient( + ... api_url="http://localhost:8000", + ... endpoint="/cds/cds-services/my-service" + ... ) + >>> client.load_from_registry("mimic-on-fhir", sample_size=10) + >>> responses = client.send_requests() + + Load CDA file from path: + >>> client = SandboxClient( + ... api_url="http://localhost:8000", + ... endpoint="/notereader/fhir/", + ... protocol="soap" + ... ) + >>> client.load_from_path("./data/clinical_note.xml") + >>> responses = client.send_requests() + + Generate data from free text: + >>> client = SandboxClient( + ... api_url="http://localhost:8000", + ... endpoint="/cds/cds-services/discharge-summarizer" + ... ) + >>> client.load_free_text( + ... csv_path="./data/notes.csv", + ... column_name="text", + ... workflow="encounter-discharge" + ... ) + >>> responses = client.send_requests() + """ + + def __init__( + self, + api_url: str, + endpoint: str, + workflow: Optional[Union[Workflow, str]] = None, + protocol: Literal["rest", "soap"] = "rest", + timeout: float = 10.0, + ): + """ + Initialize SandboxClient. + + Args: + api_url: Base URL of the service (e.g., "http://localhost:8000") + endpoint: Service endpoint path (e.g., "/cds/cds-services/my-service") + workflow: Optional workflow specification (auto-detected if not provided) + protocol: Communication protocol - "rest" for CDS Hooks, "soap" for CDA + timeout: Request timeout in seconds + + Raises: + ValueError: If api_url or endpoint is invalid + """ + try: + self.api = httpx.URL(api_url) + except Exception as e: + raise ValueError(f"Invalid API URL: {str(e)}") + + self.endpoint = endpoint + self.workflow = Workflow(workflow) if isinstance(workflow, str) else workflow + self.protocol = ApiProtocol.soap if protocol == "soap" else ApiProtocol.rest + self.timeout = timeout + + # Request/response management + self.request_data: List[Union[CDSRequest, Any]] = [] + self.responses: List[Dict] = [] + self.sandbox_id = uuid.uuid4() + + log.info( + f"Initialized SandboxClient {self.sandbox_id} for {self.api}{self.endpoint}" + ) + + def load_from_registry( + self, + source: str, + **kwargs: Any, + ) -> "SandboxClient": + """ + Load data from the dataset registry. + + Loads pre-configured datasets like MIMIC-on-FHIR, Synthea, or custom + registered datasets. + + Args: + source: Dataset name (e.g., "mimic-on-fhir", "synthea") + **kwargs: Dataset-specific parameters (e.g., sample_size, num_patients) + + Returns: + Self for method chaining + + Raises: + ValueError: If dataset not found in registry + + Examples: + Discover available datasets: + >>> from healthchain.sandbox import list_available_datasets + >>> print(list_available_datasets()) + + Load MIMIC dataset: + >>> client.load_from_registry("mimic-on-fhir", sample_size=10) + """ + from healthchain.sandbox.datasets import DatasetRegistry + + log.info(f"Loading dataset from registry: {source}") + try: + loaded_data = DatasetRegistry.load(source, **kwargs) + self._construct_request(loaded_data) + log.info(f"Loaded {source} dataset with {len(self.request_data)} requests") + except KeyError: + raise ValueError( + f"Unknown dataset: {source}. " + f"Available datasets: {DatasetRegistry.list_datasets()}" + ) + return self + + def load_from_path( + self, + path: Union[str, Path], + pattern: Optional[str] = None, + workflow: Optional[Union[Workflow, str]] = None, + ) -> "SandboxClient": + """ + Load data from file system path. + + Supports loading single files or directories. File type is auto-detected + from extension and protocol: + - .xml files with SOAP protocol → CDA documents + - .json files with REST protocol → Pre-formatted Prefetch data + + Args: + path: File path or directory path + pattern: Glob pattern for filtering files in directory (e.g., "*.xml") + workflow: Optional workflow override (auto-detected from protocol if not provided) + + Returns: + Self for method chaining + + Raises: + FileNotFoundError: If path doesn't exist + ValueError: If no matching files found or unsupported file type + + Examples: + Load single CDA file: + >>> client.load_from_path("./data/clinical_note.xml") + + Load directory of CDA files: + >>> client.load_from_path("./data/cda_files/", pattern="*.xml") + + Load with explicit workflow: + >>> client.load_from_path("./data/note.xml", workflow="sign-note-inpatient") + """ + path = Path(path) + if not path.exists(): + raise FileNotFoundError(f"Path not found: {path}") + + # Collect files to process + files_to_load = [] + if path.is_file(): + files_to_load = [path] + elif path.is_dir(): + pattern = pattern or "*" + files_to_load = list(path.glob(pattern)) + if not files_to_load: + raise ValueError( + f"No files found matching pattern '{pattern}' in {path}" + ) + else: + raise ValueError(f"Path must be a file or directory: {path}") + + log.info(f"Loading {len(files_to_load)} file(s) from {path}") + + # Process each file + for file_path in files_to_load: + # Determine file type from extension + extension = file_path.suffix.lower() + + if extension == ".xml": + with open(file_path, "r") as f: + xml_content = f.read() + workflow_enum = ( + Workflow(workflow) + if isinstance(workflow, str) + else workflow or self.workflow or Workflow.sign_note_inpatient + ) + self._construct_request(xml_content, workflow_enum) + log.info(f"Loaded CDA document from {file_path.name}") + + elif extension == ".json": + with open(file_path, "r") as f: + json_data = json.load(f) + + try: + # Validate and load as Prefetch object + prefetch_data = Prefetch(**json_data) + + workflow_enum = ( + Workflow(workflow) + if isinstance(workflow, str) + else workflow or self.workflow + ) + if not workflow_enum: + raise ValueError( + "Workflow must be specified when loading JSON Prefetch data. " + "Provide via 'workflow' parameter or set on client initialization." + ) + self._construct_request(prefetch_data, workflow_enum) + log.info(f"Loaded Prefetch data from {file_path.name}") + + except Exception as e: + log.error(f"Failed to parse {file_path} as Prefetch: {e}") + raise ValueError( + f"File {file_path} is not valid Prefetch format. " + f"Expected JSON with 'prefetch' key containing FHIR resources. " + f"Error: {e}" + ) + else: + log.warning(f"Skipping unsupported file type: {file_path}") + + log.info( + f"Loaded {len(self.request_data)} requests from {len(files_to_load)} file(s)" + ) + return self + + def load_free_text( + self, + csv_path: str, + column_name: str, + workflow: Union[Workflow, str], + random_seed: Optional[int] = None, + **kwargs: Any, + ) -> "SandboxClient": + """ + Generates a CDS prefetch from free text notes. + + Reads clinical notes from a CSV file and wraps it in FHIR DocumentReferences + in a CDS prefetch field for CDS Hooks workflows. Generates additional synthetic + FHIR resources as needed based on the specified workflow. + + Args: + csv_path: Path to CSV file containing clinical notes + column_name: Name of the column containing the text + workflow: CDS workflow type (e.g., "encounter-discharge", "patient-view") + random_seed: Seed for reproducible data generation + **kwargs: Additional parameters for data generation + + Returns: + Self for method chaining + + Raises: + FileNotFoundError: If CSV file doesn't exist + ValueError: If workflow is invalid or column not found + + Examples: + Generate discharge summaries: + >>> client.load_free_text( + ... csv_path="./data/discharge_notes.csv", + ... column_name="text", + ... workflow="encounter-discharge", + ... random_seed=42 + ... ) + """ + from .generators import CdsDataGenerator + + workflow_enum = Workflow(workflow) if isinstance(workflow, str) else workflow + + generator = CdsDataGenerator() + generator.set_workflow(workflow_enum) + + prefetch_data = generator.generate_prefetch( + random_seed=random_seed, + free_text_path=csv_path, + column_name=column_name, + **kwargs, + ) + + self._construct_request(prefetch_data, workflow_enum) + log.info( + f"Generated {len(self.request_data)} requests from free text for workflow {workflow_enum.value}" + ) + + return self + + def _construct_request( + self, data: Union[Prefetch, Any], workflow: Optional[Workflow] = None + ) -> None: + """ + Convert data to request format and add to queue. + + Args: + data: Data to convert (Prefetch for CDS, DocumentReference for CDA) + workflow: Workflow to use for request construction + """ + workflow = workflow or self.workflow + + if self.protocol == ApiProtocol.rest: + if not workflow: + raise ValueError( + "Workflow must be specified for REST/CDS Hooks requests" + ) + constructor = CdsRequestConstructor() + request = constructor.construct_request(data, workflow) + elif self.protocol == ApiProtocol.soap: + constructor = ClinDocRequestConstructor() + request = constructor.construct_request( + data, workflow or Workflow.sign_note_inpatient + ) + else: + raise ValueError(f"Unsupported protocol: {self.protocol}") + + self.request_data.append(request) + + def send_requests(self) -> List[Dict]: + """ + Send all queued requests to the service. + + Returns: + List of response dictionaries + + Raises: + RuntimeError: If no requests are queued + """ + if not self.request_data: + raise RuntimeError( + "No requests to send. Load data first using load_from_registry(), load_from_path(), or load_free_text()" + ) + + url = self.api.join(self.endpoint) + log.info(f"Sending {len(self.request_data)} requests to {url}") + + with httpx.Client(follow_redirects=True) as client: + responses: List[Dict] = [] + timeout = httpx.Timeout(self.timeout, read=None) + + for request in self.request_data: + try: + if self.protocol == ApiProtocol.soap: + headers = {"Content-Type": "text/xml; charset=utf-8"} + response = client.post( + url=str(url), + data=request.document, + headers=headers, + timeout=timeout, + ) + response.raise_for_status() + response_model = CdaResponse(document=response.text) + responses.append(response_model.model_dump_xml()) + else: + # REST/CDS Hooks + log.debug(f"Making POST request to: {url}") + response = client.post( + url=str(url), + json=request.model_dump(exclude_none=True), + timeout=timeout, + ) + response.raise_for_status() + response_data = response.json() + try: + cds_response = CDSResponse(**response_data) + responses.append(cds_response.model_dump(exclude_none=True)) + except Exception: + # Fallback to raw response if parsing fails + responses.append(response_data) + + except httpx.HTTPStatusError as exc: + try: + error_content = exc.response.json() + except Exception: + error_content = exc.response.text + log.error( + f"Error response {exc.response.status_code} while requesting " + f"{exc.request.url!r}: {error_content}" + ) + responses.append({}) + except httpx.TimeoutException as exc: + log.error(f"Request to {exc.request.url!r} timed out!") + responses.append({}) + except httpx.RequestError as exc: + log.error( + f"An error occurred while requesting {exc.request.url!r}." + ) + responses.append({}) + + self.responses = responses + log.info(f"Received {len(responses)} responses") + + return responses + + def save_results(self, directory: Union[str, Path] = "./output/") -> None: + """ + Save request and response data to disk. + + Args: + directory: Directory to save data to (default: "./output/") + + Raises: + RuntimeError: If no responses are available to save + """ + if not self.responses: + raise RuntimeError( + "No responses to save. Send requests first using send_requests()" + ) + + save_dir = Path(directory) + request_path = ensure_directory_exists(save_dir / "requests") + + # Determine file extension based on protocol + extension = "xml" if self.protocol == ApiProtocol.soap else "json" + + # Save requests + if self.protocol == ApiProtocol.soap: + request_data = [request.model_dump_xml() for request in self.request_data] + else: + request_data = [ + request.model_dump(exclude_none=True) for request in self.request_data + ] + + save_data_to_directory( + request_data, + "request", + self.sandbox_id, + request_path, + extension, + ) + log.info(f"Saved request data at {request_path}/") + + # Save responses + response_path = ensure_directory_exists(save_dir / "responses") + save_data_to_directory( + self.responses, + "response", + self.sandbox_id, + response_path, + extension, + ) + log.info(f"Saved response data at {response_path}/") + + def get_status(self) -> Dict[str, Any]: + """ + Get current client status and statistics. + + Returns: + Dictionary containing client status information + """ + return { + "sandbox_id": str(self.sandbox_id), + "api_url": str(self.api), + "endpoint": self.endpoint, + "protocol": self.protocol.value + if hasattr(self.protocol, "value") + else str(self.protocol), + "workflow": self.workflow.value if self.workflow else None, + "requests_queued": len(self.request_data), + "responses_received": len(self.responses), + } + + def __repr__(self) -> str: + """String representation of SandboxClient.""" + return ( + f"SandboxClient(api_url='{self.api}', endpoint='{self.endpoint}', " + f"protocol='{self.protocol.value if hasattr(self.protocol, 'value') else self.protocol}', " + f"requests={len(self.request_data)})" + ) diff --git a/healthchain/sandbox/use_cases/__init__.py b/healthchain/sandbox/use_cases/__init__.py deleted file mode 100644 index 1f6cf9cd..00000000 --- a/healthchain/sandbox/use_cases/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from .cds import ClinicalDecisionSupport, CdsRequestConstructor -from .clindoc import ClinicalDocumentation, ClinDocRequestConstructor - -__all__ = [ - "ClinicalDecisionSupport", - "CdsRequestConstructor", - "ClinicalDocumentation", - "ClinDocRequestConstructor", -] diff --git a/healthchain/sandbox/use_cases/cds.py b/healthchain/sandbox/use_cases/cds.py deleted file mode 100644 index babc3ce2..00000000 --- a/healthchain/sandbox/use_cases/cds.py +++ /dev/null @@ -1,124 +0,0 @@ -import logging - -from typing import Dict, Optional -from fhir.resources.resource import Resource - -from healthchain.service.endpoints import ApiProtocol -from healthchain.sandbox.base import BaseUseCase, BaseRequestConstructor, BaseClient -from healthchain.sandbox.workflows import ( - UseCaseMapping, - UseCaseType, - Workflow, - validate_workflow, -) -from healthchain.models.requests import CDSRequest -from healthchain.models.hooks import ( - OrderSelectContext, - OrderSignContext, - PatientViewContext, - EncounterDischargeContext, - Prefetch, -) - -log = logging.getLogger(__name__) - - -class CdsRequestConstructor(BaseRequestConstructor): - """ - Handles the request construction and validation - """ - - def __init__(self) -> None: - self.api_protocol = ApiProtocol.rest - self.context_mapping = { - Workflow.order_select: OrderSelectContext, - Workflow.order_sign: OrderSignContext, - Workflow.patient_view: PatientViewContext, - Workflow.encounter_discharge: EncounterDischargeContext, - } - - @validate_workflow(UseCaseMapping.ClinicalDecisionSupport) - def construct_request( - self, - prefetch_data: Dict[str, Resource], - workflow: Workflow, - context: Optional[Dict[str, str]] = {}, - ) -> CDSRequest: - """ - Constructs a HL7-compliant CDS request with prefetch data. - - Parameters: - prefetch_data (Dict[str, Resource]): Dictionary mapping prefetch keys to FHIR resources - workflow (Workflow): The CDS hook name, e.g. patient-view - context (Optional[Dict[str, str]]): Optional context data for the CDS hook - - Returns: - CDSRequest: A Pydantic model that wraps a CDS request for REST API - - Raises: - ValueError: If the workflow is invalid or not implemented - TypeError: If any prefetch value is not a valid FHIR resource - - # TODO: Add FhirServer support - """ - - log.debug(f"Constructing CDS request for {workflow.value} from {prefetch_data}") - - context_model = self.context_mapping.get(workflow, None) - if context_model is None: - raise ValueError( - f"Invalid workflow {workflow.value} or workflow model not implemented." - ) - if not isinstance(prefetch_data, Prefetch): - raise TypeError( - f"Prefetch data must be a Prefetch object, but got {type(prefetch_data)}" - ) - request = CDSRequest( - hook=workflow.value, - context=context_model(**context), - prefetch=prefetch_data.prefetch, - ) - return request - - -class ClinicalDecisionSupport(BaseUseCase): - """ - Implements EHR backend simulator for Clinical Decision Support (CDS). - - This class provides functionality to simulate CDS Hooks interactions between - an EHR system and a CDS service. It handles the construction and sending of - CDS Hook requests according to the HL7 CDS Hooks specification. - - Parameters: - path (str): The API endpoint path for CDS services - client (Optional[BaseClient]): The client used to send requests to the CDS service - - The class uses a CdsRequestConstructor strategy to build properly formatted - CDS Hook requests with appropriate context and prefetch data. - - See https://cds-hooks.org/ for the complete specification - """ - - def __init__( - self, - path: str = "/cds-services/", - client: Optional[BaseClient] = None, - ) -> None: - super().__init__( - client=client, - ) - self._type = UseCaseType.cds - self._strategy = CdsRequestConstructor() - self._path = path - - @property - def description(self) -> str: - return "Clinical decision support (HL7 CDS specification)" - - @property - def type(self) -> UseCaseType: - return self._type - - @property - def strategy(self) -> BaseRequestConstructor: - return self._strategy diff --git a/healthchain/sandbox/use_cases/clindoc.py b/healthchain/sandbox/use_cases/clindoc.py deleted file mode 100644 index e937f975..00000000 --- a/healthchain/sandbox/use_cases/clindoc.py +++ /dev/null @@ -1,135 +0,0 @@ -import base64 -import logging -import pkgutil -import xmltodict - -from typing import Dict, Optional -from fhir.resources.documentreference import DocumentReference - -from healthchain.service.endpoints import ApiProtocol -from healthchain.models import CdaRequest -from healthchain.utils.utils import insert_at_key -from healthchain.sandbox.base import BaseClient, BaseUseCase, BaseRequestConstructor -from healthchain.sandbox.workflows import ( - UseCaseMapping, - UseCaseType, - Workflow, - validate_workflow, -) - - -log = logging.getLogger(__name__) - - -class ClinDocRequestConstructor(BaseRequestConstructor): - """ - Handles the request construction and validation of a NoteReader CDA file - """ - - def __init__(self) -> None: - self.api_protocol: ApiProtocol = ApiProtocol.soap - self.soap_envelope: Dict = self._load_soap_envelope() - - def _load_soap_envelope(self): - data = pkgutil.get_data("healthchain", "templates/soap_envelope.xml") - return xmltodict.parse(data.decode("utf-8")) - - def construct_cda_xml_document(self): - """ - This function should wrap FHIR resources from Document into a template CDA file - TODO: implement this function - """ - raise NotImplementedError("This function is not implemented yet.") - - @validate_workflow(UseCaseMapping.ClinicalDocumentation) - def construct_request( - self, document_reference: DocumentReference, workflow: Workflow - ) -> CdaRequest: - """ - Constructs a CDA request for clinical documentation use cases (NoteReader) - - Parameters: - document_reference (DocumentReference): FHIR DocumentReference containing CDA XML data - workflow (Workflow): The NoteReader workflow type, e.g. notereader-sign-inpatient - - Returns: - CdaRequest: A Pydantic model containing the CDA XML wrapped in a SOAP envelope - - Raises: - ValueError: If the SOAP envelope template is invalid or missing required keys - """ - # TODO: handle different workflows - cda_xml = None - for content in document_reference.content: - if content.attachment.contentType == "text/xml": - cda_xml = content.attachment.data - break - - if cda_xml is not None: - # Make a copy of the SOAP envelope template - soap_envelope = self.soap_envelope.copy() - - cda_xml = base64.b64encode(cda_xml).decode("utf-8") - - # Insert encoded cda in the Document section - if not insert_at_key(soap_envelope, "urn:Document", cda_xml): - raise ValueError( - "Key 'urn:Document' missing from SOAP envelope template!" - ) - request = CdaRequest.from_dict(soap_envelope) - - return request - else: - log.warning("No CDA document found in the DocumentReference!") - - -class ClinicalDocumentation(BaseUseCase): - """ - Implements EHR backend strategy for clinical documentation (NoteReader) - - This class represents the backend strategy for clinical documentation using the NoteReader system. - It inherits from the `BaseUseCase` class and provides methods for processing NoteReader documents. - When used with the @sandbox decorator, it enables testing and validation of clinical documentation - workflows in a controlled environment. - - Attributes: - client (Optional[BaseClient]): The client to be used for communication with the service. - path (str): The endpoint path to send requests to. Defaults to "/notereader/". - Will be normalized to ensure it starts and ends with a forward slash. - type (UseCaseType): The type of use case, set to UseCaseType.clindoc. - strategy (BaseRequestConstructor): The strategy used for constructing requests. - - Example: - @sandbox("http://localhost:8000") - class MyNoteReader(ClinicalDocumentation): - def __init__(self): - super().__init__(path="/custom/notereader/") - - # Create instance and start sandbox - note_reader = MyNoteReader() - note_reader.start_sandbox(save_data=True) - """ - - def __init__( - self, - path: str = "/notereader/", - client: Optional[BaseClient] = None, - ) -> None: - super().__init__( - client=client, - ) - self._type = UseCaseType.clindoc - self._strategy = ClinDocRequestConstructor() - self._path = path - - @property - def description(self) -> str: - return "Clinical documentation (NoteReader)" - - @property - def type(self) -> UseCaseType: - return self._type - - @property - def strategy(self) -> BaseRequestConstructor: - return self._strategy diff --git a/healthchain/service/__init__.py b/healthchain/service/__init__.py deleted file mode 100644 index 1c3ea1ba..00000000 --- a/healthchain/service/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .service import Service - -__all__ = ["Service"] diff --git a/healthchain/service/endpoints.py b/healthchain/service/endpoints.py deleted file mode 100644 index 795aa781..00000000 --- a/healthchain/service/endpoints.py +++ /dev/null @@ -1,32 +0,0 @@ -from enum import Enum -from pydantic import BaseModel, field_validator -from typing import Optional, Callable - - -class ApiProtocol(Enum): - """ - Enum defining the supported API protocols. - - Available protocols: - - soap: SOAP protocol - - rest: REST protocol - """ - - soap = "SOAP" - rest = "REST" - - -class Endpoint(BaseModel): - path: str - method: str - function: Callable - description: Optional[str] = None - api_protocol: ApiProtocol = ApiProtocol.rest - - @field_validator("method") - @classmethod - def validate_method(cls, v: str) -> str: - method = v.upper() - if method not in ["GET", "POST", "PUT", "DELETE"]: - raise ValueError("Method must be 'GET', 'POST', 'PUT', or 'DELETE'.") - return method diff --git a/healthchain/service/service.py b/healthchain/service/service.py deleted file mode 100644 index 3a7ca036..00000000 --- a/healthchain/service/service.py +++ /dev/null @@ -1,115 +0,0 @@ -import os -import signal -import logging -import uvicorn -import warnings - -from typing import Dict - -from fastapi import FastAPI, APIRouter -from fastapi.responses import JSONResponse -from fastapi.middleware.wsgi import WSGIMiddleware -from contextlib import asynccontextmanager -from termcolor import colored - -from healthchain.gateway.soap.utils.wsgi import start_wsgi -from healthchain.service.endpoints import ApiProtocol, Endpoint - -log = logging.getLogger(__name__) - - -class Service: - """ - A service wrapper which registers routes and starts a FastAPI service - - DEPRECATED: This class is deprecated and will be removed in a future version. - Use `healthchain.gateway.api.app.HealthChainAPI` or `create_app()` instead. - - Parameters: - endpoints (Dict[str, Enpoint]): the list of endpoints to register, must be a dictionary - of Endpoint objects. Should have human-readable keys e.g. ["info", "service_mount"] - - """ - - def __init__(self, endpoints: Dict[str, Endpoint] = None): - warnings.warn( - "The Service class is deprecated and will be removed in a future version. " - "Use healthchain.gateway.api.app.HealthChainAPI or create_app() instead.", - DeprecationWarning, - stacklevel=2, - ) - self.app = FastAPI(lifespan=self.lifespan) - self.endpoints: Endpoint = endpoints - - if self.endpoints is not None: - self._register_routes() - - # Router to handle stopping the server - self.stop_router = APIRouter() - self.stop_router.add_api_route( - "/shutdown", self._shutdown, methods=["GET"], include_in_schema=False - ) - self.app.include_router(self.stop_router) - - def _register_routes(self) -> None: - # TODO: add kwargs - for endpoint in self.endpoints.values(): - if endpoint.api_protocol == ApiProtocol.soap: - wsgi_app = start_wsgi(endpoint.function) - self.app.mount(endpoint.path, WSGIMiddleware(wsgi_app)) - else: - self.app.add_api_route( - endpoint.path, - endpoint.function, - methods=[endpoint.method], - response_model_exclude_none=True, - ) - - @asynccontextmanager - async def lifespan(self, app: FastAPI): - self._startup() - yield - self._shutdown() - - def _startup(self) -> None: - healthchain_ascii = r""" - - __ __ ____ __ ________ _ - / / / /__ ____ _/ / /_/ /_ / ____/ /_ ____ _(_)___ - / /_/ / _ \/ __ `/ / __/ __ \/ / / __ \/ __ `/ / __ \ - / __ / __/ /_/ / / /_/ / / / /___/ / / / /_/ / / / / / -/_/ /_/\___/\__,_/_/\__/_/ /_/\____/_/ /_/\__,_/_/_/ /_/ - -""" # noqa: E501 - - colors = ["red", "yellow", "green", "cyan", "blue", "magenta"] - for i, line in enumerate(healthchain_ascii.split("\n")): - color = colors[i % len(colors)] - print(colored(line, color)) - for endpoint in self.endpoints.values(): - print( - f"{colored('HEALTHCHAIN', 'green')}: {endpoint.method} endpoint at {endpoint.path}/" - ) - print( - f"{colored('HEALTHCHAIN', 'green')}: See more details at {colored(self.app.docs_url, 'magenta')}" - ) - - def _shutdown(self): - """ - Shuts down server - """ - os.kill(os.getpid(), signal.SIGTERM) - return JSONResponse(content={"message": "Server is shutting down..."}) - - def run(self, config: Dict = None) -> None: - """ - Starts server on uvicorn. - - Parameters: - config (Dict): kwargs to pass into uvicorn. - - """ - if config is None: - config = {} - - uvicorn.run(self.app, **config) diff --git a/healthchain/use_cases.py b/healthchain/use_cases.py deleted file mode 100644 index c62f3ea5..00000000 --- a/healthchain/use_cases.py +++ /dev/null @@ -1,11 +0,0 @@ -import warnings - -# Issue deprecation warning -warnings.warn( - "The 'healthchain.use_cases' module is deprecated. Please use 'healthchain.sandbox.use_cases' instead.", - DeprecationWarning, - stacklevel=2, -) - -# Import everything from the new location -from healthchain.sandbox.use_cases import * # noqa: E402 F403 diff --git a/healthchain/utils/__init__.py b/healthchain/utils/__init__.py index 8dc05ffb..a4bcd2e2 100644 --- a/healthchain/utils/__init__.py +++ b/healthchain/utils/__init__.py @@ -1,4 +1,3 @@ -from .urlbuilder import UrlBuilder from .idgenerator import IdGenerator -__all__ = ["UrlBuilder", "IdGenerator"] +__all__ = ["IdGenerator"] diff --git a/healthchain/utils/urlbuilder.py b/healthchain/utils/urlbuilder.py deleted file mode 100644 index 79aff994..00000000 --- a/healthchain/utils/urlbuilder.py +++ /dev/null @@ -1,35 +0,0 @@ -from typing import Dict - -from healthchain.service.endpoints import Endpoint - - -class UrlBuilder: - def __init__(self) -> None: - self.base = "" - self.route = "" - self.service = "" - - @classmethod - def build_from_config( - cls, config: Dict[str, str], endpoints: Dict[str, Endpoint], service_id: str - ) -> str: - protocol = config.get("ssl_keyfile") - if protocol is None: - protocol = "http" - else: - protocol = "https" - host = config.get("host", "127.0.0.1") - port = config.get("port", "8000") - service = getattr(endpoints.get("service_mount"), "path", None) - if service is not None: - # TODO: revisit this - not all services are formatted with id! - service = service.format(id=service_id) - else: - raise ValueError( - f"Can't fetch service details: key 'service_mount' doesn't exist in {endpoints}" - ) - cls.base = f"{protocol}://{host}:{port}" - cls.route = service - cls.service = f"{protocol}://{host}:{port}{service}" - - return cls diff --git a/mkdocs.yml b/mkdocs.yml index a200274f..b3e54e5e 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -64,12 +64,9 @@ nav: - api/component.md - api/containers.md - api/adapters.md - - api/use_cases.md - api/cds_hooks.md - - api/service.md - - api/clients.md + - api/sandbox.md - api/interop.md - - api/data_generators.md - api/fhir_helpers.md - Community: - community/index.md diff --git a/scripts/healthchainapi_e2e_demo.py b/scripts/healthchainapi_e2e_demo.py index cb16299d..0b6870d0 100644 --- a/scripts/healthchainapi_e2e_demo.py +++ b/scripts/healthchainapi_e2e_demo.py @@ -30,11 +30,9 @@ from dataclasses import dataclass, field from enum import Enum -import healthchain as hc from spacy.tokens import Span # HealthChain imports -from healthchain.data_generators.cdsdatagenerator import CdsDataGenerator from healthchain.gateway import ( HealthChainAPI, NoteReaderService, @@ -45,21 +43,19 @@ from healthchain.gateway.fhir import FHIRGateway from healthchain.gateway.events.dispatcher import local_handler from healthchain.io import Document -from healthchain.models.hooks.prefetch import Prefetch from healthchain.models.requests import CdaRequest from healthchain.models.requests.cdsrequest import CDSRequest from healthchain.models.responses import CdaResponse from healthchain.models.responses.cdsresponse import CDSResponse from healthchain.pipeline.medicalcodingpipeline import MedicalCodingPipeline from healthchain.pipeline.summarizationpipeline import SummarizationPipeline -from healthchain.sandbox.use_cases import ClinicalDocumentation -from healthchain.fhir import create_document_reference -from healthchain.sandbox.use_cases.cds import ClinicalDecisionSupport # FHIR imports from fhir.resources.documentreference import DocumentReference from fhir.resources.patient import Patient from fhir.resources.meta import Meta +from healthchain.sandbox import SandboxClient +from healthchain.fhir import create_document_reference # Configuration CONFIG = { @@ -409,49 +405,36 @@ def create_sandboxes(): """Create sandbox environments for testing""" print_section("Creating Sandbox Environments", "🏖️") - base_url = f"http://{CONFIG['server']['host']}:{CONFIG['server']['port']}/" + base_url = f"http://{CONFIG['server']['host']}:{CONFIG['server']['port']}" # NoteReader Sandbox - @hc.sandbox(base_url) - class NotereaderSandbox(ClinicalDocumentation): - """Sandbox for testing clinical documentation workflows""" - - def __init__(self): - super().__init__() - self.data_path = CONFIG["data"]["cda_document_path"] - - @hc.ehr(workflow=CONFIG["workflows"]["notereader"]) - def load_clinical_document(self) -> DocumentReference: - """Load a sample CDA document for processing""" - with open(self.data_path, "r") as file: - xml_content = file.read() - - return create_document_reference( - data=xml_content, - content_type="text/xml", - description="Sample CDA document from sandbox", - ) + notereader_client = SandboxClient( + api_url=base_url, + endpoint="/notereader/fhir/", + workflow=CONFIG["workflows"]["notereader"], + protocol="soap", + ) + + # Load CDA document + notereader_client.load_from_path(CONFIG["data"]["cda_document_path"]) # CDS Hooks Sandbox - @hc.sandbox(base_url) - class DischargeNoteSummarizer(ClinicalDecisionSupport): - """Sandbox for testing clinical decision support workflows""" - - def __init__(self): - super().__init__(path="/cds/cds-services/") - self.data_generator = CdsDataGenerator() - - @hc.ehr(workflow=CONFIG["workflows"]["cds"]) - def load_discharge_data(self) -> Prefetch: - """Generate synthetic discharge data for testing""" - data = self.data_generator.generate_prefetch( - free_text_path=CONFIG["data"]["discharge_notes_path"], - column_name="text", - ) - return data + cds_client = SandboxClient( + api_url=base_url, + endpoint="/cds/cds-services/discharge-summary", + workflow=CONFIG["workflows"]["cds"], + protocol="rest", + ) + + # Load discharge notes + cds_client.load_free_text( + csv_path=CONFIG["data"]["discharge_notes_path"], + column_name="text", + workflow=CONFIG["workflows"]["cds"], + ) print_success("Sandbox environments created") - return NotereaderSandbox(), DischargeNoteSummarizer() + return notereader_client, cds_client def start_server(app): @@ -479,15 +462,18 @@ def run_server(): return server_thread -def run_sandbox_demos(notereader_sandbox, cds_sandbox): +def run_sandbox_demos(notereader_client, cds_client): """Run the sandbox demonstrations""" print_section("Running Sandbox Demonstrations", "🎭") # Start NoteReader demo print_step("Starting Clinical Documentation sandbox...") try: - notereader_sandbox.start_sandbox() - print_success("Clinical Documentation sandbox completed") + responses = notereader_client.send_requests() + notereader_client.save_results("./output/") + print_success( + f"Clinical Documentation sandbox completed - {len(responses)} response(s)" + ) except Exception as e: print(f"❌ NoteReader sandbox error: {str(e)}") @@ -496,8 +482,11 @@ def run_sandbox_demos(notereader_sandbox, cds_sandbox): # Start CDS Hooks demo print_step("Starting Clinical Decision Support sandbox...") try: - cds_sandbox.start_sandbox(service_id="discharge-summary") - print_success("Clinical Decision Support sandbox completed") + responses = cds_client.send_requests() + cds_client.save_results("./output/") + print_success( + f"Clinical Decision Support sandbox completed - {len(responses)} response(s)" + ) except Exception as e: print(f"❌ CDS sandbox error: {str(e)}") diff --git a/tests/sandbox/conftest.py b/tests/sandbox/conftest.py deleted file mode 100644 index 048401d7..00000000 --- a/tests/sandbox/conftest.py +++ /dev/null @@ -1,124 +0,0 @@ -import pytest - -from unittest.mock import Mock -from healthchain.fhir import create_bundle -from healthchain.models.hooks.prefetch import Prefetch -from healthchain.sandbox.base import BaseRequestConstructor, BaseUseCase -from healthchain.sandbox.clients import EHRClient -from healthchain.sandbox.decorator import sandbox -from healthchain.sandbox.use_cases.cds import ClinicalDecisionSupport -from healthchain.sandbox.workflows import UseCaseType - - -class MockDataGenerator: - def __init__(self) -> None: - self.generated_data = Prefetch(prefetch={"document": create_bundle()}) - self.workflow = None - - def set_workflow(self, workflow): - self.workflow = workflow - - -@pytest.fixture -def mock_strategy(): - mock = Mock() - mock.construct_request = Mock( - return_value=Mock(model_dump_json=Mock(return_value="{}")) - ) - return mock - - -@pytest.fixture -def mock_function(): - return Mock() - - -@pytest.fixture -def mock_workflow(): - return Mock() - - -@pytest.fixture -def ehr_client(mock_function, mock_workflow, mock_strategy): - return EHRClient(mock_function, mock_workflow, mock_strategy) - - -@pytest.fixture -def mock_cds() -> BaseUseCase: - class MockClinicalDecisionSupportStrategy(BaseRequestConstructor): - # Add required api_protocol property - api_protocol = "rest" - - construct_request = Mock( - return_value=Mock(model_dump_json=Mock(return_value="{}")) - ) - - class MockClinicalDecisionSupport(BaseUseCase): - type = UseCaseType.cds - _path = "/cds" - strategy = MockClinicalDecisionSupportStrategy() - - @property - def path(self): - return self._path - - return MockClinicalDecisionSupport - - -@pytest.fixture -def mock_client_decorator(): - """Create a mock decorator for client methods""" - - def mock_client_decorator(func): - func.is_client = True - return func - - return mock_client_decorator - - -@pytest.fixture -def correct_sandbox_class(mock_client_decorator): - """Create a correct sandbox class with required API URL""" - - @sandbox("http://localhost:8000") - class TestSandbox(ClinicalDecisionSupport): - def __init__(self) -> None: - super().__init__(path="/cds-services/") - - @mock_client_decorator - def foo(self): - return "foo" - - return TestSandbox - - -@pytest.fixture -def incorrect_client_num_sandbox_class(mock_client_decorator): - """Create a sandbox class with too many client methods""" - - @sandbox("http://localhost:8000") - class TestSandbox(ClinicalDecisionSupport): - def __init__(self) -> None: - super().__init__(path="/cds-services/") - - @mock_client_decorator - def foo(self): - return "foo" - - @mock_client_decorator - def foo2(self): - return "foo" - - return TestSandbox - - -@pytest.fixture -def missing_funcs_sandbox_class(): - """Create a sandbox class with missing client methods""" - - @sandbox("http://localhost:8000") - class TestSandbox(ClinicalDecisionSupport): - def __init__(self) -> None: - super().__init__(path="/cds-services/") - - return TestSandbox diff --git a/tests/generators_tests/test_cds_data_generator.py b/tests/sandbox/generators/test_cds_data_generator.py similarity index 97% rename from tests/generators_tests/test_cds_data_generator.py rename to tests/sandbox/generators/test_cds_data_generator.py index 7b30fb26..eecea05a 100644 --- a/tests/generators_tests/test_cds_data_generator.py +++ b/tests/sandbox/generators/test_cds_data_generator.py @@ -5,7 +5,7 @@ from fhir.resources.procedure import Procedure from fhir.resources.patient import Patient -from healthchain.data_generators import CdsDataGenerator +from healthchain.sandbox.generators import CdsDataGenerator from healthchain.sandbox.workflows import Workflow diff --git a/tests/generators_tests/test_condition_generators.py b/tests/sandbox/generators/test_condition_generators.py similarity index 92% rename from tests/generators_tests/test_condition_generators.py rename to tests/sandbox/generators/test_condition_generators.py index 86185d02..6c9cb780 100644 --- a/tests/generators_tests/test_condition_generators.py +++ b/tests/sandbox/generators/test_condition_generators.py @@ -1,10 +1,10 @@ -from healthchain.data_generators.conditiongenerators import ( +from healthchain.sandbox.generators.conditiongenerators import ( ClinicalStatusGenerator, VerificationStatusGenerator, CategoryGenerator, ConditionGenerator, ) -from healthchain.data_generators.value_sets.conditioncodes import ( +from healthchain.sandbox.generators.value_sets.conditioncodes import ( ConditionCodeSimple, ConditionCodeComplex, ) diff --git a/tests/generators_tests/test_encounter_generators.py b/tests/sandbox/generators/test_encounter_generators.py similarity index 93% rename from tests/generators_tests/test_encounter_generators.py rename to tests/sandbox/generators/test_encounter_generators.py index 18ee27f6..122bf766 100644 --- a/tests/generators_tests/test_encounter_generators.py +++ b/tests/sandbox/generators/test_encounter_generators.py @@ -1,4 +1,4 @@ -from healthchain.data_generators.encountergenerators import ( +from healthchain.sandbox.generators.encountergenerators import ( ClassGenerator, EncounterTypeGenerator, EncounterGenerator, diff --git a/tests/generators_tests/test_medication_administration_generators.py b/tests/sandbox/generators/test_medication_administration_generators.py similarity index 81% rename from tests/generators_tests/test_medication_administration_generators.py rename to tests/sandbox/generators/test_medication_administration_generators.py index 673ccb6a..24062c8d 100644 --- a/tests/generators_tests/test_medication_administration_generators.py +++ b/tests/sandbox/generators/test_medication_administration_generators.py @@ -1,8 +1,8 @@ -from healthchain.data_generators.medicationadministrationgenerators import ( +from healthchain.sandbox.generators.medicationadministrationgenerators import ( MedicationAdministrationDosageGenerator, MedicationAdministrationGenerator, ) -from healthchain.data_generators.value_sets.medicationcodes import ( +from healthchain.sandbox.generators.value_sets.medicationcodes import ( MedicationRequestMedication, ) diff --git a/tests/generators_tests/test_medication_request_generators.py b/tests/sandbox/generators/test_medication_request_generators.py similarity index 84% rename from tests/generators_tests/test_medication_request_generators.py rename to tests/sandbox/generators/test_medication_request_generators.py index 88d24a7f..4d7437cf 100644 --- a/tests/generators_tests/test_medication_request_generators.py +++ b/tests/sandbox/generators/test_medication_request_generators.py @@ -1,8 +1,8 @@ -from healthchain.data_generators.medicationrequestgenerators import ( +from healthchain.sandbox.generators.medicationrequestgenerators import ( MedicationRequestGenerator, MedicationRequestContainedGenerator, ) -from healthchain.data_generators.value_sets.medicationcodes import ( +from healthchain.sandbox.generators.value_sets.medicationcodes import ( MedicationRequestMedication, ) diff --git a/tests/generators_tests/test_patient_generators.py b/tests/sandbox/generators/test_patient_generators.py similarity index 91% rename from tests/generators_tests/test_patient_generators.py rename to tests/sandbox/generators/test_patient_generators.py index 7651b4f7..cc7fc184 100644 --- a/tests/generators_tests/test_patient_generators.py +++ b/tests/sandbox/generators/test_patient_generators.py @@ -1,4 +1,4 @@ -from healthchain.data_generators.patientgenerators import ( +from healthchain.sandbox.generators.patientgenerators import ( PatientGenerator, HumanNameGenerator, ) diff --git a/tests/generators_tests/test_practitioner_generators.py b/tests/sandbox/generators/test_practitioner_generators.py similarity index 97% rename from tests/generators_tests/test_practitioner_generators.py rename to tests/sandbox/generators/test_practitioner_generators.py index affaec61..75e8c9f1 100644 --- a/tests/generators_tests/test_practitioner_generators.py +++ b/tests/sandbox/generators/test_practitioner_generators.py @@ -1,4 +1,4 @@ -from healthchain.data_generators.practitionergenerators import ( +from healthchain.sandbox.generators.practitionergenerators import ( PractitionerGenerator, Practitioner_QualificationGenerator, Practitioner_CommunicationGenerator, diff --git a/tests/generators_tests/test_procedure_generators.py b/tests/sandbox/generators/test_procedure_generators.py similarity index 77% rename from tests/generators_tests/test_procedure_generators.py rename to tests/sandbox/generators/test_procedure_generators.py index 526d18f8..52dffbec 100644 --- a/tests/generators_tests/test_procedure_generators.py +++ b/tests/sandbox/generators/test_procedure_generators.py @@ -1,5 +1,5 @@ -from healthchain.data_generators.proceduregenerators import ProcedureGenerator -from healthchain.data_generators.value_sets.procedurecodes import ( +from healthchain.sandbox.generators.proceduregenerators import ProcedureGenerator +from healthchain.sandbox.generators.value_sets.procedurecodes import ( ProcedureCodeSimple, ProcedureCodeComplex, ) diff --git a/tests/sandbox/test_cds_sandbox.py b/tests/sandbox/test_cds_sandbox.py index fc16f342..28450fb8 100644 --- a/tests/sandbox/test_cds_sandbox.py +++ b/tests/sandbox/test_cds_sandbox.py @@ -1,18 +1,17 @@ from unittest.mock import patch, MagicMock -import healthchain as hc +from healthchain.sandbox import SandboxClient from healthchain.gateway.cds import CDSHooksService from healthchain.gateway.api import HealthChainAPI from healthchain.models.requests.cdsrequest import CDSRequest from healthchain.models.responses.cdsresponse import CDSResponse, Card from healthchain.models.hooks.prefetch import Prefetch -from healthchain.sandbox.use_cases import ClinicalDecisionSupport from healthchain.fhir import create_bundle, create_condition def test_cdshooks_sandbox_integration(): - """Test CDSHooks service integration with sandbox decorator""" - # Create HealthChainAPI instead of FastAPI + """Test CDSHooks service integration with SandboxClient""" + # Create HealthChainAPI app = HealthChainAPI() cds_service = CDSHooksService() @@ -28,22 +27,28 @@ async def handle_patient_view(request: CDSRequest) -> CDSResponse: # Register the service with the HealthChainAPI app.register_service(cds_service, "/cds") - # Define a sandbox class using the CDSHooks service - @hc.sandbox("http://localhost:8000/") - class TestCDSHooksSandbox(ClinicalDecisionSupport): - def __init__(self): - super().__init__(path="/cds/cds-services/") - self.test_bundle = create_bundle() + # Create SandboxClient + client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/cds/cds-services/test-patient-view", + workflow="patient-view", + protocol="rest", + ) - @hc.ehr(workflow="patient-view") - def load_prefetch_data(self) -> Prefetch: - return Prefetch(prefetch={"patient": self.test_bundle}) + # Load test data + test_bundle = create_bundle() + prefetch_data = Prefetch(prefetch={"patient": test_bundle}) + client._construct_request(prefetch_data, client.workflow) - # Create an instance of the sandbox - sandbox_instance = TestCDSHooksSandbox() + # Verify request was constructed + assert len(client.request_data) == 1 + assert client.request_data[0].hook == "patient-view" + + # Mock HTTP response + with patch("httpx.Client") as mock_client_class: + mock_client = MagicMock() + mock_client_class.return_value.__enter__.return_value = mock_client - # Patch the client request method to avoid actual HTTP requests - with patch.object(sandbox_instance, "_client") as mock_client: mock_response = MagicMock() mock_response.json.return_value = { "cards": [ @@ -54,43 +59,52 @@ def load_prefetch_data(self) -> Prefetch: } ] } - mock_client.send_request.return_value = mock_response + mock_response.raise_for_status = MagicMock() + mock_client.post.return_value = mock_response + + # Send requests + responses = client.send_requests() - # Verify the sandbox can be initialized with the workflow - assert hasattr(sandbox_instance, "load_prefetch_data") + # Verify response + assert len(responses) == 1 + assert responses[0]["cards"][0]["summary"] == "Test Card" def test_cdshooks_workflows(): - """Test CDSHooks sandbox""" - - @hc.sandbox("http://localhost:8000/") - class TestCDSSandbox(ClinicalDecisionSupport): - def __init__(self): - super().__init__(path="/cds/cds-services/") - self.patient_bundle = create_bundle() - self.encounter_bundle = create_bundle() - - @hc.ehr(workflow="patient-view") - def load_patient_data(self) -> Prefetch: - # Add a condition to the bundle - condition = create_condition( - subject="Patient/123", code="123", display="Test Condition" - ) - self.patient_bundle.entry = [{"resource": condition}] - return Prefetch(prefetch={"patient": self.patient_bundle}) - - # Create sandbox instance - sandbox = TestCDSSandbox() - - # Verify both workflows are correctly registered - assert hasattr(sandbox, "load_patient_data") - - # Test the patient-view workflow - with patch.object(sandbox, "_client") as mock_client: + """Test CDSHooks sandbox with patient-view workflow""" + # Create SandboxClient + client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/cds/cds-services/patient-view", + workflow="patient-view", + protocol="rest", + ) + + # Create test data + patient_bundle = create_bundle() + condition = create_condition( + subject="Patient/123", code="123", display="Test Condition" + ) + patient_bundle.entry = [{"resource": condition}] + + # Load data into client + prefetch_data = Prefetch(prefetch={"patient": patient_bundle}) + client._construct_request(prefetch_data, client.workflow) + + # Verify request was constructed + assert len(client.request_data) == 1 + + # Mock HTTP response + with patch("httpx.Client") as mock_client_class: + mock_client = MagicMock() + mock_client_class.return_value.__enter__.return_value = mock_client + mock_response = MagicMock() mock_response.json.return_value = {"cards": []} - mock_client.send_request.return_value = mock_response + mock_response.raise_for_status = MagicMock() + mock_client.post.return_value = mock_response - # Mock client workflow - mock_client.workflow = MagicMock() - mock_client.workflow.value = "patient-view" + # Send requests and verify + responses = client.send_requests() + assert len(responses) == 1 + assert "cards" in responses[0] diff --git a/tests/sandbox/test_cds_usecase.py b/tests/sandbox/test_cds_usecase.py deleted file mode 100644 index 74943831..00000000 --- a/tests/sandbox/test_cds_usecase.py +++ /dev/null @@ -1,103 +0,0 @@ -import pytest -from unittest.mock import MagicMock - -from healthchain.sandbox.use_cases.cds import ( - CdsRequestConstructor, - ClinicalDecisionSupport, -) -from healthchain.sandbox.workflows import Workflow, UseCaseType -from healthchain.models.hooks.prefetch import Prefetch -from healthchain.service.endpoints import ApiProtocol -from healthchain.fhir import create_bundle - - -def test_cds_request_constructor_init(): - """Test CdsRequestConstructor initialization""" - constructor = CdsRequestConstructor() - - # Check protocol setting - assert constructor.api_protocol == ApiProtocol.rest - - # Check context mapping - assert Workflow.patient_view in constructor.context_mapping - assert Workflow.order_select in constructor.context_mapping - assert Workflow.order_sign in constructor.context_mapping - assert Workflow.encounter_discharge in constructor.context_mapping - - -def test_cds_request_constructor_validation(): - """Test validation of workflows in CdsRequestConstructor""" - constructor = CdsRequestConstructor() - - # Create a prefetch object - prefetch = Prefetch(prefetch={"patient": create_bundle()}) - - # Test with valid workflow - valid_workflow = Workflow.patient_view - # Should not raise error - constructor.construct_request(prefetch_data=prefetch, workflow=valid_workflow) - - # Test with invalid workflow - should raise ValueError - with pytest.raises(ValueError): - # Not a real workflow - invalid_workflow = MagicMock() - invalid_workflow.value = "invalid-workflow" - constructor.construct_request(prefetch_data=prefetch, workflow=invalid_workflow) - - -def test_cds_request_constructor_type_error(): - """Test type error handling in CdsRequestConstructor""" - constructor = CdsRequestConstructor() - - # Test with invalid prefetch data type - should raise TypeError - with pytest.raises(TypeError): - # Not a Prefetch object - invalid_prefetch = {"patient": create_bundle()} - constructor.construct_request( - prefetch_data=invalid_prefetch, workflow=Workflow.patient_view - ) - - -def test_cds_request_construction(): - """Test request construction in CdsRequestConstructor""" - constructor = CdsRequestConstructor() - - # Create a bundle and prefetch - bundle = create_bundle() - prefetch = Prefetch(prefetch={"patient": bundle}) - - # Construct a request - request = constructor.construct_request( - prefetch_data=prefetch, - workflow=Workflow.patient_view, - context={"patientId": "test-patient-123"}, - ) - - # Verify request properties - assert request.hook == "patient-view" - assert request.context.patientId == "test-patient-123" - assert request.prefetch == prefetch.prefetch - - -def test_clinical_decision_support_init(): - """Test ClinicalDecisionSupport initialization""" - # Test with default parameters - cds = ClinicalDecisionSupport() - assert cds.type == UseCaseType.cds - assert isinstance(cds.strategy, CdsRequestConstructor) - assert cds._path == "/cds-services/" - - # Test with custom path - custom_path = "/api/cds/" - cds_custom = ClinicalDecisionSupport(path=custom_path) - assert cds_custom._path == custom_path - - -def test_clinical_decision_support_properties(): - """Test ClinicalDecisionSupport properties""" - cds = ClinicalDecisionSupport() - - # Check properties - assert cds.description == "Clinical decision support (HL7 CDS specification)" - assert cds.type == UseCaseType.cds - assert isinstance(cds.strategy, CdsRequestConstructor) diff --git a/tests/sandbox/test_clients.py b/tests/sandbox/test_clients.py deleted file mode 100644 index 694653ac..00000000 --- a/tests/sandbox/test_clients.py +++ /dev/null @@ -1,56 +0,0 @@ -import pytest -import httpx - -from unittest.mock import Mock, patch - - -@pytest.fixture -def mock_strategy(): - mock = Mock() - mock.construct_request = Mock( - return_value=Mock(model_dump_json=Mock(return_value="{}")) - ) - return mock - - -def test_init(ehr_client, mock_function, mock_workflow, mock_strategy): - assert ehr_client.data_generator_func == mock_function - assert ehr_client.workflow == mock_workflow - assert ehr_client.strategy == mock_strategy - assert ehr_client.request_data == [] - - -def test_generate_request(ehr_client, mock_strategy): - ehr_client.generate_request(1, 2, test="data") - mock_strategy.construct_request.assert_called_once() - assert len(ehr_client.request_data) == 1 - - -@pytest.mark.asyncio -@patch.object( - httpx.AsyncClient, - "post", - return_value=httpx.Response(200, json={"response": "test successful"}), -) -async def test_send_request(ehr_client): - ehr_client.request_data = [Mock(model_dump_json=Mock(return_value="{}"))] - responses = await ehr_client.send_request("http://fakeurl.com") - assert all(response["status"] == "success" for response in responses) - - -@pytest.mark.asyncio -async def test_logging_on_send_request_error(caplog, ehr_client): - with patch.object(httpx.AsyncClient, "post") as mock_post: - mock_post.return_value = Mock() - mock_post.return_value.response.status_code = 400 - mock_post.return_value.raise_for_status.side_effect = httpx.HTTPStatusError( - "Bad Request", - request=Mock(url="http://fakeurl.com"), - response=Mock(status_code="400"), - ) - ehr_client.request_data = [ - Mock(model_dump_json=Mock(return_value="{'request': 'success'}")) - ] - responses = await ehr_client.send_request("http://fakeurl.com") - assert "Error response 400 while requesting 'http://fakeurl.com" in caplog.text - assert {} in responses diff --git a/tests/sandbox/test_clindoc_sandbox.py b/tests/sandbox/test_clindoc_sandbox.py index 800d7aaf..9d2b0fab 100644 --- a/tests/sandbox/test_clindoc_sandbox.py +++ b/tests/sandbox/test_clindoc_sandbox.py @@ -1,17 +1,16 @@ +import base64 from unittest.mock import patch, MagicMock -import healthchain as hc +from healthchain.sandbox import SandboxClient from healthchain.gateway.soap.notereader import NoteReaderService from healthchain.gateway.api import HealthChainAPI from healthchain.models.requests import CdaRequest from healthchain.models.responses.cdaresponse import CdaResponse -from healthchain.sandbox.use_cases import ClinicalDocumentation -from healthchain.fhir import create_document_reference def test_notereader_sandbox_integration(): - """Test NoteReaderService integration with sandbox decorator""" - # Use HealthChainAPI instead of FastAPI + """Test NoteReaderService integration with SandboxClient""" + # Use HealthChainAPI app = HealthChainAPI() note_service = NoteReaderService() @@ -23,63 +22,97 @@ def process_document(cda_request: CdaRequest) -> CdaResponse: # Register service with HealthChainAPI app.register_service(note_service, "/notereader") - # Define a sandbox class that uses the NoteReader service - @hc.sandbox("http://localhost:8000/") - class TestNotereaderSandbox(ClinicalDocumentation): - def __init__(self): - super().__init__() - self.test_document = "document" - - @hc.ehr(workflow="sign-note-inpatient") - def load_document_reference(self): - return create_document_reference( - data=self.test_document, - content_type="text/xml", - description="Test document", - ) - - # Create an instance of the sandbox - sandbox_instance = TestNotereaderSandbox() - - # Patch the client request method to avoid actual HTTP requests - with patch.object(sandbox_instance, "_client") as mock_client: + # Create SandboxClient for SOAP/CDA + client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/notereader/fhir/", + workflow="sign-note-inpatient", + protocol="soap", + ) + + # Load test document + test_document = "document" + client._construct_request(test_document, client.workflow) + + # Verify request was constructed + assert len(client.request_data) == 1 + + # Mock HTTP response with proper SOAP envelope structure + with patch("httpx.Client") as mock_client_class: + mock_client = MagicMock() + mock_client_class.return_value.__enter__.return_value = mock_client + + # Create proper SOAP response with base64-encoded CDA + cda_content = "document" + encoded_cda = base64.b64encode(cda_content.encode("utf-8")).decode("utf-8") + soap_response = f""" + + + + {encoded_cda} + + + + """ + mock_response = MagicMock() - mock_response.text = "document" - mock_client.send_soap_request.return_value = mock_response + mock_response.text = soap_response + mock_response.status_code = 200 + mock_response.raise_for_status = MagicMock() + mock_client.post.return_value = mock_response - # Verify the sandbox can be initialized with the workflow - assert hasattr(sandbox_instance, "load_document_reference") + # Send requests + responses = client.send_requests() + + # Verify response + assert len(responses) == 1 + assert "document" in responses[0] def test_notereader_sandbox_workflow_execution(): - """Test executing a NoteReader workflow in the sandbox""" - - # Create a sandbox class with NoteReader - @hc.sandbox("http://localhost:8000/") - class TestNotereaderWithData(ClinicalDocumentation): - def __init__(self): - super().__init__() - self.data_processed = False - - @hc.ehr(workflow="sign-note-inpatient") - def get_clinical_document(self): - return create_document_reference( - data="Test content", - content_type="text/xml", - description="Test CDA document", - ) - - # Create sandbox instance - sandbox = TestNotereaderWithData() - - # Mock the client to avoid HTTP requests - with patch.object(sandbox, "_client") as mock_client: - # Mock response from server + """Test executing a NoteReader workflow with SandboxClient""" + # Create SandboxClient + client = SandboxClient( + api_url="http://localhost:8000", + endpoint="/notereader/fhir/", + workflow="sign-note-inpatient", + protocol="soap", + ) + + # Load clinical document + clinical_document = "Test content" + client._construct_request(clinical_document, client.workflow) + + # Verify request was constructed + assert len(client.request_data) == 1 + + # Mock HTTP response with proper SOAP envelope structure + with patch("httpx.Client") as mock_client_class: + mock_client = MagicMock() + mock_client_class.return_value.__enter__.return_value = mock_client + + # Create proper SOAP response with base64-encoded CDA + cda_content = "document" + encoded_cda = base64.b64encode(cda_content.encode("utf-8")).decode("utf-8") + soap_response = f""" + + + + {encoded_cda} + + + + """ + mock_response = MagicMock() - mock_response.text = "document" + mock_response.text = soap_response mock_response.status_code = 200 - mock_client.send_soap_request.return_value = mock_response + mock_response.raise_for_status = MagicMock() + mock_client.post.return_value = mock_response - # Set up the sandbox with correct attributes for testing - sandbox._client.workflow = MagicMock() - sandbox._client.workflow.value = "sign-note-inpatient" + # Send requests and verify + responses = client.send_requests() + assert len(responses) == 1 + assert "document" in responses[0] diff --git a/tests/sandbox/test_clindoc_usecase.py b/tests/sandbox/test_clindoc_usecase.py deleted file mode 100644 index 46f22912..00000000 --- a/tests/sandbox/test_clindoc_usecase.py +++ /dev/null @@ -1,124 +0,0 @@ -import pytest -from unittest.mock import patch, MagicMock - -from healthchain.sandbox.use_cases import clindoc -from healthchain.sandbox.use_cases.clindoc import ( - ClinDocRequestConstructor, - ClinicalDocumentation, -) -from healthchain.sandbox.workflows import Workflow, UseCaseType -from healthchain.service.endpoints import ApiProtocol -from healthchain.fhir import create_document_reference - - -def test_clindoc_request_constructor_init(): - """Test ClinDocRequestConstructor initialization""" - constructor = ClinDocRequestConstructor() - - # Check protocol setting - assert constructor.api_protocol == ApiProtocol.soap - - # Check SOAP envelope was loaded - assert constructor.soap_envelope is not None - assert isinstance(constructor.soap_envelope, dict) - - -@patch("pkgutil.get_data") -def test_clindoc_request_constructor_load_envelope(mock_get_data): - """Test loading the SOAP envelope template""" - # Mock data returned from pkgutil - mock_get_data.return_value = ( - b"" - ) - - ClinDocRequestConstructor() - - # Check if pkgutil.get_data was called with correct parameters - mock_get_data.assert_called_once_with("healthchain", "templates/soap_envelope.xml") - - -def test_clindoc_request_constructor_not_implemented(): - """Test not implemented methods raise appropriate exceptions""" - constructor = ClinDocRequestConstructor() - - # Test that method raises NotImplementedError - with pytest.raises(NotImplementedError): - constructor.construct_cda_xml_document() - - -@patch.object(ClinDocRequestConstructor, "_load_soap_envelope") -def test_clindoc_request_construction(mock_load_envelope): - """Test CDA request construction from DocumentReference""" - # Create mock SOAP envelope - mock_envelope = { - "soapenv:Envelope": { - "soapenv:Body": {"urn:ProcessDocument": {"urn:Document": ""}} - } - } - mock_load_envelope.return_value = mock_envelope - - constructor = ClinDocRequestConstructor() - - # Create a DocumentReference with XML content - xml_content = "Test Document" - doc_ref = create_document_reference( - data=xml_content, content_type="text/xml", description="Test CDA Document" - ) - - # Mock CdaRequest.from_dict to avoid actual parsing - with patch("healthchain.models.CdaRequest.from_dict") as mock_from_dict: - mock_from_dict.return_value = MagicMock() - - # Construct the request - constructor.construct_request(doc_ref, Workflow.sign_note_inpatient) - - # Verify CdaRequest.from_dict was called with modified envelope - mock_from_dict.assert_called_once() - # XML should be base64 encoded - assert ( - "urn:Document" - in mock_envelope["soapenv:Envelope"]["soapenv:Body"]["urn:ProcessDocument"] - ) - - -def test_clindoc_request_construction_no_xml(): - """Test CDA request construction when no XML content is found""" - constructor = ClinDocRequestConstructor() - - # Create a DocumentReference without XML content - doc_ref = create_document_reference( - data="Not XML content", - content_type="text/plain", - description="Test non-XML Document", - ) - - mock_warning = MagicMock() - clindoc.log.warning = mock_warning - - result = constructor.construct_request(doc_ref, Workflow.sign_note_inpatient) - assert result is None - mock_warning.assert_called_once() - - -def test_clinical_documentation_init(): - """Test ClinicalDocumentation initialization""" - # Test with default parameters - clindoc = ClinicalDocumentation() - assert clindoc.type == UseCaseType.clindoc - assert isinstance(clindoc.strategy, ClinDocRequestConstructor) - assert clindoc._path == "/notereader/" - - # Test with custom path - custom_path = "/api/notereader/" - clindoc_custom = ClinicalDocumentation(path=custom_path) - assert clindoc_custom._path == custom_path - - -def test_clinical_documentation_properties(): - """Test ClinicalDocumentation properties""" - clindoc = ClinicalDocumentation() - - # Check properties - assert clindoc.description == "Clinical documentation (NoteReader)" - assert clindoc.type == UseCaseType.clindoc - assert isinstance(clindoc.strategy, ClinDocRequestConstructor) diff --git a/tests/sandbox/test_decorators.py b/tests/sandbox/test_decorators.py deleted file mode 100644 index e13bb142..00000000 --- a/tests/sandbox/test_decorators.py +++ /dev/null @@ -1,94 +0,0 @@ -import pytest -from unittest.mock import MagicMock - -from healthchain.sandbox.decorator import ehr -from healthchain.sandbox.utils import find_attributes_of_type, assign_to_attribute -from healthchain.sandbox.workflows import UseCaseType -from healthchain.sandbox.base import BaseUseCase - -from .conftest import MockDataGenerator - - -class MockUseCase: - def __init__(self) -> None: - self.data_gen = MockDataGenerator() - - -@pytest.fixture -def function(): - def func(self): - pass - - return func - - -def test_setting_workflow_attributes(): - instance = MockUseCase() - attributes = find_attributes_of_type(instance, MockDataGenerator) - assert attributes == ["data_gen"] - - -def test_assigning_workflow_attributes(): - instance = MockUseCase() - attributes = ["data_gen", "invalid"] - - assign_to_attribute(instance, attributes[0], "set_workflow", "workflow") - assert instance.data_gen.workflow == "workflow" - - with pytest.raises(AttributeError): - assign_to_attribute(instance, attributes[1], "set_workflow", "workflow") - - -def test_ehr_invalid_use_case(function): - instance = MockUseCase() - decorated = ehr(workflow="any_workflow")(function) - with pytest.raises(AssertionError) as excinfo: - decorated(instance) - assert "MockUseCase must be subclass of valid Use Case strategy!" in str( - excinfo.value - ) - - -def test_ehr_invalid_workflow(function, mock_cds): - with pytest.raises(ValueError) as excinfo: - decorated = ehr(workflow="invalid_workflow")(function) - decorated(mock_cds()) - assert "please select from" in str(excinfo.value) - - -def test_ehr_correct_behavior(function, mock_cds): - decorated = ehr(workflow="order-sign")(function) - result = decorated(mock_cds()) - assert len(result.request_data) == 1 - - -def test_ehr_multiple_calls(function, mock_cds): - decorated = ehr(workflow="order-select", num=3)(function) - result = decorated(mock_cds()) - assert len(result.request_data) == 3 - - -def test_ehr_decorator(): - """Test the ehr decorator functionality""" - - # Create a proper subclass of BaseUseCase to avoid patching - class MockUseCase(BaseUseCase): - type = UseCaseType.cds - path = "/test" - - # Mock strategy for testing - @property - def strategy(self): - return MagicMock() - - # Test the decorator with workflow - @ehr(workflow="patient-view") - def test_method(self): - return {"test": "data"} - - # Create an instance - mock_use_case = MockUseCase() - - # Verify method is marked as client - assert hasattr(mock_use_case.test_method, "is_client") - assert mock_use_case.test_method.is_client diff --git a/tests/sandbox/test_request_constructors.py b/tests/sandbox/test_request_constructors.py new file mode 100644 index 00000000..cd9e4ae5 --- /dev/null +++ b/tests/sandbox/test_request_constructors.py @@ -0,0 +1,210 @@ +import pytest +from unittest.mock import MagicMock, patch + +from healthchain.sandbox.requestconstructors import ( + CdsRequestConstructor, + ClinDocRequestConstructor, +) +from healthchain.sandbox.workflows import Workflow +from healthchain.models.hooks.prefetch import Prefetch +from healthchain.sandbox.base import ApiProtocol +from healthchain.fhir import create_bundle + + +def test_cds_request_constructor_init(): + """Test CdsRequestConstructor initialization""" + constructor = CdsRequestConstructor() + + # Check protocol setting + assert constructor.api_protocol == ApiProtocol.rest + + # Check context mapping + assert Workflow.patient_view in constructor.context_mapping + assert Workflow.order_select in constructor.context_mapping + assert Workflow.order_sign in constructor.context_mapping + assert Workflow.encounter_discharge in constructor.context_mapping + + +def test_cds_request_constructor_validation(): + """Test validation of workflows in CdsRequestConstructor""" + constructor = CdsRequestConstructor() + + # Create a prefetch object + prefetch = Prefetch(prefetch={"patient": create_bundle()}) + + # Test with valid workflow + valid_workflow = Workflow.patient_view + # Should not raise error + constructor.construct_request(prefetch_data=prefetch, workflow=valid_workflow) + + # Test with invalid workflow - should raise ValueError + with pytest.raises(ValueError): + # Not a real workflow + invalid_workflow = MagicMock() + invalid_workflow.value = "invalid-workflow" + constructor.construct_request(prefetch_data=prefetch, workflow=invalid_workflow) + + +def test_cds_request_constructor_type_error(): + """Test type error handling in CdsRequestConstructor""" + constructor = CdsRequestConstructor() + + # Test with invalid prefetch data type - should raise TypeError + with pytest.raises(TypeError): + # Not a Prefetch object + invalid_prefetch = {"patient": create_bundle()} + constructor.construct_request( + prefetch_data=invalid_prefetch, workflow=Workflow.patient_view + ) + + +def test_cds_request_construction(): + """Test request construction in CdsRequestConstructor""" + constructor = CdsRequestConstructor() + + # Create a bundle and prefetch + bundle = create_bundle() + prefetch = Prefetch(prefetch={"patient": bundle}) + + # Construct a request + request = constructor.construct_request( + prefetch_data=prefetch, + workflow=Workflow.patient_view, + context={"patientId": "test-patient-123"}, + ) + + # Verify request properties + assert request.hook == "patient-view" + assert request.context.patientId == "test-patient-123" + assert request.prefetch == prefetch.prefetch + + +def test_clindoc_request_constructor_init(): + """Test ClinDocRequestConstructor initialization""" + constructor = ClinDocRequestConstructor() + + # Check protocol setting + assert constructor.api_protocol == ApiProtocol.soap + + # Check SOAP envelope was loaded + assert constructor.soap_envelope is not None + assert isinstance(constructor.soap_envelope, dict) + + +@patch("pkgutil.get_data") +def test_clindoc_request_constructor_load_envelope(mock_get_data): + """Test loading the SOAP envelope template""" + # Mock data returned from pkgutil + mock_get_data.return_value = ( + b"" + ) + + ClinDocRequestConstructor() + + # Check if pkgutil.get_data was called with correct parameters + mock_get_data.assert_called_once_with("healthchain", "templates/soap_envelope.xml") + + +def test_clindoc_request_constructor_not_implemented(): + """Test not implemented methods raise appropriate exceptions""" + constructor = ClinDocRequestConstructor() + + # Test that method raises NotImplementedError + with pytest.raises(NotImplementedError): + constructor.construct_cda_xml_document() + + +@patch.object(ClinDocRequestConstructor, "_load_soap_envelope") +def test_clindoc_request_construction(mock_load_envelope): + """Test CDA request construction from DocumentReference""" + # Create mock SOAP envelope + mock_envelope = { + "soapenv:Envelope": { + "soapenv:Body": {"urn:ProcessDocument": {"urn:Document": ""}} + } + } + mock_load_envelope.return_value = mock_envelope + + constructor = ClinDocRequestConstructor() + + # Create a DocumentReference with XML content + xml_content = "Test Document" + + # Mock CdaRequest.from_dict to avoid actual parsing + with patch("healthchain.models.CdaRequest.from_dict") as mock_from_dict: + mock_from_dict.return_value = MagicMock() + + # Construct the request + constructor.construct_request(xml_content, Workflow.sign_note_inpatient) + + # Verify CdaRequest.from_dict was called with modified envelope + mock_from_dict.assert_called_once() + # XML should be base64 encoded + assert ( + "urn:Document" + in mock_envelope["soapenv:Envelope"]["soapenv:Body"]["urn:ProcessDocument"] + ) + + +def test_clindoc_request_handles_malformed_xml(): + """ClinDocRequestConstructor rejects malformed XML and returns None.""" + constructor = ClinDocRequestConstructor() + + # Test with invalid XML + malformed_xml = "tag" + result = constructor.construct_request(malformed_xml, Workflow.sign_note_inpatient) + + assert result is None + + +def test_clindoc_request_rejects_non_string_input(): + """ClinDocRequestConstructor raises ValueError for non-string data.""" + constructor = ClinDocRequestConstructor() + + with pytest.raises(ValueError, match="Expected str"): + constructor.construct_request({"not": "a string"}, Workflow.sign_note_inpatient) + + with pytest.raises(ValueError, match="Expected str"): + constructor.construct_request(123, Workflow.sign_note_inpatient) + + +def test_clindoc_request_missing_soap_envelope_key(): + """ClinDocRequestConstructor raises ValueError when SOAP template missing required key.""" + with patch.object(ClinDocRequestConstructor, "_load_soap_envelope") as mock_load: + # Mock envelope without required key + mock_load.return_value = {"soapenv:Envelope": {"soapenv:Body": {}}} + + constructor = ClinDocRequestConstructor() + xml_content = "Test" + + with pytest.raises(ValueError, match="Key 'urn:Document' missing"): + constructor.construct_request(xml_content, Workflow.sign_note_inpatient) + + +def test_cds_request_construction_with_custom_context(): + """CdsRequestConstructor includes custom context parameters in request.""" + constructor = CdsRequestConstructor() + bundle = create_bundle() + prefetch = Prefetch(prefetch={"patient": bundle}) + + # Test with custom context + custom_context = {"patientId": "patient-123", "encounterId": "encounter-456"} + + request = constructor.construct_request( + prefetch_data=prefetch, workflow=Workflow.patient_view, context=custom_context + ) + + assert request.context.patientId == "patient-123" + assert request.context.encounterId == "encounter-456" + + +def test_cds_request_validates_workflow_for_clinical_doc(): + """CdsRequestConstructor rejects ClinicalDocumentation workflows.""" + constructor = CdsRequestConstructor() + prefetch = Prefetch(prefetch={"patient": create_bundle()}) + + # Should reject sign-note workflows + with pytest.raises(ValueError, match="Invalid workflow"): + constructor.construct_request( + prefetch_data=prefetch, workflow=Workflow.sign_note_inpatient + ) diff --git a/tests/sandbox/test_sandbox_client.py b/tests/sandbox/test_sandbox_client.py new file mode 100644 index 00000000..e0498d79 --- /dev/null +++ b/tests/sandbox/test_sandbox_client.py @@ -0,0 +1,222 @@ +import pytest +import json + +from healthchain.sandbox import SandboxClient + + +def test_load_from_registry_unknown_dataset(): + """load_from_registry raises ValueError for unknown datasets.""" + client = SandboxClient(api_url="http://localhost:8000", endpoint="/test") + + with pytest.raises(ValueError, match="Unknown dataset"): + client.load_from_registry("nonexistent-dataset") + + +def test_load_from_path_single_xml_file(tmp_path): + """load_from_path loads single CDA XML file.""" + # Create test CDA file + cda_file = tmp_path / "test_cda.xml" + cda_file.write_text("Test CDA") + + client = SandboxClient( + api_url="http://localhost:8000", endpoint="/notereader/fhir/", protocol="soap" + ) + + result = client.load_from_path(str(cda_file)) + + assert result is client + assert len(client.request_data) == 1 + + +def test_load_from_path_directory_with_pattern(tmp_path): + """load_from_path loads multiple files from directory with pattern.""" + # Create test CDA files + (tmp_path / "note1.xml").write_text("Note 1") + (tmp_path / "note2.xml").write_text("Note 2") + (tmp_path / "other.txt").write_text("Not XML") + + client = SandboxClient( + api_url="http://localhost:8000", endpoint="/notereader/fhir/", protocol="soap" + ) + + client.load_from_path(str(tmp_path), pattern="*.xml") + + assert len(client.request_data) == 2 + + +def test_load_from_path_directory_all_files(tmp_path): + """load_from_path loads all matching files from directory.""" + # Create test files + (tmp_path / "note1.xml").write_text("Note 1") + (tmp_path / "note2.xml").write_text("Note 2") + + client = SandboxClient( + api_url="http://localhost:8000", endpoint="/notereader/fhir/", protocol="soap" + ) + + client.load_from_path(str(tmp_path)) + + assert len(client.request_data) == 2 + + +def test_load_from_path_error_handling(tmp_path): + """load_from_path raises FileNotFoundError for nonexistent path.""" + client = SandboxClient( + api_url="http://localhost:8000", endpoint="/notereader/fhir/", protocol="soap" + ) + + with pytest.raises(FileNotFoundError): + client.load_from_path("/nonexistent/path.xml") + + with pytest.raises(ValueError, match="No files found"): + client.load_from_path(str(tmp_path), pattern="*.xml") + + +def test_load_free_text_generates_data(tmp_path): + """load_free_text generates synthetic data from CSV.""" + # Create test CSV + csv_file = tmp_path / "test.csv" + csv_file.write_text("text\nSample discharge note\n") + + client = SandboxClient(api_url="http://localhost:8000", endpoint="/test") + + client.load_free_text( + csv_path=str(csv_file), + column_name="text", + workflow="encounter-discharge", + random_seed=42, + ) + assert len(client.request_data) > 0 + + +def test_send_requests_without_data(): + """send_requests raises RuntimeError if no data is loaded.""" + client = SandboxClient(api_url="http://localhost:8000", endpoint="/test") + + with pytest.raises(RuntimeError, match="No requests to send"): + client.send_requests() + + +def test_save_results_without_responses(): + """save_results raises RuntimeError if no responses available.""" + client = SandboxClient(api_url="http://localhost:8000", endpoint="/test") + + with pytest.raises(RuntimeError, match="No responses to save"): + client.save_results() + + +def test_get_status(): + """get_status returns client status information.""" + client = SandboxClient( + api_url="http://localhost:8000", endpoint="/test", workflow="patient-view" + ) + + status = client.get_status() + + assert "sandbox_id" in status + assert status["api_url"] == "http://localhost:8000" + assert status["endpoint"] == "/test" + assert status["protocol"] == "REST" + assert status["workflow"] == "patient-view" + assert status["requests_queued"] == 0 + assert status["responses_received"] == 0 + + +def test_repr(): + """__repr__ returns meaningful string representation.""" + client = SandboxClient(api_url="http://localhost:8000", endpoint="/test") + + repr_str = repr(client) + + assert "SandboxClient" in repr_str + assert "http://localhost:8000" in repr_str + assert "/test" in repr_str + + +def test_load_from_path_json_prefetch_file(tmp_path): + """load_from_path loads and validates JSON Prefetch files.""" + from healthchain.fhir import create_bundle + + # Create valid Prefetch JSON + json_file = tmp_path / "prefetch.json" + prefetch_data = {"prefetch": {"patient": create_bundle().model_dump()}} + json_file.write_text(json.dumps(prefetch_data)) + + client = SandboxClient( + api_url="http://localhost:8000", endpoint="/test", workflow="patient-view" + ) + + client.load_from_path(str(json_file)) + + assert len(client.request_data) == 1 + assert client.request_data[0].hook == "patient-view" + + +def test_load_from_path_json_without_workflow_fails(tmp_path): + """load_from_path requires workflow for JSON Prefetch files.""" + json_file = tmp_path / "prefetch.json" + json_file.write_text('{"prefetch": {}}') + + client = SandboxClient(api_url="http://localhost:8000", endpoint="/test") + + with pytest.raises(ValueError, match="Workflow must be specified"): + client.load_from_path(str(json_file)) + + +def test_load_from_path_invalid_json_prefetch(tmp_path): + """load_from_path rejects malformed JSON Prefetch data.""" + json_file = tmp_path / "invalid.json" + json_file.write_text('{"not_prefetch": "data"}') + + client = SandboxClient( + api_url="http://localhost:8000", endpoint="/test", workflow="patient-view" + ) + + with pytest.raises(ValueError, match="not valid Prefetch format"): + client.load_from_path(str(json_file)) + + +def test_save_results_distinguishes_protocols(tmp_path): + """save_results uses correct file extension based on protocol.""" + from healthchain.models import Prefetch + from healthchain.fhir import create_bundle + from healthchain.sandbox.workflows import Workflow + + # Test REST/JSON protocol + rest_client = SandboxClient( + api_url="http://localhost:8000", endpoint="/test", protocol="rest" + ) + prefetch = Prefetch(prefetch={"patient": create_bundle()}) + rest_client._construct_request(prefetch, Workflow.patient_view) + rest_client.responses = [{"cards": []}] + + rest_dir = tmp_path / "rest" + rest_client.save_results(rest_dir) + + assert len(list(rest_dir.glob("**/*.json"))) > 0 + assert len(list(rest_dir.glob("**/*.xml"))) == 0 + + # Test SOAP/XML protocol + soap_client = SandboxClient( + api_url="http://localhost:8000", endpoint="/test", protocol="soap" + ) + soap_client._construct_request("test", Workflow.sign_note_inpatient) + soap_client.responses = ["data"] + + soap_dir = tmp_path / "soap" + soap_client.save_results(soap_dir) + + assert len(list(soap_dir.glob("**/*.xml"))) > 0 + assert len(list(soap_dir.glob("**/*.json"))) == 0 + + +def test_construct_request_requires_workflow_for_rest(): + """_construct_request raises ValueError if workflow missing for REST protocol.""" + client = SandboxClient(api_url="http://localhost:8000", endpoint="/test") + from healthchain.models import Prefetch + from healthchain.fhir import create_bundle + + prefetch = Prefetch(prefetch={"patient": create_bundle()}) + + with pytest.raises(ValueError, match="Workflow must be specified for REST"): + client._construct_request(prefetch, None) diff --git a/tests/sandbox/test_sandbox_environment.py b/tests/sandbox/test_sandbox_environment.py deleted file mode 100644 index 488389e9..00000000 --- a/tests/sandbox/test_sandbox_environment.py +++ /dev/null @@ -1,148 +0,0 @@ -import pytest - -from unittest.mock import MagicMock - -from healthchain.sandbox.decorator import sandbox -from healthchain.sandbox.environment import SandboxEnvironment -from healthchain.sandbox.workflows import UseCaseType - - -def test_sandbox_init(correct_sandbox_class): - test_sandbox = correct_sandbox_class() - attributes = dir(test_sandbox) - - # Check that required attributes are present - assert "start_sandbox" in attributes - assert "stop_sandbox" in attributes - assert "_client" in attributes - assert "sandbox_env" in attributes - - # Check client is correctly initialized - assert test_sandbox._client == "foo" - - -def test_incorrect_sandbox_usage( - incorrect_client_num_sandbox_class, - missing_funcs_sandbox_class, -): - # Test multiple client methods - with pytest.raises( - RuntimeError, - match="Multiple methods are registered as _client. Only one is allowed.", - ): - incorrect_client_num_sandbox_class() - - # Test when no client is configured - with pytest.raises( - RuntimeError, - match="Client is not configured. Please check your class initialization.", - ): - incorrect_class = missing_funcs_sandbox_class() - incorrect_class.start_sandbox() - - # Test when decorator is applied to non-BaseUseCase class - with pytest.raises( - TypeError, - match="The 'sandbox' decorator can only be applied to subclasses of BaseUseCase, got testSandbox", - ): - - @sandbox("http://localhost:8000") - class testSandbox: - pass - - sandbox(testSandbox) - - -def test_start_sandbox(correct_sandbox_class): - """Test the start_sandbox function""" - test_sandbox = correct_sandbox_class() - - # Mock SandboxEnvironment to prevent actual execution - mock_env = MagicMock() - test_sandbox.sandbox_env = mock_env - - # Test with default parameters - test_sandbox.start_sandbox() - mock_env.start_sandbox.assert_called_once_with( - service_id=None, save_data=True, save_dir="./output/" - ) - - # Reset mock and test with custom parameters - mock_env.reset_mock() - service_id = "test-service" - save_dir = "./custom_dir/" - - test_sandbox.start_sandbox( - service_id=service_id, - save_data=False, - save_dir=save_dir, - ) - - mock_env.start_sandbox.assert_called_once_with( - service_id=service_id, - save_data=False, - save_dir=save_dir, - ) - - -def test_sandbox_environment_init(): - """Test SandboxEnvironment initialization""" - api = "http://localhost:8000" - path = "/test" - client = MagicMock() - use_case_type = UseCaseType.cds - config = {"test": "config"} - - env = SandboxEnvironment(api, path, client, use_case_type, config) - - assert env._client == client - assert env.type == use_case_type - assert str(env.api) == api - assert env.path == path - assert env.config == config - assert env.responses == [] - assert env.sandbox_id is None - - -def test_sandbox_environment_start_sandbox(): - """Test SandboxEnvironment.start_sandbox without patching""" - # Create mocks manually - test_uuid = "test-uuid" - test_responses = ["response1", "response2"] - - # Setup environment - client = MagicMock() - client.request_data = [MagicMock(), MagicMock()] - client.request_data[0].model_dump.return_value = {"request": "data1"} - client.request_data[1].model_dump.return_value = {"request": "data2"} - - # Create a customized SandboxEnvironment for testing - class TestSandboxEnvironment(SandboxEnvironment): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.test_uuid = test_uuid - self.test_responses = test_responses - - def start_sandbox( - self, - service_id=None, - save_data=True, - save_dir="./output/", - logging_config=None, - ): - self.sandbox_id = self.test_uuid - self.responses = self.test_responses - # We don't actually save data or make any real requests - return - - # Create our test environment - env = TestSandboxEnvironment( - "http://localhost:8000", "/test", client, UseCaseType.cds, {} - ) - - # Test start_sandbox - env.start_sandbox(service_id="test-service", save_data=True) - - # Verify results - assert env.sandbox_id == test_uuid - assert env.responses == test_responses diff --git a/tests/sandbox/test_utils.py b/tests/sandbox/test_utils.py new file mode 100644 index 00000000..dc097a0c --- /dev/null +++ b/tests/sandbox/test_utils.py @@ -0,0 +1,118 @@ +"""Tests for sandbox utility functions.""" + +import pytest +import json + +from unittest.mock import patch + +from healthchain.sandbox.utils import ( + generate_filename, + save_file, + ensure_directory_exists, + save_data_to_directory, +) + + +def test_generate_filename_format(): + """generate_filename creates properly formatted filenames with timestamp and identifiers.""" + with patch("healthchain.sandbox.utils.datetime") as mock_datetime: + mock_datetime.now.return_value.strftime.return_value = "2024-01-15_10:30:45" + + # Test JSON filename + filename = generate_filename("request", "abc123def456", 0, "json") + assert filename == "2024-01-15_10:30:45_sandbox_abc123de_request_0.json" + + # Test XML filename + filename = generate_filename("response", "xyz789abc123", 5, "xml") + assert filename == "2024-01-15_10:30:45_sandbox_xyz789ab_response_5.xml" + + +def test_save_file_handles_json_and_xml(tmp_path): + """save_file correctly saves JSON and XML data with proper formatting.""" + sandbox_id = "test123" + + # Test JSON save + json_data = {"key": "value", "nested": {"data": 123}} + save_file(json_data, "request", sandbox_id, 0, tmp_path, "json") + + json_files = list(tmp_path.glob("*_request_0.json")) + assert len(json_files) == 1 + with open(json_files[0]) as f: + loaded = json.load(f) + assert loaded == json_data + + # Test XML save + xml_data = "content" + save_file(xml_data, "response", sandbox_id, 1, tmp_path, "xml") + + xml_files = list(tmp_path.glob("*_response_1.xml")) + assert len(xml_files) == 1 + with open(xml_files[0]) as f: + assert f.read() == xml_data + + +def test_save_file_rejects_unsupported_extensions(tmp_path): + """save_file raises ValueError for unsupported file extensions.""" + with pytest.raises(ValueError, match="Unsupported extension: txt"): + save_file("data", "request", "test123", 0, tmp_path, "txt") + + +def test_ensure_directory_exists_creates_nested_paths(tmp_path): + """ensure_directory_exists creates nested directory structures.""" + nested_path = tmp_path / "level1" / "level2" / "level3" + + result = ensure_directory_exists(nested_path) + + assert result.exists() + assert result.is_dir() + assert result == nested_path + + +def test_ensure_directory_exists_idempotent(tmp_path): + """ensure_directory_exists safely handles already existing directories.""" + test_dir = tmp_path / "existing" + test_dir.mkdir() + + # Should not raise error + result = ensure_directory_exists(test_dir) + assert result.exists() + + +def test_save_data_to_directory_batch_processing(tmp_path): + """save_data_to_directory saves multiple data items with proper indexing.""" + data_list = [ + {"request": 1, "data": "first"}, + {"request": 2, "data": "second"}, + {"request": 3, "data": "third"}, + ] + + save_data_to_directory(data_list, "request", "test123", tmp_path, "json") + + # Verify all files created + json_files = sorted(tmp_path.glob("*_request_*.json")) + assert len(json_files) == 3 + + # Verify content + for idx, file_path in enumerate(json_files): + with open(file_path) as f: + loaded = json.load(f) + assert loaded["request"] == idx + 1 + + +def test_save_data_to_directory_handles_errors_gracefully(tmp_path, caplog): + """save_data_to_directory logs errors but continues processing remaining items.""" + # Mix valid and invalid data + data_list = [ + {"valid": "data1"}, + None, # Will cause error during JSON serialization + {"valid": "data2"}, + ] + + with patch("healthchain.sandbox.utils.save_file") as mock_save: + # Make second save fail + mock_save.side_effect = [None, Exception("Save failed"), None] + + save_data_to_directory(data_list, "request", "test123", tmp_path, "json") + + # Should attempt to save all three + assert mock_save.call_count == 3 diff --git a/tests/sandbox/test_workflows.py b/tests/sandbox/test_workflows.py new file mode 100644 index 00000000..0d216ccc --- /dev/null +++ b/tests/sandbox/test_workflows.py @@ -0,0 +1,72 @@ +"""Tests for sandbox workflow validation and mapping logic.""" + +import pytest + +from healthchain.sandbox.workflows import ( + Workflow, + UseCaseMapping, + is_valid_workflow, + validate_workflow, +) + + +def test_workflow_use_case_mapping_rules(): + """Workflow-UseCase mapping enforces correct associations.""" + # CDS workflows + assert is_valid_workflow( + UseCaseMapping.ClinicalDecisionSupport, Workflow.patient_view + ) + assert is_valid_workflow( + UseCaseMapping.ClinicalDecisionSupport, Workflow.order_select + ) + assert is_valid_workflow( + UseCaseMapping.ClinicalDecisionSupport, Workflow.encounter_discharge + ) + + # ClinDoc workflows + assert is_valid_workflow( + UseCaseMapping.ClinicalDocumentation, Workflow.sign_note_inpatient + ) + assert is_valid_workflow( + UseCaseMapping.ClinicalDocumentation, Workflow.sign_note_outpatient + ) + + # Invalid associations + assert not is_valid_workflow( + UseCaseMapping.ClinicalDocumentation, Workflow.patient_view + ) + assert not is_valid_workflow( + UseCaseMapping.ClinicalDecisionSupport, Workflow.sign_note_inpatient + ) + + +def test_validate_workflow_decorator_enforcement(): + """validate_workflow decorator rejects invalid workflow-usecase combinations.""" + + @validate_workflow(UseCaseMapping.ClinicalDecisionSupport) + def cds_function(data, workflow: Workflow): + return f"Processing {workflow.value}" + + # Valid workflow passes + result = cds_function("data", workflow=Workflow.patient_view) + assert result == "Processing patient-view" + + # Invalid workflow raises ValueError + with pytest.raises(ValueError, match="Invalid workflow .* for UseCase"): + cds_function("data", workflow=Workflow.sign_note_inpatient) + + +def test_validate_workflow_decorator_with_positional_args(): + """validate_workflow decorator handles positional workflow arguments.""" + + @validate_workflow(UseCaseMapping.ClinicalDocumentation) + def clindoc_function(data, workflow: Workflow): + return workflow.value + + # Positional argument + result = clindoc_function("data", Workflow.sign_note_outpatient) + assert result == "sign-note-outpatient" + + # Invalid positional argument + with pytest.raises(ValueError, match="Invalid workflow"): + clindoc_function("data", Workflow.order_select)