Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 17 additions & 40 deletions imednet/core/endpoint/mixins/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from imednet.constants import DEFAULT_PAGE_SIZE
from imednet.core.endpoint.abc import EndpointABC
from imednet.core.endpoint.operations import ListOperation
from imednet.core.endpoint.structs import ListRequestState
from imednet.core.paginator import AsyncPaginator, Paginator
from imednet.core.parsing import get_model_parser
Expand Down Expand Up @@ -65,28 +66,6 @@ def _process_list_result(
self._update_local_cache(result, study, has_filters, cache)
return result

async def _execute_async_list(
self,
paginator: AsyncPaginator,
parse_func: Callable[[Any], T],
study: Optional[str],
has_filters: bool,
cache: Any,
) -> List[T]:
result = [parse_func(item) async for item in paginator]
return self._process_list_result(result, study, has_filters, cache)

def _execute_sync_list(
self,
paginator: Paginator,
parse_func: Callable[[Any], T],
study: Optional[str],
has_filters: bool,
cache: Any,
) -> List[T]:
result = [parse_func(item) for item in paginator]
return self._process_list_result(result, study, has_filters, cache)

def _prepare_list_request(
self,
study_key: Optional[str],
Expand Down Expand Up @@ -143,17 +122,16 @@ def _list_sync(
if state.cached_result is not None:
return state.cached_result

paginator = paginator_cls(client, state.path, params=state.params, page_size=self.PAGE_SIZE)
parse_func = self._resolve_parse_func()

return self._execute_sync_list(
paginator,
parse_func,
state.study,
state.has_filters,
state.cache,
operation = ListOperation[T](
path=state.path,
params=state.params,
page_size=self.PAGE_SIZE,
parse_func=self._resolve_parse_func(),
)

result = operation.execute_sync(client, paginator_cls)
return self._process_list_result(result, state.study, state.has_filters, state.cache)

async def _list_async(
self,
client: AsyncRequestorProtocol,
Expand All @@ -169,17 +147,16 @@ async def _list_async(
if state.cached_result is not None:
return state.cached_result

paginator = paginator_cls(client, state.path, params=state.params, page_size=self.PAGE_SIZE)
parse_func = self._resolve_parse_func()

return await self._execute_async_list(
paginator,
parse_func,
state.study,
state.has_filters,
state.cache,
operation = ListOperation[T](
path=state.path,
params=state.params,
page_size=self.PAGE_SIZE,
parse_func=self._resolve_parse_func(),
)

result = await operation.execute_async(client, paginator_cls)
return self._process_list_result(result, state.study, state.has_filters, state.cache)

def list(
self,
study_key: Optional[str] = None,
Expand Down
15 changes: 14 additions & 1 deletion imednet/core/endpoint/mixins/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class ParamMixin:
_pop_study_filter: bool = False
_missing_study_exception: type[Exception] = ValueError

PARAM_PROCESSOR: Optional[ParamProcessor] = None
PARAM_PROCESSOR_CLS: type[ParamProcessor] = DefaultParamProcessor
STUDY_KEY_STRATEGY: Optional[StudyKeyStrategy] = None

Expand All @@ -44,6 +45,18 @@ def study_key_strategy(self) -> StudyKeyStrategy:
return KeepStudyKeyStrategy(exception_cls=self._missing_study_exception)
return OptionalStudyKeyStrategy()

@property
def param_processor(self) -> ParamProcessor:
"""
Get the configured parameter processor.

Returns:
The processor instance to use.
"""
if self.PARAM_PROCESSOR:
return self.PARAM_PROCESSOR
return self.PARAM_PROCESSOR_CLS()

def _resolve_params(
self,
study_key: Optional[str],
Expand All @@ -55,7 +68,7 @@ def _resolve_params(
filters = cast(EndpointProtocol, self)._auto_filter(filters)

# Use the configured parameter processor strategy
processor = self.PARAM_PROCESSOR_CLS()
processor = self.param_processor
filters, special_params = processor.process_filters(filters)

if special_params:
Expand Down
3 changes: 3 additions & 0 deletions imednet/core/endpoint/operations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .list import ListOperation

__all__ = ["ListOperation"]
91 changes: 91 additions & 0 deletions imednet/core/endpoint/operations/list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""
Operation for executing list requests.

This module encapsulates the logic for fetching and parsing a list of resources
from the API, handling pagination seamlessly.
"""

from __future__ import annotations

from typing import Any, Callable, Dict, Generic, List, TypeVar

from imednet.core.paginator import AsyncPaginator, Paginator
from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol

T = TypeVar("T")


class ListOperation(Generic[T]):
"""
Operation for executing list requests.

Encapsulates the logic for setting up a paginator, iterating through pages,
and parsing the results.
"""

def __init__(
self,
path: str,
params: Dict[str, Any],
page_size: int,
parse_func: Callable[[Any], T],
) -> None:
"""
Initialize the list operation.

Args:
path: The API endpoint path.
params: Query parameters for the request.
page_size: The number of items per page.
parse_func: A function to parse a raw JSON item into the model T.
"""
self.path = path
self.params = params
self.page_size = page_size
self.parse_func = parse_func

def execute_sync(
self,
client: RequestorProtocol,
paginator_cls: type[Paginator],
) -> List[T]:
"""
Execute synchronous list request.

Args:
client: The synchronous HTTP client.
paginator_cls: The paginator class to use.

Returns:
A list of parsed items.
"""
paginator = paginator_cls(
client,
self.path,
params=self.params,
page_size=self.page_size,
)
return [self.parse_func(item) for item in paginator]

async def execute_async(
self,
client: AsyncRequestorProtocol,
paginator_cls: type[AsyncPaginator],
) -> List[T]:
"""
Execute asynchronous list request.

Args:
client: The asynchronous HTTP client.
paginator_cls: The async paginator class to use.

Returns:
A list of parsed items.
"""
paginator = paginator_cls(
client,
self.path,
params=self.params,
page_size=self.page_size,
)
return [self.parse_func(item) async for item in paginator]
54 changes: 54 additions & 0 deletions imednet/core/endpoint/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,60 @@ def process_filters(self, filters: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict
return filters.copy(), {}


class MappingParamProcessor(ParamProcessor):
"""
ParamProcessor that maps specific filter keys to API parameters.

Extracts keys defined in the mapping and returns them as special parameters,
optionally renaming them according to the mapping values.
"""

def __init__(
self,
mapping: Dict[str, str],
defaults: Optional[Dict[str, Any]] = None,
) -> None:
"""
Initialize the processor with a mapping.

Args:
mapping: A dictionary where keys are the filter keys to look for,
and values are the API parameter names to map them to.
defaults: A dictionary of default values for keys not found in filters.
"""
self.mapping = mapping
self.defaults = defaults or {}

def process_filters(self, filters: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""
Process filters using the configured mapping.

Args:
filters: The input filters dictionary.

Returns:
A tuple of (cleaned filters, special parameters).
"""
filters = filters.copy()
special_params = {}

for filter_key, api_key in self.mapping.items():
value = None
if filter_key in filters:
value = filters.pop(filter_key)
elif filter_key in self.defaults:
value = self.defaults[filter_key]

if value is not None:
# Convert boolean to lowercase string if necessary, or just pass through
if isinstance(value, bool):
special_params[api_key] = str(value).lower()
else:
special_params[api_key] = value

return filters, special_params


@runtime_checkable
class StudyKeyStrategy(Protocol):
"""Protocol for study key handling strategies."""
Expand Down
27 changes: 3 additions & 24 deletions imednet/endpoints/records.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,16 @@
"""Endpoint for managing records (eCRF instances) in a study."""

from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Union

from imednet.constants import HEADER_EMAIL_NOTIFY
from imednet.core.endpoint.mixins import CreateEndpointMixin, EdcListGetEndpoint
from imednet.core.protocols import ParamProcessor
from imednet.core.endpoint.strategies import MappingParamProcessor
from imednet.models.jobs import Job
from imednet.models.records import Record
from imednet.utils.security import validate_header_value
from imednet.validation.cache import SchemaCache, validate_record_entry


class RecordsParamProcessor(ParamProcessor):
"""Parameter processor for Records endpoint."""

def process_filters(self, filters: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""
Extract 'record_data_filter' parameter.

Args:
filters: The filters dictionary.

Returns:
Tuple of (cleaned filters, special parameters).
"""
filters = filters.copy()
record_data_filter = filters.pop("record_data_filter", None)
special_params = {}
if record_data_filter:
special_params["recordDataFilter"] = record_data_filter
return filters, special_params


class RecordsEndpoint(EdcListGetEndpoint[Record], CreateEndpointMixin[Job]):
"""
API endpoint for interacting with records (eCRF instances) in an iMedNet study.
Expand All @@ -43,7 +22,7 @@ class RecordsEndpoint(EdcListGetEndpoint[Record], CreateEndpointMixin[Job]):
MODEL = Record
_id_param = "recordId"
_pop_study_filter = False
PARAM_PROCESSOR_CLS = RecordsParamProcessor
PARAM_PROCESSOR = MappingParamProcessor({"record_data_filter": "recordDataFilter"})

def _prepare_create_request(
self,
Expand Down
28 changes: 5 additions & 23 deletions imednet/endpoints/users.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,10 @@
"""Endpoint for managing users in a study."""

from typing import Any, Dict, Tuple

from imednet.core.endpoint.mixins import EdcListGetEndpoint
from imednet.core.protocols import ParamProcessor
from imednet.core.endpoint.strategies import MappingParamProcessor
from imednet.models.users import User


class UsersParamProcessor(ParamProcessor):
"""Parameter processor for Users endpoint."""

def process_filters(self, filters: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""
Extract 'include_inactive' parameter.

Args:
filters: The filters dictionary.

Returns:
Tuple of (cleaned filters, special parameters).
"""
filters = filters.copy()
include_inactive = filters.pop("include_inactive", False)
special_params = {"includeInactive": str(include_inactive).lower()}
return filters, special_params


class UsersEndpoint(EdcListGetEndpoint[User]):
"""
API endpoint for interacting with users in an iMedNet study.
Expand All @@ -37,4 +16,7 @@ class UsersEndpoint(EdcListGetEndpoint[User]):
MODEL = User
_id_param = "userId"
_pop_study_filter = True
PARAM_PROCESSOR_CLS = UsersParamProcessor
PARAM_PROCESSOR = MappingParamProcessor(
mapping={"include_inactive": "includeInactive"},
defaults={"include_inactive": False},
)