diff --git a/README.md b/README.md index 11383e1..53bc9fb 100644 --- a/README.md +++ b/README.md @@ -7,8 +7,10 @@ ## Features - Collecting album and track metadata into db +- Tracking album changes - Working with `flac` + `cue` music libraries ## Technologies -- `SqlAlchemy orm` -- `Asyncio` +- `SqlAlchemy orm` for interacting with a db +- `Asyncio` for using async requests via `asyncpg` driver +- `PostgreSQL` as db diff --git a/audiostats/db/api.py b/audiostats/db/api.py index 8d612ae..87efa2d 100644 --- a/audiostats/db/api.py +++ b/audiostats/db/api.py @@ -11,24 +11,36 @@ logger = logging.getLogger(__name__) class DBApi: - def __init__(self, db_url : str): + def __init__(self, db_url : str, workers : int = 5, queue_sz : int = 10): self._session_factory = SessionFactory(db_url) + self._queue = asyncio.Queue(maxsize=queue_sz) + self._num_workers = workers - async def _upsert_album(self, album : AlbumDTO): + async def _album_upserter(self): async with self._session_factory as sf: unit_of_work = UnitOfWork(sf) async with unit_of_work() as uow: - await uow.albums.upsert(album) + while True: + album = await self._queue.get() + try: + if not album: + break + await uow.albums.upsert(album) + finally: + self._queue.task_done() async def upsert_albums(self, albums : Iterator[AlbumDTO]): - batch = [] + workers = [asyncio.create_task(self._album_upserter()) for _ in range(self._num_workers)] + for album in albums: - batch.append(self._upsert_album(album)) - if len(batch) > 10: - await asyncio.gather(*batch) - batch.clear() - if batch: - await asyncio.gather(*batch) + await self._queue.put(album) + + await self._queue.join() + + for _ in range(self._num_workers): + await self._queue.put(None) + + await asyncio.gather(*workers) async def get_all_albums(self): async with self._session_factory as sf: diff --git a/audiostats/db/models.py b/audiostats/db/models.py index fc4a37f..a798ad6 100644 --- a/audiostats/db/models.py +++ b/audiostats/db/models.py @@ -76,7 +76,15 @@ def __str__(self): class AlbumStatus(Base): - """Represents **album_statuses** line as orm object""" + """Represents **album_statuses** table line as orm object + + :ivar id: AlbumStatus id + :ivar album_id: Album id + :ivar time_stamp: timestamp when the entry was added + :ivar status: insertion status, may be `added` or `modified` + :ivar success: processing success, may be `success` or `warning` if there was a warning while process the album cue + :ivar album: Relationship to the parent album + """ __tablename__ = 'album_statuses' diff --git a/audiostats/handlers/models.py b/audiostats/handlers/models.py index ae2cc54..ef324b8 100644 --- a/audiostats/handlers/models.py +++ b/audiostats/handlers/models.py @@ -1,23 +1,17 @@ +from collections import Counter from dataclasses import dataclass, field -from datetime import datetime from audiostats.domain import Status, Success -@dataclass(slots=True) +@dataclass(slots=True, frozen=True) class StatusDTO: status : Status success : Success - # timestamp : datetime | None = field(default=None, compare=False) - - # def __eq__(self, other): - # if not isinstance(other, StatusDTO): - # return NotImplemented - # return self.status == other.status and self.success == other.success def __repr__(self): return f'' -@dataclass(slots=True) +@dataclass(slots=True, frozen=True) class TrackDTO: title : str number : int | None @@ -44,4 +38,17 @@ def __repr__(self): {'\n'.join(['\t' + repr(i) for i in self.statuses])} )>''' + def __eq__(self, other): + if not isinstance(other, AlbumDTO): + return False + return ( + self.title == other.title + and self.performer == other.performer + and self.year == other.year + and self.path == other.path + and self.cover == other.cover + and Counter(self.tracks) == Counter(other.tracks) + and Counter(self.statuses) == Counter(other.statuses) + ) +