10
10
from typing_extensions import override
11
11
12
12
from apify_client import ApifyClientAsync
13
- from crawlee ._utils .requests import unique_key_to_request_id
14
13
from crawlee .storage_clients ._base import RequestQueueClient
15
14
from crawlee .storage_clients .models import AddRequestsResponse , ProcessedRequest , RequestQueueMetadata
16
15
@@ -59,10 +58,10 @@ def __init__(
59
58
"""The name of the request queue."""
60
59
61
60
self ._queue_head = deque [str ]()
62
- """A deque to store request IDs in the queue head."""
61
+ """A deque to store request unique keys in the queue head."""
63
62
64
63
self ._requests_cache : LRUCache [str , CachedRequest ] = LRUCache (maxsize = self ._MAX_CACHED_REQUESTS )
65
- """A cache to store request objects. Request ID is used as the cache key."""
64
+ """A cache to store request objects. Request unique key is used as the cache key."""
66
65
67
66
self ._queue_has_locked_requests : bool | None = None
68
67
"""Whether the queue has requests locked by another client."""
@@ -248,14 +247,13 @@ async def add_batch_of_requests(
248
247
already_present_requests : list [ProcessedRequest ] = []
249
248
250
249
for request in requests :
251
- if self ._requests_cache .get (request .id ):
250
+ if self ._requests_cache .get (request .unique_key ):
252
251
# We are not sure if it was already handled at this point, and it is not worth calling API for it.
253
252
# It could have been handled by another client in the meantime, so cached information about
254
253
# `request.was_already_handled` is not reliable.
255
254
already_present_requests .append (
256
255
ProcessedRequest .model_validate (
257
256
{
258
- 'id' : request .id ,
259
257
'uniqueKey' : request .unique_key ,
260
258
'wasAlreadyPresent' : True ,
261
259
'wasAlreadyHandled' : request .was_already_handled ,
@@ -267,14 +265,13 @@ async def add_batch_of_requests(
267
265
# Add new request to the cache.
268
266
processed_request = ProcessedRequest .model_validate (
269
267
{
270
- 'id' : request .id ,
271
268
'uniqueKey' : request .unique_key ,
272
269
'wasAlreadyPresent' : True ,
273
270
'wasAlreadyHandled' : request .was_already_handled ,
274
271
}
275
272
)
276
273
self ._cache_request (
277
- unique_key_to_request_id ( request .unique_key ) ,
274
+ request .unique_key ,
278
275
processed_request ,
279
276
)
280
277
new_requests .append (request )
@@ -299,7 +296,7 @@ async def add_batch_of_requests(
299
296
300
297
# Remove unprocessed requests from the cache
301
298
for unprocessed_request in api_response .unprocessed_requests :
302
- self ._requests_cache .pop (unique_key_to_request_id ( unprocessed_request .unique_key ) , None )
299
+ self ._requests_cache .pop (unprocessed_request .unique_key , None )
303
300
304
301
else :
305
302
api_response = AddRequestsResponse .model_validate (
@@ -323,16 +320,16 @@ async def add_batch_of_requests(
323
320
return api_response
324
321
325
322
@override
326
- async def get_request (self , request_id : str ) -> Request | None :
323
+ async def get_request (self , request_unique_key : str ) -> Request | None :
327
324
"""Get a request by ID.
328
325
329
326
Args:
330
- request_id: The ID of the request to get.
327
+ request_unique_key: Unique key of the request to get.
331
328
332
329
Returns:
333
330
The request or None if not found.
334
331
"""
335
- response = await self ._api_client .get_request ( request_id )
332
+ response = await self ._api_client .get_request_by_unique_key ( request_unique_key )
336
333
337
334
if response is None :
338
335
return None
@@ -381,7 +378,7 @@ async def fetch_next_request(self) -> Request | None:
381
378
return None
382
379
383
380
# Use get request to ensure we have the full request object.
384
- request = await self .get_request (request .id )
381
+ request = await self .get_request (request .unique_key )
385
382
if request is None :
386
383
logger .debug (
387
384
'Request fetched from the beginning of queue was not found in the RQ' ,
@@ -407,7 +404,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
407
404
if request .handled_at is None :
408
405
request .handled_at = datetime .now (tz = timezone .utc )
409
406
410
- if cached_request := self ._requests_cache [request .id ]:
407
+ if cached_request := self ._requests_cache [request .unique_key ]:
411
408
cached_request .was_already_handled = request .was_already_handled
412
409
try :
413
410
# Update the request in the API
@@ -419,14 +416,14 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
419
416
self ._assumed_handled_count += 1
420
417
421
418
# Update the cache with the handled request
422
- cache_key = unique_key_to_request_id ( request .unique_key )
419
+ cache_key = request .unique_key
423
420
self ._cache_request (
424
421
cache_key ,
425
422
processed_request ,
426
423
hydrated_request = request ,
427
424
)
428
425
except Exception as exc :
429
- logger .debug (f'Error marking request { request .id } as handled: { exc !s} ' )
426
+ logger .debug (f'Error marking request { request .unique_key } as handled: { exc !s} ' )
430
427
return None
431
428
else :
432
429
return processed_request
@@ -467,7 +464,7 @@ async def reclaim_request(
467
464
self ._assumed_handled_count -= 1
468
465
469
466
# Update the cache
470
- cache_key = unique_key_to_request_id ( request .unique_key )
467
+ cache_key = request .unique_key
471
468
self ._cache_request (
472
469
cache_key ,
473
470
processed_request ,
@@ -481,11 +478,11 @@ async def reclaim_request(
481
478
482
479
# Try to release the lock on the request
483
480
try :
484
- await self ._delete_request_lock (request .id , forefront = forefront )
481
+ await self ._delete_request_lock (request .unique_key , forefront = forefront )
485
482
except Exception as err :
486
- logger .debug (f'Failed to delete request lock for request { request .id } ' , exc_info = err )
483
+ logger .debug (f'Failed to delete request lock for request { request .unique_key } ' , exc_info = err )
487
484
except Exception as exc :
488
- logger .debug (f'Error reclaiming request { request .id } : { exc !s} ' )
485
+ logger .debug (f'Error reclaiming request { request .unique_key } : { exc !s} ' )
489
486
return None
490
487
else :
491
488
return processed_request
@@ -554,7 +551,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
554
551
return None
555
552
556
553
# Update cache with hydrated request
557
- cache_key = unique_key_to_request_id ( request .unique_key )
554
+ cache_key = request .unique_key
558
555
self ._cache_request (
559
556
cache_key ,
560
557
ProcessedRequest (
@@ -592,7 +589,7 @@ async def _update_request(
592
589
)
593
590
594
591
return ProcessedRequest .model_validate (
595
- {'id' : request . id , ' uniqueKey' : request .unique_key } | response ,
592
+ {'uniqueKey' : request .unique_key } | response ,
596
593
)
597
594
598
595
async def _list_head (
@@ -653,28 +650,26 @@ async def _list_head(
653
650
request = Request .model_validate (request_data )
654
651
655
652
# Skip requests without ID or unique key
656
- if not request .id or not request . unique_key :
653
+ if not request .unique_key :
657
654
logger .debug (
658
655
'Skipping request from queue head, missing ID or unique key' ,
659
656
extra = {
660
- 'id' : request .id ,
661
657
'unique_key' : request .unique_key ,
662
658
},
663
659
)
664
660
continue
665
661
666
662
# Cache the request
667
663
self ._cache_request (
668
- unique_key_to_request_id ( request .unique_key ) ,
664
+ request .unique_key ,
669
665
ProcessedRequest (
670
- id = request .id ,
671
666
unique_key = request .unique_key ,
672
667
was_already_present = True ,
673
668
was_already_handled = False ,
674
669
),
675
670
hydrated_request = request ,
676
671
)
677
- self ._queue_head .append (request .id )
672
+ self ._queue_head .append (request .unique_key )
678
673
679
674
for leftover_request_id in leftover_buffer :
680
675
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
@@ -683,21 +678,21 @@ async def _list_head(
683
678
684
679
async def _prolong_request_lock (
685
680
self ,
686
- request_id : str ,
681
+ request_unique_key : str ,
687
682
* ,
688
683
lock_secs : int ,
689
684
) -> ProlongRequestLockResponse :
690
685
"""Prolong the lock on a specific request in the queue.
691
686
692
687
Args:
693
- request_id: The identifier of the request whose lock is to be prolonged.
688
+ request_unique_key: Unique key of the request whose lock is to be prolonged.
694
689
lock_secs: The additional amount of time, in seconds, that the request will remain locked.
695
690
696
691
Returns:
697
692
A response containing the time at which the lock will expire.
698
693
"""
699
- response = await self ._api_client .prolong_request_lock (
700
- request_id = request_id ,
694
+ response = await self ._api_client .prolong_request_lock_by_unique_key (
695
+ request_unique_key = request_unique_key ,
701
696
# All requests reaching this code were the tip of the queue at the moment when they were fetched,
702
697
# so if their lock expires, they should be put back to the forefront as their handling is long overdue.
703
698
forefront = True ,
@@ -710,37 +705,37 @@ async def _prolong_request_lock(
710
705
711
706
# Update the cache with the new lock expiration
712
707
for cached_request in self ._requests_cache .values ():
713
- if cached_request .id == request_id :
708
+ if cached_request .unique_key == request_unique_key :
714
709
cached_request .lock_expires_at = result .lock_expires_at
715
710
break
716
711
717
712
return result
718
713
719
714
async def _delete_request_lock (
720
715
self ,
721
- request_id : str ,
716
+ request_unique_key : str ,
722
717
* ,
723
718
forefront : bool = False ,
724
719
) -> None :
725
720
"""Delete the lock on a specific request in the queue.
726
721
727
722
Args:
728
- request_id: ID of the request to delete the lock.
723
+ request_unique_key: Unique key of the request to delete the lock.
729
724
forefront: Whether to put the request in the beginning or the end of the queue after the lock is deleted.
730
725
"""
731
726
try :
732
- await self ._api_client .delete_request_lock (
733
- request_id = request_id ,
727
+ await self ._api_client .delete_request_lock_by_unique_key (
728
+ request_unique_key = request_unique_key ,
734
729
forefront = forefront ,
735
730
)
736
731
737
732
# Update the cache to remove the lock
738
733
for cached_request in self ._requests_cache .values ():
739
- if cached_request .id == request_id :
734
+ if cached_request .unique_key == request_unique_key :
740
735
cached_request .lock_expires_at = None
741
736
break
742
737
except Exception as err :
743
- logger .debug (f'Failed to delete request lock for request { request_id } ' , exc_info = err )
738
+ logger .debug (f'Failed to delete request lock for request { request_unique_key } ' , exc_info = err )
744
739
745
740
def _cache_request (
746
741
self ,
@@ -758,7 +753,7 @@ def _cache_request(
758
753
hydrated_request: The hydrated request object, if available.
759
754
"""
760
755
self ._requests_cache [cache_key ] = CachedRequest (
761
- id = processed_request .id ,
756
+ unique_key = processed_request .unique_key ,
762
757
was_already_handled = processed_request .was_already_handled ,
763
758
hydrated = hydrated_request ,
764
759
lock_expires_at = None ,
0 commit comments