Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cg/cli/post_process/post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from cg.cli.utils import CLICK_CONTEXT_SETTINGS
from cg.constants.cli_options import DRY_RUN
from cg.models.cg_config import CGConfig
from cg.services.run_devices.abstract_classes import PostProcessingService
from cg.services.run_devices.protocols import PostProcessingService

LOG = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion cg/cli/post_process/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

from cg.exc import CgError
from cg.models.cg_config import CGConfig
from cg.services.run_devices.abstract_classes import PostProcessingService
from cg.services.run_devices.pacbio.post_processing_service import PacBioPostProcessingService
from cg.services.run_devices.protocols import PostProcessingService
from cg.services.run_devices.run_names.service import RunNamesService
from cg.utils.mapping import get_item_by_pattern_in_source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from datetime import datetime

from cg.exc import PacbioSequencingRunAlreadyExistsError
from cg.services.run_devices.abstract_classes import PostProcessingStoreService
from cg.services.run_devices.error_handler import handle_post_processing_errors
from cg.services.run_devices.exc import (
PostProcessingDataTransferError,
Expand All @@ -21,13 +20,14 @@
PacBioSMRTCellMetricsDTO,
)
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData
from cg.services.run_devices.protocols import PostProcessingStoreService
from cg.store.models import PacbioSMRTCell, PacbioSMRTCellMetrics
from cg.store.store import Store

LOG = logging.getLogger(__name__)


class PacBioStoreService(PostProcessingStoreService):
class PacBioStoreService(PostProcessingStoreService[PacBioRunData]):
def __init__(self, store: Store, data_transfer_service: PacBioDataTransferService):
self.store = store
self.data_transfer_service = data_transfer_service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from pydantic import ValidationError

from cg.services.run_devices.abstract_classes import PostProcessingDataTransferService
from cg.services.run_devices.error_handler import handle_post_processing_errors
from cg.services.run_devices.exc import (
PostProcessingDataTransferError,
Expand All @@ -26,11 +25,12 @@
from cg.services.run_devices.pacbio.metrics_parser.metrics_parser import PacBioMetricsParser
from cg.services.run_devices.pacbio.metrics_parser.models import PacBioMetrics
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData
from cg.services.run_devices.protocols import PostProcessingDataTransferService

LOG = logging.getLogger(__name__)


class PacBioDataTransferService(PostProcessingDataTransferService):
class PacBioDataTransferService(PostProcessingDataTransferService[PacBioRunData, PacBioDTOs]):
def __init__(self, metrics_service: PacBioMetricsParser):
self.metrics_service: PacBioMetricsParser = metrics_service

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
PacBioHousekeeperTags,
file_pattern_to_bundle_type,
)
from cg.services.run_devices.abstract_classes import PostProcessingHKService
from cg.services.run_devices.error_handler import handle_post_processing_errors
from cg.services.run_devices.exc import (
PostProcessingParsingError,
Expand All @@ -24,12 +23,13 @@
from cg.services.run_devices.pacbio.metrics_parser.models import PacBioMetrics
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData
from cg.services.run_devices.pacbio.run_file_manager.run_file_manager import PacBioRunFileManager
from cg.services.run_devices.protocols import PostProcessingHKService
from cg.utils.mapping import get_item_by_pattern_in_source

LOG = logging.getLogger(__name__)


class PacBioHousekeeperService(PostProcessingHKService):
class PacBioHousekeeperService(PostProcessingHKService[PacBioRunData]):

def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from pydantic import ValidationError

from cg.constants.pacbio import PacBioDirsAndFiles
from cg.services.run_devices.abstract_classes import PostProcessingMetricsParser
from cg.services.run_devices.error_handler import handle_post_processing_errors
from cg.services.run_devices.exc import (
PostProcessingParsingError,
Expand All @@ -28,11 +27,12 @@
)
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData
from cg.services.run_devices.pacbio.run_file_manager.run_file_manager import PacBioRunFileManager
from cg.services.run_devices.protocols import PostProcessingMetricsParser

LOG = logging.getLogger(__name__)


class PacBioMetricsParser(PostProcessingMetricsParser):
class PacBioMetricsParser(PostProcessingMetricsParser[PacBioRunData, PacBioMetrics]):
"""Class for parsing PacBio sequencing metrics."""

def __init__(self, file_manager: PacBioRunFileManager):
Expand Down
4 changes: 2 additions & 2 deletions cg/services/run_devices/pacbio/post_processing_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
from pathlib import Path

from cg.services.run_devices.abstract_classes import PostProcessingService
from cg.services.run_devices.constants import POST_PROCESSING_COMPLETED
from cg.services.run_devices.error_handler import handle_post_processing_errors
from cg.services.run_devices.exc import (
Expand All @@ -22,11 +21,12 @@
)
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData
from cg.services.run_devices.pacbio.run_validator.pacbio_run_validator import PacBioRunValidator
from cg.services.run_devices.protocols import PostProcessingService

LOG = logging.getLogger(__name__)


class PacBioPostProcessingService(PostProcessingService):
class PacBioPostProcessingService(PostProcessingService[PacBioRunData]):
"""Service for handling post-processing of PacBio sequencing runs."""

def __init__(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from pathlib import Path

from cg.services.run_devices.abstract_classes import RunDataGenerator
from cg.services.run_devices.error_handler import handle_post_processing_errors
from cg.services.run_devices.exc import PostProcessingRunDataGeneratorError
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData
from cg.services.run_devices.protocols import RunDataGenerator
from cg.services.run_devices.validators import validate_has_expected_parts, validate_name_pre_fix


class PacBioRunDataGenerator(RunDataGenerator):
class PacBioRunDataGenerator(RunDataGenerator[PacBioRunData]):

@staticmethod
def _validate_run_name(run_name) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

from cg.constants import FileExtensions
from cg.constants.pacbio import MANIFEST_FILE_PATTERN, ZIPPED_REPORTS_PATTERN, PacBioDirsAndFiles
from cg.services.run_devices.abstract_classes import RunFileManager
from cg.services.run_devices.error_handler import handle_post_processing_errors
from cg.services.run_devices.exc import PostProcessingRunFileManagerError
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData
from cg.services.run_devices.pacbio.run_file_manager.models import PacBioRunValidatorFiles
from cg.services.run_devices.protocols import RunFileManager
from cg.services.run_devices.validators import validate_files_or_directories_exist
from cg.utils.files import get_files_matching_pattern


class PacBioRunFileManager(RunFileManager):
class PacBioRunFileManager(RunFileManager[PacBioRunData]):

@handle_post_processing_errors(
to_except=(FileNotFoundError,), to_raise=PostProcessingRunFileManagerError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
from cg.constants.constants import FileFormat
from cg.constants.pacbio import PacBioDirsAndFiles
from cg.services.decompression_service.decompressor import Decompressor
from cg.services.run_devices.abstract_classes import RunValidator
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData
from cg.services.run_devices.pacbio.run_file_manager.models import PacBioRunValidatorFiles
from cg.services.run_devices.pacbio.run_file_manager.run_file_manager import PacBioRunFileManager
from cg.services.run_devices.protocols import RunValidator
from cg.services.validate_file_transfer_service.validate_file_transfer_service import (
ValidateFileTransferService,
)

LOG = logging.getLogger(__name__)


class PacBioRunValidator(RunValidator):
class PacBioRunValidator(RunValidator[PacBioRunData]):
"""
PacBio run validator.
Ensure that the post-processing of a pacbio run can start.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,89 +1,98 @@
"""Post-processing service abstract classes."""

import logging
from abc import ABC, abstractmethod
from abc import abstractmethod
from pathlib import Path
from typing import Protocol, TypeVar, runtime_checkable

from cg.services.run_devices.abstract_models import PostProcessingDTOs, RunData, RunMetrics
from cg.services.run_devices.constants import POST_PROCESSING_COMPLETED

LOG = logging.getLogger(__name__)

RunDataT_contra = TypeVar("RunDataT_contra", bound=RunData, contravariant=True)
RunDataT_co = TypeVar("RunDataT_co", bound=RunData, covariant=True)

class RunDataGenerator(ABC):
RunMetricsT_co = TypeVar("RunMetricsT_co", bound=RunMetrics, covariant=True)

Comment on lines +13 to +17
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not clear to me, could we discuss it in coding guidelines if there is time?

PostProcessingDTOsT_co = TypeVar("PostProcessingDTOsT_co", bound=PostProcessingDTOs, covariant=True)


class RunDataGenerator(Protocol[RunDataT_co]):
"""Abstract class that holds functionality to create a run data object."""

@abstractmethod
def get_run_data(self, run_name: str, sequencing_dir: str) -> RunData:
def get_run_data(self, run_name: str, sequencing_dir: str) -> RunDataT_co:
"""Get the run data for a sequencing run."""
pass


class RunValidator(ABC):
class RunValidator(Protocol[RunDataT_contra]):
"""Abstract class that holds functionality to validate a run and ensure it can start post processing."""

@abstractmethod
def ensure_post_processing_can_start(self, run_data: RunData):
def ensure_post_processing_can_start(self, run_data: RunDataT_contra):
"""Ensure a post processing run can start."""
pass

@abstractmethod
def validate_run_files(self, run_data: RunData):
def validate_run_files(self, run_data: RunDataT_contra):
"""Validate presence run files."""
pass


class RunFileManager(ABC):
class RunFileManager(Protocol[RunDataT_contra]):
"""Abstract class that manages files related to an instrument run."""

@abstractmethod
def get_files_to_parse(self, run_data: RunData) -> list[Path]:
def get_files_to_parse(self, run_data: RunDataT_contra) -> list[Path]:
"""Get the files required for the PostProcessingMetricsService."""
pass

@abstractmethod
def get_files_to_store(self, run_data: RunData) -> list[Path]:
def get_files_to_store(self, run_data: RunDataT_contra) -> list[Path]:
"""Get the files to store for the PostProcessingHKService."""
pass


class PostProcessingMetricsParser(ABC):
class PostProcessingMetricsParser(Protocol[RunDataT_contra, RunMetricsT_co]):
"""Abstract class that manages the metrics parsing related to an instrument run."""

@abstractmethod
def parse_metrics(self, run_data: RunData) -> RunMetrics:
def parse_metrics(self, run_data: RunDataT_contra) -> RunMetricsT_co:
"""Parse the sequencing run metrics."""
pass


class PostProcessingDataTransferService(ABC):
class PostProcessingDataTransferService(Protocol[RunDataT_contra, PostProcessingDTOsT_co]):
"""Abstract class that manages data transfer from parsed metrics to the database structure."""

@abstractmethod
def get_post_processing_dtos(self, run_data: RunData) -> PostProcessingDTOs:
def get_post_processing_dtos(self, run_data: RunDataT_contra) -> PostProcessingDTOsT_co:
"""Get the data transfer objects for the PostProcessingStoreService."""
pass


class PostProcessingStoreService(ABC):
class PostProcessingStoreService(Protocol[RunDataT_contra]):
"""Abstract class that manages storing data transfer objects in the database."""

@abstractmethod
def store_post_processing_data(self, run_data: RunData, dry_run: bool = False):
def store_post_processing_data(self, run_data: RunDataT_contra, dry_run: bool = False):
"""Store the data transfer objects in the database."""
pass


class PostProcessingHKService(ABC):
class PostProcessingHKService(Protocol[RunDataT_contra]):
"""Abstract class that manages storing of files for an instrument run."""

@abstractmethod
def store_files_in_housekeeper(self, run_data: RunData, dry_run: bool = False):
def store_files_in_housekeeper(self, run_data: RunDataT_contra, dry_run: bool = False):
"""Store the files in Housekeeper."""
pass


class PostProcessingService(ABC):
@runtime_checkable
class PostProcessingService(Protocol[RunDataT_contra]):
"""Abstract class that encapsulates the logic required for post-processing a sequencing run."""

@abstractmethod
Expand All @@ -102,7 +111,7 @@ def can_post_processing_start(self, run_name: str) -> bool:
pass

@staticmethod
def _touch_post_processing_complete(run_data: RunData, dry_run: bool = False) -> None:
def _touch_post_processing_complete(run_data: RunDataT_contra, dry_run: bool = False) -> None:
"""Touch the post-processing complete file."""
processing_complete_file = Path(run_data.full_path, POST_PROCESSING_COMPLETED)
if not dry_run:
Expand Down
Loading