Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions imednet/core/endpoint/mixins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
StrictListGetEndpoint,
)
from .caching import CacheMixin
from .create import CreateEndpointMixin
from .get import FilterGetEndpointMixin, PathGetEndpointMixin
from .list import ListEndpointMixin
from .params import ParamMixin
Expand All @@ -27,7 +26,6 @@
__all__ = [
"AsyncPaginator",
"CacheMixin",
"CreateEndpointMixin",
"EdcListEndpoint",
"EdcListGetEndpoint",
"EdcListPathGetEndpoint",
Expand Down
87 changes: 0 additions & 87 deletions imednet/core/endpoint/mixins/create.py

This file was deleted.

115 changes: 115 additions & 0 deletions imednet/core/endpoint/operations/record_create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""
Operation for creating records.

This module encapsulates the logic for validating and creating records,
including handling schema validation and header security.
"""

from __future__ import annotations

from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union

from imednet.constants import HEADER_EMAIL_NOTIFY
from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol
from imednet.utils.security import validate_header_value
from imednet.validation.cache import SchemaCache, validate_record_entry

T = TypeVar("T")


class RecordCreateOperation(Generic[T]):
"""
Operation for creating records.

Encapsulates validation, header construction, and request execution.
"""

def __init__(
self,
path: str,
records_data: List[Dict[str, Any]],
email_notify: Union[bool, str, None],
schema: Optional[SchemaCache],
parse_func: Callable[[Any], T],
) -> None:
"""
Initialize the record creation operation.

Args:
path: The API endpoint path.
records_data: List of record data objects to create.
email_notify: Whether to send email notifications or specific email.
schema: Optional schema cache for validation.
parse_func: Function to parse the response JSON.

Raises:
ValidationError: If record data is invalid against the schema.
ValueError: If email headers contain invalid characters.
"""
self.path = path
self.records_data = records_data
self.email_notify = email_notify
self.schema = schema
self.parse_func = parse_func

self._validate()
self.headers = self._build_headers()

def _validate(self) -> None:
"""Validate records against schema if provided."""
if self.schema is not None:
for rec in self.records_data:
validate_record_entry(self.schema, rec)

def _build_headers(self) -> Dict[str, str]:
"""
Build headers for record creation request.

Returns:
Dictionary of headers.

Raises:
ValueError: If email_notify contains newlines (header injection prevention).
"""
headers: Dict[str, str] = {}
if self.email_notify is not None:
if isinstance(self.email_notify, str):
validate_header_value(self.email_notify)
headers[HEADER_EMAIL_NOTIFY] = self.email_notify
else:
headers[HEADER_EMAIL_NOTIFY] = str(self.email_notify).lower()
return headers

def execute_sync(self, client: RequestorProtocol) -> T:
"""
Execute synchronous creation request.

Args:
client: The synchronous HTTP client.

Returns:
The parsed response.
"""
response = client.post(
self.path,
json=self.records_data,
headers=self.headers,
)
return self.parse_func(response.json())

async def execute_async(self, client: AsyncRequestorProtocol) -> T:
"""
Execute asynchronous creation request.

Args:
client: The asynchronous HTTP client.

Returns:
The parsed response.
"""
response = await client.post(
self.path,
json=self.records_data,
headers=self.headers,
)
return self.parse_func(response.json())
87 changes: 20 additions & 67 deletions imednet/endpoints/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@

from typing import Any, Dict, List, Optional, Union

from imednet.constants import HEADER_EMAIL_NOTIFY
from imednet.core.endpoint.mixins import CreateEndpointMixin, EdcListGetEndpoint
from imednet.core.endpoint.mixins import EdcListGetEndpoint
from imednet.core.endpoint.operations.record_create import RecordCreateOperation
from imednet.core.endpoint.strategies import MappingParamProcessor
from imednet.models.jobs import Job
from imednet.models.records import Record
from imednet.utils.security import validate_header_value
from imednet.validation.cache import SchemaCache, validate_record_entry
from imednet.validation.cache import SchemaCache


class RecordsEndpoint(EdcListGetEndpoint[Record], CreateEndpointMixin[Job]):
class RecordsEndpoint(EdcListGetEndpoint[Record]):
"""
API endpoint for interacting with records (eCRF instances) in an iMedNet study.

Expand All @@ -24,54 +23,6 @@ class RecordsEndpoint(EdcListGetEndpoint[Record], CreateEndpointMixin[Job]):
_pop_study_filter = False
PARAM_PROCESSOR = MappingParamProcessor({"record_data_filter": "recordDataFilter"})

def _prepare_create_request(
self,
study_key: str,
records_data: List[Dict[str, Any]],
email_notify: Union[bool, str, None],
schema: Optional[SchemaCache],
) -> tuple[str, Dict[str, str]]:
self._validate_records_if_schema_present(schema, records_data)
headers = self._build_headers(email_notify)
path = self._build_path(study_key, self.PATH)
return path, headers

def _validate_records_if_schema_present(
self, schema: Optional[SchemaCache], records_data: List[Dict[str, Any]]
) -> None:
"""
Validate records against schema if provided.

Args:
schema: Optional schema cache for validation
records_data: List of record data to validate
"""
if schema is not None:
for rec in records_data:
validate_record_entry(schema, rec)

def _build_headers(self, email_notify: Union[bool, str, None]) -> Dict[str, str]:
"""
Build headers for record creation request.

Args:
email_notify: Email notification setting

Returns:
Dictionary of headers

Raises:
ValueError: If email_notify contains newlines (header injection prevention)
"""
headers = {}
if email_notify is not None:
if isinstance(email_notify, str):
validate_header_value(email_notify)
headers[HEADER_EMAIL_NOTIFY] = email_notify
else:
headers[HEADER_EMAIL_NOTIFY] = str(email_notify).lower()
return headers

def create(
self,
study_key: str,
Expand All @@ -97,15 +48,16 @@ def create(
Raises:
ValueError: If email_notify contains invalid characters
"""
path, headers = self._prepare_create_request(study_key, records_data, email_notify, schema)
client = self._require_sync_client()
return self._create_sync(
client,
path,
json=records_data,
headers=headers,
path = self._build_path(study_key, self.PATH)
operation = RecordCreateOperation[Job](
path=path,
records_data=records_data,
email_notify=email_notify,
schema=schema,
parse_func=Job.from_json,
)
client = self._require_sync_client()
return operation.execute_sync(client)

async def async_create(
self,
Expand Down Expand Up @@ -134,12 +86,13 @@ async def async_create(
Raises:
ValueError: If email_notify contains invalid characters
"""
path, headers = self._prepare_create_request(study_key, records_data, email_notify, schema)
client = self._require_async_client()
return await self._create_async(
client,
path,
json=records_data,
headers=headers,
path = self._build_path(study_key, self.PATH)
operation = RecordCreateOperation[Job](
path=path,
records_data=records_data,
email_notify=email_notify,
schema=schema,
parse_func=Job.from_json,
)
client = self._require_async_client()
return await operation.execute_async(client)