Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,4 @@ test.db

# dump of scripts
scripts/*csv
*.zim
12 changes: 12 additions & 0 deletions backend/src/cms_backend/db/book.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any
from uuid import UUID

from sqlalchemy import select
from sqlalchemy.orm import Session as OrmSession

from cms_backend.db.models import Book, BookLocation, WarehousePath, ZimfarmNotification
Expand Down Expand Up @@ -97,3 +98,14 @@ def create_book_location(
)

return location


def get_next_book_to_move_files_or_none(
session: OrmSession,
) -> Book | None:
return session.scalars(
select(Book)
.where(Book.status == "pending_move")
.order_by(Book.created_at)
.limit(1)
).one_or_none()
21 changes: 21 additions & 0 deletions backend/src/cms_backend/db/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime
from ipaddress import IPv4Address
from pathlib import Path
from typing import Any, Optional
from uuid import UUID

Expand Down Expand Up @@ -143,6 +144,12 @@ class Book(Base):
postgresql_where=text("status = 'errored'"),
)

Index(
"idx_book_status_pending_move",
Book.status,
postgresql_where=text("status = 'pending_move'"),
)


class Title(Base):
__tablename__ = "title"
Expand Down Expand Up @@ -225,3 +232,17 @@ class BookLocation(Base):

book: Mapped["Book"] = relationship(back_populates="locations", init=False)
warehouse_path: Mapped["WarehousePath"] = relationship(init=False)

def full_local_path(self, warehouse_local_folders_map: dict[UUID, str]) -> Path:
folder_in_warehouse = Path(self.warehouse_path.folder_name) / self.filename
warehouse_folder = Path(
warehouse_local_folders_map[self.warehouse_path.warehouse.id]
)
return warehouse_folder / folder_in_warehouse

@property
def full_str(self) -> str:
return (
f"{self.warehouse_path.warehouse.name}:"
f"{self.warehouse_path.folder_name}/{self.filename}"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Add partial index for pending_move book status

Revision ID: add_pending_move_index
Revises: add_book_location_table
Create Date: 2025-11-13 00:00:00.000000

"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "add_pending_move_index"
down_revision = "add_book_location_table"
branch_labels = None
depends_on = None


def upgrade():
# Create partial index for pending_move status
op.create_index(
"idx_book_status_pending_move",
"book",
["status"],
postgresql_where="status = 'pending_move'",
)


def downgrade():
# Drop the partial index
op.drop_index("idx_book_status_pending_move", table_name="book")
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
def process_zimfarm_notifications(session: OrmSession):
logger.info("Processing Zimfarm notifications")
nb_notifications_processed = 0
raise Exception("foo")
while True:
with session.begin_nested():
notification = get_next_notification_to_process_or_none(session)
Expand Down
3 changes: 3 additions & 0 deletions backend/src/cms_backend/processors/book.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def create_book_target_locations(
f"{getnow()}: book already at all target locations, skipping target "
"creation"
)
book.status = "published"
return

# Create target locations for each applicable warehouse path
Expand All @@ -168,3 +169,5 @@ def create_book_target_locations(
filename=target_filename,
status="target",
)

book.status = "pending_move"
1 change: 0 additions & 1 deletion backend/src/cms_backend/processors/title.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def add_book_to_title(session: OrmSession, book: Book, title: Title):
title.books.append(book)
book.events.append(f"{getnow()}: book added to title {title.id}")
title.events.append(f"{getnow()}: book {book.id} added to title")
book.status = "processed"

if title.name != book.name:
title.events.append(f"{getnow()}: updating title name to {book.name}")
Expand Down
30 changes: 22 additions & 8 deletions backend/src/cms_backend/shuttle/context.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
import dataclasses
import os
from dataclasses import dataclass
from datetime import timedelta
from typing import TypeVar
from typing import ClassVar
from uuid import UUID

from humanfriendly import parse_timespan

T = TypeVar("T")
WarehouseId = str
LocalWarehousePath = str


@dataclasses.dataclass(kw_only=True)
def _parse_local_warehouse_paths() -> dict[UUID, str]:
env_value = os.getenv("LOCAL_WAREHOUSE_PATHS", default="")
if not env_value:
return {}
return {
UUID(warehouse_id): local_path
for item in env_value.split(",")
if item
for (warehouse_id, local_path) in [item.split(":", 1)]
}


@dataclass(kw_only=True)
class Context:
"""Class holding every contextual / configuration bits which can be moved

Expand All @@ -20,8 +34,8 @@ class Context:
os.getenv("PAUSE_IN_THE_LOOP", default="10s")
)

process_zimfarm_notifications_interval: timedelta = timedelta(
seconds=parse_timespan(
os.getenv("PROCESS_ZIMFARM_NOTIFICATIONS_INTERVAL", default="1m")
)
move_files_interval: timedelta = timedelta(
seconds=parse_timespan(os.getenv("MOVE_FILES_INTERVAL", default="1m"))
)

local_warehouse_paths: ClassVar[dict[UUID, str]] = _parse_local_warehouse_paths()
8 changes: 7 additions & 1 deletion backend/src/cms_backend/shuttle/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@
from cms_backend.context import Context
from cms_backend.db import Session
from cms_backend.shuttle.context import Context as ShuttleContext
from cms_backend.shuttle.move_files import move_files
from cms_backend.utils.database import upgrade_db_schema
from cms_backend.utils.datetime import getnow
from cms_backend.utils.task_config import TaskConfig

# Configure background tasks with their execution intervals
tasks: list[TaskConfig] = []
tasks: list[TaskConfig] = [
TaskConfig(
func=move_files,
interval=ShuttleContext.move_files_interval,
),
]


def main():
Expand Down
132 changes: 132 additions & 0 deletions backend/src/cms_backend/shuttle/move_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import shutil

from sqlalchemy.orm import Session as OrmSession

from cms_backend import logger
from cms_backend.db.book import get_next_book_to_move_files_or_none
from cms_backend.db.models import Book, BookLocation
from cms_backend.shuttle.context import Context as ShuttleContext
from cms_backend.utils.datetime import getnow


def move_files(session: OrmSession):
logger.info("Moving ZIM files")
nb_zim_files_moved = 0
while True:
with session.begin_nested():
book = get_next_book_to_move_files_or_none(session)
if not book:
break
logger.debug(f"Processing ZIM file of book {book.id}")
move_book_files(session, book)
nb_zim_files_moved += 1

logger.info(f"Done moving {nb_zim_files_moved} ZIM files")


def move_book_files(session: OrmSession, book: Book):
inaccessible_warehouse_names = {
loc.warehouse_path.warehouse.name
for loc in book.locations
if loc.warehouse_path.warehouse_id
not in ShuttleContext.local_warehouse_paths.keys()
}

# if any warehouse is not accessible, we do not proceed (complex scenarii not yet
# implemented)
if len(inaccessible_warehouse_names) > 0:
logger.debug(
f"Ignoring book {book.id}, no access to "
f"{','.join(inaccessible_warehouse_names)} warehouses"
)
return

current_locations: list[BookLocation] = [
loc for loc in book.locations if loc.status == "current"
]

target_locations: list[BookLocation] = [
loc for loc in book.locations if loc.status == "target"
]

if len(current_locations) == 0:
book.events.append(
f"{getnow()}: error encountered while moving files, no current location"
)
book.status = "errored"
return

if len(target_locations) == 0:
book.events.append(
f"{getnow()}: ignoring move files operation, no target location set"
)
book.status = "published"
return

# start with copies
while len(target_locations) > len(current_locations):
current_location = current_locations[0]
target_location = target_locations[0]

current_path = current_location.full_local_path(
ShuttleContext.local_warehouse_paths
)
target_path = target_location.full_local_path(
ShuttleContext.local_warehouse_paths
)

target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(current_path, target_path)
logger.debug(f"Copied book {book.id} from {current_path} to {target_path}")
book.events.append(
f"{getnow()}: copied book from {current_location.full_str} to "
f"{target_location.full_str}"
)
target_locations.remove(target_location)
target_location.status = "current"

# continue with moves
while len(current_locations) > 0 and len(target_locations) > 0:
current_location = current_locations[0]
target_location = target_locations[0]

current_path = current_location.full_local_path(
ShuttleContext.local_warehouse_paths
)
target_path = target_location.full_local_path(
ShuttleContext.local_warehouse_paths
)

target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.move(current_path, target_path)
logger.debug(f"Moved book {book.id} from {current_path} to {target_path}")
book.events.append(
f"{getnow()}: moved book from {current_location.full_str} to "
f"{target_location.full_str}"
)
current_locations.remove(current_location)
target_locations.remove(target_location)
book.locations.remove(current_location)
session.delete(current_location)
session.flush()
target_location.status = "current"

# cleanup phase: delete extra current locations
while len(current_locations) > 0:
current_location = current_locations[0]
current_path = current_location.full_local_path(
ShuttleContext.local_warehouse_paths
)

current_path.unlink(missing_ok=True)
logger.debug(
f"Deleted extra current location for book {book.id} at {current_path}"
)
book.events.append(
f"{getnow()}: deleted old location {current_location.full_str}"
)
current_locations.remove(current_location)
book.locations.remove(current_location)
session.delete(current_location)

book.status = "published"
12 changes: 12 additions & 0 deletions backend/tests/processors/test_book_location_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,9 @@ def test_no_target_when_current_matches_single_path(
"book already at all target locations" in event for event in book.events
)

# Book should be marked as published (no move needed)
assert book.status == "published"

def test_no_target_when_current_matches_multiple_paths(
self,
dbsession: OrmSession,
Expand Down Expand Up @@ -522,6 +525,9 @@ def test_no_target_when_current_matches_multiple_paths(
"book already at all target locations" in event for event in book.events
)

# Book should be marked as published (no move needed)
assert book.status == "published"

def test_target_created_when_partial_match(
self,
dbsession: OrmSession,
Expand Down Expand Up @@ -574,6 +580,9 @@ def test_target_created_when_partial_match(
"book already at all target locations" in event for event in book.events
)

# Book should be marked as pending_move (needs file movement)
assert book.status == "pending_move"

def test_target_created_when_filename_differs(
self,
dbsession: OrmSession,
Expand Down Expand Up @@ -619,3 +628,6 @@ def test_target_created_when_filename_differs(
assert not any(
"book already at all target locations" in event for event in book.events
)

# Book should be marked as pending_move (needs file movement)
assert book.status == "pending_move"
4 changes: 2 additions & 2 deletions backend/tests/processors/test_zimfarm_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_process_notification_success(
assert notification.book is not None
assert notification.book.title == title
assert notification.book.title_id == title.id
assert notification.book.status == "processed"
assert notification.book.status == "pending_move"
assert any(
event
for event in notification.events
Expand Down Expand Up @@ -421,7 +421,7 @@ def test_process_notification_with_existing_books(

assert notification.book is not None
assert notification.book.title == title
assert notification.book.status == "processed"
assert notification.book.status == "pending_move"
assert len(title.books) == 2
assert existing_book in title.books
assert notification.book in title.books
Expand Down
1 change: 1 addition & 0 deletions backend/tests/shuttle/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Tests for the shuttle module."""
Loading