From 74da80fc3d7aa2b2434a07684c7ea64eb6b693ba Mon Sep 17 00:00:00 2001 From: Olezhich Date: Tue, 9 Sep 2025 09:45:25 +0300 Subject: [PATCH] added parallel logics to upsert --- audiostats/db/api.py | 16 ++++++-- audiostats/db/models.py | 38 +++++++++++++------ audiostats/db/repositories.py | 18 ++++----- tests/db_fixture.py | 71 +++++------------------------------ 4 files changed, 58 insertions(+), 85 deletions(-) diff --git a/audiostats/db/api.py b/audiostats/db/api.py index a67771f..2b211e7 100644 --- a/audiostats/db/api.py +++ b/audiostats/db/api.py @@ -1,3 +1,4 @@ +import asyncio import logging from collections.abc import Iterator @@ -14,10 +15,19 @@ class DBApi: def __init__(self, session_factory : async_sessionmaker[AsyncSession]): self._session_factory = session_factory - async def upsert_albums(self, albums : Iterator[AlbumDTO]): + async def _upsert_album(self, album : AlbumDTO): async with UnitOfWork(self._session_factory) as uow: - for album in albums: - await uow.albums.upsert(album) + await uow.albums.upsert(album) + + async def upsert_albums(self, albums : Iterator[AlbumDTO]): + batch = [] + for album in albums: + batch.append(self._upsert_album(album)) + if len(batch) > 10: + await asyncio.gather(*batch) + batch.clear() + if batch: + await asyncio.gather(*batch) async def get_all_albums(self): async with UnitOfWork(self._session_factory) as uow: diff --git a/audiostats/db/models.py b/audiostats/db/models.py index f8d53dd..4aaeec1 100644 --- a/audiostats/db/models.py +++ b/audiostats/db/models.py @@ -1,4 +1,6 @@ -from sqlalchemy import String, Integer, UniqueConstraint, ForeignKey, Float +from datetime import datetime + +from sqlalchemy import String, Integer, UniqueConstraint, ForeignKey, Float, DATETIME from sqlalchemy.orm import declarative_base, Mapped, mapped_column, relationship Base = declarative_base() @@ -7,7 +9,7 @@ MAX_STR_FIELD_LEN = 50 class Album(Base): - """Represents database line as orm object + """Represents **albums** table line as orm object Pair ``performer`` - ``title`` should be **unique** @@ -41,7 +43,7 @@ def __str__(self): return f'{self.year} - {self.performer} - {self.title}' class Track(Base): - """Represents database line as orm object + """Represents **tracks** table line as orm object :ivar id: Track id :ivar title: Track title @@ -54,17 +56,31 @@ class Track(Base): """ __tablename__ = 'tracks' - id : Mapped[int] = mapped_column(Integer, primary_key=True) #track id - title : Mapped[str] = mapped_column(String(MAX_STR_FIELD_LEN), nullable=False) #track title - album_id : Mapped[int] = mapped_column(Integer, ForeignKey('albums.id', ondelete='CASCADE'), index=True) #album id - number : Mapped[int | None] = mapped_column(Integer, nullable=True) #track number in album - path : Mapped[str] = mapped_column(String(MAX_PATH_FIELD_LEN), nullable=True) #path to flac file with track - offset : Mapped[float | None] = mapped_column(Float, nullable=True) #offset from the beginning of the file to the beginning of the track - duration : Mapped[float | None] = mapped_column(Float, nullable=True) #track duration + id : Mapped[int] = mapped_column(Integer, primary_key=True) + title : Mapped[str] = mapped_column(String(MAX_STR_FIELD_LEN), nullable=False) + album_id : Mapped[int] = mapped_column(Integer, ForeignKey('albums.id', ondelete='CASCADE'), index=True) + number : Mapped[int | None] = mapped_column(Integer, nullable=True) + path : Mapped[str] = mapped_column(String(MAX_PATH_FIELD_LEN), nullable=True) + offset : Mapped[float | None] = mapped_column(Float, nullable=True) + duration : Mapped[float | None] = mapped_column(Float, nullable=True) album : Mapped["Album"]= relationship('Album', back_populates='tracks') def __repr__(self): return f'' def __str__(self): - return f'{self.number} - {self.title}' \ No newline at end of file + return f'{self.number} - {self.title}' + + +# class AlbumStatus(Base): +# """Represents **album_statuses** line as orm object""" +# +# __tablename__ = 'album_statuses' +# +# id : Mapped[int] = mapped_column(Integer, primary_key=True) +# album_id : Mapped[int] = mapped_column(Integer, ForeignKey('albums.id', ondelete='CASCADE'), index=True) +# time_stamp : Mapped[datetime] = mapped_column(DATETIME, nullable=False) +# status : Mapped[] +# success : Mapped[] + + diff --git a/audiostats/db/repositories.py b/audiostats/db/repositories.py index 04968dd..787a7d4 100644 --- a/audiostats/db/repositories.py +++ b/audiostats/db/repositories.py @@ -1,17 +1,16 @@ -#from sqlalchemy import select -from sqlalchemy import select, delete +import logging from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload -#from sqlalchemy.orm import Session - from .models import Album from audiostats.handlers import AlbumDTO from audiostats.application.dto_mappers import create_album_dto_f_orm, update_album_orm_meta_f_dto, update_track_orm_f_dto, create_track_orm_f_dto +logger = logging.getLogger(__name__) + class AlbumRepository: def __init__(self, session : AsyncSession): self._session = session @@ -37,12 +36,13 @@ async def upsert(self, album_data : AlbumDTO): for track in old_tracks_by_title.values(): await self._session.delete(track) + logger.info(f'Album upserted: {album_data}') + async def find_by_title_performer(self, title : str, performer : str | None) -> Album | None: - result = await self._session.execute(select(Album).where( - Album.title == title, - Album.performer == performer - ).options( - selectinload(Album.tracks) + result = await self._session.execute( + select(Album).where( + Album.title == title, Album.performer == performer).options( + selectinload(Album.tracks) )) return result.scalar_one_or_none() #return self._session.query(Album).filter(Album.title == title and Album.performer == performer if performer else Album.performer.is_(None)).first() diff --git a/tests/db_fixture.py b/tests/db_fixture.py index 6788c8d..c3f15d1 100644 --- a/tests/db_fixture.py +++ b/tests/db_fixture.py @@ -10,52 +10,6 @@ logger = logging.getLogger(__name__) -# @pytest.fixture(scope="session") -# def test_engine(): -# engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) -# Base.metadata.create_all(engine) -# yield engine -# engine.dispose() - -# @pytest.fixture(scope="session") -# def test_sessionmaker(test_engine): -# sessionfactory = sessionmaker(bind=test_engine) -# logger.debug(f'session type: {type(sessionfactory)}') -# return sessionfactory - -# @pytest.fixture(scope="function") -# def event_loop(): -# loop = asyncio.new_event_loop() -# asyncio.set_event_loop(loop) -# yield loop -# loop.close() -# -# @pytest.fixture(scope='function') -# async def test_engine(): -# engine: AsyncEngine = create_async_engine( -# "sqlite+aiosqlite:///:memory:", -# connect_args={"check_same_thread": False}, -# echo=False, -# ) -# async with engine.begin() as conn: -# await conn.run_sync(Base.metadata.create_all) -# yield engine -# await engine.dispose() -# -# @pytest.fixture(scope='function') -# async def test_sessionmaker(test_engine): -# factory = async_sessionmaker( -# bind=test_engine, -# class_=AsyncSession, -# expire_on_commit=False, -# autoflush=False, -# future=True, -# ) -# print("Type of test_engine:", -# type(test_engine)) # ← Должно быть: -# print("test_engine:", test_engine) -# return factory - @pytest.fixture(scope="session") def event_loop(): """Создаем event loop для тестов""" @@ -72,28 +26,14 @@ def test_engine(): connect_args={"check_same_thread": False}, echo=False, ) - yield engine - engine.sync_engine.dispose() - -# @pytest.fixture(scope="session", autouse=True) -# async def setup_database(test_engine): -# """Создание и удаление таблиц""" -# async with test_engine.begin() as conn: -# await conn.run_sync(Base.metadata.create_all) -# yield -# async with test_engine.begin() as conn: -# await conn.run_sync(Base.metadata.drop_all) + return engine @pytest.fixture(scope="session") def test_session_factory(test_engine): - """Создает async_sessionmaker который ожидает UnitOfWork""" - async def setup(): async with test_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) - # Запускаем setup - import asyncio asyncio.run(setup()) return async_sessionmaker( @@ -101,4 +41,11 @@ async def setup(): class_=AsyncSession, expire_on_commit=False, autoflush=False - ) \ No newline at end of file + ) + +@pytest.fixture(scope="session", autouse=True) +def cleanup(test_engine): + yield + async def dispose(): + await test_engine.dispose() + asyncio.run(dispose()) \ No newline at end of file