From 96489b0e1f40764a769f439de3c78e8cdcee6364 Mon Sep 17 00:00:00 2001 From: mike0sv Date: Tue, 30 Jun 2020 17:47:30 +0300 Subject: [PATCH 01/20] EBNT-407 dataset and metric deletion --- src/ebonite/core/errors.py | 42 +++++- src/ebonite/core/objects/core.py | 91 ++++++++++-- src/ebonite/ext/s3/artifact.py | 3 +- src/ebonite/repository/artifact/__init__.py | 4 +- src/ebonite/repository/artifact/base.py | 24 ---- src/ebonite/repository/artifact/inmemory.py | 2 +- src/ebonite/repository/artifact/local.py | 3 +- src/ebonite/repository/dataset/__init__.py | 3 + src/ebonite/repository/dataset/artifact.py | 15 +- src/ebonite/repository/dataset/base.py | 7 + src/ebonite/repository/metadata/local.py | 1 + tests/core/objects/conftest.py | 24 +++- tests/core/objects/test_core.py | 132 +++++++++++++++++- tests/repository/artifact/meta_common.py | 3 +- tests/repository/dataset/__init__.py | 0 tests/repository/dataset/conftest.py | 39 ++++++ tests/repository/dataset/dataset_common.py | 32 +++++ .../dataset/test_artifact/__init__.py | 0 .../dataset/test_artifact/conftest.py | 13 ++ 19 files changed, 392 insertions(+), 46 deletions(-) create mode 100644 tests/repository/dataset/__init__.py create mode 100644 tests/repository/dataset/conftest.py create mode 100644 tests/repository/dataset/dataset_common.py create mode 100644 tests/repository/dataset/test_artifact/__init__.py create mode 100644 tests/repository/dataset/test_artifact/conftest.py diff --git a/src/ebonite/core/errors.py b/src/ebonite/core/errors.py index 4453ea24..8c3b38ab 100644 --- a/src/ebonite/core/errors.py +++ b/src/ebonite/core/errors.py @@ -15,14 +15,12 @@ class EboniteError(Exception): """ General Ebonite error """ - pass class MetadataError(EboniteError): """ General Ebonite Metadata Error """ - pass class ExistingProjectError(MetadataError): @@ -179,3 +177,43 @@ def __init__(self, environment: RuntimeEnvironment): environment = environment.name if isinstance(environment, RuntimeEnvironment) else environment super(EnvironmentWithInstancesError, self).__init__(f'Environment {environment} ' f'has foreign key and can not be deleted') + + +class DatasetError(EboniteError): + """ + Base class for exceptions in :class:`~ebonite.repository.dataset.DatasetRpository` + """ + + +class NoSuchDataset(DatasetError): + def __init__(self, dataset_id, repo, e=None): + super(NoSuchDataset, self).__init__(f'No dataset with id {dataset_id} found in {repo}', e) + + +class DatasetExistsError(DatasetError): + def __init__(self, dataset_id, repo, e=None): + super(DatasetExistsError, self).__init__(f'Dataset with id {dataset_id} already in {repo}', e) + + +class ArtifactError(EboniteError): + """ + Base class for exceptions in :class:`ArtifactRepository` + """ + + +class NoSuchArtifactError(ArtifactError): + """ + Exception which is thrown if artifact is not found in the repository + """ + + def __init__(self, artifact_id, repo): + super(NoSuchArtifactError, self).__init__(f'No artifact with id {artifact_id} found in {repo}') + + +class ArtifactExistsError(ArtifactError): + """ + Exception which is thrown if artifact already exists in the repository + """ + + def __init__(self, artifact_id, repo): + super(ArtifactExistsError, self).__init__(f'Artifact with id {artifact_id} already in {repo}') diff --git a/src/ebonite/core/objects/core.py b/src/ebonite/core/objects/core.py index 03913535..d1a21bb9 100644 --- a/src/ebonite/core/objects/core.py +++ b/src/ebonite/core/objects/core.py @@ -569,7 +569,8 @@ def _resolve_dataset(self, dataset: AnyDataset, name: str) -> str: """Resolves existing dataset or creates a new one :param dataset: dataset name, Dataset, DatasetSource or raw data object - :param name: name for dataset if it is new""" + :param name: name for dataset if it is new + """ if isinstance(dataset, str): if dataset not in self.datasets: raise ValueError(f'no dataset named {dataset} in task {self}') # TODO maybe ohter error? @@ -582,7 +583,8 @@ def _resolve_metric(self, metric: AnyMetric, name: str): """Resolves existing metric or creates a new one :param metric: metric name, Metric or raw metric object - :param name: name for metric if it is new""" + :param name: name for metric if it is new + """ if isinstance(metric, str): if metric not in self.metrics: raise ValueError(f'no metric named {metric} in task {self}') # TODO maybe ohter error? @@ -599,9 +601,10 @@ def add_evaluation(self, name: str, :param name: name of the evaluation set :param data: input dataset for evaluation :param target: ground truth for input data - :param metrics: one or more metrics to measure""" + :param metrics: one or more metrics to measure + """ if name in self.evaluation_sets: - raise ValueError(f'evalset {name} already in task {self}') + raise errors.EboniteError(f'evalset {name} already in task {self}') data = self._resolve_dataset(data, f'{name}_input') target = self._resolve_dataset(target, f'{name}_output') if not isinstance(metrics, list): @@ -609,14 +612,30 @@ def add_evaluation(self, name: str, metrics = [self._resolve_metric(m, f'{name}_{i}') for i, m in enumerate(metrics)] self.evaluation_sets[name] = EvaluationSet(data, target, metrics) + def delete_evaluation(self, name: str, save: bool = True): + """Deletes evaluation set from task + + :param name: name of the evaluation to delete + :param save: also update task metadata in repo + """ + if save: + self._check_meta() + + if name not in self.evaluation_sets: + raise errors.EboniteError(f'cannot delete evalset from task {self}: no evalset with name {name}') + del self.evaluation_sets[name] + if save: + self.save() + @_with_dataset def add_dataset(self, name, dataset: Union[DatasetSource, AbstractDataset, Any]): """Adds new dataset to this task :param name: name of the dataset - :param dataset: Dataset, DatasetSource or raw dataset object""" + :param dataset: Dataset, DatasetSource or raw dataset object + """ if name in self.datasets: - raise ValueError(f'dataset {name} already in task {self}') + raise errors.EboniteError(f'dataset {name} already in task {self}') if not isinstance(dataset, DatasetSource): if not isinstance(dataset, AbstractDataset): dataset = Dataset.from_object(dataset) @@ -630,18 +649,71 @@ def push_datasets(self): if isinstance(dataset, InMemoryDatasetSource): self.datasets[name] = self._dataset.save(f'{self.id}/{name}', dataset.read()) + @_with_dataset + def delete_dataset(self, name: str, force: bool = False, save: bool = True): + """Deletes dataset from task with artifacts + + :param name: name of the dataset to delete + :param force: wheter to check evalsets that use this dataset and remove them or raise error + :param save: also update task metadata in repo + """ + if name not in self.datasets: + raise errors.EboniteError(f'cannot delete dataset from {self}: no dataset with name {name}') + if save: + self._check_meta() + evalsets = {n: es for n, es in self.evaluation_sets.items() if + name in (es.input_dataset, es.output_dataset)} + if len(evalsets) > 0 and not force: + raise errors.EboniteError( + f'cannot delete dataset from {self}: it is used by evalsets {list(evalsets.keys())}' + f'remove them or set force=True') + try: + self._dataset.delete(f'{self.id}/{name}') + except errors.NoSuchDataset: + pass # already deleted or was not saved + del self.datasets[name] + for es in evalsets.keys(): + self.delete_evaluation(es, save) + if save: + self.save() + def add_metric(self, name, metric: Union[Metric, Any]): """Adds metric to this task :param name: name of the metric - :param metric: Metric or raw metric object""" + :param metric: Metric or raw metric object + """ if name in self.metrics: - raise ValueError(f'metric {name} already in task {self}') + raise errors.EboniteError(f'metric {name} already in task {self}') if not isinstance(metric, Metric): metric = MetricAnalyzer.analyze(metric) # TODO checks self.metrics[name] = metric + def delete_metric(self, name: str, force: bool = False, save: bool = True): + """Deletes metric from task + + :param name: name of the metric to delete + :param force: wheter to check evalsets that use this metric and remove them or raise error + :param save: also update task metadata in repo + """ + if name not in self.metrics: + raise errors.EboniteError(f'cannot delete metric from {self}: no metric with name {name}') + if save: + self._check_meta() + evalsets = {n: es for n, es in self.evaluation_sets.items() if + name in es.metrics} + + if len(evalsets) > 0 and not force: + raise errors.EboniteError( + f'cannot delete metric from {self}: it is used by evalsets {list(evalsets.keys())}' + f'remove them or set force=True') + del self.metrics[name] + for es in evalsets.keys(): + self.delete_evaluation(es, save) + if save: + self.save() + def evaluate_all(self) -> Dict[str, 'EvaluationResult']: """Evaluates all viable pairs of evalsets and models/pipelines""" result = {} @@ -704,7 +776,8 @@ def task(self, task: Task): class EvaluationResult(EboniteParams): """Represents result of evaluation of one evalset on multiple evaluatable objects - :param scores: mapping 'object name' -> ('metric' -> 'score')""" + :param scores: mapping 'object name' -> ('metric' -> 'score') + """ def __init__(self, scores: Dict[str, Dict[str, float]] = None): self.scores = scores or {} diff --git a/src/ebonite/ext/s3/artifact.py b/src/ebonite/ext/s3/artifact.py index 87820c01..88c63f9c 100644 --- a/src/ebonite/ext/s3/artifact.py +++ b/src/ebonite/ext/s3/artifact.py @@ -8,8 +8,9 @@ from pyjackson.decorators import cached_property from ebonite.config import Config, Core, Param +from ebonite.core.errors import ArtifactExistsError, NoSuchArtifactError from ebonite.core.objects.artifacts import ArtifactCollection, Blob, Blobs, StreamContextManager -from ebonite.repository.artifact import ArtifactExistsError, ArtifactRepository, NoSuchArtifactError +from ebonite.repository.artifact import ArtifactRepository from ebonite.utils.log import logger diff --git a/src/ebonite/repository/artifact/__init__.py b/src/ebonite/repository/artifact/__init__.py index b875e976..c5b8bd0c 100644 --- a/src/ebonite/repository/artifact/__init__.py +++ b/src/ebonite/repository/artifact/__init__.py @@ -1,3 +1,3 @@ -from .base import ArtifactExistsError, ArtifactRepository, NoSuchArtifactError, RepoArtifactBlob +from .base import ArtifactRepository, RepoArtifactBlob -__all__ = ['ArtifactRepository', 'NoSuchArtifactError', 'RepoArtifactBlob', 'ArtifactExistsError'] +__all__ = ['ArtifactRepository', 'RepoArtifactBlob'] diff --git a/src/ebonite/repository/artifact/base.py b/src/ebonite/repository/artifact/base.py index dd30039d..53831af8 100644 --- a/src/ebonite/repository/artifact/base.py +++ b/src/ebonite/repository/artifact/base.py @@ -7,30 +7,6 @@ from ebonite.core.objects.artifacts import ArtifactCollection, Blob -class ArtifactError(Exception): - """ - Base class for exceptions in :class:`ArtifactRepository` - """ - - -class NoSuchArtifactError(ArtifactError): - """ - Exception which is thrown if artifact is not found in the repository - """ - - def __init__(self, artifact_id, repo: 'ArtifactRepository'): - super(NoSuchArtifactError, self).__init__('No artifact with id {} found in {}'.format(artifact_id, repo)) - - -class ArtifactExistsError(ArtifactError): - """ - Exception which is thrown if artifact already exists in the repository - """ - - def __init__(self, artifact_id, repo: 'ArtifactRepository'): - super(ArtifactExistsError, self).__init__('Artifact with id {} already in {}'.format(artifact_id, repo)) - - @type_field('type') class ArtifactRepository: """ diff --git a/src/ebonite/repository/artifact/inmemory.py b/src/ebonite/repository/artifact/inmemory.py index eec41284..97c6d2ef 100644 --- a/src/ebonite/repository/artifact/inmemory.py +++ b/src/ebonite/repository/artifact/inmemory.py @@ -1,8 +1,8 @@ import typing +from ebonite.core.errors import ArtifactExistsError, NoSuchArtifactError from ebonite.core.objects.artifacts import ArtifactCollection, Blob, Blobs, InMemoryBlob from ebonite.repository.artifact import ArtifactRepository -from ebonite.repository.artifact.base import ArtifactExistsError, NoSuchArtifactError class InMemoryArtifactRepository(ArtifactRepository): diff --git a/src/ebonite/repository/artifact/local.py b/src/ebonite/repository/artifact/local.py index d264c2f8..b0955bcd 100644 --- a/src/ebonite/repository/artifact/local.py +++ b/src/ebonite/repository/artifact/local.py @@ -3,8 +3,9 @@ import shutil import typing +from ebonite.core.errors import ArtifactExistsError, NoSuchArtifactError from ebonite.core.objects.artifacts import ArtifactCollection, Blob, Blobs, LocalFileBlob -from ebonite.repository.artifact import ArtifactExistsError, ArtifactRepository, NoSuchArtifactError +from ebonite.repository.artifact import ArtifactRepository from ebonite.utils.fs import get_lib_path from ebonite.utils.log import logger diff --git a/src/ebonite/repository/dataset/__init__.py b/src/ebonite/repository/dataset/__init__.py index e69de29b..ab7934df 100644 --- a/src/ebonite/repository/dataset/__init__.py +++ b/src/ebonite/repository/dataset/__init__.py @@ -0,0 +1,3 @@ +from .base import DatasetRepository + +__all__ = ['DatasetRepository'] diff --git a/src/ebonite/repository/dataset/artifact.py b/src/ebonite/repository/dataset/artifact.py index 691abf56..9acd1e1e 100644 --- a/src/ebonite/repository/dataset/artifact.py +++ b/src/ebonite/repository/dataset/artifact.py @@ -3,6 +3,7 @@ from pyjackson.decorators import type_field +from ebonite.core.errors import ArtifactExistsError, DatasetExistsError, NoSuchArtifactError, NoSuchDataset from ebonite.core.objects import ArtifactCollection, DatasetType from ebonite.core.objects.base import EboniteParams from ebonite.core.objects.dataset_source import Dataset, DatasetSource @@ -38,6 +39,7 @@ class ArtifactDatasetRepository(DatasetRepository): """DatasetRpository implementation that saves datasets as artifacts to ArtifactRepository :param repo: underlying ArtifactRepository""" + ARTIFACT_TYPE = 'datasets' def __init__(self, repo: ArtifactRepository): @@ -46,13 +48,22 @@ def __init__(self, repo: ArtifactRepository): def save(self, dataset_id: str, dataset: Dataset) -> DatasetSource: writer = dataset.get_writer() if writer is None: - raise ValueError(f'{dataset.dataset_type} does not support artifacat persistance') + raise ValueError(f'{dataset.dataset_type} does not support artifact persistance') reader, artifacts = writer.write(dataset) with artifacts.blob_dict() as blobs: - pushed = self.repo.push_artifact(self.ARTIFACT_TYPE, dataset_id, blobs) + try: + pushed = self.repo.push_artifact(self.ARTIFACT_TYPE, dataset_id, blobs) + except ArtifactExistsError as e: + raise DatasetExistsError(dataset_id, self, e) return ArtifactDatasetSource(reader, pushed, dataset.dataset_type) + def delete(self, dataset_id: str): + try: + self.repo.delete_artifact(self.ARTIFACT_TYPE, dataset_id) + except NoSuchArtifactError as e: + raise NoSuchDataset(dataset_id, self, e) + class ArtifactDatasetSource(DatasetSource): """DatasetSource for reading datasets from ArtifactDatasetRepository diff --git a/src/ebonite/repository/dataset/base.py b/src/ebonite/repository/dataset/base.py index 554c38a2..8f04a7af 100644 --- a/src/ebonite/repository/dataset/base.py +++ b/src/ebonite/repository/dataset/base.py @@ -13,3 +13,10 @@ def save(self, dataset_id: str, dataset: Dataset) -> DatasetSource: :param dataset_id: string identifier :param dataset: dataset to save :returns: DatasetSource that produces same Dataset""" + + @abstractmethod + def delete(self, dataset_id: str): + """Method to delete dataset from this repository + + :param dataset_id: dataset identifier + """ diff --git a/src/ebonite/repository/metadata/local.py b/src/ebonite/repository/metadata/local.py index 91c67185..a4867041 100644 --- a/src/ebonite/repository/metadata/local.py +++ b/src/ebonite/repository/metadata/local.py @@ -90,6 +90,7 @@ def get_and_increment(self, name): def add_project(self, project: Project): assert project.id is not None + # project._tasks = self.projects[project.id] = project self.project_name_index[project.name] = project.id diff --git a/tests/core/objects/conftest.py b/tests/core/objects/conftest.py index 02d3913d..4d3ffb4f 100644 --- a/tests/core/objects/conftest.py +++ b/tests/core/objects/conftest.py @@ -4,9 +4,12 @@ import pytest from pyjackson.generics import Serializer +from ebonite.core.analyzer.metric import MetricAnalyzer from ebonite.core.objects.core import Buildable, Image, Model, Pipeline, Project, Task -from ebonite.repository import MetadataRepository +from ebonite.core.objects.dataset_source import Dataset +from ebonite.repository import DatasetRepository, MetadataRepository from ebonite.repository.artifact.inmemory import InMemoryArtifactRepository +from ebonite.repository.dataset.artifact import ArtifactDatasetRepository from ebonite.repository.metadata.local import LocalMetadataRepository from tests.conftest import DummyModelWrapper @@ -21,6 +24,11 @@ def artifact_repo(): return InMemoryArtifactRepository() +@pytest.fixture +def dataset_repo(artifact_repo): + return ArtifactDatasetRepository(artifact_repo) + + @pytest.fixture def project_factory(meta: MetadataRepository, artifact_repo: InMemoryArtifactRepository): counter = 0 @@ -39,7 +47,7 @@ def factory(saved=False): @pytest.fixture -def task_factory(project_factory): +def task_factory(project_factory, dataset_repo: DatasetRepository): counter = 0 def factory(saved=False): @@ -49,6 +57,7 @@ def factory(saved=False): if saved: project = project_factory(True) project.add_task(task) + task.bind_dataset_repo(dataset_repo) return task return factory @@ -143,6 +152,17 @@ def model(sklearn_model_obj, pandas_data): return Model.create(sklearn_model_obj, pandas_data) +@pytest.fixture +def metric(): + from sklearn.metrics import roc_auc_score + return MetricAnalyzer.analyze(roc_auc_score) + + +@pytest.fixture +def dataset(pandas_data): + return Dataset.from_object(pandas_data) + + @pytest.fixture def pipeline(pipeline_factory): return pipeline_factory() diff --git a/tests/core/objects/test_core.py b/tests/core/objects/test_core.py index a7d7948f..bd5625c9 100644 --- a/tests/core/objects/test_core.py +++ b/tests/core/objects/test_core.py @@ -1,14 +1,19 @@ import numpy as np import pytest from pyjackson import deserialize, serialize +from pyjackson.core import Unserializable -from ebonite.core.errors import MetadataError, NonExistingModelError, NonExistingTaskError, UnboundObjectError +from ebonite.core.errors import (EboniteError, MetadataError, NonExistingModelError, NonExistingTaskError, + UnboundObjectError) from ebonite.core.objects import ModelWrapper from ebonite.core.objects.artifacts import Blobs, InMemoryBlob from ebonite.core.objects.core import Model, Pipeline, Project, Task, _WrapperMethodAccessor +from ebonite.core.objects.dataset_source import DatasetSource +from ebonite.core.objects.metric import Metric from ebonite.core.objects.requirements import InstallableRequirement, Requirement, Requirements from ebonite.ext.sklearn import SklearnModelWrapper from ebonite.repository import MetadataRepository +from ebonite.repository.artifact.inmemory import InMemoryArtifactRepository from tests.core.objects.conftest import serde_and_compare @@ -203,6 +208,131 @@ def test_task__push_model(task_saved_art, created_model): assert created_model.id in task_saved_art.models +def test_task__add_metric(task_saved): + from sklearn.metrics import roc_auc_score + task_saved.add_metric('auc', roc_auc_score) + task_saved.save() + + task = task_saved._meta.get_task_by_name(task_saved.project, task_saved.name) + assert 'auc' in task.metrics + assert isinstance(task.metrics['auc'], Metric) + + +def test_task__add_metric_exists(task_saved, metric): + task_saved.add_metric('auc', metric) + with pytest.raises(EboniteError): + task_saved.add_metric('auc', metric) + + +def test_task__delete_metric(task_saved, metric): + task_saved.add_metric('auc', metric) + task_saved.save() + task_saved.delete_metric('auc') + + task = task_saved._meta.get_task_by_name(task_saved.project, task_saved.name) + assert 'auc' not in task.metrics + + +def test_task__delete_metric_non_existing(task_saved): + with pytest.raises(EboniteError): + task_saved.delete_metric('auc') + + +def test_task__add_dataset(task_saved, dataset): + task_saved.add_dataset('data', dataset) + assert 'data' in task_saved.datasets + assert isinstance(task_saved.datasets['data'], DatasetSource) + assert isinstance(task_saved.datasets['data'], Unserializable) + task_saved.save() + + task = task_saved._meta.get_task_by_name(task_saved.project, task_saved.name) + assert 'data' in task.datasets + assert isinstance(task.datasets['data'], DatasetSource) + assert not isinstance(task.datasets['data'], Unserializable) + + +def test_task__add_dataset_exists(task_saved, dataset): + task_saved.add_dataset('data', dataset) + with pytest.raises(EboniteError): + task_saved.add_dataset('data', dataset) + + +def test_task__delete_dataset(task_saved, dataset): + task_saved.add_dataset('data', dataset) + task_saved.save() + art_repo: InMemoryArtifactRepository = task_saved._art + assert len(art_repo._cache) > 0 + + task_saved.delete_dataset('data') + + task = task_saved._meta.get_task_by_name(task_saved.project, task_saved.name) + assert 'data' not in task.datasets + assert len(art_repo._cache) == 0 + + +def test_task__delete_dataset_non_existing(task_saved): + with pytest.raises(EboniteError): + task_saved.delete_dataset('data') + + +def test_task__add_evalset(task_saved, dataset, metric): + task_saved.add_evaluation('eval', dataset, dataset, metric) + assert 'eval' in task_saved.evaluation_sets + assert all(isinstance(d, DatasetSource) for d in task_saved.datasets.values()) + assert all(isinstance(m, Metric) for m in task_saved.metrics.values()) + task_saved.save() + + task = task_saved._meta.get_task_by_name(task_saved.project, task_saved.name) + assert 'eval' in task.evaluation_sets + + +def test_task__add_evalset_exists(task_saved, dataset, metric): + task_saved.add_evaluation('data', dataset, dataset, metric) + with pytest.raises(EboniteError): + task_saved.add_evaluation('data', dataset, dataset, metric) + + +def test_task__delete_evalset(task_saved, dataset, metric): + task_saved.add_evaluation('data', dataset, dataset, metric) + task_saved.save() + + task_saved.delete_evaluation('data') + + task = task_saved._meta.get_task_by_name(task_saved.project, task_saved.name) + assert 'data' not in task.evaluation_sets + + +def test_task__delete_evalset_non_existing(task_saved): + with pytest.raises(EboniteError): + task_saved.delete_evaluation('data') + + +def test_task__delete_metric_with_evalset(task_saved, dataset, metric): + task_saved.add_metric('metric', metric) + task_saved.add_dataset('data', dataset) + task_saved.add_evaluation('eval', 'data', 'data', 'metric') + + with pytest.raises(EboniteError): + task_saved.delete_metric('metric') + + task_saved.delete_metric('metric', force=True) + assert 'metric' not in task_saved.metrics + assert 'eval' not in task_saved.evaluation_sets + + +def test_task__delete_dataset_with_evalset(task_saved, dataset, metric): + task_saved.add_metric('metric', metric) + task_saved.add_dataset('data', dataset) + task_saved.add_evaluation('eval', 'data', 'data', 'metric') + + with pytest.raises(EboniteError): + task_saved.delete_dataset('data') + + task_saved.delete_dataset('data', force=True) + assert 'data' not in task_saved.datasets + assert 'eval' not in task_saved.evaluation_sets + + # ###############MODEL################## def test_create_model(sklearn_model_obj, pandas_data): model = Model.create(sklearn_model_obj, pandas_data) diff --git a/tests/repository/artifact/meta_common.py b/tests/repository/artifact/meta_common.py index 2ce1a23b..b94b15c5 100644 --- a/tests/repository/artifact/meta_common.py +++ b/tests/repository/artifact/meta_common.py @@ -3,9 +3,10 @@ import pytest +from ebonite.core.errors import ArtifactExistsError, NoSuchArtifactError from ebonite.core.objects.artifacts import ArtifactCollection, InMemoryBlob from ebonite.core.objects.core import Model -from ebonite.repository.artifact import ArtifactExistsError, ArtifactRepository, NoSuchArtifactError +from ebonite.repository.artifact import ArtifactRepository # from tests.repository.artifact.test_local.conftest import local_artifact as repo_fixture # from tests.ext.s3.conftest import s3_artifact as repo_fixture diff --git a/tests/repository/dataset/__init__.py b/tests/repository/dataset/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/repository/dataset/conftest.py b/tests/repository/dataset/conftest.py new file mode 100644 index 00000000..82545611 --- /dev/null +++ b/tests/repository/dataset/conftest.py @@ -0,0 +1,39 @@ +from typing import Tuple + +import pytest + +from ebonite.core.objects import ArtifactCollection, DatasetType +from ebonite.core.objects.artifacts import Blobs, InMemoryBlob +from ebonite.core.objects.dataset_source import Dataset +from ebonite.repository.dataset.artifact import DatasetReader, DatasetWriter +from tests.conftest import interface_hook_creator + + +class TestDatasetWriter(DatasetWriter): + def write(self, dataset: Dataset) -> Tuple[DatasetReader, ArtifactCollection]: + return TestDatasetReader(), Blobs({'data': InMemoryBlob(dataset.data.encode('utf8'))}) + + +class TestDatasetReader(DatasetReader): + def read(self, artifacts: ArtifactCollection) -> Dataset: + return Dataset(artifacts.bytes_dict()['data'].decode('utf8'), TestDatasetType()) + + +class TestDatasetType(DatasetType): + + def get_writer(self): + return TestDatasetWriter() + + def deserialize(self, obj: dict) -> object: + return obj + + def serialize(self, instance: object) -> dict: + return instance + + +@pytest.fixture +def data() -> Dataset: + return Dataset('abcdefg', TestDatasetType()) + + +create_dataset_hooks = interface_hook_creator('tests/repository/dataset/', 'dataset_common.py', 'dataset_repo') diff --git a/tests/repository/dataset/dataset_common.py b/tests/repository/dataset/dataset_common.py new file mode 100644 index 00000000..0fe4f87b --- /dev/null +++ b/tests/repository/dataset/dataset_common.py @@ -0,0 +1,32 @@ +import pytest + +from ebonite.core.errors import DatasetExistsError, NoSuchDataset +from ebonite.core.objects.dataset_source import Dataset +from ebonite.repository import DatasetRepository + + +def test_save(dataset_repo: DatasetRepository, data: Dataset): + source = dataset_repo.save('a', data) + data2 = source.read() + + assert data2.data == data.data + assert data2.dataset_type == data.dataset_type + + +def test_save_existing(dataset_repo: DatasetRepository, data: Dataset): + dataset_repo.save('a', data) + + with pytest.raises(DatasetExistsError): + dataset_repo.save('a', data) + + +def test_delete(dataset_repo: DatasetRepository, data: Dataset): + source = dataset_repo.save('a', data) + dataset_repo.delete('a') + with pytest.raises(Exception): + source.read() + + +def test_delete_not_existing(dataset_repo: DatasetRepository): + with pytest.raises(NoSuchDataset): + dataset_repo.delete('a') diff --git a/tests/repository/dataset/test_artifact/__init__.py b/tests/repository/dataset/test_artifact/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/repository/dataset/test_artifact/conftest.py b/tests/repository/dataset/test_artifact/conftest.py new file mode 100644 index 00000000..ea6f224f --- /dev/null +++ b/tests/repository/dataset/test_artifact/conftest.py @@ -0,0 +1,13 @@ +import pytest + +from ebonite.repository.artifact.local import LocalArtifactRepository +from ebonite.repository.dataset.artifact import ArtifactDatasetRepository +from tests.repository.dataset.conftest import create_dataset_hooks + + +@pytest.fixture +def artifact_dataset_repo(tmpdir_factory): + return ArtifactDatasetRepository(LocalArtifactRepository(tmpdir_factory.mktemp('repo'))) + + +pytest_runtest_protocol, pytest_collect_file = create_dataset_hooks(artifact_dataset_repo, 'artifact') From a890e1eecafcea26f9f0a33fded4f12a7850f3e9 Mon Sep 17 00:00:00 2001 From: mike0sv Date: Fri, 3 Jul 2020 15:32:36 +0300 Subject: [PATCH 02/20] EBNT-407 small fixes --- src/ebonite/core/objects/core.py | 9 +++++++++ src/ebonite/repository/metadata/local.py | 1 - 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/ebonite/core/objects/core.py b/src/ebonite/core/objects/core.py index d1a21bb9..b092c0e6 100644 --- a/src/ebonite/core/objects/core.py +++ b/src/ebonite/core/objects/core.py @@ -409,6 +409,9 @@ def delete(self, cascade: bool = False): for pipeline in self._meta.get_pipelines(self): self.delete_pipeline(pipeline) + for dataset in self.datasets: + self.delete_dataset(dataset, force=True, save=False) + self._meta.delete_task(self) @_with_meta @@ -786,6 +789,12 @@ def __iadd__(self, other: 'EvaluationResult'): self.scores.update(other.scores) return self + def __add__(self, other: 'EvaluationResult'): + scores = {} + scores.update(self.scores) + scores.update(other.scores) + return EvaluationResult(scores) + class _InTaskEvaluatable(_InTask): """Intermediate abstract class for object inside task that can be evaluated""" diff --git a/src/ebonite/repository/metadata/local.py b/src/ebonite/repository/metadata/local.py index a4867041..91c67185 100644 --- a/src/ebonite/repository/metadata/local.py +++ b/src/ebonite/repository/metadata/local.py @@ -90,7 +90,6 @@ def get_and_increment(self, name): def add_project(self, project: Project): assert project.id is not None - # project._tasks = self.projects[project.id] = project self.project_name_index[project.name] = project.id From cc3400d8828fb72af01a1563e575b8056cfcd6fc Mon Sep 17 00:00:00 2001 From: mike0sv Date: Mon, 6 Jul 2020 01:07:10 +0300 Subject: [PATCH 03/20] EBNT-404 evaluation results --- src/ebonite/core/objects/core.py | 245 +++++++++++++++++++--------- src/ebonite/core/objects/wrapper.py | 8 + tox.ini | 2 +- 3 files changed, 174 insertions(+), 81 deletions(-) diff --git a/src/ebonite/core/objects/core.py b/src/ebonite/core/objects/core.py index b092c0e6..c41873cb 100644 --- a/src/ebonite/core/objects/core.py +++ b/src/ebonite/core/objects/core.py @@ -3,6 +3,7 @@ import json import re import tempfile +import time import warnings from abc import abstractmethod from copy import copy @@ -717,39 +718,31 @@ def delete_metric(self, name: str, force: bool = False, save: bool = True): if save: self.save() - def evaluate_all(self) -> Dict[str, 'EvaluationResult']: - """Evaluates all viable pairs of evalsets and models/pipelines""" - result = {} + def evaluate_all(self, force=False, save_result=True) -> Dict[str, 'EvaluationResult']: + """Evaluates all viable pairs of evalsets and models/pipelines + :param force: force reevaluate already evaluated + :param save_result: save evaluation results to meta""" + result = {} + timestamp = time.time() for name, evalset in self.evaluation_sets.items(): - res = EvaluationResult() input, output, metrics = evalset.get(self, cache=True) for model in self._models.values(): - evaluate = model.evaluate(input, output, metrics) - res += evaluate + evaluate = model.evaluate(input, output, metrics, evaluation_name=name, + timestamp=timestamp, force=force, save=save_result) + if isinstance(evaluate, dict): + for method, res in evaluate.items(): + result[f'{model.name}.{method}'] = res + elif isinstance(evaluate, EvaluationResult): + result[model.name] = evaluate for pipeline in self._pipelines.values(): - evaluate = pipeline.evaluate(input, output, metrics) - res += evaluate - result[name] = res + evaluate = pipeline.evaluate(input, output, metrics, evaluation_name=name, + timestamp=timestamp, force=force, save=save_result) + if evaluate is not None: + result[pipeline.name] = evaluate return result -class _WrapperMethodAccessor: - """Class to access ModelWrapper methods from model - - :param model: model to access - :param method_name: name of the wrapper method""" - - def __init__(self, model: 'Model', method_name: str): - if model.wrapper.methods is None or method_name not in model.wrapper.exposed_methods: - raise AttributeError(f'{model} does not have {method_name} method') - self.model = model - self.method_name = method_name - - def __call__(self, data): - return self.model.wrapper.call_method(self.method_name, data) - - class _InTask(EboniteObject): """Intermediate abstract class for object inside task""" @@ -779,52 +772,57 @@ def task(self, task: Task): class EvaluationResult(EboniteParams): """Represents result of evaluation of one evalset on multiple evaluatable objects - :param scores: mapping 'object name' -> ('metric' -> 'score') + :param scores: mapping 'metric' -> 'score' + :param timestamp: time of evaluation """ - def __init__(self, scores: Dict[str, Dict[str, float]] = None): + def __init__(self, timestamp: float, scores: Dict[str, float] = None): + self.timestamp = timestamp self.scores = scores or {} - def __iadd__(self, other: 'EvaluationResult'): - self.scores.update(other.scores) - return self - def __add__(self, other: 'EvaluationResult'): - scores = {} - scores.update(self.scores) - scores.update(other.scores) - return EvaluationResult(scores) +@make_string +class EvaluationResultCollection(EboniteParams): + """Collection of evaluation results for single evalset + :param results: list of results""" -class _InTaskEvaluatable(_InTask): - """Intermediate abstract class for object inside task that can be evaluated""" + def __init__(self, results: List[EvaluationResult]): + self.results = results - def run_evalset(self, evalset: Union[str, EvaluationSet]) -> EvaluationResult: - """r""" - task = self.task - if isinstance(evalset, str): - try: - evalset = task.evaluation_sets[evalset] - except KeyError: - raise ValueError(f'no evalset {evalset} in task {task}') + def add(self, result: EvaluationResult): + if result != self.latest: + self.results.append(result) - data = task.datasets[evalset.input_dataset].cache() - target = task.datasets[evalset.output_dataset].cache() - metrics = {m: task.metrics[m] for m in evalset.metrics} - return self.evaluate(data, target, metrics) + @property + def latest(self) -> Optional[EvaluationResult]: + if len(self.results) == 0: + return + return max(self.results, key=lambda r: r.timestamp) - @abstractmethod - def evaluate(self, input: DatasetSource, output: DatasetSource, metrics: Dict[str, Metric]) -> EvaluationResult: - """Evaluates this object - :param input: input data - :param output: target - :param metrics: dict of metrics to evaluate - """ +EvaluationResults = Dict[str, EvaluationResultCollection] # Evaluation results for all evalsets +MultipleResults = Dict[str, EvaluationResult] # Evaluation results for multiple objects but one evalset + + +class _WrapperMethodAccessor: + """Class to access ModelWrapper methods from model + + :param model: model to access + :param method_name: name of the wrapper method""" + + def __init__(self, model: 'Model', method_name: str): + if model.wrapper.methods is None or method_name not in model.wrapper.exposed_methods: + raise AttributeError(f'{model} does not have {method_name} method') + self.model = model + self.method_name = method_name + + def __call__(self, data): + return self.model.wrapper.call_method(self.method_name, data) @make_string('id', 'name') -class Model(_InTaskEvaluatable): +class Model(_InTask): """ Model contains metadata for machine learning model @@ -849,9 +847,11 @@ def __init__(self, name: str, wrapper_meta: Optional[dict] = None, description: str = None, id: int = None, task_id: int = None, - author: str = None, creation_date: datetime.datetime = None): + author: str = None, creation_date: datetime.datetime = None, + evaluations: Dict[str, EvaluationResults] = None): super().__init__(id, name, author, creation_date, task_id) + self.evaluations = evaluations or {} self.description = description self.params = params or {} try: @@ -1042,6 +1042,7 @@ def create(cls, model_object, input_data, model_name: str = None, requirements = RequirementAnalyzer.analyze(requirements) params = params or {} params[cls.PYTHON_VERSION] = params.get(cls.PYTHON_VERSION, get_python_version()) + # noinspection PyTypeChecker model = Model(name, wrapper, None, requirements, params, description) model._unpersisted_artifacts = artifact return model @@ -1120,24 +1121,63 @@ def save(self): self._art.push_model_artifacts(self) self._meta.save_model(self) - def evaluate(self, input: DatasetSource, output: DatasetSource, metrics: Dict[str, Metric]) -> EvaluationResult: + def evaluate(self, input: DatasetSource, output: DatasetSource, metrics: Dict[str, Metric], + evaluation_name: str = None, method_name: str = None, + timestamp=None, save=True, force=False, + raise_on_error=False) -> Optional[Union[EvaluationResult, Dict[str, EvaluationResult]]]: """Evaluates this model :param input: input data :param output: target :param metrics: dict of metrics to evaluate - """ - result = {} - for name in self.wrapper.exposed_methods: - tin, tout = self.wrapper.method_signature(name) - if input.dataset_type == tin and output.dataset_type == tout: - call = self.wrapper.call_method(name, input.read().data) - method_result = {} - for mname, metric in metrics.items(): - method_result[mname] = metric.evaluate(output.read().data, call) - result[f'{self.name}_{name}'] = method_result + :param evaluation_name: name of this evaluation + :param method_name: name of wrapper method. If none, all methods with consistent datatypes will be evaluated + :param timestamp: time of the evaluation (defaults to now) + :param save: save results to meta + :param force: force reevalute + :param raise_on_error: raise error if datatypes are incorrect or just return + """ + if method_name is None: + methods = self.wrapper.match_methods_by_type(input.dataset_type, output.dataset_type) + if len(methods) == 0: + if raise_on_error: + raise ValueError('incompatible dataset types for evaluation') + return + else: + method_name = self.wrapper.resolve_method(method_name) + tin, tout = self.wrapper.method_signature(method_name) + if tin != input.dataset_type or tout != output.dataset_type: + if raise_on_error: + raise ValueError('incompatible dataset types for evaluation') + return + methods = [method_name] - return EvaluationResult(result) + if save: + if evaluation_name is None: + raise ValueError('Provide evaluation_name to save evaluation or set save = False') + self._check_meta(True) + results: Dict[str, EvaluationResult] = {} # method -> result + timestamp = timestamp or time.time() + + for method_name in methods: + if evaluation_name in self.evaluations[method_name] and not force: + results[method_name] = self.evaluations[method_name][evaluation_name].latest + continue + + call = self.wrapper.call_method(method_name, input.read().data) + scores = {} + for mname, metric in metrics.items(): + scores[mname] = metric.evaluate(output.read().data, call) + results[method_name] = EvaluationResult(timestamp, scores) + + if save: + for method_name, result in results.items(): + self.evaluations[method_name][evaluation_name].add(result) + self.save() + + if len(results) == 1: + return list(results.values())[0] + return results def _generate_name(prefix='', postfix=''): @@ -1172,7 +1212,7 @@ def __init__(self, model_name: str, method_name: str): @make_string('id', 'name') -class Pipeline(_InTaskEvaluatable): +class Pipeline(_InTask): """Pipeline is a class to represent a sequence of different Model's methods. They can be used to reuse different models (for example, pre-processing functions) in different pipelines. Pipelines must have exact same in and out data types as tasks they are in @@ -1192,8 +1232,10 @@ def __init__(self, name: str, output_data: DatasetType, id: int = None, author: str = None, creation_date: datetime.datetime = None, - task_id: int = None): + task_id: int = None, + evaluations: EvaluationResults = None): super().__init__(id, name, author, creation_date, task_id) + self.evaluations = evaluations or EvaluationResults() self.output_data = output_data self.input_data = input_data self.steps = steps @@ -1244,22 +1286,65 @@ def save(self): """Saves this pipeline to metadata repository""" self._meta.save_pipeline(self) - def evaluate(self, input: DatasetSource, output: DatasetSource, metrics: Dict[str, Metric]) -> EvaluationResult: + @_with_meta + def evaluate_set(self, evalset: Union[str, EvaluationSet], + evaluation_name: str = None, + timestamp=None, save=True, force=False, + raise_on_error=False) -> Optional[EvaluationResult]: + """Evaluates this pipeline + + :param evalset: evalset or it's name + :param evaluation_name: name of this evaluation + :param timestamp: time of the evaluation (defaults to now) + :param save: save results to meta + :param force: force reevalute + :param raise_on_error: raise error if datatypes are incorrect or just return + """ + task = self.task + if isinstance(evalset, str): + try: + evaluation_name = evaluation_name or evalset + evalset = task.evaluation_sets[evalset] + except KeyError: + raise ValueError(f'No evalset {evalset} in {task}') + input, output, metrics = evalset.get(task, False) + return self.evaluate(input, output, metrics, evaluation_name, timestamp, save, force, raise_on_error) + + def evaluate(self, input: DatasetSource, output: DatasetSource, metrics: Dict[str, Metric], + evaluation_name: str = None, + timestamp=None, save=True, force=False, + raise_on_error=False) -> Optional[EvaluationResult]: """Evaluates this pipeline :param input: input data :param output: target :param metrics: dict of metrics to evaluate - """ - result = {} + :param evaluation_name: name of this evaluation + :param timestamp: time of the evaluation (defaults to now) + :param save: save results to meta + :param force: force reevalute + :param raise_on_error: raise error if datatypes are incorrect or just return + """ + if evaluation_name in self.evaluations and not force: + return self.evaluations[evaluation_name].latest + if save: + if evaluation_name is None: + raise ValueError('Provide evaluation_name to save evaluation or set save = False') + self._check_meta(True) + timestamp = timestamp or time.time() + if input.dataset_type == self.input_data and output.dataset_type == self.output_data: + scores = {} call = self.run(input.read().data) - method_result = {} for mname, metric in metrics.items(): - method_result[mname] = metric.evaluate(output.read().data, call) - result[f'{self.name}'] = method_result - - return EvaluationResult(result) + scores[mname] = metric.evaluate(output.read().data, call) + result = EvaluationResult(timestamp, scores) + if save: + self.evaluations[evaluation_name].add(result) + self.save() + return result + elif raise_on_error: + raise ValueError('incompatible dataset types for evaluation') @type_field('type') diff --git a/src/ebonite/core/objects/wrapper.py b/src/ebonite/core/objects/wrapper.py index 2ad9d490..3d006598 100644 --- a/src/ebonite/core/objects/wrapper.py +++ b/src/ebonite/core/objects/wrapper.py @@ -219,6 +219,7 @@ def __deepcopy__(self, memo): obj.model = self.model obj.methods = self.methods obj.requirements = self.requirements + obj.curdir = self.curdir for field in get_class_fields(cls): setattr(obj, field.name, getattr(self, field.name)) return obj @@ -237,6 +238,13 @@ def resolve_method(self, method_name=None): self._check_method(method_name) return method_name + def match_methods_by_type(self, input: DatasetType, output: DatasetType) -> typing.List[str]: + methods = [] + for method_name, (_, method_input, method_output) in self.methods.items(): + if method_input == input and method_output == output: + methods.append(method_name) + return methods + class LibModelWrapperMixin(ModelWrapper): """ diff --git a/tox.ini b/tox.ini index eb266962..28675b16 100644 --- a/tox.ini +++ b/tox.ini @@ -61,7 +61,7 @@ deps = flake8 readme-renderer pygments - isort + isort<5.0.0 skip_install = true commands = python setup.py check --strict --metadata --restructuredtext From 8b1a7e1fcbac6b43bb96ff52011e41cdbb55450a Mon Sep 17 00:00:00 2001 From: mike0sv Date: Mon, 6 Jul 2020 20:51:02 +0300 Subject: [PATCH 04/20] EBNT-404 tests --- src/ebonite/core/objects/core.py | 40 +++++- src/ebonite/core/objects/wrapper.py | 1 - tests/core/objects/test_evaluation.py | 183 ++++++++++++++++++++++++++ 3 files changed, 218 insertions(+), 6 deletions(-) create mode 100644 tests/core/objects/test_evaluation.py diff --git a/src/ebonite/core/objects/core.py b/src/ebonite/core/objects/core.py index c41873cb..bb63ddf1 100644 --- a/src/ebonite/core/objects/core.py +++ b/src/ebonite/core/objects/core.py @@ -344,7 +344,7 @@ def get(self, task: 'Task', cache=True) -> Tuple[DatasetSource, DatasetSource, D output = task.datasets[self.output_dataset] if cache: input = input.cache() - output = input.cache() + output = output.cache() return input, output, {m: task.metrics[m] for m in self.metrics} @@ -787,8 +787,8 @@ class EvaluationResultCollection(EboniteParams): :param results: list of results""" - def __init__(self, results: List[EvaluationResult]): - self.results = results + def __init__(self, results: List[EvaluationResult] = None): + self.results = results or [] def add(self, result: EvaluationResult): if result != self.latest: @@ -1118,9 +1118,36 @@ def __getattr__(self, item: str): @_with_artifact def save(self): """Saves model to metadata repo and pushes unpersisted artifacts""" - self._art.push_model_artifacts(self) + if self._unpersisted_artifacts is not None: + self._art.push_model_artifacts(self) self._meta.save_model(self) + @_with_meta + def evaluate_set(self, evalset: Union[str, EvaluationSet], + evaluation_name: str = None, method_name: str = None, + timestamp=None, save=True, force=False, + raise_on_error=False) -> Optional[EvaluationResult]: + """Evaluates this model + + :param evalset: evalset or it's name + :param evaluation_name: name of this evaluation + :param method_name: name of wrapper method. If none, all methods with consistent datatypes will be evaluated + :param timestamp: time of the evaluation (defaults to now) + :param save: save results to meta + :param force: force reevalute + :param raise_on_error: raise error if datatypes are incorrect or just return + """ + task = self.task + if isinstance(evalset, str): + try: + evaluation_name = evaluation_name or evalset + evalset = task.evaluation_sets[evalset] + except KeyError: + raise ValueError(f'No evalset {evalset} in {task}') + input, output, metrics = evalset.get(task, False) + return self.evaluate(input, output, metrics, evaluation_name, method_name, timestamp, save, force, + raise_on_error) + def evaluate(self, input: DatasetSource, output: DatasetSource, metrics: Dict[str, Metric], evaluation_name: str = None, method_name: str = None, timestamp=None, save=True, force=False, @@ -1160,6 +1187,7 @@ def evaluate(self, input: DatasetSource, output: DatasetSource, metrics: Dict[st timestamp = timestamp or time.time() for method_name in methods: + self.evaluations.setdefault(method_name, {}) if evaluation_name in self.evaluations[method_name] and not force: results[method_name] = self.evaluations[method_name][evaluation_name].latest continue @@ -1172,6 +1200,7 @@ def evaluate(self, input: DatasetSource, output: DatasetSource, metrics: Dict[st if save: for method_name, result in results.items(): + self.evaluations[method_name].setdefault(evaluation_name, EvaluationResultCollection()) self.evaluations[method_name][evaluation_name].add(result) self.save() @@ -1235,7 +1264,7 @@ def __init__(self, name: str, task_id: int = None, evaluations: EvaluationResults = None): super().__init__(id, name, author, creation_date, task_id) - self.evaluations = evaluations or EvaluationResults() + self.evaluations = evaluations or {} self.output_data = output_data self.input_data = input_data self.steps = steps @@ -1340,6 +1369,7 @@ def evaluate(self, input: DatasetSource, output: DatasetSource, metrics: Dict[st scores[mname] = metric.evaluate(output.read().data, call) result = EvaluationResult(timestamp, scores) if save: + self.evaluations.setdefault(evaluation_name, EvaluationResultCollection()) self.evaluations[evaluation_name].add(result) self.save() return result diff --git a/src/ebonite/core/objects/wrapper.py b/src/ebonite/core/objects/wrapper.py index 3d006598..cf112449 100644 --- a/src/ebonite/core/objects/wrapper.py +++ b/src/ebonite/core/objects/wrapper.py @@ -193,7 +193,6 @@ def _exposed_methods_mapping(self) -> typing.Dict[str, str]: this allows to wrap existing API with your own pre/postprocessing. Otherwise, wrapped model object method is going to be called. """ - pass # pragma: no cover @staticmethod def with_model(f): diff --git a/tests/core/objects/test_evaluation.py b/tests/core/objects/test_evaluation.py new file mode 100644 index 00000000..58579ce0 --- /dev/null +++ b/tests/core/objects/test_evaluation.py @@ -0,0 +1,183 @@ +import typing + +import numpy as np +import pytest + +from ebonite.core.objects import Model, ModelWrapper +from ebonite.core.objects.artifacts import Blobs +from ebonite.core.objects.core import EvaluationResults +from ebonite.core.objects.metric import Metric +from ebonite.core.objects.wrapper import PickleModelIO + + +class EvalModelWrapper(ModelWrapper): + + def _exposed_methods_mapping(self) -> typing.Dict[str, str]: + return {'predict1': 'predict1', 'predict2': 'predict2', 'predict3': 'predict3'} + + def predict1(self, data): + return np.mean(data, axis=1) + + def predict2(self, data): + return np.mean(data, axis=1) + + def predict3(self, data): + return np.mean(data, axis=1) >= 0.5 + + +@pytest.fixture +def float_data(): + return np.ones((5, 10)) * 0.3 + + +@pytest.fixture +def float_target(float_data): + return np.mean(float_data, axis=1) + + +@pytest.fixture +def float_target2(float_target): + return 1. - float_target + + +@pytest.fixture +def bool_target(float_target): + return float_target >= .5 + + +@pytest.fixture +def eval_model(float_data): + return Model('model', EvalModelWrapper(PickleModelIO()).bind_model('None', input_data=float_data), Blobs({})) + + +@pytest.fixture +def eval_model_saved(eval_model, task_saved): + return task_saved.push_model(eval_model) + + +@pytest.fixture +def eval_pipeline(eval_model_saved: Model): + pipeline = eval_model_saved.as_pipeline('predict1') + pipeline.name = 'pipeline' + return pipeline + + +class AccMetric(Metric): + def evaluate(self, truth, prediction): + return np.sum(truth == prediction) / len(truth) + + +@pytest.fixture +def accuracy_metric(): + return AccMetric() + + +class MaeMetric(Metric): + def evaluate(self, truth: np.ndarray, prediction): + return np.mean(np.abs(truth.astype(float) - prediction.astype(float))) + + +@pytest.fixture +def mae_metric(): + return MaeMetric() + + +@pytest.fixture +def task_with_evals(task_saved, eval_model_saved, eval_pipeline, float_data, float_target, float_target2, bool_target, + accuracy_metric, mae_metric): + task_saved.add_pipeline(eval_pipeline) + + task_saved.add_metric('accuracy_score', accuracy_metric) + task_saved.add_metric('mean_absolute_error', mae_metric) + task_saved.add_evaluation('test_float1', float_data, float_target, ['accuracy_score', 'mean_absolute_error']) + task_saved.add_evaluation('test_float2', float_data, float_target2, ['accuracy_score', 'mean_absolute_error']) + task_saved.add_evaluation('test_bool', float_data, bool_target, ['accuracy_score', 'mean_absolute_error']) + + task_saved.save() + return task_saved + + +def _check_float_eval(result: EvaluationResults, name: str, good): + assert name in result + eval = result[name] + assert len(eval.results) == 1 + scores1 = eval.latest.scores + assert 'accuracy_score' in scores1 + assert scores1['accuracy_score'] == (1 if good else 0) + assert 'mean_absolute_error' in scores1 + if good: + assert scores1['mean_absolute_error'] == 0 + else: + assert scores1['mean_absolute_error'] > 0 + + +def test_task_evaluation(task_with_evals): + task_with_evals.evaluate_all() + pipeline = task_with_evals._meta.get_pipeline_by_name('pipeline', task_with_evals) + + _check_float_eval(pipeline.evaluations, 'test_float1', True) + _check_float_eval(pipeline.evaluations, 'test_float2', False) + assert 'test_bool' not in pipeline.evaluations + + model = task_with_evals._meta.get_model_by_name('model', task_with_evals) + + assert 'predict1' in model.evaluations + predict1 = model.evaluations['predict1'] + + _check_float_eval(predict1, 'test_float1', True) + _check_float_eval(predict1, 'test_float2', False) + assert 'test_bool' not in predict1 + + assert 'predict2' in model.evaluations + predict2 = model.evaluations['predict2'] + + _check_float_eval(predict2, 'test_float1', True) + _check_float_eval(predict2, 'test_float2', False) + assert 'test_bool' not in predict2 + + assert 'predict3' in model.evaluations + predict3 = model.evaluations['predict3'] + + assert 'test_float1' not in predict3 + assert 'test_float2' not in predict3 + _check_float_eval(predict3, 'test_bool', True) + + +def test_evaluation_no_save(task_with_evals): + task_with_evals.evaluate_all(save_result=False) + pipeline = task_with_evals._meta.get_pipeline_by_name('pipeline', task_with_evals) + assert len(pipeline.evaluations) == 0 + model = task_with_evals._meta.get_model_by_name('model', task_with_evals) + assert len(model.evaluations) == 0 + + +def test_reevaluation(task_with_evals): + task_with_evals.evaluate_all() + task_with_evals.evaluate_all() + pipeline = task_with_evals._meta.get_pipeline_by_name('pipeline', task_with_evals) + assert len(pipeline.evaluations['test_float1'].results) == 1 + model = task_with_evals._meta.get_model_by_name('model', task_with_evals) + assert len(model.evaluations['predict1']['test_float1'].results) == 1 + + +def test_reevaluation_force(task_with_evals): + task_with_evals.evaluate_all() + task_with_evals.evaluate_all(force=True) + pipeline = task_with_evals._meta.get_pipeline_by_name('pipeline', task_with_evals) + assert len(pipeline.evaluations['test_float1'].results) == 2 + model = task_with_evals._meta.get_model_by_name('model', task_with_evals) + assert len(model.evaluations['predict1']['test_float1'].results) == 2 + + +def test_wrong_evaluation_raise(task_with_evals): + pipeline = task_with_evals._meta.get_pipeline_by_name('pipeline', task_with_evals) + with pytest.raises(ValueError): + pipeline.evaluate_set('aaa') + with pytest.raises(ValueError): + pipeline.evaluate_set('test_bool', raise_on_error=True) + + model = task_with_evals._meta.get_model_by_name('model', task_with_evals) + with pytest.raises(ValueError): + model.evaluate_set('aaa') + with pytest.raises(ValueError): + model.evaluate_set('test_bool', method_name='predict1', raise_on_error=True) From 69b64628327aa6e85341f9c4a2c245ac761290dd Mon Sep 17 00:00:00 2001 From: mike0sv Date: Mon, 6 Jul 2020 23:06:40 +0300 Subject: [PATCH 05/20] EBNT-404 add fields to sql model --- src/ebonite/ext/sqlalchemy/models.py | 34 ++++++++++++++++++------ tests/repository/metadata/meta_common.py | 22 ++++++++++----- 2 files changed, 41 insertions(+), 15 deletions(-) diff --git a/src/ebonite/ext/sqlalchemy/models.py b/src/ebonite/ext/sqlalchemy/models.py index a8b03af7..1903d8aa 100644 --- a/src/ebonite/ext/sqlalchemy/models.py +++ b/src/ebonite/ext/sqlalchemy/models.py @@ -8,8 +8,10 @@ from ebonite.core.objects import DatasetType from ebonite.core.objects.artifacts import ArtifactCollection -from ebonite.core.objects.core import (Buildable, Image, Model, Pipeline, PipelineStep, Project, RuntimeEnvironment, - RuntimeInstance, Task) +from ebonite.core.objects.core import (Buildable, EvaluationResults, EvaluationSet, Image, Model, Pipeline, + PipelineStep, Project, RuntimeEnvironment, RuntimeInstance, Task) +from ebonite.core.objects.dataset_source import DatasetSource +from ebonite.core.objects.metric import Metric from ebonite.core.objects.requirements import Requirements SQL_OBJECT_FIELD = '_sqlalchemy_object' @@ -104,6 +106,10 @@ class STask(Base, Attaching): pipelines: Iterable['SPipeline'] = relationship("SPipeline", back_populates='task') images: Iterable['SImage'] = relationship("SImage", back_populates='task') + datasets = Column(Text) + metrics = Column(Text) + evaluation_sets = Column(Text) + __table_args__ = (UniqueConstraint('name', 'project_id', name='tasks_name_and_ref'),) def to_obj(self) -> Task: @@ -111,7 +117,10 @@ def to_obj(self) -> Task: name=self.name, author=self.author, creation_date=self.creation_date, - project_id=self.project_id) + project_id=self.project_id, + datasets=safe_loads(self.datasets, Dict[str, DatasetSource]), + metrics=safe_loads(self.metrics, Dict[str, Metric]), + evaluation_sets=safe_loads(self.evaluation_sets, Dict[str, EvaluationSet])) for model in self.models: task._models.add(model.to_obj()) @@ -131,7 +140,10 @@ def get_kwargs(cls, task: Task) -> dict: project_id=task.project_id, models=[SModel.from_obj(m) for m in task.models.values()], images=[SImage.from_obj(i) for i in task.images.values()], - pipelines=[SPipeline.from_obj(p) for p in task.pipelines.values()]) + pipelines=[SPipeline.from_obj(p) for p in task.pipelines.values()], + datasets=dumps(task.datasets), + metrics=dumps(task.metrics), + evaluation_sets=dumps(task.evaluation_sets)) class SModel(Base, Attaching): @@ -151,6 +163,7 @@ class SModel(Base, Attaching): task_id = Column(Integer, ForeignKey('tasks.id'), nullable=False) task = relationship("STask", back_populates="models") + evaluations = Column(Text) __table_args__ = (UniqueConstraint('name', 'task_id', name='models_name_and_ref'),) def to_obj(self) -> Model: @@ -163,7 +176,8 @@ def to_obj(self) -> Model: description=self.description, params=safe_loads(self.params, Dict[str, Any]), id=self.id, - task_id=self.task_id) + task_id=self.task_id, + evaluations=safe_loads(self.evaluations, Dict[str, EvaluationResults])) return self.attach(model) @classmethod @@ -177,7 +191,8 @@ def get_kwargs(cls, model: Model) -> dict: requirements=dumps(model.requirements), description=model.description, params=dumps(model.params), - task_id=model.task_id) + task_id=model.task_id, + evaluations=dumps(model.evaluations)) class SPipeline(Base, Attaching): @@ -196,6 +211,7 @@ class SPipeline(Base, Attaching): task_id = Column(Integer, ForeignKey('tasks.id'), nullable=False) task = relationship("STask", back_populates="pipelines") + evaluations = Column(Text) __table_args__ = (UniqueConstraint('name', 'task_id', name='pipelines_name_and_ref'),) def to_obj(self) -> Pipeline: @@ -206,7 +222,8 @@ def to_obj(self) -> Pipeline: author=self.author, creation_date=self.creation_date, id=self.id, - task_id=self.task_id) + task_id=self.task_id, + evaluations=safe_loads(self.evaluations, EvaluationResults)) return self.attach(pipeline) @classmethod @@ -218,7 +235,8 @@ def get_kwargs(cls, pipeline: Pipeline) -> dict: steps=dumps(pipeline.steps), input_data=dumps(pipeline.input_data), output_data=dumps(pipeline.output_data), - task_id=pipeline.task_id) + task_id=pipeline.task_id, + evaluations=dumps(pipeline.evaluations)) class SImage(Base, Attaching): diff --git a/tests/repository/metadata/meta_common.py b/tests/repository/metadata/meta_common.py index 5e9516b4..91ed673d 100644 --- a/tests/repository/metadata/meta_common.py +++ b/tests/repository/metadata/meta_common.py @@ -13,6 +13,7 @@ from ebonite.core.objects.core import Model, Pipeline, Project, Task from ebonite.repository.metadata import MetadataRepository + # from tests.ext.sqlalchemy.conftest import sqlalchemy_meta as meta # from tests.repository.metadata.test_local.conftest import local_meta as meta # _ = [meta] @@ -359,7 +360,8 @@ def test_update_task_with_models(meta: MetadataRepository, project: Project, tas excepted_fields=['id', 'models', 'pipelines', 'datasets', 'evaluation_sets', 'metrics', 'project_id']) model = update_object_fields(model, excepted_fields=['id', 'wrapper', 'artifact', 'requirements', - 'wrapper_meta', 'task_id', 'wrapper_obj', 'params']) + 'wrapper_meta', 'task_id', 'wrapper_obj', 'params', + 'evaluations']) updated_task = meta.update_task(task) assert id == task.id @@ -628,7 +630,8 @@ def test_update_model(meta: MetadataRepository, project: Project, task: Task, mo id = model.id model = update_object_fields(model, excepted_fields=['id', 'wrapper', 'artifact', 'requirements', - 'wrapper_meta', 'task_id', 'wrapper_obj', 'params']) + 'wrapper_meta', 'task_id', 'wrapper_obj', 'params', + 'evaluations']) model = meta.update_model(model) assert id == model.id @@ -648,7 +651,8 @@ def test_update_model_source_is_changed(meta: MetadataRepository, project: Proje id = saved_model.id saved_model = update_object_fields(model, excepted_fields=['id', 'wrapper', 'artifact', 'requirements', - 'wrapper_meta', 'task_id', 'wrapper_obj', 'params']) + 'wrapper_meta', 'task_id', 'wrapper_obj', 'params', + 'evaluations']) saved_model = meta.update_model(saved_model) assert id == saved_model.id @@ -726,7 +730,8 @@ def test_save_updated_existing_model(meta: MetadataRepository, project: Project, model = meta.create_model(model) model = update_object_fields(model, excepted_fields=['id', 'wrapper', 'artifact', 'requirements', - 'wrapper_meta', 'task_id', 'wrapper_obj', 'params']) + 'wrapper_meta', 'task_id', 'wrapper_obj', 'params', + 'evaluations']) saved_model = meta.save_model(model) assert saved_model == model @@ -900,7 +905,8 @@ def test_update_pipeline(meta: MetadataRepository, project: Project, task: Task, id = pipeline.id - pipeline = update_object_fields(pipeline, excepted_fields=['id', 'input_data', 'output_data', 'task_id']) + pipeline = update_object_fields(pipeline, + excepted_fields=['id', 'input_data', 'output_data', 'task_id', 'evaluations']) pipeline = meta.update_pipeline(pipeline) assert id == pipeline.id @@ -919,7 +925,8 @@ def test_update_pipeline_source_is_changed(meta: MetadataRepository, project: Pr id = saved_pipeline.id - saved_pipeline = update_object_fields(pipeline, excepted_fields=['id', 'input_data', 'output_data', 'task_id']) + saved_pipeline = update_object_fields(pipeline, + excepted_fields=['id', 'input_data', 'output_data', 'task_id', 'evaluations']) saved_pipeline = meta.update_pipeline(saved_pipeline) assert id == saved_pipeline.id @@ -996,7 +1003,8 @@ def test_save_updated_existing_pipeline(meta: MetadataRepository, project: Proje pipeline.task_id = task.id pipeline = meta.create_pipeline(pipeline) - pipeline = update_object_fields(pipeline, excepted_fields=['id', 'input_data', 'output_data', 'task_id']) + pipeline = update_object_fields(pipeline, + excepted_fields=['id', 'input_data', 'output_data', 'task_id', 'evaluations']) saved_pipeline = meta.save_pipeline(pipeline) assert saved_pipeline == pipeline From add294bc41024746de96eb4f6bcfc16c0cd495d4 Mon Sep 17 00:00:00 2001 From: mike0sv Date: Tue, 7 Jul 2020 00:44:59 +0300 Subject: [PATCH 06/20] EBNT-404 add fields to sql model --- tests/repository/metadata/meta_common.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/repository/metadata/meta_common.py b/tests/repository/metadata/meta_common.py index 91ed673d..b631bd47 100644 --- a/tests/repository/metadata/meta_common.py +++ b/tests/repository/metadata/meta_common.py @@ -13,7 +13,6 @@ from ebonite.core.objects.core import Model, Pipeline, Project, Task from ebonite.repository.metadata import MetadataRepository - # from tests.ext.sqlalchemy.conftest import sqlalchemy_meta as meta # from tests.repository.metadata.test_local.conftest import local_meta as meta # _ = [meta] From 58794b03e85e63b590a5cd496a5d67a086f271c7 Mon Sep 17 00:00:00 2001 From: mike0sv Date: Mon, 13 Jul 2020 09:37:39 +0300 Subject: [PATCH 07/20] EBNT-384 dvc dataset source --- src/ebonite/ext/dvc/__init__.py | 0 src/ebonite/ext/dvc/dataset_source.py | 33 +++++++ src/ebonite/ext/pandas/dataset_source.py | 17 ++-- src/ebonite/repository/dataset/artifact.py | 19 ++-- test.requirements.txt | 2 + tests/ext/test_dvc/__init__.py | 0 tests/ext/test_dvc/data1.csv | 3 + tests/ext/test_dvc/test_dataset_source.py | 101 +++++++++++++++++++++ tests/ext/test_s3/conftest.py | 2 +- 9 files changed, 163 insertions(+), 14 deletions(-) create mode 100644 src/ebonite/ext/dvc/__init__.py create mode 100644 src/ebonite/ext/dvc/dataset_source.py create mode 100644 tests/ext/test_dvc/__init__.py create mode 100644 tests/ext/test_dvc/data1.csv create mode 100644 tests/ext/test_dvc/test_dataset_source.py diff --git a/src/ebonite/ext/dvc/__init__.py b/src/ebonite/ext/dvc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/ebonite/ext/dvc/dataset_source.py b/src/ebonite/ext/dvc/dataset_source.py new file mode 100644 index 00000000..4af0384b --- /dev/null +++ b/src/ebonite/ext/dvc/dataset_source.py @@ -0,0 +1,33 @@ +import contextlib + +import dvc.api +from dvc.repo import Repo + +from ebonite.core.objects.artifacts import Blob, Blobs, StreamContextManager +from ebonite.core.objects.dataset_source import DatasetSource +from ebonite.repository.dataset.artifact import ArtifactDatasetSource, DatasetReader + + +class DvcBlob(Blob): + def __init__(self, path: str, repo: str = None, rev: str = None, remote: str = None, mode: str = 'r', + encoding: str = None): + self.path = path + self.repo = repo + self.rev = rev + self.remote = remote + self.mode = mode + self.encoding = encoding + + def materialize(self, path): + Repo.get(self.remote, self.path, out=path, rev=self.rev) # TODO tests + + @contextlib.contextmanager + def bytestream(self) -> StreamContextManager: + with dvc.api.open(self.path, self.repo, self.rev, self.remote, self.mode, self.encoding) as f: + yield f + + +def create_dvc_source(path: str, reader: DatasetReader, repo, rev: str = None, remote: str = None, mode: str = 'r', + encoding: str = None) -> DatasetSource: + artifacts = Blobs.from_blobs({path: DvcBlob(path, repo, rev, remote, mode, encoding)}) + return ArtifactDatasetSource(reader, artifacts) diff --git a/src/ebonite/ext/pandas/dataset_source.py b/src/ebonite/ext/pandas/dataset_source.py index 8238220d..8de1bd0b 100644 --- a/src/ebonite/ext/pandas/dataset_source.py +++ b/src/ebonite/ext/pandas/dataset_source.py @@ -191,16 +191,17 @@ class PandasReader(DatasetReader): """DatasetReader for pandas dataframes :param format: PandasFormat instance to use - :param data_type: DataFrameType to use for aliging read data + :param dataset_type: DataFrameType to use for aliging read data """ - def __init__(self, format: PandasFormat, data_type: DataFrameType): - self.data_type = data_type + def __init__(self, format: PandasFormat, dataset_type: DataFrameType, path: str = None): + super(PandasReader, self).__init__(dataset_type) + self.path = path or PANDAS_DATA_FILE self.format = format def read(self, artifacts: ArtifactCollection) -> Dataset: - with artifacts.blob_dict() as blobs, blobs[PANDAS_DATA_FILE].bytestream() as b: - return Dataset.from_object(self.data_type.align(self.format.read(b))) + with artifacts.blob_dict() as blobs, blobs[self.path].bytestream() as b: + return Dataset.from_object(self.dataset_type.align(self.format.read(b))) class PandasWriter(DatasetWriter): @@ -209,12 +210,14 @@ class PandasWriter(DatasetWriter): :param format: PandasFormat instance to use """ - def __init__(self, format: PandasFormat): + def __init__(self, format: PandasFormat, path: str = None): + self.path = path or PANDAS_DATA_FILE self.format = format def write(self, dataset: Dataset) -> Tuple[DatasetReader, ArtifactCollection]: blob = LazyBlob(lambda: self.format.write(dataset.data)) - return PandasReader(self.format, dataset.dataset_type), ArtifactCollection.from_blobs({PANDAS_DATA_FILE: blob}) + return (PandasReader(self.format, dataset.dataset_type, self.path), + ArtifactCollection.from_blobs({self.path: blob})) # class PandasJdbcDatasetSource(_PandasDatasetSource): # def __init__(self, dataset_type: DatasetType, table: str, connection: str, diff --git a/src/ebonite/repository/dataset/artifact.py b/src/ebonite/repository/dataset/artifact.py index 9acd1e1e..a8ae51a1 100644 --- a/src/ebonite/repository/dataset/artifact.py +++ b/src/ebonite/repository/dataset/artifact.py @@ -13,13 +13,20 @@ @type_field('type') class DatasetReader(EboniteParams): - """ABC for reading Dataset from files (artifacts) to use with ArtifactDatasetSource""" + """ABC for reading Dataset from files (artifacts) to use with ArtifactDatasetSource + + :param dataset_type: type of the resulting dataset + """ + + def __init__(self, dataset_type: DatasetType): + self.dataset_type = dataset_type @abstractmethod def read(self, artifacts: ArtifactCollection) -> Dataset: """Method to read Dataset from artifacts - :param artifacts: artifacts to read""" + :param artifacts: artifacts to read + """ @type_field('type') @@ -56,7 +63,7 @@ def save(self, dataset_id: str, dataset: Dataset) -> DatasetSource: pushed = self.repo.push_artifact(self.ARTIFACT_TYPE, dataset_id, blobs) except ArtifactExistsError as e: raise DatasetExistsError(dataset_id, self, e) - return ArtifactDatasetSource(reader, pushed, dataset.dataset_type) + return ArtifactDatasetSource(reader, pushed) def delete(self, dataset_id: str): try: @@ -70,10 +77,10 @@ class ArtifactDatasetSource(DatasetSource): :param reader: DatasetReader for this dataset :param artifacts: ArtifactCollection with actual files - :param dataset_type: DatasetType of contained dataset""" + """ - def __init__(self, reader: DatasetReader, artifacts: ArtifactCollection, dataset_type: DatasetType): - super(ArtifactDatasetSource, self).__init__(dataset_type) + def __init__(self, reader: DatasetReader, artifacts: ArtifactCollection): + super(ArtifactDatasetSource, self).__init__(reader.dataset_type) self.reader = reader self.artifacts = artifacts diff --git a/test.requirements.txt b/test.requirements.txt index ee56552d..1b1e164a 100644 --- a/test.requirements.txt +++ b/test.requirements.txt @@ -27,3 +27,5 @@ lightgbm==2.3.1 torch==1.4.0+cpu ; sys_platform != "darwin" torch==1.4.0 ; sys_platform == "darwin" + +dvc==1.1.7 \ No newline at end of file diff --git a/tests/ext/test_dvc/__init__.py b/tests/ext/test_dvc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/ext/test_dvc/data1.csv b/tests/ext/test_dvc/data1.csv new file mode 100644 index 00000000..a10481fd --- /dev/null +++ b/tests/ext/test_dvc/data1.csv @@ -0,0 +1,3 @@ +col1,col2 +123,asdf +456,cvbx \ No newline at end of file diff --git a/tests/ext/test_dvc/test_dataset_source.py b/tests/ext/test_dvc/test_dataset_source.py new file mode 100644 index 00000000..19bb39cc --- /dev/null +++ b/tests/ext/test_dvc/test_dataset_source.py @@ -0,0 +1,101 @@ +import contextlib +import os +import shutil + +import pandas as pd +import pytest +from dvc.cli import parse_args +from dvc.command.add import CmdAdd +from dvc.command.data_sync import CmdDataPush +from dvc.command.init import CmdInit +from dvc.command.remote import CmdRemoteAdd, CmdRemoteModify + +from ebonite.core.analyzer.dataset import DatasetAnalyzer +from ebonite.ext.dvc.dataset_source import create_dvc_source +from ebonite.ext.pandas import DataFrameType +from ebonite.ext.pandas.dataset_source import PandasFormatCsv, PandasReader +from ebonite.ext.s3 import S3ArtifactRepository +from ebonite.utils import fs +from tests.conftest import docker_test +from tests.ext.test_s3.conftest import ACCESS_KEY, SECRET_KEY # noqa + + +@pytest.fixture +def dvc_repo_factory(tmpdir_factory): + def dvc_repo(remote, remote_cmd=None): + repo_path = str(tmpdir_factory.mktemp('repo')) + + curdir = os.path.abspath('.') + try: + os.chdir(repo_path) + + CmdInit(parse_args(['init', '--no-scm'])).run() + + CmdRemoteAdd(parse_args(['remote', 'add', '-d', 'storage', remote])).run() + if remote_cmd is not None: + remote_cmd, args = remote_cmd + remote_cmd(parse_args(args)).run() + + shutil.copy(fs.current_module_path('data1.csv'), repo_path) + + CmdAdd(parse_args(['add', 'data1.csv'])).run() + CmdDataPush(parse_args(['push'])).run() + + shutil.rmtree(os.path.join(repo_path, '.dvc', 'cache')) + finally: + os.chdir(curdir) + + return repo_path + + return dvc_repo + + +@pytest.fixture +def local_dvc_repo(tmpdir_factory, dvc_repo_factory): + storage_path = str(tmpdir_factory.mktemp('storage')) + return dvc_repo_factory(storage_path) + + +@contextlib.contextmanager +def override_env(**envs): + prev = {e: os.environ.get(e, None) for e in envs.keys()} + try: + for e, val in envs.items(): + os.environ[e] = val + yield + finally: + for e, val in prev.items(): + if val is not None: + os.environ[e] = val + + +@pytest.fixture +def s3_dvc_repo(s3server, dvc_repo_factory): + url = f'http://localhost:{s3server}' + + with override_env(AWS_ACCESS_KEY_ID=ACCESS_KEY, AWS_SECRET_ACCESS_KEY=SECRET_KEY, + S3_ACCESS_KEY=ACCESS_KEY, S3_SECRET_KEY=SECRET_KEY): + S3ArtifactRepository('dvc-bucket', url)._ensure_bucket() # noqa + return dvc_repo_factory('s3://dvc-bucket', + (CmdRemoteModify, ['remote', 'modify', 'storage', 'endpointurl', url])) + + +def test_create_dvc_source__local(local_dvc_repo): + dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], []) + ds = create_dvc_source(path='data1.csv', + reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'), + repo=local_dvc_repo) + dataset = ds.read() + assert isinstance(dataset.data, pd.DataFrame) + assert DatasetAnalyzer.analyze(dataset.data) == dt + + +@docker_test +def test_create_dvc_source_s3(s3_dvc_repo): + dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], []) + ds = create_dvc_source(path='data1.csv', + reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'), + repo=s3_dvc_repo) + dataset = ds.read() + assert isinstance(dataset.data, pd.DataFrame) + assert DatasetAnalyzer.analyze(dataset.data) == dt diff --git a/tests/ext/test_s3/conftest.py b/tests/ext/test_s3/conftest.py index fd6be9ff..4ba20674 100644 --- a/tests/ext/test_s3/conftest.py +++ b/tests/ext/test_s3/conftest.py @@ -25,7 +25,7 @@ def delete_bucket(repo: S3ArtifactRepository): # fake fixture that ensures that S3 server is up between tests -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def s3server(pytestconfig): if not has_docker() or 'not docker' in pytestconfig.getoption('markexpr'): pytest.skip('skipping docker tests') From ca6e05c88bea0ef1012fc03dc417b8374cd13293 Mon Sep 17 00:00:00 2001 From: mike0sv Date: Tue, 14 Jul 2020 16:20:09 +0300 Subject: [PATCH 08/20] fix tests --- tests/repository/dataset/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/repository/dataset/conftest.py b/tests/repository/dataset/conftest.py index 82545611..b55b2fe8 100644 --- a/tests/repository/dataset/conftest.py +++ b/tests/repository/dataset/conftest.py @@ -11,7 +11,7 @@ class TestDatasetWriter(DatasetWriter): def write(self, dataset: Dataset) -> Tuple[DatasetReader, ArtifactCollection]: - return TestDatasetReader(), Blobs({'data': InMemoryBlob(dataset.data.encode('utf8'))}) + return TestDatasetReader(dataset.dataset_type), Blobs({'data': InMemoryBlob(dataset.data.encode('utf8'))}) class TestDatasetReader(DatasetReader): From f73d576f86e6caa4f9717ffe2e5540d673265470 Mon Sep 17 00:00:00 2001 From: mike0sv Date: Tue, 14 Jul 2020 18:15:48 +0300 Subject: [PATCH 09/20] EBNT-384 fix tests --- src/ebonite/ext/numpy/dataset_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ebonite/ext/numpy/dataset_source.py b/src/ebonite/ext/numpy/dataset_source.py index ac59c3f9..5b64021d 100644 --- a/src/ebonite/ext/numpy/dataset_source.py +++ b/src/ebonite/ext/numpy/dataset_source.py @@ -22,7 +22,7 @@ class NumpyNdarrayWriter(DatasetWriter): """DatasetWriter implementation for numpy ndarray""" def write(self, dataset: Dataset) -> Tuple[DatasetReader, ArtifactCollection]: - return NumpyNdarrayReader(), ArtifactCollection.from_blobs( + return NumpyNdarrayReader(dataset.dataset_type), ArtifactCollection.from_blobs( {DATA_FILE: LazyBlob(lambda: save_npz(dataset.data))}) From 8bdee2630b272bc114ef40010c18129fb18fd68d Mon Sep 17 00:00:00 2001 From: mike0sv Date: Tue, 14 Jul 2020 21:52:27 +0300 Subject: [PATCH 10/20] EBNT-384 windows tests --- tests/ext/test_dvc/test_dataset_source.py | 2 +- tests/ext/test_ext_loader.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/ext/test_dvc/test_dataset_source.py b/tests/ext/test_dvc/test_dataset_source.py index 19bb39cc..74d26506 100644 --- a/tests/ext/test_dvc/test_dataset_source.py +++ b/tests/ext/test_dvc/test_dataset_source.py @@ -41,7 +41,7 @@ def dvc_repo(remote, remote_cmd=None): CmdAdd(parse_args(['add', 'data1.csv'])).run() CmdDataPush(parse_args(['push'])).run() - shutil.rmtree(os.path.join(repo_path, '.dvc', 'cache')) + shutil.rmtree(os.path.join(repo_path, '.dvc', 'cache'), ignore_errors=True) finally: os.chdir(curdir) diff --git a/tests/ext/test_ext_loader.py b/tests/ext/test_ext_loader.py index c442a300..e48bb8c6 100644 --- a/tests/ext/test_ext_loader.py +++ b/tests/ext/test_ext_loader.py @@ -92,7 +92,6 @@ def test_extension_loader__lazy(ext_loader, two_temp_modules): assert module_imported(module2) -@pytest.mark.kek def test_extension_loader__lazy_defered(ext_loader, two_temp_modules, temp_module_factory): module1, module2 = two_temp_modules From a70207a6bdcdf9207389c0f946147c08ab85cd25 Mon Sep 17 00:00:00 2001 From: mike0sv Date: Tue, 14 Jul 2020 22:29:45 +0300 Subject: [PATCH 11/20] EBNT-384 windows tests --- tests/ext/test_dvc/test_dataset_source.py | 38 +++++++++-------------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/tests/ext/test_dvc/test_dataset_source.py b/tests/ext/test_dvc/test_dataset_source.py index 74d26506..ffacbada 100644 --- a/tests/ext/test_dvc/test_dataset_source.py +++ b/tests/ext/test_dvc/test_dataset_source.py @@ -4,11 +4,7 @@ import pandas as pd import pytest -from dvc.cli import parse_args -from dvc.command.add import CmdAdd -from dvc.command.data_sync import CmdDataPush -from dvc.command.init import CmdInit -from dvc.command.remote import CmdRemoteAdd, CmdRemoteModify +from dvc.repo import Repo from ebonite.core.analyzer.dataset import DatasetAnalyzer from ebonite.ext.dvc.dataset_source import create_dvc_source @@ -22,28 +18,24 @@ @pytest.fixture def dvc_repo_factory(tmpdir_factory): - def dvc_repo(remote, remote_cmd=None): + def dvc_repo(remote, remote_kwargs=None): repo_path = str(tmpdir_factory.mktemp('repo')) - curdir = os.path.abspath('.') - try: - os.chdir(repo_path) + repo = Repo.init(repo_path, no_scm=True) - CmdInit(parse_args(['init', '--no-scm'])).run() + with repo.config.edit() as conf: + remote_config = {'url': remote} + if remote_kwargs is not None: + remote_config.update(remote_kwargs) + conf['remote']['storage'] = remote_config + conf['core']['remote'] = 'storage' - CmdRemoteAdd(parse_args(['remote', 'add', '-d', 'storage', remote])).run() - if remote_cmd is not None: - remote_cmd, args = remote_cmd - remote_cmd(parse_args(args)).run() + shutil.copy(fs.current_module_path('data1.csv'), repo_path) - shutil.copy(fs.current_module_path('data1.csv'), repo_path) - - CmdAdd(parse_args(['add', 'data1.csv'])).run() - CmdDataPush(parse_args(['push'])).run() - - shutil.rmtree(os.path.join(repo_path, '.dvc', 'cache'), ignore_errors=True) - finally: - os.chdir(curdir) + repo.add([os.path.join(repo_path, 'data1.csv')]) + repo.push() + os.remove(os.path.join(repo_path, 'data1.csv')) + shutil.rmtree(os.path.join(repo_path, '.dvc', 'cache'), ignore_errors=True) return repo_path @@ -77,7 +69,7 @@ def s3_dvc_repo(s3server, dvc_repo_factory): S3_ACCESS_KEY=ACCESS_KEY, S3_SECRET_KEY=SECRET_KEY): S3ArtifactRepository('dvc-bucket', url)._ensure_bucket() # noqa return dvc_repo_factory('s3://dvc-bucket', - (CmdRemoteModify, ['remote', 'modify', 'storage', 'endpointurl', url])) + {'endpointurl': url}) def test_create_dvc_source__local(local_dvc_repo): From f91f41c221b2eee2f8460573e7ddf9324c62d863 Mon Sep 17 00:00:00 2001 From: mike0sv Date: Tue, 14 Jul 2020 23:51:23 +0300 Subject: [PATCH 12/20] EBNT-384 windows tests --- tests/ext/test_dvc/test_dataset_source.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/ext/test_dvc/test_dataset_source.py b/tests/ext/test_dvc/test_dataset_source.py index ffacbada..41c79973 100644 --- a/tests/ext/test_dvc/test_dataset_source.py +++ b/tests/ext/test_dvc/test_dataset_source.py @@ -17,9 +17,9 @@ @pytest.fixture -def dvc_repo_factory(tmpdir_factory): +def dvc_repo_factory(tmpdir): def dvc_repo(remote, remote_kwargs=None): - repo_path = str(tmpdir_factory.mktemp('repo')) + repo_path = tmpdir repo = Repo.init(repo_path, no_scm=True) From 6cd09da6dcfe0ef410de3035abe1286e73964063 Mon Sep 17 00:00:00 2001 From: mike0sv Date: Wed, 15 Jul 2020 00:06:41 +0300 Subject: [PATCH 13/20] EBNT-384 FUUUUUU --- tests/ext/test_dvc/test_dataset_source.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/ext/test_dvc/test_dataset_source.py b/tests/ext/test_dvc/test_dataset_source.py index 41c79973..64dd701f 100644 --- a/tests/ext/test_dvc/test_dataset_source.py +++ b/tests/ext/test_dvc/test_dataset_source.py @@ -72,7 +72,7 @@ def s3_dvc_repo(s3server, dvc_repo_factory): {'endpointurl': url}) -def test_create_dvc_source__local(local_dvc_repo): +def no_test_create_dvc_source__local(local_dvc_repo): dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], []) ds = create_dvc_source(path='data1.csv', reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'), @@ -83,7 +83,7 @@ def test_create_dvc_source__local(local_dvc_repo): @docker_test -def test_create_dvc_source_s3(s3_dvc_repo): +def no_test_create_dvc_source_s3(s3_dvc_repo): dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], []) ds = create_dvc_source(path='data1.csv', reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'), From 332947cb525a9b835fb76d16fe4dc124a5173aeb Mon Sep 17 00:00:00 2001 From: mike0sv Date: Wed, 15 Jul 2020 09:32:21 +0300 Subject: [PATCH 14/20] EBNT-384 add dvc ext --- src/ebonite/ext/dvc/__init__.py | 3 +++ src/ebonite/ext/ext_loader.py | 3 ++- tests/ext/test_dvc/test_dataset_source.py | 12 +++++++----- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/ebonite/ext/dvc/__init__.py b/src/ebonite/ext/dvc/__init__.py index e69de29b..7dd95579 100644 --- a/src/ebonite/ext/dvc/__init__.py +++ b/src/ebonite/ext/dvc/__init__.py @@ -0,0 +1,3 @@ +from .dataset_source import DvcBlob, create_dvc_source + +__all__ = ['DvcBlob', 'create_dvc_source'] diff --git a/src/ebonite/ext/ext_loader.py b/src/ebonite/ext/ext_loader.py index 07c6c95b..3f5bc4a2 100644 --- a/src/ebonite/ext/ext_loader.py +++ b/src/ebonite/ext/ext_loader.py @@ -77,7 +77,8 @@ class ExtensionLoader: Extension('ebonite.ext.imageio', ['imageio']), Extension('ebonite.ext.lightgbm', ['lightgbm'], False), Extension('ebonite.ext.xgboost', ['xgboost'], False), - Extension('ebonite.ext.docker', ['docker'], False) + Extension('ebonite.ext.docker', ['docker'], False), + Extension('ebonite.ext.dvc', ['dvc'], False) ) _loaded_extensions: Dict[Extension, ModuleType] = {} diff --git a/tests/ext/test_dvc/test_dataset_source.py b/tests/ext/test_dvc/test_dataset_source.py index 64dd701f..47535107 100644 --- a/tests/ext/test_dvc/test_dataset_source.py +++ b/tests/ext/test_dvc/test_dataset_source.py @@ -31,10 +31,12 @@ def dvc_repo(remote, remote_kwargs=None): conf['core']['remote'] = 'storage' shutil.copy(fs.current_module_path('data1.csv'), repo_path) - - repo.add([os.path.join(repo_path, 'data1.csv')]) + data1_path = os.path.join(repo_path, 'data1.csv') + assert os.path.exists(data1_path) + repo.add([data1_path]) + assert os.path.exists(data1_path + '.dvc') repo.push() - os.remove(os.path.join(repo_path, 'data1.csv')) + os.remove(data1_path) shutil.rmtree(os.path.join(repo_path, '.dvc', 'cache'), ignore_errors=True) return repo_path @@ -72,7 +74,7 @@ def s3_dvc_repo(s3server, dvc_repo_factory): {'endpointurl': url}) -def no_test_create_dvc_source__local(local_dvc_repo): +def test_create_dvc_source__local(local_dvc_repo): dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], []) ds = create_dvc_source(path='data1.csv', reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'), @@ -83,7 +85,7 @@ def no_test_create_dvc_source__local(local_dvc_repo): @docker_test -def no_test_create_dvc_source_s3(s3_dvc_repo): +def test_create_dvc_source_s3(s3_dvc_repo): dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], []) ds = create_dvc_source(path='data1.csv', reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'), From e95e6f5609d7a4dfe29547223c004bf927086c55 Mon Sep 17 00:00:00 2001 From: mike0sv Date: Wed, 15 Jul 2020 10:09:25 +0300 Subject: [PATCH 15/20] EBNT-384 no dvc --- test.requirements.txt | 2 +- tests/ext/test_dvc/test_dataset_source.py | 14 +++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/test.requirements.txt b/test.requirements.txt index 1b1e164a..e3b31e00 100644 --- a/test.requirements.txt +++ b/test.requirements.txt @@ -28,4 +28,4 @@ torch==1.4.0+cpu ; sys_platform != "darwin" torch==1.4.0 ; sys_platform == "darwin" -dvc==1.1.7 \ No newline at end of file +#dvc==1.1.7 \ No newline at end of file diff --git a/tests/ext/test_dvc/test_dataset_source.py b/tests/ext/test_dvc/test_dataset_source.py index 47535107..64c41a8a 100644 --- a/tests/ext/test_dvc/test_dataset_source.py +++ b/tests/ext/test_dvc/test_dataset_source.py @@ -4,10 +4,9 @@ import pandas as pd import pytest -from dvc.repo import Repo from ebonite.core.analyzer.dataset import DatasetAnalyzer -from ebonite.ext.dvc.dataset_source import create_dvc_source +# from ebonite.ext.dvc.dataset_source import create_dvc_source # noqa from ebonite.ext.pandas import DataFrameType from ebonite.ext.pandas.dataset_source import PandasFormatCsv, PandasReader from ebonite.ext.s3 import S3ArtifactRepository @@ -16,11 +15,14 @@ from tests.ext.test_s3.conftest import ACCESS_KEY, SECRET_KEY # noqa +# from dvc.repo import Repo # noqa + + @pytest.fixture def dvc_repo_factory(tmpdir): def dvc_repo(remote, remote_kwargs=None): repo_path = tmpdir - + Repo = None repo = Repo.init(repo_path, no_scm=True) with repo.config.edit() as conf: @@ -74,7 +76,8 @@ def s3_dvc_repo(s3server, dvc_repo_factory): {'endpointurl': url}) -def test_create_dvc_source__local(local_dvc_repo): +def _test_create_dvc_source__local(local_dvc_repo): + create_dvc_source = None dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], []) ds = create_dvc_source(path='data1.csv', reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'), @@ -85,7 +88,8 @@ def test_create_dvc_source__local(local_dvc_repo): @docker_test -def test_create_dvc_source_s3(s3_dvc_repo): +def _test_create_dvc_source_s3(s3_dvc_repo): + create_dvc_source = None dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], []) ds = create_dvc_source(path='data1.csv', reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'), From a91188264670c2b3cdcae1c0cc8dd3552aaee927 Mon Sep 17 00:00:00 2001 From: mike0sv Date: Wed, 15 Jul 2020 10:15:01 +0300 Subject: [PATCH 16/20] EBNT-384 no dvc --- tests/ext/test_dvc/test_dataset_source.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/ext/test_dvc/test_dataset_source.py b/tests/ext/test_dvc/test_dataset_source.py index 64c41a8a..e4eb9308 100644 --- a/tests/ext/test_dvc/test_dataset_source.py +++ b/tests/ext/test_dvc/test_dataset_source.py @@ -6,7 +6,6 @@ import pytest from ebonite.core.analyzer.dataset import DatasetAnalyzer -# from ebonite.ext.dvc.dataset_source import create_dvc_source # noqa from ebonite.ext.pandas import DataFrameType from ebonite.ext.pandas.dataset_source import PandasFormatCsv, PandasReader from ebonite.ext.s3 import S3ArtifactRepository @@ -15,9 +14,6 @@ from tests.ext.test_s3.conftest import ACCESS_KEY, SECRET_KEY # noqa -# from dvc.repo import Repo # noqa - - @pytest.fixture def dvc_repo_factory(tmpdir): def dvc_repo(remote, remote_kwargs=None): From 238137976556e60fc74870c72f01f6b4bff0b5f8 Mon Sep 17 00:00:00 2001 From: mike0sv Date: Wed, 15 Jul 2020 10:38:13 +0300 Subject: [PATCH 17/20] EBNT-384 add dvc install --- test.requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test.requirements.txt b/test.requirements.txt index e3b31e00..1b1e164a 100644 --- a/test.requirements.txt +++ b/test.requirements.txt @@ -28,4 +28,4 @@ torch==1.4.0+cpu ; sys_platform != "darwin" torch==1.4.0 ; sys_platform == "darwin" -#dvc==1.1.7 \ No newline at end of file +dvc==1.1.7 \ No newline at end of file From e4564643be6342f1aca5b4aba8825f4bccbf628f Mon Sep 17 00:00:00 2001 From: mike0sv Date: Wed, 15 Jul 2020 11:00:55 +0300 Subject: [PATCH 18/20] EBNT-384 add dvc import --- tests/ext/test_dvc/test_dataset_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/ext/test_dvc/test_dataset_source.py b/tests/ext/test_dvc/test_dataset_source.py index e4eb9308..79fdb521 100644 --- a/tests/ext/test_dvc/test_dataset_source.py +++ b/tests/ext/test_dvc/test_dataset_source.py @@ -4,6 +4,7 @@ import pandas as pd import pytest +from dvc.repo import Repo from ebonite.core.analyzer.dataset import DatasetAnalyzer from ebonite.ext.pandas import DataFrameType @@ -18,7 +19,6 @@ def dvc_repo_factory(tmpdir): def dvc_repo(remote, remote_kwargs=None): repo_path = tmpdir - Repo = None repo = Repo.init(repo_path, no_scm=True) with repo.config.edit() as conf: From 9f4e6b557fc12ce349c1fd3e7af504db6970076b Mon Sep 17 00:00:00 2001 From: mike0sv Date: Wed, 15 Jul 2020 11:34:25 +0300 Subject: [PATCH 19/20] EBNT-384 no color --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 28675b16..05d0c728 100644 --- a/tox.ini +++ b/tox.ini @@ -44,7 +44,7 @@ deps = coverage==4.5.4 ; coveralls 5.x is buggy for now commands = - pytest --cov --cov-report= --cov-append --disable-warnings -m "not docker and not tf_v1" tests + pytest --color=no --cov --cov-report= --cov-append --disable-warnings -m "not docker and not tf_v1" tests pytest --cov --cov-report= --cov-append --disable-warnings tests_requirements pytest --cov --cov-report= --cov-append --disable-warnings -m docker tests pip uninstall -y tensorflow From ada049cd02c91131ea4d4907f61c44eb96b23368 Mon Sep 17 00:00:00 2001 From: mike0sv Date: Wed, 15 Jul 2020 11:46:37 +0300 Subject: [PATCH 20/20] EBNT-384 local imports --- tests/ext/test_dvc/test_dataset_source.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/ext/test_dvc/test_dataset_source.py b/tests/ext/test_dvc/test_dataset_source.py index 79fdb521..0e5350ea 100644 --- a/tests/ext/test_dvc/test_dataset_source.py +++ b/tests/ext/test_dvc/test_dataset_source.py @@ -4,7 +4,6 @@ import pandas as pd import pytest -from dvc.repo import Repo from ebonite.core.analyzer.dataset import DatasetAnalyzer from ebonite.ext.pandas import DataFrameType @@ -19,6 +18,7 @@ def dvc_repo_factory(tmpdir): def dvc_repo(remote, remote_kwargs=None): repo_path = tmpdir + from dvc.repo import Repo repo = Repo.init(repo_path, no_scm=True) with repo.config.edit() as conf: @@ -72,9 +72,9 @@ def s3_dvc_repo(s3server, dvc_repo_factory): {'endpointurl': url}) -def _test_create_dvc_source__local(local_dvc_repo): - create_dvc_source = None +def test_create_dvc_source__local(local_dvc_repo): dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], []) + from ebonite.ext.dvc import create_dvc_source ds = create_dvc_source(path='data1.csv', reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'), repo=local_dvc_repo) @@ -84,9 +84,9 @@ def _test_create_dvc_source__local(local_dvc_repo): @docker_test -def _test_create_dvc_source_s3(s3_dvc_repo): - create_dvc_source = None +def test_create_dvc_source_s3(s3_dvc_repo): dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], []) + from ebonite.ext.dvc import create_dvc_source ds = create_dvc_source(path='data1.csv', reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'), repo=s3_dvc_repo)