diff --git a/src/ebonite/ext/dvc/__init__.py b/src/ebonite/ext/dvc/__init__.py new file mode 100644 index 00000000..7dd95579 --- /dev/null +++ b/src/ebonite/ext/dvc/__init__.py @@ -0,0 +1,3 @@ +from .dataset_source import DvcBlob, create_dvc_source + +__all__ = ['DvcBlob', 'create_dvc_source'] diff --git a/src/ebonite/ext/dvc/dataset_source.py b/src/ebonite/ext/dvc/dataset_source.py new file mode 100644 index 00000000..4af0384b --- /dev/null +++ b/src/ebonite/ext/dvc/dataset_source.py @@ -0,0 +1,33 @@ +import contextlib + +import dvc.api +from dvc.repo import Repo + +from ebonite.core.objects.artifacts import Blob, Blobs, StreamContextManager +from ebonite.core.objects.dataset_source import DatasetSource +from ebonite.repository.dataset.artifact import ArtifactDatasetSource, DatasetReader + + +class DvcBlob(Blob): + def __init__(self, path: str, repo: str = None, rev: str = None, remote: str = None, mode: str = 'r', + encoding: str = None): + self.path = path + self.repo = repo + self.rev = rev + self.remote = remote + self.mode = mode + self.encoding = encoding + + def materialize(self, path): + Repo.get(self.remote, self.path, out=path, rev=self.rev) # TODO tests + + @contextlib.contextmanager + def bytestream(self) -> StreamContextManager: + with dvc.api.open(self.path, self.repo, self.rev, self.remote, self.mode, self.encoding) as f: + yield f + + +def create_dvc_source(path: str, reader: DatasetReader, repo, rev: str = None, remote: str = None, mode: str = 'r', + encoding: str = None) -> DatasetSource: + artifacts = Blobs.from_blobs({path: DvcBlob(path, repo, rev, remote, mode, encoding)}) + return ArtifactDatasetSource(reader, artifacts) diff --git a/src/ebonite/ext/ext_loader.py b/src/ebonite/ext/ext_loader.py index 07c6c95b..3f5bc4a2 100644 --- a/src/ebonite/ext/ext_loader.py +++ b/src/ebonite/ext/ext_loader.py @@ -77,7 +77,8 @@ class ExtensionLoader: Extension('ebonite.ext.imageio', ['imageio']), Extension('ebonite.ext.lightgbm', ['lightgbm'], False), Extension('ebonite.ext.xgboost', ['xgboost'], False), - Extension('ebonite.ext.docker', ['docker'], False) + Extension('ebonite.ext.docker', ['docker'], False), + Extension('ebonite.ext.dvc', ['dvc'], False) ) _loaded_extensions: Dict[Extension, ModuleType] = {} diff --git a/src/ebonite/ext/numpy/dataset_source.py b/src/ebonite/ext/numpy/dataset_source.py index ac59c3f9..5b64021d 100644 --- a/src/ebonite/ext/numpy/dataset_source.py +++ b/src/ebonite/ext/numpy/dataset_source.py @@ -22,7 +22,7 @@ class NumpyNdarrayWriter(DatasetWriter): """DatasetWriter implementation for numpy ndarray""" def write(self, dataset: Dataset) -> Tuple[DatasetReader, ArtifactCollection]: - return NumpyNdarrayReader(), ArtifactCollection.from_blobs( + return NumpyNdarrayReader(dataset.dataset_type), ArtifactCollection.from_blobs( {DATA_FILE: LazyBlob(lambda: save_npz(dataset.data))}) diff --git a/src/ebonite/ext/pandas/dataset_source.py b/src/ebonite/ext/pandas/dataset_source.py index 8238220d..8de1bd0b 100644 --- a/src/ebonite/ext/pandas/dataset_source.py +++ b/src/ebonite/ext/pandas/dataset_source.py @@ -191,16 +191,17 @@ class PandasReader(DatasetReader): """DatasetReader for pandas dataframes :param format: PandasFormat instance to use - :param data_type: DataFrameType to use for aliging read data + :param dataset_type: DataFrameType to use for aliging read data """ - def __init__(self, format: PandasFormat, data_type: DataFrameType): - self.data_type = data_type + def __init__(self, format: PandasFormat, dataset_type: DataFrameType, path: str = None): + super(PandasReader, self).__init__(dataset_type) + self.path = path or PANDAS_DATA_FILE self.format = format def read(self, artifacts: ArtifactCollection) -> Dataset: - with artifacts.blob_dict() as blobs, blobs[PANDAS_DATA_FILE].bytestream() as b: - return Dataset.from_object(self.data_type.align(self.format.read(b))) + with artifacts.blob_dict() as blobs, blobs[self.path].bytestream() as b: + return Dataset.from_object(self.dataset_type.align(self.format.read(b))) class PandasWriter(DatasetWriter): @@ -209,12 +210,14 @@ class PandasWriter(DatasetWriter): :param format: PandasFormat instance to use """ - def __init__(self, format: PandasFormat): + def __init__(self, format: PandasFormat, path: str = None): + self.path = path or PANDAS_DATA_FILE self.format = format def write(self, dataset: Dataset) -> Tuple[DatasetReader, ArtifactCollection]: blob = LazyBlob(lambda: self.format.write(dataset.data)) - return PandasReader(self.format, dataset.dataset_type), ArtifactCollection.from_blobs({PANDAS_DATA_FILE: blob}) + return (PandasReader(self.format, dataset.dataset_type, self.path), + ArtifactCollection.from_blobs({self.path: blob})) # class PandasJdbcDatasetSource(_PandasDatasetSource): # def __init__(self, dataset_type: DatasetType, table: str, connection: str, diff --git a/src/ebonite/repository/dataset/artifact.py b/src/ebonite/repository/dataset/artifact.py index 9acd1e1e..a8ae51a1 100644 --- a/src/ebonite/repository/dataset/artifact.py +++ b/src/ebonite/repository/dataset/artifact.py @@ -13,13 +13,20 @@ @type_field('type') class DatasetReader(EboniteParams): - """ABC for reading Dataset from files (artifacts) to use with ArtifactDatasetSource""" + """ABC for reading Dataset from files (artifacts) to use with ArtifactDatasetSource + + :param dataset_type: type of the resulting dataset + """ + + def __init__(self, dataset_type: DatasetType): + self.dataset_type = dataset_type @abstractmethod def read(self, artifacts: ArtifactCollection) -> Dataset: """Method to read Dataset from artifacts - :param artifacts: artifacts to read""" + :param artifacts: artifacts to read + """ @type_field('type') @@ -56,7 +63,7 @@ def save(self, dataset_id: str, dataset: Dataset) -> DatasetSource: pushed = self.repo.push_artifact(self.ARTIFACT_TYPE, dataset_id, blobs) except ArtifactExistsError as e: raise DatasetExistsError(dataset_id, self, e) - return ArtifactDatasetSource(reader, pushed, dataset.dataset_type) + return ArtifactDatasetSource(reader, pushed) def delete(self, dataset_id: str): try: @@ -70,10 +77,10 @@ class ArtifactDatasetSource(DatasetSource): :param reader: DatasetReader for this dataset :param artifacts: ArtifactCollection with actual files - :param dataset_type: DatasetType of contained dataset""" + """ - def __init__(self, reader: DatasetReader, artifacts: ArtifactCollection, dataset_type: DatasetType): - super(ArtifactDatasetSource, self).__init__(dataset_type) + def __init__(self, reader: DatasetReader, artifacts: ArtifactCollection): + super(ArtifactDatasetSource, self).__init__(reader.dataset_type) self.reader = reader self.artifacts = artifacts diff --git a/test.requirements.txt b/test.requirements.txt index ee56552d..1b1e164a 100644 --- a/test.requirements.txt +++ b/test.requirements.txt @@ -27,3 +27,5 @@ lightgbm==2.3.1 torch==1.4.0+cpu ; sys_platform != "darwin" torch==1.4.0 ; sys_platform == "darwin" + +dvc==1.1.7 \ No newline at end of file diff --git a/tests/ext/test_dvc/__init__.py b/tests/ext/test_dvc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/ext/test_dvc/data1.csv b/tests/ext/test_dvc/data1.csv new file mode 100644 index 00000000..a10481fd --- /dev/null +++ b/tests/ext/test_dvc/data1.csv @@ -0,0 +1,3 @@ +col1,col2 +123,asdf +456,cvbx \ No newline at end of file diff --git a/tests/ext/test_dvc/test_dataset_source.py b/tests/ext/test_dvc/test_dataset_source.py new file mode 100644 index 00000000..0e5350ea --- /dev/null +++ b/tests/ext/test_dvc/test_dataset_source.py @@ -0,0 +1,95 @@ +import contextlib +import os +import shutil + +import pandas as pd +import pytest + +from ebonite.core.analyzer.dataset import DatasetAnalyzer +from ebonite.ext.pandas import DataFrameType +from ebonite.ext.pandas.dataset_source import PandasFormatCsv, PandasReader +from ebonite.ext.s3 import S3ArtifactRepository +from ebonite.utils import fs +from tests.conftest import docker_test +from tests.ext.test_s3.conftest import ACCESS_KEY, SECRET_KEY # noqa + + +@pytest.fixture +def dvc_repo_factory(tmpdir): + def dvc_repo(remote, remote_kwargs=None): + repo_path = tmpdir + from dvc.repo import Repo + repo = Repo.init(repo_path, no_scm=True) + + with repo.config.edit() as conf: + remote_config = {'url': remote} + if remote_kwargs is not None: + remote_config.update(remote_kwargs) + conf['remote']['storage'] = remote_config + conf['core']['remote'] = 'storage' + + shutil.copy(fs.current_module_path('data1.csv'), repo_path) + data1_path = os.path.join(repo_path, 'data1.csv') + assert os.path.exists(data1_path) + repo.add([data1_path]) + assert os.path.exists(data1_path + '.dvc') + repo.push() + os.remove(data1_path) + shutil.rmtree(os.path.join(repo_path, '.dvc', 'cache'), ignore_errors=True) + + return repo_path + + return dvc_repo + + +@pytest.fixture +def local_dvc_repo(tmpdir_factory, dvc_repo_factory): + storage_path = str(tmpdir_factory.mktemp('storage')) + return dvc_repo_factory(storage_path) + + +@contextlib.contextmanager +def override_env(**envs): + prev = {e: os.environ.get(e, None) for e in envs.keys()} + try: + for e, val in envs.items(): + os.environ[e] = val + yield + finally: + for e, val in prev.items(): + if val is not None: + os.environ[e] = val + + +@pytest.fixture +def s3_dvc_repo(s3server, dvc_repo_factory): + url = f'http://localhost:{s3server}' + + with override_env(AWS_ACCESS_KEY_ID=ACCESS_KEY, AWS_SECRET_ACCESS_KEY=SECRET_KEY, + S3_ACCESS_KEY=ACCESS_KEY, S3_SECRET_KEY=SECRET_KEY): + S3ArtifactRepository('dvc-bucket', url)._ensure_bucket() # noqa + return dvc_repo_factory('s3://dvc-bucket', + {'endpointurl': url}) + + +def test_create_dvc_source__local(local_dvc_repo): + dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], []) + from ebonite.ext.dvc import create_dvc_source + ds = create_dvc_source(path='data1.csv', + reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'), + repo=local_dvc_repo) + dataset = ds.read() + assert isinstance(dataset.data, pd.DataFrame) + assert DatasetAnalyzer.analyze(dataset.data) == dt + + +@docker_test +def test_create_dvc_source_s3(s3_dvc_repo): + dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], []) + from ebonite.ext.dvc import create_dvc_source + ds = create_dvc_source(path='data1.csv', + reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'), + repo=s3_dvc_repo) + dataset = ds.read() + assert isinstance(dataset.data, pd.DataFrame) + assert DatasetAnalyzer.analyze(dataset.data) == dt diff --git a/tests/ext/test_ext_loader.py b/tests/ext/test_ext_loader.py index c442a300..e48bb8c6 100644 --- a/tests/ext/test_ext_loader.py +++ b/tests/ext/test_ext_loader.py @@ -92,7 +92,6 @@ def test_extension_loader__lazy(ext_loader, two_temp_modules): assert module_imported(module2) -@pytest.mark.kek def test_extension_loader__lazy_defered(ext_loader, two_temp_modules, temp_module_factory): module1, module2 = two_temp_modules diff --git a/tests/ext/test_s3/conftest.py b/tests/ext/test_s3/conftest.py index fd6be9ff..4ba20674 100644 --- a/tests/ext/test_s3/conftest.py +++ b/tests/ext/test_s3/conftest.py @@ -25,7 +25,7 @@ def delete_bucket(repo: S3ArtifactRepository): # fake fixture that ensures that S3 server is up between tests -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def s3server(pytestconfig): if not has_docker() or 'not docker' in pytestconfig.getoption('markexpr'): pytest.skip('skipping docker tests') diff --git a/tests/repository/dataset/conftest.py b/tests/repository/dataset/conftest.py index 82545611..b55b2fe8 100644 --- a/tests/repository/dataset/conftest.py +++ b/tests/repository/dataset/conftest.py @@ -11,7 +11,7 @@ class TestDatasetWriter(DatasetWriter): def write(self, dataset: Dataset) -> Tuple[DatasetReader, ArtifactCollection]: - return TestDatasetReader(), Blobs({'data': InMemoryBlob(dataset.data.encode('utf8'))}) + return TestDatasetReader(dataset.dataset_type), Blobs({'data': InMemoryBlob(dataset.data.encode('utf8'))}) class TestDatasetReader(DatasetReader): diff --git a/tox.ini b/tox.ini index 28675b16..05d0c728 100644 --- a/tox.ini +++ b/tox.ini @@ -44,7 +44,7 @@ deps = coverage==4.5.4 ; coveralls 5.x is buggy for now commands = - pytest --cov --cov-report= --cov-append --disable-warnings -m "not docker and not tf_v1" tests + pytest --color=no --cov --cov-report= --cov-append --disable-warnings -m "not docker and not tf_v1" tests pytest --cov --cov-report= --cov-append --disable-warnings tests_requirements pytest --cov --cov-report= --cov-append --disable-warnings -m docker tests pip uninstall -y tensorflow