diff --git a/imednet/core/endpoint/mixins/__init__.py b/imednet/core/endpoint/mixins/__init__.py index 363de74d..776b1460 100644 --- a/imednet/core/endpoint/mixins/__init__.py +++ b/imednet/core/endpoint/mixins/__init__.py @@ -18,7 +18,6 @@ StrictListGetEndpoint, ) from .caching import CacheMixin -from .create import CreateEndpointMixin from .get import FilterGetEndpointMixin, PathGetEndpointMixin from .list import ListEndpointMixin from .params import ParamMixin @@ -27,7 +26,6 @@ __all__ = [ "AsyncPaginator", "CacheMixin", - "CreateEndpointMixin", "EdcListEndpoint", "EdcListGetEndpoint", "EdcListPathGetEndpoint", diff --git a/imednet/core/endpoint/mixins/create.py b/imednet/core/endpoint/mixins/create.py deleted file mode 100644 index 147459f0..00000000 --- a/imednet/core/endpoint/mixins/create.py +++ /dev/null @@ -1,87 +0,0 @@ -from __future__ import annotations - -from typing import Any, Callable, Dict, Generic, Optional, TypeVar, cast - -import httpx - -from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol - -T_RESP = TypeVar("T_RESP") - - -class CreateEndpointMixin(Generic[T_RESP]): - """Mixin implementing creation logic.""" - - def _prepare_kwargs( - self, - json: Any = None, - data: Any = None, - headers: Optional[Dict[str, str]] = None, - ) -> Dict[str, Any]: - """ - Prepare keyword arguments for the request. - - Filters out None values to preserve default behavior. - """ - kwargs: Dict[str, Any] = {} - if json is not None: - kwargs["json"] = json - if data is not None: - kwargs["data"] = data - if headers is not None: - kwargs["headers"] = headers - return kwargs - - def _process_response( - self, - response: httpx.Response, - parse_func: Optional[Callable[[Any], T_RESP]] = None, - ) -> T_RESP: - """ - Process the API response and parse the result. - - Args: - response: The HTTP response object. - parse_func: Optional function to parse the JSON payload. - - Returns: - The parsed response object. - """ - payload = response.json() - if parse_func: - return parse_func(payload) - return cast(T_RESP, payload) - - def _create_sync( - self, - client: RequestorProtocol, - path: str, - *, - json: Any = None, - data: Any = None, - headers: Optional[Dict[str, str]] = None, - parse_func: Optional[Callable[[Any], T_RESP]] = None, - ) -> T_RESP: - """ - Execute a synchronous creation request (POST). - """ - kwargs = self._prepare_kwargs(json=json, data=data, headers=headers) - response = client.post(path, **kwargs) - return self._process_response(response, parse_func) - - async def _create_async( - self, - client: AsyncRequestorProtocol, - path: str, - *, - json: Any = None, - data: Any = None, - headers: Optional[Dict[str, str]] = None, - parse_func: Optional[Callable[[Any], T_RESP]] = None, - ) -> T_RESP: - """ - Execute an asynchronous creation request (POST). - """ - kwargs = self._prepare_kwargs(json=json, data=data, headers=headers) - response = await client.post(path, **kwargs) - return self._process_response(response, parse_func) diff --git a/imednet/core/endpoint/operations/record_create.py b/imednet/core/endpoint/operations/record_create.py new file mode 100644 index 00000000..e449b1a4 --- /dev/null +++ b/imednet/core/endpoint/operations/record_create.py @@ -0,0 +1,115 @@ +""" +Operation for creating records. + +This module encapsulates the logic for validating and creating records, +including handling schema validation and header security. +""" + +from __future__ import annotations + +from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union + +from imednet.constants import HEADER_EMAIL_NOTIFY +from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol +from imednet.utils.security import validate_header_value +from imednet.validation.cache import SchemaCache, validate_record_entry + +T = TypeVar("T") + + +class RecordCreateOperation(Generic[T]): + """ + Operation for creating records. + + Encapsulates validation, header construction, and request execution. + """ + + def __init__( + self, + path: str, + records_data: List[Dict[str, Any]], + email_notify: Union[bool, str, None], + schema: Optional[SchemaCache], + parse_func: Callable[[Any], T], + ) -> None: + """ + Initialize the record creation operation. + + Args: + path: The API endpoint path. + records_data: List of record data objects to create. + email_notify: Whether to send email notifications or specific email. + schema: Optional schema cache for validation. + parse_func: Function to parse the response JSON. + + Raises: + ValidationError: If record data is invalid against the schema. + ValueError: If email headers contain invalid characters. + """ + self.path = path + self.records_data = records_data + self.email_notify = email_notify + self.schema = schema + self.parse_func = parse_func + + self._validate() + self.headers = self._build_headers() + + def _validate(self) -> None: + """Validate records against schema if provided.""" + if self.schema is not None: + for rec in self.records_data: + validate_record_entry(self.schema, rec) + + def _build_headers(self) -> Dict[str, str]: + """ + Build headers for record creation request. + + Returns: + Dictionary of headers. + + Raises: + ValueError: If email_notify contains newlines (header injection prevention). + """ + headers: Dict[str, str] = {} + if self.email_notify is not None: + if isinstance(self.email_notify, str): + validate_header_value(self.email_notify) + headers[HEADER_EMAIL_NOTIFY] = self.email_notify + else: + headers[HEADER_EMAIL_NOTIFY] = str(self.email_notify).lower() + return headers + + def execute_sync(self, client: RequestorProtocol) -> T: + """ + Execute synchronous creation request. + + Args: + client: The synchronous HTTP client. + + Returns: + The parsed response. + """ + response = client.post( + self.path, + json=self.records_data, + headers=self.headers, + ) + return self.parse_func(response.json()) + + async def execute_async(self, client: AsyncRequestorProtocol) -> T: + """ + Execute asynchronous creation request. + + Args: + client: The asynchronous HTTP client. + + Returns: + The parsed response. + """ + response = await client.post( + self.path, + json=self.records_data, + headers=self.headers, + ) + return self.parse_func(response.json()) diff --git a/imednet/endpoints/records.py b/imednet/endpoints/records.py index 2c485f38..e3af4fd2 100644 --- a/imednet/endpoints/records.py +++ b/imednet/endpoints/records.py @@ -2,16 +2,15 @@ from typing import Any, Dict, List, Optional, Union -from imednet.constants import HEADER_EMAIL_NOTIFY -from imednet.core.endpoint.mixins import CreateEndpointMixin, EdcListGetEndpoint +from imednet.core.endpoint.mixins import EdcListGetEndpoint +from imednet.core.endpoint.operations.record_create import RecordCreateOperation from imednet.core.endpoint.strategies import MappingParamProcessor from imednet.models.jobs import Job from imednet.models.records import Record -from imednet.utils.security import validate_header_value -from imednet.validation.cache import SchemaCache, validate_record_entry +from imednet.validation.cache import SchemaCache -class RecordsEndpoint(EdcListGetEndpoint[Record], CreateEndpointMixin[Job]): +class RecordsEndpoint(EdcListGetEndpoint[Record]): """ API endpoint for interacting with records (eCRF instances) in an iMedNet study. @@ -24,54 +23,6 @@ class RecordsEndpoint(EdcListGetEndpoint[Record], CreateEndpointMixin[Job]): _pop_study_filter = False PARAM_PROCESSOR = MappingParamProcessor({"record_data_filter": "recordDataFilter"}) - def _prepare_create_request( - self, - study_key: str, - records_data: List[Dict[str, Any]], - email_notify: Union[bool, str, None], - schema: Optional[SchemaCache], - ) -> tuple[str, Dict[str, str]]: - self._validate_records_if_schema_present(schema, records_data) - headers = self._build_headers(email_notify) - path = self._build_path(study_key, self.PATH) - return path, headers - - def _validate_records_if_schema_present( - self, schema: Optional[SchemaCache], records_data: List[Dict[str, Any]] - ) -> None: - """ - Validate records against schema if provided. - - Args: - schema: Optional schema cache for validation - records_data: List of record data to validate - """ - if schema is not None: - for rec in records_data: - validate_record_entry(schema, rec) - - def _build_headers(self, email_notify: Union[bool, str, None]) -> Dict[str, str]: - """ - Build headers for record creation request. - - Args: - email_notify: Email notification setting - - Returns: - Dictionary of headers - - Raises: - ValueError: If email_notify contains newlines (header injection prevention) - """ - headers = {} - if email_notify is not None: - if isinstance(email_notify, str): - validate_header_value(email_notify) - headers[HEADER_EMAIL_NOTIFY] = email_notify - else: - headers[HEADER_EMAIL_NOTIFY] = str(email_notify).lower() - return headers - def create( self, study_key: str, @@ -97,15 +48,16 @@ def create( Raises: ValueError: If email_notify contains invalid characters """ - path, headers = self._prepare_create_request(study_key, records_data, email_notify, schema) - client = self._require_sync_client() - return self._create_sync( - client, - path, - json=records_data, - headers=headers, + path = self._build_path(study_key, self.PATH) + operation = RecordCreateOperation[Job]( + path=path, + records_data=records_data, + email_notify=email_notify, + schema=schema, parse_func=Job.from_json, ) + client = self._require_sync_client() + return operation.execute_sync(client) async def async_create( self, @@ -134,12 +86,13 @@ async def async_create( Raises: ValueError: If email_notify contains invalid characters """ - path, headers = self._prepare_create_request(study_key, records_data, email_notify, schema) - client = self._require_async_client() - return await self._create_async( - client, - path, - json=records_data, - headers=headers, + path = self._build_path(study_key, self.PATH) + operation = RecordCreateOperation[Job]( + path=path, + records_data=records_data, + email_notify=email_notify, + schema=schema, parse_func=Job.from_json, ) + client = self._require_async_client() + return await operation.execute_async(client)