55import json
66import logging
77import sys
8+ from collections import deque
89from pathlib import Path
9- from typing import Awaitable , List , Optional , Sequence , TypeVar
10+ from typing import Awaitable , Iterable , List , Optional , Sequence , TypeVar
1011
1112import click
1213import httpx
4344_imdb_cache : IMDbCache | None = None
4445_imdb_max_retries : int = 3
4546_imdb_backoff : float = 1.0
46- _imdb_retry_queue : asyncio . Queue [ str ] | None = None
47+ _imdb_retry_queue : "_IMDbRetryQueue" | None = None
4748_imdb_batch_limit : int = 5
4849_qdrant_batch_size : int = 1000
4950
51+
52+ class _IMDbRetryQueue (asyncio .Queue [str ]):
53+ """Queue that tracks items in a deque for safe serialization."""
54+
55+ def __init__ (self , initial : Iterable [str ] | None = None ):
56+ super ().__init__ ()
57+ self ._items : deque [str ] = deque ()
58+ if initial :
59+ for imdb_id in initial :
60+ imdb_id_str = str (imdb_id )
61+ super ().put_nowait (imdb_id_str )
62+ self ._items .append (imdb_id_str )
63+
64+ def put_nowait (self , item : str ) -> None : # type: ignore[override]
65+ super ().put_nowait (item )
66+ self ._items .append (item )
67+
68+ def get_nowait (self ) -> str : # type: ignore[override]
69+ if not self ._items :
70+ raise RuntimeError ("Desynchronization: Queue is not empty but self._items is empty." )
71+ try :
72+ item = super ().get_nowait ()
73+ except asyncio .QueueEmpty :
74+ raise RuntimeError ("Desynchronization: self._items is not empty but asyncio.Queue is empty." )
75+ self ._items .popleft ()
76+ return item
77+
78+ def snapshot (self ) -> list [str ]:
79+ """Return a list of the current queue contents."""
80+
81+ return list (self ._items )
82+
5083# Known Qdrant-managed dense embedding models with their dimensionality and
5184# similarity metric. To support a new server-side embedding model, add an entry
5285# here with the appropriate vector size and `models.Distance` value.
@@ -182,14 +215,20 @@ def _load_imdb_retry_queue(path: Path) -> None:
182215 """Populate the retry queue from a JSON file if it exists."""
183216
184217 global _imdb_retry_queue
185- _imdb_retry_queue = asyncio . Queue ()
218+ ids : list [ str ] = []
186219 if path .exists ():
187220 try :
188- ids = json .loads (path .read_text ())
189- for imdb_id in ids :
190- _imdb_retry_queue .put_nowait (str (imdb_id ))
221+ data = json .loads (path .read_text ())
222+ if isinstance (data , list ):
223+ ids = [str (imdb_id ) for imdb_id in data ]
224+ else :
225+ logger .warning (
226+ "IMDb retry queue file %s did not contain a list; ignoring its contents" ,
227+ path ,
228+ )
191229 except Exception :
192230 logger .exception ("Failed to load IMDb retry queue from %s" , path )
231+ _imdb_retry_queue = _IMDbRetryQueue (ids )
193232
194233
195234async def _process_imdb_retry_queue (client : httpx .AsyncClient ) -> None :
@@ -210,8 +249,7 @@ def _persist_imdb_retry_queue(path: Path) -> None:
210249
211250 if _imdb_retry_queue is None :
212251 return
213- ids = list (_imdb_retry_queue ._queue ) # type: ignore[attr-defined]
214- path .write_text (json .dumps (ids ))
252+ path .write_text (json .dumps (_imdb_retry_queue .snapshot ()))
215253
216254
217255async def _upsert_in_batches (
@@ -598,7 +636,7 @@ async def run(
598636 async with httpx .AsyncClient (timeout = 30 ) as client :
599637 await _process_imdb_retry_queue (client )
600638 else :
601- _imdb_retry_queue = asyncio . Queue ()
639+ _imdb_retry_queue = _IMDbRetryQueue ()
602640
603641 items : List [AggregatedItem ]
604642 if sample_dir is not None :
0 commit comments