9
9
import tempfile
10
10
import uuid
11
11
from collections .abc import Awaitable
12
+ from concurrent import futures
12
13
from functools import partial
13
14
from pathlib import Path
14
15
from typing import Any , Coroutine , Optional , Tuple , Union , cast , Generator , BinaryIO
15
16
16
17
import aiofiles
17
18
import httpx
18
- import nest_asyncio # type: ignore
19
19
from httpx import AsyncClient
20
20
from pypdf import PdfReader , PdfWriter
21
21
56
56
HI_RES_STRATEGY = 'hi_res'
57
57
MAX_PAGE_LENGTH = 4000
58
58
59
+ def _run_coroutines_in_separate_thread (
60
+ coroutines_task : Coroutine [Any , Any , list [tuple [int , httpx .Response ]]],
61
+ ) -> list [tuple [int , httpx .Response ]]:
62
+ return asyncio .run (coroutines_task )
63
+
59
64
60
65
async def _order_keeper (index : int , coro : Awaitable ) -> Tuple [int , httpx .Response ]:
61
66
response = await coro
@@ -64,7 +69,8 @@ async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, httpx.Respons
64
69
65
70
async def run_tasks (
66
71
coroutines : list [partial [Coroutine [Any , Any , httpx .Response ]]],
67
- allow_failed : bool = False
72
+ allow_failed : bool = False ,
73
+ concurrency_level : int = 10 ,
68
74
) -> list [tuple [int , httpx .Response ]]:
69
75
"""Run a list of coroutines in parallel and return the results in order.
70
76
@@ -80,13 +86,14 @@ async def run_tasks(
80
86
# Use a variable to adjust the httpx client timeout, or default to 30 minutes
81
87
# When we're able to reuse the SDK to make these calls, we can remove this var
82
88
# The SDK timeout will be controlled by parameter
89
+ limiter = asyncio .Semaphore (concurrency_level )
83
90
client_timeout_minutes = 60
84
91
if timeout_var := os .getenv ("UNSTRUCTURED_CLIENT_TIMEOUT_MINUTES" ):
85
92
client_timeout_minutes = int (timeout_var )
86
93
client_timeout = httpx .Timeout (60 * client_timeout_minutes )
87
94
88
95
async with httpx .AsyncClient (timeout = client_timeout ) as client :
89
- armed_coroutines = [coro (async_client = client ) for coro in coroutines ] # type: ignore
96
+ armed_coroutines = [coro (async_client = client , limiter = limiter ) for coro in coroutines ] # type: ignore
90
97
if allow_failed :
91
98
responses = await asyncio .gather (* armed_coroutines , return_exceptions = False )
92
99
return list (enumerate (responses , 1 ))
@@ -110,16 +117,6 @@ async def run_tasks(
110
117
return sorted (results , key = lambda x : x [0 ])
111
118
112
119
113
- def context_is_uvloop ():
114
- """Return true if uvloop is installed and we're currently in a uvloop context. Our asyncio splitting code currently doesn't work under uvloop."""
115
- try :
116
- import uvloop # type: ignore[import] # pylint: disable=import-outside-toplevel
117
- loop = asyncio .get_event_loop ()
118
- return isinstance (loop , uvloop .Loop )
119
- except (ImportError , RuntimeError ):
120
- return False
121
-
122
-
123
120
def get_optimal_split_size (num_pages : int , concurrency_level : int ) -> int :
124
121
"""Distributes pages to workers evenly based on the number of pages and desired concurrency level."""
125
122
if num_pages < MAX_PAGES_PER_SPLIT * concurrency_level :
@@ -163,8 +160,10 @@ def __init__(self) -> None:
163
160
self .coroutines_to_execute : dict [
164
161
str , list [partial [Coroutine [Any , Any , httpx .Response ]]]
165
162
] = {}
163
+ self .concurrency_level : dict [str , int ] = {}
166
164
self .api_successful_responses : dict [str , list [httpx .Response ]] = {}
167
165
self .api_failed_responses : dict [str , list [httpx .Response ]] = {}
166
+ self .executors : dict [str , futures .ThreadPoolExecutor ] = {}
168
167
self .tempdirs : dict [str , tempfile .TemporaryDirectory ] = {}
169
168
self .allow_failed : bool = DEFAULT_ALLOW_FAILED
170
169
self .cache_tmp_data_feature : bool = DEFAULT_CACHE_TMP_DATA
@@ -264,14 +263,6 @@ def before_request(
264
263
logger .warning ("HTTP client not accessible! Continuing without splitting." )
265
264
return request
266
265
267
- if context_is_uvloop ():
268
- logger .warning ("Splitting is currently incompatible with uvloop. Continuing without splitting." )
269
- return request
270
-
271
- # This allows us to use an event loop in an env with an existing loop
272
- # Temporary fix until we can improve the async splitting behavior
273
- nest_asyncio .apply ()
274
-
275
266
# This is our key into coroutines_to_execute
276
267
# We need to pass it on to after_success so
277
268
# we know which results are ours
@@ -317,13 +308,15 @@ def before_request(
317
308
fallback_value = DEFAULT_ALLOW_FAILED ,
318
309
)
319
310
320
- concurrency_level = form_utils .get_split_pdf_concurrency_level_param (
311
+ self . concurrency_level [ operation_id ] = form_utils .get_split_pdf_concurrency_level_param (
321
312
form_data ,
322
313
key = PARTITION_FORM_CONCURRENCY_LEVEL_KEY ,
323
314
fallback_value = DEFAULT_CONCURRENCY_LEVEL ,
324
315
max_allowed = MAX_CONCURRENCY_LEVEL ,
325
316
)
326
- limiter = asyncio .Semaphore (concurrency_level )
317
+
318
+ executor = futures .ThreadPoolExecutor (max_workers = 1 )
319
+ self .executors [operation_id ] = executor
327
320
328
321
self .cache_tmp_data_feature = form_utils .get_split_pdf_cache_tmp_data (
329
322
form_data ,
@@ -346,7 +339,7 @@ def before_request(
346
339
page_count = page_range_end - page_range_start + 1
347
340
348
341
split_size = get_optimal_split_size (
349
- num_pages = page_count , concurrency_level = concurrency_level
342
+ num_pages = page_count , concurrency_level = self . concurrency_level [ operation_id ]
350
343
)
351
344
352
345
# If the doc is small enough, and we aren't slicing it with a page range:
@@ -389,7 +382,6 @@ def before_request(
389
382
# in `after_success`.
390
383
coroutine = partial (
391
384
self .call_api_partial ,
392
- limiter = limiter ,
393
385
operation_id = operation_id ,
394
386
pdf_chunk_request = pdf_chunk_request ,
395
387
pdf_chunk_file = pdf_chunk_file ,
@@ -607,10 +599,16 @@ def _await_elements(self, operation_id: str) -> Optional[list]:
607
599
if tasks is None :
608
600
return None
609
601
610
- ioloop = asyncio .get_event_loop ()
611
- task_responses : list [tuple [int , httpx .Response ]] = ioloop .run_until_complete (
612
- run_tasks (tasks , allow_failed = self .allow_failed )
613
- )
602
+ concurrency_level = self .concurrency_level .get (operation_id , DEFAULT_CONCURRENCY_LEVEL )
603
+ coroutines = run_tasks (tasks , allow_failed = self .allow_failed , concurrency_level = concurrency_level )
604
+
605
+ # sending the coroutines to a separate thread to avoid blocking the current event loop
606
+ # this operation should be removed when the SDK is updated to support async hooks
607
+ executor = self .executors .get (operation_id )
608
+ if executor is None :
609
+ raise RuntimeError ("Executor not found for operation_id" )
610
+ task_responses_future = executor .submit (_run_coroutines_in_separate_thread , coroutines )
611
+ task_responses = task_responses_future .result ()
614
612
615
613
if task_responses is None :
616
614
return None
@@ -714,6 +712,10 @@ def _clear_operation(self, operation_id: str) -> None:
714
712
"""
715
713
self .coroutines_to_execute .pop (operation_id , None )
716
714
self .api_successful_responses .pop (operation_id , None )
715
+ self .concurrency_level .pop (operation_id , None )
716
+ executor = self .executors .pop (operation_id , None )
717
+ if executor is not None :
718
+ executor .shutdown (wait = True )
717
719
tempdir = self .tempdirs .pop (operation_id , None )
718
720
if tempdir :
719
721
tempdir .cleanup ()
0 commit comments