Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions audiostats/db/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,9 @@ async def get_all_albums(self):
async with unit_of_work() as uow:
return await uow.albums.all()

async def get_all_albums_w_status(self):
async with self._session_factory as sf:
unit_of_work = UnitOfWork(sf)
async with unit_of_work() as uow:
return await uow.albums.all_w_status()

7 changes: 7 additions & 0 deletions audiostats/handlers/models.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from dataclasses import dataclass, field
from datetime import datetime

from audiostats.domain import Status, Success

@dataclass(slots=True)
class StatusDTO:
status : Status
success : Success
# timestamp : datetime | None = field(default=None, compare=False)

# def __eq__(self, other):
# if not isinstance(other, StatusDTO):
# return NotImplemented
# return self.status == other.status and self.success == other.success

def __repr__(self):
return f'<StatusDTO(status={self.status}, success={self.success})>'
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,4 @@ def processed_album_dtos():
@pytest.fixture()
def processed_album_dtos_w_status(processed_album_dtos):
for i in processed_album_dtos:
i.statuses=[StatusDTO(status=Status.ADDED, success=Success.SUCCESS)]
i.statuses.append(StatusDTO(status=Status.ADDED, success=Success.SUCCESS))
71 changes: 36 additions & 35 deletions tests/test_db_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,63 @@

import pytest
import dotenv
import logging
import copy

from audiostats.db.api import DBApi
from audiostats.handlers import TrackDTO
from audiostats.handlers import TrackDTO, StatusDTO
from audiostats.domain import Status, Success

logger = logging.getLogger(__name__)


@pytest.mark.asyncio
async def test_async_upsert_albums(processed_album_dtos):
#db api initialisation
dotenv.load_dotenv()
db_url = os.getenv('ASYNC_DB_URL')

api = DBApi(db_url)
await api.upsert_albums(processed_album_dtos)

albums = await api.get_all_albums()
#Upserting base album set
logger.info(f'incoming dtos: {processed_album_dtos}')

assert albums.sort(key=lambda x: x.title) == processed_album_dtos.sort(
key=lambda x: x.title), 'Insert some albums to db'
await api.upsert_albums(processed_album_dtos)

processed_album_dtos[0].tracks.pop(-1)
processed_album_dtos[0].tracks.pop(2)
processed_album_dtos[1].tracks.append(
TrackDTO(title='New_Track_1', number=6, path='music/new_track_1.flac', offset=None, duration=None))
processed_album_dtos[1].year = None
albums = await api.get_all_albums_w_status()

await api.upsert_albums(processed_album_dtos)
logger.info(f'db state after first albums upsert: {albums}')

albums = await api.get_all_albums()
target_album_dtos = copy.deepcopy(processed_album_dtos)
for i in target_album_dtos:
i.statuses.append(StatusDTO(status=Status.ADDED, success=Success.SUCCESS))

assert albums.sort(key=lambda x: x.title) == processed_album_dtos.sort(
key=lambda x: x.title), 'Update some albums in db'
assert sorted(albums, key=lambda x: x.title) == sorted(target_album_dtos,
key=lambda x: x.title), 'Insert some albums to db'

#Upserting muted album set
muted_album_dtos = copy.deepcopy(processed_album_dtos)

muted_album_dtos[0].tracks.pop(-1)
muted_album_dtos[0].tracks.pop(2)
muted_album_dtos[1].tracks.append(
TrackDTO(title='New_Track_1', number=6, path='music/new_track_1.flac', offset=None, duration=None))
muted_album_dtos[1].year = None

# @pytest.mark.asyncio
# async def test_upsert_albums(test_session_factory, processed_album_dtos):
# session_factory = await test_session_factory
# api = DBApi(session_factory)
#
# await api.upsert_albums(processed_album_dtos)
#
# albums = await api.get_all_albums()
#
# assert albums.sort(key=lambda x: x.title) == processed_album_dtos.sort(key=lambda x: x.title), 'Insert some albums to db'
#
# processed_album_dtos[0].tracks.pop(-1)
# processed_album_dtos[0].tracks.pop(2)
# processed_album_dtos[1].tracks.append(TrackDTO(title='New_Track_1', number=6, path='music/new_track_1.flac', offset=None, duration=None))
# processed_album_dtos[1].year = None
#
# await api.upsert_albums(processed_album_dtos)
#
# albums = await api.get_all_albums()
#
# assert albums.sort(key=lambda x: x.title) == processed_album_dtos.sort(key=lambda x: x.title), 'Update some albums in db'
logger.info(f'muted album dtos: {muted_album_dtos}')

await api.upsert_albums(muted_album_dtos)

albums = await api.get_all_albums_w_status()

logger.info(f'db state after first albums update: {albums}')

target_muted_album_dtos = copy.deepcopy(muted_album_dtos)
for i in target_muted_album_dtos:
i.statuses.append(StatusDTO(status=Status.ADDED, success=Success.SUCCESS))
target_muted_album_dtos[0].statuses.append(StatusDTO(status=Status.MODIFIED, success=Success.SUCCESS))
target_muted_album_dtos[1].statuses.append(StatusDTO(status=Status.MODIFIED, success=Success.SUCCESS))

assert sorted(albums, key=lambda x: x.title) == sorted(target_muted_album_dtos,
key=lambda x: x.title), 'Update some albums in db'