1
1
from __future__ import annotations
2
2
3
3
import asyncio
4
+ import re
5
+ from base64 import b64encode
4
6
from collections import deque
5
7
from datetime import datetime , timedelta , timezone
8
+ from hashlib import sha256
6
9
from logging import getLogger
7
10
from typing import TYPE_CHECKING , Final
8
11
@@ -320,16 +323,16 @@ async def add_batch_of_requests(
320
323
return api_response
321
324
322
325
@override
323
- async def get_request (self , request_unique_key : str ) -> Request | None :
324
- """Get a request by ID .
326
+ async def get_request (self , unique_key : str ) -> Request | None :
327
+ """Get a request by unique key .
325
328
326
329
Args:
327
- request_unique_key : Unique key of the request to get.
330
+ unique_key : Unique key of the request to get.
328
331
329
332
Returns:
330
333
The request or None if not found.
331
334
"""
332
- response = await self ._api_client .get_request_by_unique_key ( request_unique_key )
335
+ response = await self ._api_client .get_request ( unique_key_to_request_id ( unique_key ) )
333
336
334
337
if response is None :
335
338
return None
@@ -357,23 +360,23 @@ async def fetch_next_request(self) -> Request | None:
357
360
return None
358
361
359
362
# Get the next request ID from the queue head
360
- next_request_id = self ._queue_head .popleft ()
363
+ next_unique_key = self ._queue_head .popleft ()
361
364
362
- request = await self ._get_or_hydrate_request (next_request_id )
365
+ request = await self ._get_or_hydrate_request (next_unique_key )
363
366
364
367
# Handle potential inconsistency where request might not be in the main table yet
365
368
if request is None :
366
369
logger .debug (
367
370
'Cannot find a request from the beginning of queue, will be retried later' ,
368
- extra = {'nextRequestId ' : next_request_id },
371
+ extra = {'nextRequestUniqueKey ' : next_unique_key },
369
372
)
370
373
return None
371
374
372
375
# If the request was already handled, skip it
373
376
if request .handled_at is not None :
374
377
logger .debug (
375
378
'Request fetched from the beginning of queue was already handled' ,
376
- extra = {'nextRequestId ' : next_request_id },
379
+ extra = {'nextRequestUniqueKey ' : next_unique_key },
377
380
)
378
381
return None
379
382
@@ -382,7 +385,7 @@ async def fetch_next_request(self) -> Request | None:
382
385
if request is None :
383
386
logger .debug (
384
387
'Request fetched from the beginning of queue was not found in the RQ' ,
385
- extra = {'nextRequestId ' : next_request_id },
388
+ extra = {'nextRequestUniqueKey ' : next_unique_key },
386
389
)
387
390
return None
388
391
@@ -509,29 +512,29 @@ async def _ensure_head_is_non_empty(self) -> None:
509
512
# Fetch requests from the API and populate the queue head
510
513
await self ._list_head (lock_time = self ._DEFAULT_LOCK_TIME )
511
514
512
- async def _get_or_hydrate_request (self , request_id : str ) -> Request | None :
513
- """Get a request by ID , either from cache or by fetching from API.
515
+ async def _get_or_hydrate_request (self , unique_key : str ) -> Request | None :
516
+ """Get a request by unique key , either from cache or by fetching from API.
514
517
515
518
Args:
516
- request_id: The ID of the request to get.
519
+ unique_key: Unique keu of the request to get.
517
520
518
521
Returns:
519
522
The request if found and valid, otherwise None.
520
523
"""
521
524
# First check if the request is in our cache
522
- cached_entry = self ._requests_cache .get (request_id )
525
+ cached_entry = self ._requests_cache .get (unique_key )
523
526
524
527
if cached_entry and cached_entry .hydrated :
525
528
# If we have the request hydrated in cache, check if lock is expired
526
529
if cached_entry .lock_expires_at and cached_entry .lock_expires_at < datetime .now (tz = timezone .utc ):
527
530
# Try to prolong the lock if it's expired
528
531
try :
529
532
lock_secs = int (self ._DEFAULT_LOCK_TIME .total_seconds ())
530
- response = await self ._prolong_request_lock (request_id , lock_secs = lock_secs )
533
+ response = await self ._prolong_request_lock (unique_key , lock_secs = lock_secs )
531
534
cached_entry .lock_expires_at = response .lock_expires_at
532
535
except Exception :
533
536
# If prolonging the lock fails, we lost the request
534
- logger .debug (f'Failed to prolong lock for request { request_id } , returning None' )
537
+ logger .debug (f'Failed to prolong lock for request { unique_key } , returning None' )
535
538
return None
536
539
537
540
return cached_entry .hydrated
@@ -540,30 +543,29 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
540
543
try :
541
544
# Try to acquire or prolong the lock
542
545
lock_secs = int (self ._DEFAULT_LOCK_TIME .total_seconds ())
543
- await self ._prolong_request_lock (request_id , lock_secs = lock_secs )
546
+ await self ._prolong_request_lock (unique_key , lock_secs = lock_secs )
544
547
545
548
# Fetch the request data
546
- request = await self .get_request (request_id )
549
+ request = await self .get_request (unique_key )
547
550
548
551
# If request is not found, release lock and return None
549
552
if not request :
550
- await self ._delete_request_lock (request_id )
553
+ await self ._delete_request_lock (unique_key )
551
554
return None
552
555
553
556
# Update cache with hydrated request
554
557
cache_key = request .unique_key
555
558
self ._cache_request (
556
559
cache_key ,
557
560
ProcessedRequest (
558
- id = request_id ,
559
561
unique_key = request .unique_key ,
560
562
was_already_present = True ,
561
563
was_already_handled = request .handled_at is not None ,
562
564
),
563
565
hydrated_request = request ,
564
566
)
565
567
except Exception as exc :
566
- logger .debug (f'Error fetching or locking request { request_id } : { exc !s} ' )
568
+ logger .debug (f'Error fetching or locking request { unique_key } : { exc !s} ' )
567
569
return None
568
570
else :
569
571
return request
@@ -613,8 +615,8 @@ async def _list_head(
613
615
logger .debug (f'Using cached queue head with { len (self ._queue_head )} requests' )
614
616
# Create a list of requests from the cached queue head
615
617
items = []
616
- for request_id in list (self ._queue_head )[:limit ]:
617
- cached_request = self ._requests_cache .get (request_id )
618
+ for unique_key in list (self ._queue_head )[:limit ]:
619
+ cached_request = self ._requests_cache .get (unique_key )
618
620
if cached_request and cached_request .hydrated :
619
621
items .append (cached_request .hydrated )
620
622
@@ -671,28 +673,28 @@ async def _list_head(
671
673
)
672
674
self ._queue_head .append (request .unique_key )
673
675
674
- for leftover_request_id in leftover_buffer :
676
+ for leftover_unique_key in leftover_buffer :
675
677
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
676
- self ._queue_head .append (leftover_request_id )
678
+ self ._queue_head .append (leftover_unique_key )
677
679
return RequestQueueHead .model_validate (response )
678
680
679
681
async def _prolong_request_lock (
680
682
self ,
681
- request_unique_key : str ,
683
+ unique_key : str ,
682
684
* ,
683
685
lock_secs : int ,
684
686
) -> ProlongRequestLockResponse :
685
687
"""Prolong the lock on a specific request in the queue.
686
688
687
689
Args:
688
- request_unique_key : Unique key of the request whose lock is to be prolonged.
690
+ unique_key : Unique key of the request whose lock is to be prolonged.
689
691
lock_secs: The additional amount of time, in seconds, that the request will remain locked.
690
692
691
693
Returns:
692
694
A response containing the time at which the lock will expire.
693
695
"""
694
- response = await self ._api_client .prolong_request_lock_by_unique_key (
695
- request_unique_key = request_unique_key ,
696
+ response = await self ._api_client .prolong_request_lock (
697
+ request_id = unique_key_to_request_id ( unique_key ) ,
696
698
# All requests reaching this code were the tip of the queue at the moment when they were fetched,
697
699
# so if their lock expires, they should be put back to the forefront as their handling is long overdue.
698
700
forefront = True ,
@@ -705,37 +707,37 @@ async def _prolong_request_lock(
705
707
706
708
# Update the cache with the new lock expiration
707
709
for cached_request in self ._requests_cache .values ():
708
- if cached_request .unique_key == request_unique_key :
710
+ if cached_request .unique_key == unique_key :
709
711
cached_request .lock_expires_at = result .lock_expires_at
710
712
break
711
713
712
714
return result
713
715
714
716
async def _delete_request_lock (
715
717
self ,
716
- request_unique_key : str ,
718
+ unique_key : str ,
717
719
* ,
718
720
forefront : bool = False ,
719
721
) -> None :
720
722
"""Delete the lock on a specific request in the queue.
721
723
722
724
Args:
723
- request_unique_key : Unique key of the request to delete the lock.
725
+ unique_key : Unique key of the request to delete the lock.
724
726
forefront: Whether to put the request in the beginning or the end of the queue after the lock is deleted.
725
727
"""
726
728
try :
727
- await self ._api_client .delete_request_lock_by_unique_key (
728
- request_unique_key = request_unique_key ,
729
+ await self ._api_client .delete_request_lock (
730
+ request_id = unique_key_to_request_id ( unique_key ) ,
729
731
forefront = forefront ,
730
732
)
731
733
732
734
# Update the cache to remove the lock
733
735
for cached_request in self ._requests_cache .values ():
734
- if cached_request .unique_key == request_unique_key :
736
+ if cached_request .unique_key == unique_key :
735
737
cached_request .lock_expires_at = None
736
738
break
737
739
except Exception as err :
738
- logger .debug (f'Failed to delete request lock for request { request_unique_key } ' , exc_info = err )
740
+ logger .debug (f'Failed to delete request lock for request { unique_key } ' , exc_info = err )
739
741
740
742
def _cache_request (
741
743
self ,
@@ -758,3 +760,26 @@ def _cache_request(
758
760
hydrated = hydrated_request ,
759
761
lock_expires_at = None ,
760
762
)
763
+
764
+
765
+ def unique_key_to_request_id (unique_key : str , * , request_id_length : int = 15 ) -> str :
766
+ """Generate a deterministic request ID based on a unique key.
767
+
768
+ Args:
769
+ unique_key: The unique key to convert into a request ID.
770
+ request_id_length: The length of the request ID.
771
+
772
+ Returns:
773
+ A URL-safe, truncated request ID based on the unique key.
774
+ """
775
+ # Encode the unique key and compute its SHA-256 hash
776
+ hashed_key = sha256 (unique_key .encode ('utf-8' )).digest ()
777
+
778
+ # Encode the hash in base64 and decode it to get a string
779
+ base64_encoded = b64encode (hashed_key ).decode ('utf-8' )
780
+
781
+ # Remove characters that are not URL-safe ('+', '/', or '=')
782
+ url_safe_key = re .sub (r'(\+|\/|=)' , '' , base64_encoded )
783
+
784
+ # Truncate the key to the desired length
785
+ return url_safe_key [:request_id_length ]
0 commit comments