Skip to content

Commit bc92fe5

Browse files
fixes: review feedback
1 parent 32838b9 commit bc92fe5

File tree

1 file changed

+47
-8
lines changed

1 file changed

+47
-8
lines changed

workflowai/core/client/client.py

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import importlib.metadata
33
import os
4+
from email.utils import parsedate_to_datetime
45
from typing import (
56
Any,
67
AsyncIterator,
@@ -138,24 +139,62 @@ async def run(
138139
if not stream:
139140
res = None
140141
delay = retry_delay / 1000
141-
for _ in range(max_retry_count):
142+
retry_count = 0
143+
while retry_count <= max_retry_count:
142144
try:
143145
res = await self.api.post(route, request, returns=TaskRunResponse)
144146
return res.to_domain(task)
145147
except HTTPStatusError as e:
146148
if e.response.status_code == 404:
147149
raise NotFoundError("Task not found")
148-
if e.response.status_code == 429:
150+
retry_after = e.response.headers.get("Retry-After")
151+
if retry_after:
152+
try:
153+
#for 429 errors this is non-negative decimal
154+
delay = float(retry_after)
155+
except ValueError:
156+
try:
157+
retry_after_date = parsedate_to_datetime(retry_after)
158+
current_time = asyncio.get_event_loop().time()
159+
delay = (retry_after_date.timestamp()- current_time)
160+
except (TypeError, ValueError, OverflowError):
161+
delay = min(delay * 2, max_retry_delay / 1000)
162+
await asyncio.sleep(delay)
163+
elif e.response.status_code == 429:
149164
if delay < max_retry_delay / 1000:
150165
delay = min(delay * 2, max_retry_delay / 1000)
151166
await asyncio.sleep(delay)
152-
167+
retry_count += 1
168+
153169
async def _stream():
154-
async for chunk in self.api.stream(
155-
method="POST", path=route, data=request, returns=RunTaskStreamChunk
156-
):
157-
yield task.output_class.model_construct(None, **chunk.task_output)
158-
170+
delay = retry_delay / 1000
171+
retry_count = 0
172+
while retry_count <= max_retry_count:
173+
try:
174+
async for chunk in self.api.stream(
175+
method="POST", path=route, data=request, returns=RunTaskStreamChunk
176+
):
177+
yield task.output_class.model_construct(None, **chunk.task_output)
178+
except HTTPStatusError as e:
179+
if e.response.status_code == 404:
180+
raise NotFoundError("Task not found")
181+
retry_after = e.response.headers.get("Retry-After")
182+
183+
if retry_after:
184+
try:
185+
delay = float(retry_after)
186+
except ValueError:
187+
try:
188+
retry_after_date = parsedate_to_datetime(retry_after)
189+
current_time = asyncio.get_event_loop().time()
190+
delay = (retry_after_date.timestamp() - current_time)
191+
except (TypeError, ValueError, OverflowError):
192+
delay = min(delay * 2, max_retry_delay / 1000)
193+
elif e.response.status_code == 429:
194+
if delay < max_retry_delay / 1000:
195+
delay = min(delay * 2, max_retry_delay / 1000)
196+
await asyncio.sleep(delay)
197+
retry_count += 1
159198
return _stream()
160199

161200
async def import_run(

0 commit comments

Comments
 (0)