diff --git a/imednet/core/endpoint/mixins/list.py b/imednet/core/endpoint/mixins/list.py index f391a13e..8b964295 100644 --- a/imednet/core/endpoint/mixins/list.py +++ b/imednet/core/endpoint/mixins/list.py @@ -4,6 +4,7 @@ from imednet.constants import DEFAULT_PAGE_SIZE from imednet.core.endpoint.abc import EndpointABC +from imednet.core.endpoint.operations import ListOperation from imednet.core.endpoint.structs import ListRequestState from imednet.core.paginator import AsyncPaginator, Paginator from imednet.core.parsing import get_model_parser @@ -65,28 +66,6 @@ def _process_list_result( self._update_local_cache(result, study, has_filters, cache) return result - async def _execute_async_list( - self, - paginator: AsyncPaginator, - parse_func: Callable[[Any], T], - study: Optional[str], - has_filters: bool, - cache: Any, - ) -> List[T]: - result = [parse_func(item) async for item in paginator] - return self._process_list_result(result, study, has_filters, cache) - - def _execute_sync_list( - self, - paginator: Paginator, - parse_func: Callable[[Any], T], - study: Optional[str], - has_filters: bool, - cache: Any, - ) -> List[T]: - result = [parse_func(item) for item in paginator] - return self._process_list_result(result, study, has_filters, cache) - def _prepare_list_request( self, study_key: Optional[str], @@ -143,17 +122,16 @@ def _list_sync( if state.cached_result is not None: return state.cached_result - paginator = paginator_cls(client, state.path, params=state.params, page_size=self.PAGE_SIZE) - parse_func = self._resolve_parse_func() - - return self._execute_sync_list( - paginator, - parse_func, - state.study, - state.has_filters, - state.cache, + operation = ListOperation[T]( + path=state.path, + params=state.params, + page_size=self.PAGE_SIZE, + parse_func=self._resolve_parse_func(), ) + result = operation.execute_sync(client, paginator_cls) + return self._process_list_result(result, state.study, state.has_filters, state.cache) + async def _list_async( self, client: AsyncRequestorProtocol, @@ -169,17 +147,16 @@ async def _list_async( if state.cached_result is not None: return state.cached_result - paginator = paginator_cls(client, state.path, params=state.params, page_size=self.PAGE_SIZE) - parse_func = self._resolve_parse_func() - - return await self._execute_async_list( - paginator, - parse_func, - state.study, - state.has_filters, - state.cache, + operation = ListOperation[T]( + path=state.path, + params=state.params, + page_size=self.PAGE_SIZE, + parse_func=self._resolve_parse_func(), ) + result = await operation.execute_async(client, paginator_cls) + return self._process_list_result(result, state.study, state.has_filters, state.cache) + def list( self, study_key: Optional[str] = None, diff --git a/imednet/core/endpoint/mixins/params.py b/imednet/core/endpoint/mixins/params.py index 7ebe7182..d9778f42 100644 --- a/imednet/core/endpoint/mixins/params.py +++ b/imednet/core/endpoint/mixins/params.py @@ -23,6 +23,7 @@ class ParamMixin: _pop_study_filter: bool = False _missing_study_exception: type[Exception] = ValueError + PARAM_PROCESSOR: Optional[ParamProcessor] = None PARAM_PROCESSOR_CLS: type[ParamProcessor] = DefaultParamProcessor STUDY_KEY_STRATEGY: Optional[StudyKeyStrategy] = None @@ -44,6 +45,18 @@ def study_key_strategy(self) -> StudyKeyStrategy: return KeepStudyKeyStrategy(exception_cls=self._missing_study_exception) return OptionalStudyKeyStrategy() + @property + def param_processor(self) -> ParamProcessor: + """ + Get the configured parameter processor. + + Returns: + The processor instance to use. + """ + if self.PARAM_PROCESSOR: + return self.PARAM_PROCESSOR + return self.PARAM_PROCESSOR_CLS() + def _resolve_params( self, study_key: Optional[str], @@ -55,7 +68,7 @@ def _resolve_params( filters = cast(EndpointProtocol, self)._auto_filter(filters) # Use the configured parameter processor strategy - processor = self.PARAM_PROCESSOR_CLS() + processor = self.param_processor filters, special_params = processor.process_filters(filters) if special_params: diff --git a/imednet/core/endpoint/operations/__init__.py b/imednet/core/endpoint/operations/__init__.py new file mode 100644 index 00000000..6072c5c9 --- /dev/null +++ b/imednet/core/endpoint/operations/__init__.py @@ -0,0 +1,3 @@ +from .list import ListOperation + +__all__ = ["ListOperation"] diff --git a/imednet/core/endpoint/operations/list.py b/imednet/core/endpoint/operations/list.py new file mode 100644 index 00000000..52cfdfb7 --- /dev/null +++ b/imednet/core/endpoint/operations/list.py @@ -0,0 +1,91 @@ +""" +Operation for executing list requests. + +This module encapsulates the logic for fetching and parsing a list of resources +from the API, handling pagination seamlessly. +""" + +from __future__ import annotations + +from typing import Any, Callable, Dict, Generic, List, TypeVar + +from imednet.core.paginator import AsyncPaginator, Paginator +from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol + +T = TypeVar("T") + + +class ListOperation(Generic[T]): + """ + Operation for executing list requests. + + Encapsulates the logic for setting up a paginator, iterating through pages, + and parsing the results. + """ + + def __init__( + self, + path: str, + params: Dict[str, Any], + page_size: int, + parse_func: Callable[[Any], T], + ) -> None: + """ + Initialize the list operation. + + Args: + path: The API endpoint path. + params: Query parameters for the request. + page_size: The number of items per page. + parse_func: A function to parse a raw JSON item into the model T. + """ + self.path = path + self.params = params + self.page_size = page_size + self.parse_func = parse_func + + def execute_sync( + self, + client: RequestorProtocol, + paginator_cls: type[Paginator], + ) -> List[T]: + """ + Execute synchronous list request. + + Args: + client: The synchronous HTTP client. + paginator_cls: The paginator class to use. + + Returns: + A list of parsed items. + """ + paginator = paginator_cls( + client, + self.path, + params=self.params, + page_size=self.page_size, + ) + return [self.parse_func(item) for item in paginator] + + async def execute_async( + self, + client: AsyncRequestorProtocol, + paginator_cls: type[AsyncPaginator], + ) -> List[T]: + """ + Execute asynchronous list request. + + Args: + client: The asynchronous HTTP client. + paginator_cls: The async paginator class to use. + + Returns: + A list of parsed items. + """ + paginator = paginator_cls( + client, + self.path, + params=self.params, + page_size=self.page_size, + ) + return [self.parse_func(item) async for item in paginator] diff --git a/imednet/core/endpoint/strategies.py b/imednet/core/endpoint/strategies.py index afef88ac..3571a09d 100644 --- a/imednet/core/endpoint/strategies.py +++ b/imednet/core/endpoint/strategies.py @@ -30,6 +30,60 @@ def process_filters(self, filters: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict return filters.copy(), {} +class MappingParamProcessor(ParamProcessor): + """ + ParamProcessor that maps specific filter keys to API parameters. + + Extracts keys defined in the mapping and returns them as special parameters, + optionally renaming them according to the mapping values. + """ + + def __init__( + self, + mapping: Dict[str, str], + defaults: Optional[Dict[str, Any]] = None, + ) -> None: + """ + Initialize the processor with a mapping. + + Args: + mapping: A dictionary where keys are the filter keys to look for, + and values are the API parameter names to map them to. + defaults: A dictionary of default values for keys not found in filters. + """ + self.mapping = mapping + self.defaults = defaults or {} + + def process_filters(self, filters: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """ + Process filters using the configured mapping. + + Args: + filters: The input filters dictionary. + + Returns: + A tuple of (cleaned filters, special parameters). + """ + filters = filters.copy() + special_params = {} + + for filter_key, api_key in self.mapping.items(): + value = None + if filter_key in filters: + value = filters.pop(filter_key) + elif filter_key in self.defaults: + value = self.defaults[filter_key] + + if value is not None: + # Convert boolean to lowercase string if necessary, or just pass through + if isinstance(value, bool): + special_params[api_key] = str(value).lower() + else: + special_params[api_key] = value + + return filters, special_params + + @runtime_checkable class StudyKeyStrategy(Protocol): """Protocol for study key handling strategies.""" diff --git a/imednet/endpoints/records.py b/imednet/endpoints/records.py index 0e9e9a83..2c485f38 100644 --- a/imednet/endpoints/records.py +++ b/imednet/endpoints/records.py @@ -1,37 +1,16 @@ """Endpoint for managing records (eCRF instances) in a study.""" -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Union from imednet.constants import HEADER_EMAIL_NOTIFY from imednet.core.endpoint.mixins import CreateEndpointMixin, EdcListGetEndpoint -from imednet.core.protocols import ParamProcessor +from imednet.core.endpoint.strategies import MappingParamProcessor from imednet.models.jobs import Job from imednet.models.records import Record from imednet.utils.security import validate_header_value from imednet.validation.cache import SchemaCache, validate_record_entry -class RecordsParamProcessor(ParamProcessor): - """Parameter processor for Records endpoint.""" - - def process_filters(self, filters: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]: - """ - Extract 'record_data_filter' parameter. - - Args: - filters: The filters dictionary. - - Returns: - Tuple of (cleaned filters, special parameters). - """ - filters = filters.copy() - record_data_filter = filters.pop("record_data_filter", None) - special_params = {} - if record_data_filter: - special_params["recordDataFilter"] = record_data_filter - return filters, special_params - - class RecordsEndpoint(EdcListGetEndpoint[Record], CreateEndpointMixin[Job]): """ API endpoint for interacting with records (eCRF instances) in an iMedNet study. @@ -43,7 +22,7 @@ class RecordsEndpoint(EdcListGetEndpoint[Record], CreateEndpointMixin[Job]): MODEL = Record _id_param = "recordId" _pop_study_filter = False - PARAM_PROCESSOR_CLS = RecordsParamProcessor + PARAM_PROCESSOR = MappingParamProcessor({"record_data_filter": "recordDataFilter"}) def _prepare_create_request( self, diff --git a/imednet/endpoints/users.py b/imednet/endpoints/users.py index 2b4fe5b5..41bcc734 100644 --- a/imednet/endpoints/users.py +++ b/imednet/endpoints/users.py @@ -1,31 +1,10 @@ """Endpoint for managing users in a study.""" -from typing import Any, Dict, Tuple - from imednet.core.endpoint.mixins import EdcListGetEndpoint -from imednet.core.protocols import ParamProcessor +from imednet.core.endpoint.strategies import MappingParamProcessor from imednet.models.users import User -class UsersParamProcessor(ParamProcessor): - """Parameter processor for Users endpoint.""" - - def process_filters(self, filters: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]: - """ - Extract 'include_inactive' parameter. - - Args: - filters: The filters dictionary. - - Returns: - Tuple of (cleaned filters, special parameters). - """ - filters = filters.copy() - include_inactive = filters.pop("include_inactive", False) - special_params = {"includeInactive": str(include_inactive).lower()} - return filters, special_params - - class UsersEndpoint(EdcListGetEndpoint[User]): """ API endpoint for interacting with users in an iMedNet study. @@ -37,4 +16,7 @@ class UsersEndpoint(EdcListGetEndpoint[User]): MODEL = User _id_param = "userId" _pop_study_filter = True - PARAM_PROCESSOR_CLS = UsersParamProcessor + PARAM_PROCESSOR = MappingParamProcessor( + mapping={"include_inactive": "includeInactive"}, + defaults={"include_inactive": False}, + )