Skip to content

Commit 44e76ab

Browse files
committed
2 parents 1749e83 + 780bead commit 44e76ab

File tree

7 files changed

+296
-65
lines changed

7 files changed

+296
-65
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

77
## [Unreleased]
8+
- Added functionality to make retries for the failed HTTP requests.
89

910
### Added
1011
- Optional merge_events parameter to @stream and @scheduled decorators.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
2+
3+
4+
@scheduled
5+
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
6+
api.max_retries = 5 # Enabling up to 5 retries when HTTP error happens.
7+
...

docs/modules/ROOT/pages/index.adoc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,23 @@ include::example$api/tutorial007.py[]
269269
<.> You can enable this flag
270270
to save and <<produce_messages,`produce`>> the data at once.
271271

272+
[#enabling_retries]
273+
=== Enabling re-tries
274+
275+
When request fails due to HTTP error, re-trying functionality can be used.
276+
It will try to re-send the same request again with exponential back-off for HTTP status codes below:
277+
278+
* 428
279+
* 500
280+
* 502
281+
* 503
282+
* 504
283+
284+
[source,python]
285+
----
286+
include::example$api/tutorial008.py[]
287+
----
288+
272289
[#cache]
273290
== Cache
274291
Apps might need to share some data between invokes.

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
"redis >=3.5.3, <4.0.0",
4545
"requests >=2.25.0, <3.0.0",
4646
"urllib3 <2", # lambda doesnt support version 2 yet
47+
"tenacity >=8.2.3, <9.0.0",
4748
],
4849
python_requires='>=3.8, <4.0',
4950
license='The Unlicense',

src/corva/api.py

Lines changed: 114 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
import json
22
import posixpath
33
import re
4+
from http import HTTPStatus
45
from typing import List, Optional, Sequence, Union
56

67
import requests
8+
from tenacity import (
9+
RetryError,
10+
retry,
11+
retry_if_result,
12+
stop_after_attempt,
13+
wait_random_exponential,
14+
)
715

816

917
class Api:
@@ -14,6 +22,7 @@ class Api:
1422
"""
1523

1624
TIMEOUT_LIMITS = (3, 30) # seconds
25+
DEFAULT_MAX_RETRIES = int(0)
1726

1827
def __init__(
1928
self,
@@ -31,28 +40,39 @@ def __init__(
3140
self.app_key = app_key
3241
self.app_connection_id = app_connection_id
3342
self.timeout = timeout or self.TIMEOUT_LIMITS[1]
43+
self._max_retries = self.DEFAULT_MAX_RETRIES
3444

3545
@property
3646
def default_headers(self):
3747
return {
38-
'Authorization': f'API {self.api_key}',
39-
'X-Corva-App': self.app_key,
48+
"Authorization": f"API {self.api_key}",
49+
"X-Corva-App": self.app_key,
4050
}
4151

52+
@property
53+
def max_retries(self) -> int:
54+
return self._max_retries
55+
56+
@max_retries.setter
57+
def max_retries(self, value: int):
58+
if not (0 <= value <= 10):
59+
raise ValueError("Values between 0 and 10 are allowed")
60+
self._max_retries = value
61+
4262
def get(self, path: str, **kwargs):
43-
return self._request('GET', path, **kwargs)
63+
return self._request("GET", path, **kwargs)
4464

4565
def post(self, path: str, **kwargs):
46-
return self._request('POST', path, **kwargs)
66+
return self._request("POST", path, **kwargs)
4767

4868
def patch(self, path: str, **kwargs):
49-
return self._request('PATCH', path, **kwargs)
69+
return self._request("PATCH", path, **kwargs)
5070

5171
def put(self, path: str, **kwargs):
52-
return self._request('PUT', path, **kwargs)
72+
return self._request("PUT", path, **kwargs)
5373

5474
def delete(self, path: str, **kwargs):
55-
return self._request('DELETE', path, **kwargs)
75+
return self._request("DELETE", path, **kwargs)
5676

5777
def _get_url(self, path: str):
5878
"""Builds complete url.
@@ -66,19 +86,50 @@ def _get_url(self, path: str):
6686
3 corva api url, if above points are False.
6787
"""
6888

69-
if path.startswith('http'):
89+
if path.startswith("http"):
7090
return path
7191

7292
path = path.lstrip(
73-
'/'
93+
"/"
7494
) # delete leading forward slash for posixpath.join to work correctly
7595

7696
# search text like api/v1 or api/v10 in path
77-
if bool(re.search(r'api/v\d+', path)):
97+
if bool(re.search(r"api/v\d+", path)):
7898
return posixpath.join(self.data_api_url, path)
7999

80100
return posixpath.join(self.api_url, path)
81101

102+
@staticmethod
103+
def _execute_request(
104+
method: str,
105+
url: str,
106+
params: Optional[dict],
107+
data: Optional[dict],
108+
headers: Optional[dict] = None,
109+
timeout: Optional[int] = None,
110+
):
111+
"""Executes the request.
112+
113+
Args:
114+
method: HTTP method.
115+
path: url to call.
116+
data: request body, that will be casted to json.
117+
params: url query string params.
118+
headers: additional headers to include in request.
119+
timeout: custom request timeout in seconds.
120+
121+
Returns:
122+
requests.Response instance.
123+
"""
124+
return requests.request(
125+
method=method,
126+
url=url,
127+
params=params,
128+
json=data,
129+
headers=headers,
130+
timeout=timeout,
131+
)
132+
82133
def _request(
83134
self,
84135
method: str,
@@ -89,7 +140,7 @@ def _request(
89140
headers: Optional[dict] = None,
90141
timeout: Optional[int] = None,
91142
) -> requests.Response:
92-
"""Executes the request.
143+
"""Prepares HTTP request.
93144
94145
Args:
95146
method: HTTP method.
@@ -102,6 +153,13 @@ def _request(
102153
Returns:
103154
requests.Response instance.
104155
"""
156+
retryable_status_codes = [
157+
HTTPStatus.TOO_MANY_REQUESTS, # 428
158+
HTTPStatus.INTERNAL_SERVER_ERROR, # 500
159+
HTTPStatus.BAD_GATEWAY, # 502
160+
HTTPStatus.SERVICE_UNAVAILABLE, # 503
161+
HTTPStatus.GATEWAY_TIMEOUT, # 504
162+
]
105163

106164
timeout = timeout or self.timeout
107165
self._validate_timeout(timeout)
@@ -113,22 +171,46 @@ def _request(
113171
**(headers or {}),
114172
}
115173

116-
response = requests.request(
117-
method=method,
118-
url=url,
119-
params=params,
120-
json=data,
121-
headers=headers,
122-
timeout=timeout,
123-
)
174+
if self.max_retries > 0:
175+
retry_decorator = retry(
176+
stop=stop_after_attempt(self.max_retries),
177+
wait=wait_random_exponential(multiplier=0.25, max=10),
178+
retry=retry_if_result(
179+
lambda r: r.status_code in retryable_status_codes
180+
),
181+
)
182+
retrying_request = retry_decorator(self._execute_request)
183+
try:
184+
response = retrying_request(
185+
method=method,
186+
url=url,
187+
params=params,
188+
data=data,
189+
headers=headers,
190+
timeout=timeout,
191+
)
192+
except RetryError as e:
193+
if not e.last_attempt.failed:
194+
response = e.last_attempt.result()
195+
else:
196+
raise
197+
else:
198+
response = self._execute_request(
199+
method=method,
200+
url=url,
201+
params=params,
202+
data=data,
203+
headers=headers,
204+
timeout=timeout,
205+
)
124206

125207
return response
126208

127209
def _validate_timeout(self, timeout: int) -> None:
128210
if self.TIMEOUT_LIMITS[0] > timeout or self.TIMEOUT_LIMITS[1] < timeout:
129211
raise ValueError(
130-
f'Timeout must be between {self.TIMEOUT_LIMITS[0]} and '
131-
f'{self.TIMEOUT_LIMITS[1]} seconds.'
212+
f"Timeout must be between {self.TIMEOUT_LIMITS[0]} and "
213+
f"{self.TIMEOUT_LIMITS[1]} seconds."
132214
)
133215

134216
def get_dataset(
@@ -166,13 +248,13 @@ def get_dataset(
166248
"""
167249

168250
response = self.get(
169-
f'/api/v1/data/{provider}/{dataset}/',
251+
f"/api/v1/data/{provider}/{dataset}/",
170252
params={
171-
'query': json.dumps(query),
172-
'sort': json.dumps(sort),
173-
'fields': fields,
174-
'limit': limit,
175-
'skip': skip,
253+
"query": json.dumps(query),
254+
"sort": json.dumps(sort),
255+
"fields": fields,
256+
"limit": limit,
257+
"skip": skip,
176258
},
177259
)
178260
response.raise_for_status()
@@ -195,8 +277,8 @@ def produce_messages(self, data: Sequence[dict]) -> None:
195277
"""
196278

197279
response = self.post(
198-
'/api/v1/message_producer/',
199-
json={'app_connection_id': self.app_connection_id, 'data': data},
280+
"/api/v1/message_producer/",
281+
data={"app_connection_id": self.app_connection_id, "data": data},
200282
)
201283
response.raise_for_status()
202284

@@ -226,13 +308,13 @@ def insert_data(
226308

227309
if produce:
228310
body = {
229-
'data': list(data),
230-
'producer': {'app_connection_id': self.app_connection_id},
311+
"data": list(data),
312+
"producer": {"app_connection_id": self.app_connection_id},
231313
}
232314
else:
233315
body = list(data)
234316

235-
response = self.post(f'/api/v1/data/{provider}/{dataset}/', data=body)
317+
response = self.post(f"/api/v1/data/{provider}/{dataset}/", data=body)
236318
response.raise_for_status()
237319

238320
return response.json()

0 commit comments

Comments
 (0)