Skip to content

Commit 0886058

Browse files
authored
Increase unit test coverage to 95% (#147)
* test: increase coverage to ninety five percent * build: apply ruff formatting
1 parent 89f6a1b commit 0886058

39 files changed

+755
-239
lines changed

docker/pyproject.deps.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "mcp-plex"
3-
version = "2.0.7"
3+
version = "2.0.9"
44
requires-python = ">=3.11,<3.13"
55
dependencies = [
66
"fastmcp>=2.11.2",

mcp_plex/common/cache.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""In-memory LRU cache for media payload and artwork data."""
2+
23
from __future__ import annotations
34

45
from collections import OrderedDict

mcp_plex/common/types.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Type definitions for Plex metadata and external services."""
2+
23
from __future__ import annotations
34

45
from dataclasses import dataclass
@@ -221,6 +222,7 @@ class ExternalIDs:
221222
imdb: Optional[str] = None
222223
tmdb: Optional[str] = None
223224

225+
224226
__all__ = [
225227
"IMDbRating",
226228
"IMDbTitle",
@@ -244,4 +246,3 @@ class ExternalIDs:
244246
JSONValue: TypeAlias = JSONScalar | Sequence["JSONValue"] | Mapping[str, "JSONValue"]
245247
JSONMapping: TypeAlias = Mapping[str, JSONValue]
246248
MutableJSONMapping: TypeAlias = MutableMapping[str, JSONValue]
247-

mcp_plex/loader/__init__.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Loader orchestration utilities and staged pipeline helpers."""
2+
23
from __future__ import annotations
34

45
import asyncio
@@ -183,7 +184,9 @@ def _build_loader_orchestrator(
183184
max_concurrent_upserts: int,
184185
imdb_config: IMDbRuntimeConfig,
185186
qdrant_config: QdrantRuntimeConfig,
186-
) -> tuple[LoaderOrchestrator, list[AggregatedItem], asyncio.Queue[list[models.PointStruct]]]:
187+
) -> tuple[
188+
LoaderOrchestrator, list[AggregatedItem], asyncio.Queue[list[models.PointStruct]]
189+
]:
187190
"""Wire the staged loader pipeline and return the orchestrator helpers."""
188191

189192
from .pipeline.ingestion import IngestionStage
@@ -206,8 +209,7 @@ async def _upsert_aggregated(
206209
return
207210
items.extend(batch)
208211
points = [
209-
build_point(item, dense_model_name, sparse_model_name)
210-
for item in batch
212+
build_point(item, dense_model_name, sparse_model_name) for item in batch
211213
]
212214
for point_chunk in chunk_sequence(points, upsert_buffer_size):
213215
await _upsert_in_batches(
@@ -332,7 +334,9 @@ async def run(
332334
require_positive(max_concurrent_upserts, name="max_concurrent_upserts")
333335
require_positive(qdrant_retry_attempts, name="qdrant_retry_attempts")
334336

335-
imdb_retry_queue = _load_imdb_retry_queue(imdb_queue_path) if imdb_queue_path else IMDbRetryQueue()
337+
imdb_retry_queue = (
338+
_load_imdb_retry_queue(imdb_queue_path) if imdb_queue_path else IMDbRetryQueue()
339+
)
336340
imdb_config = IMDbRuntimeConfig(
337341
cache=imdb_cache,
338342
max_retries=imdb_max_retries,
@@ -443,7 +447,9 @@ async def run(
443447
if imdb_queue_path:
444448
_persist_imdb_retry_queue(imdb_queue_path, imdb_config.retry_queue)
445449

446-
json.dump([item.model_dump(mode="json") for item in items], fp=sys.stdout, indent=2)
450+
json.dump(
451+
[item.model_dump(mode="json") for item in items], fp=sys.stdout, indent=2
452+
)
447453
sys.stdout.write("\n")
448454
finally:
449455
await client.close()
@@ -483,9 +489,7 @@ async def load_media(
483489
"""Orchestrate one or more runs of :func:`run`."""
484490

485491
if delay < 0:
486-
raise ValueError(
487-
f"Delay between runs must be non-negative; received {delay!r}"
488-
)
492+
raise ValueError(f"Delay between runs must be non-negative; received {delay!r}")
489493

490494
while True:
491495
await run(
@@ -521,4 +525,3 @@ async def load_media(
521525
break
522526

523527
await asyncio.sleep(delay)
524-

mcp_plex/loader/__main__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Module entrypoint for ``python -m mcp_plex.loader``."""
2+
23
from __future__ import annotations
34

45
from .cli import main

mcp_plex/loader/cli.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Command-line interface for the loader pipeline."""
2+
23
from __future__ import annotations
34

45
import asyncio
@@ -159,7 +160,10 @@
159160
"--log-level",
160161
envvar="LOG_LEVEL",
161162
show_envvar=True,
162-
type=click.Choice(["critical", "error", "warning", "info", "debug", "notset"], case_sensitive=False),
163+
type=click.Choice(
164+
["critical", "error", "warning", "info", "debug", "notset"],
165+
case_sensitive=False,
166+
),
163167
default="info",
164168
show_default=True,
165169
help="Logging level for console output",

mcp_plex/loader/pipeline/channels.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
constants. The loader still emits ``None`` as a completion token for
66
compatibility while downstream components migrate to sentinel-only signaling.
77
"""
8+
89
from __future__ import annotations
910

1011
import asyncio
@@ -74,9 +75,7 @@ class SampleBatch:
7475
IngestBatch = MovieBatch | EpisodeBatch | SampleBatch
7576

7677
IngestQueueItem: TypeAlias = IngestBatch | None | IngestSentinel
77-
PersistenceQueueItem: TypeAlias = (
78-
PersistencePayload | None | PersistenceSentinel
79-
)
78+
PersistenceQueueItem: TypeAlias = PersistencePayload | None | PersistenceSentinel
8079

8180
IngestQueue: TypeAlias = asyncio.Queue[IngestQueueItem]
8281
PersistenceQueue: TypeAlias = asyncio.Queue[PersistenceQueueItem]

mcp_plex/loader/pipeline/enrichment.py

Lines changed: 16 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,9 @@ async def get(
6767
*,
6868
params: Mapping[str, str] | None = None,
6969
headers: Mapping[str, str] | None = None,
70-
) -> httpx.Response:
71-
...
70+
) -> httpx.Response: ...
7271

73-
async def aclose(self) -> None:
74-
...
72+
async def aclose(self) -> None: ...
7573

7674

7775
HTTPClientResource = (
@@ -80,9 +78,7 @@ async def aclose(self) -> None:
8078
| AbstractContextManager[AsyncHTTPClient]
8179
)
8280

83-
HTTPClientFactory = Callable[
84-
[], HTTPClientResource | Awaitable[HTTPClientResource]
85-
]
81+
HTTPClientFactory = Callable[[], HTTPClientResource | Awaitable[HTTPClientResource]]
8682

8783

8884
def _extract_external_ids(item: PlexPartialObject) -> ExternalIDs:
@@ -207,9 +203,7 @@ async def _fetch_tmdb_episode(
207203
) -> TMDBEpisode | None:
208204
"""Fetch TMDb data for a TV episode."""
209205

210-
url = (
211-
f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_number}/episode/{episode_number}"
212-
)
206+
url = f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_number}/episode/{episode_number}"
213207
try:
214208
resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"})
215209
except httpx.HTTPError:
@@ -245,9 +239,7 @@ async def _fetch_tmdb_episode_chunk(
245239
url, params=params, headers={"Authorization": f"Bearer {api_key}"}
246240
)
247241
except httpx.HTTPError:
248-
LOGGER.exception(
249-
"HTTP error fetching TMDb episode chunk for show %s", show_id
250-
)
242+
LOGGER.exception("HTTP error fetching TMDb episode chunk for show %s", show_id)
251243
return {}
252244
if not resp.is_success:
253245
return {}
@@ -364,9 +356,7 @@ def __init__(
364356
int(imdb_batch_limit), name="imdb_batch_limit"
365357
)
366358
if imdb_requests_per_window is not None and imdb_requests_per_window <= 0:
367-
raise ValueError(
368-
"imdb_requests_per_window must be positive when provided"
369-
)
359+
raise ValueError("imdb_requests_per_window must be positive when provided")
370360
if imdb_window_seconds <= 0:
371361
raise ValueError("imdb_window_seconds must be positive")
372362
self._imdb_throttle = _RequestThrottler(
@@ -416,9 +406,7 @@ async def run(self) -> None:
416406
got_item = True
417407
try:
418408
if batch is None:
419-
self._logger.debug(
420-
"Received legacy completion token; ignoring."
421-
)
409+
self._logger.debug("Received legacy completion token; ignoring.")
422410
continue
423411

424412
if batch is INGEST_DONE:
@@ -450,9 +438,7 @@ async def run(self) -> None:
450438
)
451439
await self._handle_sample_batch(batch)
452440
else: # pragma: no cover - defensive logging for future types
453-
self._logger.warning(
454-
"Received unsupported batch type: %r", batch
455-
)
441+
self._logger.warning("Received unsupported batch type: %r", batch)
456442
finally:
457443
if got_item:
458444
self._ingest_queue.task_done()
@@ -526,7 +512,9 @@ async def _acquire_http_client(self) -> AsyncIterator[AsyncHTTPClient]:
526512
return
527513

528514
if hasattr(resource, "__aenter__") and hasattr(resource, "__aexit__"):
529-
async with cast(AbstractAsyncContextManager[AsyncHTTPClient], resource) as client:
515+
async with cast(
516+
AbstractAsyncContextManager[AsyncHTTPClient], resource
517+
) as client:
530518
yield client
531519
return
532520

@@ -592,9 +580,7 @@ async def _enrich_movies(
592580
if not ids.tmdb:
593581
continue
594582
tmdb_tasks.append(
595-
asyncio.create_task(
596-
_fetch_tmdb_movie(client, ids.tmdb, api_key)
597-
)
583+
asyncio.create_task(_fetch_tmdb_movie(client, ids.tmdb, api_key))
598584
)
599585

600586
imdb_map: dict[str, IMDbTitle | None] = {}
@@ -604,8 +590,7 @@ async def _enrich_movies(
604590
combined_results = await asyncio.gather(imdb_future, *tmdb_tasks)
605591
imdb_map = cast(dict[str, IMDbTitle | None], combined_results[0])
606592
tmdb_results = [
607-
cast(TMDBMovie | None, result)
608-
for result in combined_results[1:]
593+
cast(TMDBMovie | None, result) for result in combined_results[1:]
609594
]
610595
retry_snapshot = set(self._imdb_retry_queue.snapshot())
611596
elif tmdb_tasks:
@@ -686,9 +671,7 @@ async def _enrich_episodes(
686671
retry_snapshot: set[str] = set()
687672
tmdb_results: list[TMDBEpisode | None] = [None] * len(episodes)
688673
if imdb_future and tmdb_future:
689-
imdb_map, tmdb_results = await asyncio.gather(
690-
imdb_future, tmdb_future
691-
)
674+
imdb_map, tmdb_results = await asyncio.gather(imdb_future, tmdb_future)
692675
retry_snapshot = set(self._imdb_retry_queue.snapshot())
693676
elif imdb_future:
694677
imdb_map = await imdb_future
@@ -723,9 +706,7 @@ async def _get_tmdb_show(
723706
tmdb_id_str = str(tmdb_id)
724707
if tmdb_id_str in self._show_tmdb_cache:
725708
return self._show_tmdb_cache[tmdb_id_str]
726-
show = await _fetch_tmdb_show(
727-
client, tmdb_id_str, self._tmdb_api_key or ""
728-
)
709+
show = await _fetch_tmdb_show(client, tmdb_id_str, self._tmdb_api_key or "")
729710
self._show_tmdb_cache[tmdb_id_str] = show
730711
return show
731712

@@ -972,9 +953,7 @@ async def _fetch_imdb_batch(
972953
try:
973954
response = await client.get(url, params=params)
974955
except httpx.HTTPError:
975-
LOGGER.exception(
976-
"HTTP error fetching IMDb IDs %s", ",".join(chunk)
977-
)
956+
LOGGER.exception("HTTP error fetching IMDb IDs %s", ",".join(chunk))
978957
for imdb_id in chunk:
979958
results[imdb_id] = None
980959
break

mcp_plex/loader/pipeline/ingestion.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
having the stage skeleton in place allows other components to depend on the
66
interface.
77
"""
8+
89
from __future__ import annotations
910

1011
import asyncio
@@ -80,7 +81,9 @@ async def run(self) -> None:
8081
else:
8182
await self._run_plex_ingestion()
8283

83-
self._logger.debug("Publishing ingestion completion sentinels to downstream stages.")
84+
self._logger.debug(
85+
"Publishing ingestion completion sentinels to downstream stages."
86+
)
8487
await enqueue_nowait(self._output_queue, None)
8588
await enqueue_nowait(self._output_queue, self._completion_sentinel)
8689
self._logger.info(
@@ -346,19 +349,15 @@ def _fetch_shows(start: int) -> Sequence[Show]:
346349
episode_total,
347350
)
348351

349-
async def _enqueue_sample_batches(
350-
self, items: Sequence[AggregatedItem]
351-
) -> None:
352+
async def _enqueue_sample_batches(self, items: Sequence[AggregatedItem]) -> None:
352353
"""Place sample items onto the ingest queue in configured batch sizes."""
353354

354355
for chunk in chunk_sequence(items, self._sample_batch_size):
355356
batch_items = list(chunk)
356357
if not batch_items:
357358
continue
358359

359-
await enqueue_nowait(
360-
self._output_queue, SampleBatch(items=batch_items)
361-
)
360+
await enqueue_nowait(self._output_queue, SampleBatch(items=batch_items))
362361
self._items_ingested += len(batch_items)
363362
self._batches_ingested += 1
364363
self._logger.debug(

mcp_plex/loader/pipeline/orchestrator.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,15 @@ def __init__(self, spec: _StageSpec, error: BaseException) -> None:
3131

3232

3333
class IngestionStageProtocol(Protocol):
34-
async def run(self) -> None:
35-
...
34+
async def run(self) -> None: ...
3635

3736

3837
class EnrichmentStageProtocol(Protocol):
39-
async def run(self) -> None:
40-
...
38+
async def run(self) -> None: ...
4139

4240

4341
class PersistenceStageProtocol(Protocol):
44-
async def run(self, worker_id: int) -> None:
45-
...
42+
async def run(self, worker_id: int) -> None: ...
4643

4744

4845
class LoaderOrchestrator:
@@ -127,9 +124,7 @@ async def _run_stage(
127124
self._logger.debug("%s cancelled.", stage_name)
128125
raise
129126
except BaseException as exc:
130-
self._logger.debug(
131-
"%s raised %s", stage_name, exc, exc_info=exc
132-
)
127+
self._logger.debug("%s raised %s", stage_name, exc, exc_info=exc)
133128
raise _StageFailure(spec, exc) from exc
134129
else:
135130
self._logger.info("%s completed successfully.", stage_name)

0 commit comments

Comments
 (0)