diff --git a/redcap/coroutine.py b/redcap/coroutine.py new file mode 100644 index 0000000..348ce52 --- /dev/null +++ b/redcap/coroutine.py @@ -0,0 +1,535 @@ +import asyncio +from collections import namedtuple +from optparse import Values +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, overload + +from typing_extensions import Literal, TypedDict +from xmlrpc.client import DateTime + +from aiohttp import ClientResponseError, ClientResponse, ClientSession + +from mergedeep import merge, Strategy + +from copy import copy + +from datetime import date, datetime + +from dict2xml import dict2xml + +from uuid import uuid4 + +import pandas as pd + +import itertools + +import asyncio + +import tqdm + +from io import StringIO + +from tqdm.contrib.logging import logging_redirect_tqdm + +import logging + +import json + +from request import FileUpload, Json, EmptyJson + +RedcapError = ClientResponseError + +__author__ = "Shane Buckley " + +class _RCCorutine: + + """ + Private class wrapping the request response for a single call to the + redcap api along with some metadata. Contains the responses along with their + headers and some metadata. + """ + + def __init__( + self, + url: str, + payload: Dict, + fmt: Optional[Literal["json", "csv", "xml"]], + verify_ssl: Union[bool, str], + def_field: str, + return_headers: bool, + file: Optional[FileUpload], + sleep_time: int, + chunks: int, + ): + """Constructor""" + self.creation_time = datetime.now() + self._id = self._get_id(__name__, date_time=self.creation_time) + self.logger = logging.getLogger(self._id) + self.content = dict() + self.header = dict() + self.payload = payload + self.url = url + self.fmt = fmt + self.verify_ssl = verify_ssl + self.def_field = def_field + self.return_headers = return_headers + self.file = file + self.response = dict() + self.errors = dict() + self.error_payloads = dict() + self.records = None + self.sleep_time = sleep_time + self.chunks = chunks + self.chunked = False + + def _get_id( + self, + name: str, + date_time: DateTime = None, + ) -> str: + """Method to get a unique id.""" + id = str(uuid4()) + if date_time == None: + dt = datetime.now() + else: + dt = date_time + dt = date_time + return "{id}_{dt}_{name}" + + # method to get the records for a project asynchronously + async def _get_records( + self, + ) -> List[str]: + # determine if the payload is exporting records + has_records = self.payload['content'] in [ + 'record', + 'file', + 'log', + 'participantList' + ] + records = [payload[key] for key in payload.keys if 'record' in key] + # if the payload has defined records + if len(records) > 0: + return records + # if the payload does not have defined records + elif has_records: + # generate the simple payload + payload = { + 'token': self.payload['token'], + 'content': 'record', + 'format': 'json', + 'type': 'flat', + 'csvDelimiter': '', + 'fields[0]': self.def_field, + 'rawOrLabel': 'raw', + 'rawOrLabelHeaders': 'raw', + 'exportCheckboxLabel': 'false', + 'exportSurveyFields': 'false', + 'exportDataAccessGroups': 'false', + 'returnFormat': 'json', + } + # run a basic request to fetch these ids + async with ClientSession() as session: + async with session.post(self.url, data=payload) as response: + resp_dict = await response.json() + #print(response) + # get the ids only from the response + records = set([x[self.def_field] for x in resp_dict]) + # return the resultant list + return records + else: + return [] + + @staticmethod + def _chunks( + self, + lst: List, + n: int + ) -> List: + """Yield successive n-sized chunks from lst.""" + for i in range(0, n): + yield lst[i::n] + + @staticmethod + def _merge_chunk( + self, + chunk_as_list_of_dicts: List[Dict[str, Any]] + ) -> Dict[str, Any]: + """Runs the deep merge over a chunk to create a single dictionary.""" + chunk_as_dict = dict() + merge(chunk_as_dict, *chunk_as_list_of_dicts, strategy=Strategy.TYPESAFE_ADDITIVE) + for key in chunk_as_dict.keys(): + if isinstance(chunk_as_dict[key], list): + chunk_as_dict[key] = list(set(chunk_as_dict[key])) + return chunk_as_dict + + @staticmethod + def _get_payloads( + self, + extended_by: str = 'record', + ) -> Dict[str, Json]: + """Gets all possible payloads and then chunks them""" + + # create a copy of the overall payload + pload = copy(self.payload) + # we want to handle the payload as json + pload['format'] = 'json' + pload['returnFormat'] = 'json' + # get the set of criteria to not extend by + try: + not_extended_by = set(pload.keys()) - set(extended_by) + # get the criteria not being extended by while removing them from the selection_criteria + not_extended_by = {key: pload.pop(key) for key in not_extended_by} + except: + not_extended_by = set() + # if not_extended_by is empty, then set it to None + if len(not_extended_by) == 0: + not_extended_by = None + # converts the dict into lists with tags identifying the criteria: from criteria: value to _value + criteria_list = [[key + '_' + item for item in pload[key]] for key in pload.keys()] + # gets all permutations to get all individual calls + extended_call_list_of_tuples = list(itertools.product(*criteria_list)) + # method to convert the resultant list of tuples into a list of dicts + def crit_tuple_to_dict(this_tuple, extend_to_dicts=None): + # get the list of key + keys = {x.split('_')[0] for x in this_tuple} + # initialize the dicts + this_dict = {this_key: [] for this_key in keys} + # fill the list of dicts + for item in this_tuple: + # get the key + key = item.split('_')[0] + # get the value + value = item.replace(key + '_', '', 1) + # add the value + this_dict[key].append(value) + # if there were fields the calls were not extended by + if extend_to_dicts != None: + this_dict.update(not_extended_by) + # return the list of dicts + return this_dict + # convert the list of lists back into a list of dicts + extended_call_list_of_dicts = [crit_tuple_to_dict(this_tuple=x, extend_to_dicts=not_extended_by) for x in extended_call_list_of_tuples] + # method to re-combine the max-width jobs split into n chunks + def condense_to_chunks(all_api_calls, num_chunks): + # chunk the api_calls list + chunked_calls_unmerged = list(self._chunks(lst=all_api_calls, n=num_chunks)) + # merge the chunks idividual calls + chunked_calls_merged = [self._merge_chunk(x) for x in chunked_calls_unmerged] + # return the api calls + return chunked_calls_merged + # chunk the calls + final_call_list = condense_to_chunks(all_api_calls=extended_call_list_of_dicts, num_chunks=self.chunks) + # drop any empty api_calls + final_call_list = [x for x in final_call_list if x != {}] + print(final_call_list) + print(len(final_call_list)) + # convert the list to a dictionaries, assigning ids to each payload + final_call_dict = {self._get_id("RCCoroutine"): x for x in final_call_list} + # return the list of api requests + return final_call_dict + + @overload + @staticmethod + async def _get_content( + self, + response: ClientResponse, + format_type: None, + return_empty_json: Literal[True], + return_bytes: Literal[False], + ) -> EmptyJson: + ... + + @overload + @staticmethod + async def _get_content( + self, + response: ClientResponse, + format_type: None, + return_empty_json: Literal[False], + return_bytes: Literal[True], + ) -> bytes: + ... + + @overload + @staticmethod + async def _get_content( + self, + response: ClientResponse, + format_type: Literal["json"], + return_empty_json: Literal[False], + return_bytes: Literal[False], + ) -> Union[Json, Dict[str, str]]: + """This should return json, but might also return an error dict""" + ... + + @overload + @staticmethod + async def _get_content( + self, + response: ClientResponse, + format_type: Literal["csv", "xml"], + return_empty_json: Literal[False], + return_bytes: Literal[False], + ) -> str: + ... + + @staticmethod + async def _get_content( + self, + response: ClientResponse, + format_type: Optional[Literal["json", "csv", "xml"]], + return_empty_json: bool, + return_bytes: bool, + ): + """Abstraction for grabbing content from a returned response""" + # extract the content from the payloads + streamed_content = await self.response.json() + # get if any payload had errors + self.has_errors() + # check if any payloads had errors + if len(self.errors.keys()) > 0: + # build the error string + error_str = "Request {self._id} had did not complete.\n" + for e in self.errors.keys(): + error_str + "Coroutine {e} from request {self._id} had the following errors:\n" + error_str + "\nFrom using the payload:" + error_str + self.error_payloads[e] + # log the error + self.logger.error(error_str) + # raise an exception, use the content of the first response with an error + raise RedcapError(self.content.values()[0], message=error_str) + + # if the payloads were extended + if self.chunked: + # recombine the payloads + content = self._recombine_content() + # otherwise, unlist the single content item and do original PyCap check + else: + response = self.response[0] + # return them to their correct return type + + if return_bytes: + return response.content + + if return_empty_json: + return [{}] + + if format_type == "json": + return response.json() + + # don't do anything to csv/xml strings + return response.text + + @staticmethod + def _has_errors( + self, + content: Dict[str, ClientResponse] + ): + """Determines if any requests had errors.""" + if len(content.keys()) > 1: + def has_error(content): + bad_request = False + try: + bad_request = "error" in content.text + bad_request |= 200 == content.status + except AttributeError: + # we're not dealing with an error dict + bad_request = True + return bad_request + # add a loop here to run the above method and set self.errors and self.error_payloads + #for response in self. + # original method + else: + if self.fmt == "json": + try: + bad_request = "error" in content.keys() + except AttributeError: + # we're not dealing with an error dict + bad_request = False + elif self.fmt == "csv": + bad_request = content.lower().startswith("error:") + # xml is the default returnFormat for error messages + elif self.fmt == "xml" or self.fmt is None: + bad_request = "" in str(content).lower() + + if bad_request: + raise RedcapError(content) + + @staticmethod + async def run( + self, + ) -> Union[Tuple[Dict[str, Any]], Dict[str, Any]]: + + # get if the request has records + self.records = self._get_records() + # if the payloads need extended (chunk > 1 and has records) + if ( (len(self.records) > 0) and (self.chunks > 0) ): + # extend by records to get a list of payloads + payloads = self._get_payloads() + self.chunked = True + # otherwise, add to a dictionary + else: + payloads = {self._get_id("RCCoroutine"), self.payload} + # execute the payload(s) + await self._run_request(payloads) #TODO: add method + + # get the content from the responses + await self._get_content() + + # check for errors + self._has_errors(self.content) + + # run the response through _return_data to clean it up + # let self.content be the dictionary of content and the result of this be the final content + content = await self._return_data() + + if self.return_headers: + return content, self.headers + else: + return content + + # pylint: disable=import-outside-toplevel + @staticmethod + def _read_csv(buf: StringIO, **df_kwargs) -> "pd.DataFrame": + """Wrapper around pandas read_csv that handles EmptyDataError""" + import pandas as pd + from pandas.errors import EmptyDataError + + try: + dataframe = pd.read_csv(buf, **df_kwargs) + except EmptyDataError: + dataframe = pd.DataFrame() + + return dataframe + + @overload + async def _return_data( + self, + response: Json, + content: Literal[ + "exportFieldNames", + "formEventMapping", + "metadata", + "participantList", + "project", + "record", + "report", + "user", + ], + format_type: Literal["json"], + df_kwargs: None, + record_type: Literal["flat", "eav"] = "flat", + chunked: bool = False, + ) -> Json: + ... + + @overload + async def _return_data( + self, + response: str, + content: Literal[ + "exportFieldNames", + "formEventMapping", + "metadata", + "participantList", + "project", + "record", + "report", + "user", + ], + format_type: Literal["csv", "xml"], + df_kwargs: None, + record_type: Literal["flat", "eav"] = "flat", + chunked: bool = False, + ) -> str: + ... + + @overload + async def _return_data( + self, + response: str, + content: Literal[ + "exportFieldNames", + "formEventMapping", + "metadata", + "participantList", + "project", + "record", + "report", + "user", + ], + format_type: Literal["df"], + df_kwargs: Optional[Dict[str, Any]], + record_type: Literal["flat", "eav"] = "flat", + chunked: bool = False, + ) -> "pd.DataFrame": + ... + + async def _return_data( + self, + response: Union[Json, str], + content: Literal[ + "exportFieldNames", + "formEventMapping", + "metadata", + "participantList", + "project", + "record", + "report", + "user", + ], + format_type: Literal["json", "csv", "xml", "df"], + df_kwargs: Optional[Dict[str, Any]] = None, + record_type: Literal["flat", "eav"] = "flat", + ): + """Handle returning data for export methods + + This mostly just stores the logic for the default + `df_kwargs` value for export methods, when returning + a dataframe. + + Args: + response: Output from _call_api + content: + The 'content' parameter for the API call. + Same one used in _initialize_payload + format_type: + The format of the response. + Same one used in _initialize_payload + df_kwargs: + Passed to `pandas.read_csv` to control construction of + returned DataFrame. Different defaults exist for + different content + record_type: + Database output structure type. + Used only for records content + """ + + # original method + if self.records != []: + if format_type != "df": + return response + + if not df_kwargs: + if ( + content in ["formEventMapping", "participantList", "project", "user"] + or record_type == "eav" + ): + df_kwargs = {} + elif content == "exportFieldNames": + df_kwargs = {"index_col": "original_field_name"} + elif content == "metadata": + df_kwargs = {"index_col": "field_name"} + elif content in ["report", "record"]: + if self.is_longitudinal: + df_kwargs = {"index_col": [self.def_field, "redcap_event_name"]} + else: + df_kwargs = {"index_col": self.def_field} + + buf = StringIO(response) # make this async + dataframe = self._read_csv(buf, **df_kwargs) + buf.close() + + return dataframe + # recombine the response if it was chunked + #else: diff --git a/redcap/methods/base.py b/redcap/methods/base.py index ea9ad18..dda5262 100644 --- a/redcap/methods/base.py +++ b/redcap/methods/base.py @@ -5,6 +5,7 @@ from typing import ( Any, + Callable, Dict, List, Optional, @@ -127,20 +128,6 @@ def _validate_url_and_token(url: str, token: str) -> None: f"{ expected_token_len } characters long", ) - # pylint: disable=import-outside-toplevel - @staticmethod - def _read_csv(buf: StringIO, **df_kwargs) -> "pd.DataFrame": - """Wrapper around pandas read_csv that handles EmptyDataError""" - import pandas as pd - from pandas.errors import EmptyDataError - - try: - dataframe = pd.read_csv(buf, **df_kwargs) - except EmptyDataError: - dataframe = pd.DataFrame() - - return dataframe - # pylint: enable=import-outside-toplevel @staticmethod def _lookup_return_type( @@ -323,200 +310,108 @@ def _initialize_import_payload( payload["format"] = import_format return payload - @overload - def _return_data( - self, - response: Json, - content: Literal[ - "exportFieldNames", - "formEventMapping", - "metadata", - "participantList", - "project", - "record", - "report", - "user", - ], - format_type: Literal["json"], - df_kwargs: None, - record_type: Literal["flat", "eav"] = "flat", - ) -> Json: - ... - - @overload - def _return_data( - self, - response: str, - content: Literal[ - "exportFieldNames", - "formEventMapping", - "metadata", - "participantList", - "project", - "record", - "report", - "user", - ], - format_type: Literal["csv", "xml"], - df_kwargs: None, - record_type: Literal["flat", "eav"] = "flat", - ) -> str: - ... - - @overload - def _return_data( - self, - response: str, - content: Literal[ - "exportFieldNames", - "formEventMapping", - "metadata", - "participantList", - "project", - "record", - "report", - "user", - ], - format_type: Literal["df"], - df_kwargs: Optional[Dict[str, Any]], - record_type: Literal["flat", "eav"] = "flat", - ) -> "pd.DataFrame": - ... - - def _return_data( - self, - response: Union[Json, str], - content: Literal[ - "exportFieldNames", - "formEventMapping", - "metadata", - "participantList", - "project", - "record", - "report", - "user", - ], - format_type: Literal["json", "csv", "xml", "df"], - df_kwargs: Optional[Dict[str, Any]] = None, - record_type: Literal["flat", "eav"] = "flat", - ): - """Handle returning data for export methods - - This mostly just stores the logic for the default - `df_kwargs` value for export methods, when returning - a dataframe. - - Args: - response: Output from _call_api - content: - The 'content' parameter for the API call. - Same one used in _initialize_payload - format_type: - The format of the response. - Same one used in _initialize_payload - df_kwargs: - Passed to `pandas.read_csv` to control construction of - returned DataFrame. Different defaults exist for - different content - record_type: - Database output structure type. - Used only for records content - """ - if format_type != "df": - return response - - if not df_kwargs: - if ( - content in ["formEventMapping", "participantList", "project", "user"] - or record_type == "eav" - ): - df_kwargs = {} - elif content == "exportFieldNames": - df_kwargs = {"index_col": "original_field_name"} - elif content == "metadata": - df_kwargs = {"index_col": "field_name"} - elif content in ["report", "record"]: - if self.is_longitudinal: - df_kwargs = {"index_col": [self.def_field, "redcap_event_name"]} - else: - df_kwargs = {"index_col": self.def_field} - - buf = StringIO(response) - dataframe = self._read_csv(buf, **df_kwargs) - buf.close() - - return dataframe - @overload def _call_api( self, - payload: Dict[str, Any], + payload: Callable, return_type: Literal["file_map"], file: None, + coroutine: bool, + sleep_time: int, + chunks: int, + **kwargs: Dict, ) -> FileMap: ... @overload def _call_api( self, - payload: Dict[str, Any], + payload: Callable, return_type: Literal["json"], + coroutine: bool, + sleep_time: int, + chunks: int, file: None = None, + **kwargs: Dict, ) -> Json: ... @overload def _call_api( self, - payload: Dict[str, Any], + payload: Callable, return_type: Literal["empty_json"], file: FileUpload, + coroutine: bool, + sleep_time: int, + chunks: int, + **kwargs: Dict, ) -> EmptyJson: ... @overload def _call_api( self, - payload: Dict[str, Any], + payload: Callable, return_type: Literal["count_dict"], + coroutine: bool, + sleep_time: int, + chunks: int, file: None = None, + **kwargs: Dict, ) -> Dict[str, int]: ... @overload def _call_api( self, - payload: Dict[str, Any], + payload: Callable, return_type: Literal["ids_list"], + coroutine: bool, + sleep_time: int, + chunks: int, file: None = None, + **kwargs: Dict, ) -> List[str]: ... @overload def _call_api( self, - payload: Dict[str, Any], + payload: Callable, return_type: Literal["int"], + coroutine: bool, + sleep_time: int, + chunks: int, file: None = None, + **kwargs: Dict, ) -> int: ... @overload def _call_api( self, - payload: Dict[str, Any], + payload: Callable, return_type: Literal["str"], + coroutine: bool, + sleep_time: int, + chunks: int, file: None = None, + **kwargs: Dict, ) -> str: ... def _call_api( self, - payload: Dict[str, Any], + payload: Callable, return_type: Literal[ "file_map", "json", "empty_json", "count_dict", "ids_list", "str", "int" ], + coroutine: bool, + sleep_time: int, + chunks: int, file: Optional[FileUpload] = None, + **kwargs: Dict, ) -> Union[FileMap, Json, Dict[str, int], List[dict], List[str], int, str]: """Make a POST Requst to the REDCap API @@ -538,5 +433,10 @@ def _call_api( rcr = _RCRequest(url=self.url, payload=payload, config=config) return rcr.execute( - verify_ssl=self.verify_ssl, return_headers=return_headers, file=file + verify_ssl=self.verify_ssl, + return_headers=return_headers, + file=file, + coroutine=coroutine, + sleep_time=sleep_time, + chunks=chunks, ) diff --git a/redcap/methods/field_names.py b/redcap/methods/field_names.py index ad93ab0..fa20f05 100644 --- a/redcap/methods/field_names.py +++ b/redcap/methods/field_names.py @@ -1,5 +1,5 @@ """REDCap API methods for Project field names""" -from typing import TYPE_CHECKING, Any, Dict, Optional, overload +from typing import TYPE_CHECKING, Any, Coroutine, Dict, Optional, overload from typing_extensions import Literal @@ -18,6 +18,7 @@ def export_field_names( format_type: Literal["json"], field: Optional[str], df_kwargs: Optional[Dict[str, Any]] = None, + return_coroutine: bool = False, ) -> Json: ... @@ -27,6 +28,7 @@ def export_field_names( format_type: Literal["csv", "xml"], field: Optional[str], df_kwargs: Optional[Dict[str, Any]] = None, + return_coroutine: bool = False, ) -> str: ... @@ -36,14 +38,26 @@ def export_field_names( format_type: Literal["df"], field: Optional[str], df_kwargs: Optional[Dict[str, Any]] = None, + return_coroutine: bool = False, ) -> "pd.DataFrame": ... + @overload def export_field_names( self, format_type: Literal["json", "csv", "xml", "df"] = "json", field: Optional[str] = None, df_kwargs: Optional[Dict[str, Any]] = None, + return_coroutine: bool = False, + ) -> Coroutine: + ... + + def export_field_names( + self, + format_type: Literal["json", "csv", "xml", "df"] = "json", + field: Optional[str] = None, + df_kwargs: Optional[Dict[str, Any]] = None, + return_coroutine: bool = False, ): # pylint: disable=line-too-long """ @@ -73,15 +87,20 @@ def export_field_names( {'original_field_name': 'form_1_complete', 'choice_value': '', 'export_field_name': 'form_1_complete'}] """ # pylint: enable=line-too-long - payload = self._initialize_payload( - content="exportFieldNames", format_type=format_type - ) - - if field: - payload["field"] = field + def _build_payload(**kwargs): + payload = self._initialize_payload( + content="exportFieldNames", format_type=kwargs["format_type"] + ) + if kwargs["field"]: + payload["field"] = kwargs["field"] return_type = self._lookup_return_type(format_type, request_type="export") - response = self._call_api(payload, return_type) + response = self._call_api( + payload=_build_payload, + return_type=return_type, + return_coroutine=return_coroutine, + kwargs=locals(), + ) return self._return_data( response=response, diff --git a/redcap/methods/files.py b/redcap/methods/files.py index 581a129..8abd2c5 100644 --- a/redcap/methods/files.py +++ b/redcap/methods/files.py @@ -29,6 +29,7 @@ def export_file( field: str, event: Optional[str] = None, repeat_instance: Optional[int] = None, + return_coroutine: bool = False, ) -> FileMap: """ Export the contents of a file stored for a particular record @@ -61,19 +62,26 @@ def export_file( """ self._check_file_field(field) # load up payload - payload = self._initialize_payload(content="file") - # there's no format field in this call - payload["action"] = "export" - payload["field"] = field - payload["record"] = record - if event: - payload["event"] = event - if repeat_instance: - payload["repeat_instance"] = str(repeat_instance) + def _build_payload(**kwargs): + payload = self._initialize_payload(content="file") + # there's no format field in this call + payload["action"] = "export" + payload["field"] = field + payload["record"] = record + if event: + payload["event"] = event + if repeat_instance: + payload["repeat_instance"] = str(repeat_instance) + return payload # This might just be due to some typing issues, maybe we can come back and # remove this disable eventually. # pylint: disable=unpacking-non-sequence - content, headers = self._call_api(payload=payload, return_type="file_map") + content, headers = self._call_api( + payload=_build_payload, + return_type="file_map", + return_coroutine=return_coroutine, + kwargs=locals(), + ) # pylint: enable=unpacking-non-sequence # REDCap adds some useful things in content-type content_map = {} @@ -98,6 +106,7 @@ def import_file( file_object: "TextIOWrapper", event: Optional[str] = None, repeat_instance: Optional[Union[int, str]] = None, + return_coroutine: bool = False, ) -> EmptyJson: """ Import the contents of a file represented by file_object to a @@ -138,18 +147,25 @@ def import_file( """ self._check_file_field(field) # load up payload - payload = self._initialize_payload(content="file") - payload["action"] = "import" - payload["field"] = field - payload["record"] = record - if event: - payload["event"] = event - if repeat_instance: - payload["repeat_instance"] = repeat_instance + def _build_payload(**kwargs): + payload = self._initialize_payload(content="file") + payload["action"] = "import" + payload["field"] = kwargs["field"] + payload["record"] = kwargs["record"] + if event: + payload["event"] = kwargs["event"] + if repeat_instance: + payload["repeat_instance"] = kwargs["repeat_instance"] + return payload + file_upload_dict = {"file": (file_name, file_object)} return self._call_api( - payload=payload, return_type="empty_json", file=file_upload_dict + payload=_build_payload, + return_type="empty_json", + file=file_upload_dict, + return_coroutine=return_coroutine, + kwargs=locals() ) def delete_file( @@ -157,6 +173,7 @@ def delete_file( record: str, field: str, event: Optional[str] = None, + return_coroutine: bool = False, ) -> EmptyJson: """ Delete a file from REDCap @@ -194,11 +211,18 @@ def delete_file( """ self._check_file_field(field) # Load up payload - payload = self._initialize_payload(content="file") - payload["action"] = "delete" - payload["record"] = record - payload["field"] = field - if event: - payload["event"] = event - - return self._call_api(payload=payload, return_type="empty_json") + def _build_payload(**kwargs): + payload = self._initialize_payload(content="file") + payload["action"] = "delete" + payload["record"] = record + payload["field"] = field + if event: + payload["event"] = event + return payload + + return self._call_api( + payload=_build_payload, + return_type="empty_json", + return_coroutine=return_coroutine, + kwargs=locals() + ) diff --git a/redcap/request.py b/redcap/request.py index 18c6cbf..a63c069 100644 --- a/redcap/request.py +++ b/redcap/request.py @@ -2,12 +2,13 @@ # -*- coding: utf-8 -*- """Low-level HTTP functionality""" +import asyncio from collections import namedtuple -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, overload +from typing import TYPE_CHECKING, Any, Coroutine, Dict, List, Optional, Tuple, Union, overload from typing_extensions import Literal, TypedDict -from requests import RequestException, Response, Session +from coroutine import _RCCorutine if TYPE_CHECKING: from io import TextIOWrapper @@ -19,11 +20,6 @@ __license__ = "MIT" __copyright__ = "2014, Vanderbilt University" -RedcapError = RequestException - -_session = Session() - - class FileUpload(TypedDict): """Typing for the file upload API""" @@ -32,7 +28,6 @@ class FileUpload(TypedDict): _ContentConfig = namedtuple("_ContentConfig", ["return_empty_json", "return_bytes"]) - class _RCRequest: """ Private class wrapping the REDCap API. Decodes response from redcap @@ -44,7 +39,7 @@ def __init__( url: str, payload: Dict[str, Any], config: _ContentConfig, - session=_session, + def_field: str, ): """Constructor @@ -56,8 +51,8 @@ def __init__( self.url = url self.payload = payload self.config = config - self.session = session self.fmt = self._get_format_key(payload) + self.def_field = def_field @staticmethod def _get_format_key( @@ -85,73 +80,15 @@ def _get_format_key( return payload[fmt_key] - @overload - @staticmethod - def get_content( - response: Response, - format_type: None, - return_empty_json: Literal[True], - return_bytes: Literal[False], - ) -> EmptyJson: - ... - - @overload - @staticmethod - def get_content( - response: Response, - format_type: None, - return_empty_json: Literal[False], - return_bytes: Literal[True], - ) -> bytes: - ... - - @overload - @staticmethod - def get_content( - response: Response, - format_type: Literal["json"], - return_empty_json: Literal[False], - return_bytes: Literal[False], - ) -> Union[Json, Dict[str, str]]: - """This should return json, but might also return an error dict""" - ... - - @overload - @staticmethod - def get_content( - response: Response, - format_type: Literal["csv", "xml"], - return_empty_json: Literal[False], - return_bytes: Literal[False], - ) -> str: - ... - - @staticmethod - def get_content( - response: Response, - format_type: Optional[Literal["json", "csv", "xml"]], - return_empty_json: bool, - return_bytes: bool, - ): - """Abstraction for grabbing content from a returned response""" - if return_bytes: - return response.content - - if return_empty_json: - return [{}] - - if format_type == "json": - return response.json() - - # don't do anything to csv/xml strings - return response.text - @overload def execute( self, verify_ssl: Union[bool, str], return_headers: Literal[True], file: Optional[FileUpload], + coroutine: bool, + sleep_time: int, + chunks: int, ) -> Tuple[Union[Json, str, bytes], dict]: ... @@ -161,14 +98,32 @@ def execute( verify_ssl: Union[bool, str], return_headers: Literal[False], file: Optional[FileUpload], + coroutine: bool, + sleep_time: int, + chunks: int, ) -> Union[List[Dict[str, Any]], str, bytes]: ... + @overload + def execute( + self, + verify_ssl: Union[bool, str], + return_headers: Literal[False], + file: Optional[FileUpload], + coroutine: bool, + sleep_time: int, + chunks: int, + ) -> Coroutine: + ... + def execute( self, verify_ssl: Union[bool, str], return_headers: bool, file: Optional[FileUpload], + coroutine: bool, + sleep_time: int, + chunks: int, ): """Execute the API request and return data @@ -188,33 +143,18 @@ def execute( Badly formed request i.e record doesn't exist, field doesn't exist, etc. """ - response = self.session.post( - self.url, data=self.payload, verify=verify_ssl, files=file - ) - content = self.get_content( - response, - format_type=self.fmt, - return_empty_json=self.config.return_empty_json, - return_bytes=self.config.return_bytes, + request_coroutine = _RCCorutine(self.url, + self.payload, + self.fmt, verify_ssl, + self.def_field, + return_headers, + file, + sleep_time, + chunks ) - if self.fmt == "json": - try: - bad_request = "error" in content.keys() - except AttributeError: - # we're not dealing with an error dict - bad_request = False - elif self.fmt == "csv": - bad_request = content.lower().startswith("error:") - # xml is the default returnFormat for error messages - elif self.fmt == "xml" or self.fmt is None: - bad_request = "" in str(content).lower() - - if bad_request: - raise RedcapError(content) - - if return_headers: - return content, response.headers - - return content + if coroutine: + return request_coroutine.run() + else: + return asyncio.run(request_coroutine.run())