Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/ebonite/ext/dvc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .dataset_source import DvcBlob, create_dvc_source

__all__ = ['DvcBlob', 'create_dvc_source']
33 changes: 33 additions & 0 deletions src/ebonite/ext/dvc/dataset_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import contextlib

import dvc.api
from dvc.repo import Repo

from ebonite.core.objects.artifacts import Blob, Blobs, StreamContextManager
from ebonite.core.objects.dataset_source import DatasetSource
from ebonite.repository.dataset.artifact import ArtifactDatasetSource, DatasetReader


class DvcBlob(Blob):
def __init__(self, path: str, repo: str = None, rev: str = None, remote: str = None, mode: str = 'r',
encoding: str = None):
self.path = path
self.repo = repo
self.rev = rev
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's rev? Also docs would be very appriciated.

self.remote = remote
self.mode = mode
self.encoding = encoding

def materialize(self, path):
Repo.get(self.remote, self.path, out=path, rev=self.rev) # TODO tests

@contextlib.contextmanager
def bytestream(self) -> StreamContextManager:
with dvc.api.open(self.path, self.repo, self.rev, self.remote, self.mode, self.encoding) as f:
yield f


def create_dvc_source(path: str, reader: DatasetReader, repo, rev: str = None, remote: str = None, mode: str = 'r',
encoding: str = None) -> DatasetSource:
artifacts = Blobs.from_blobs({path: DvcBlob(path, repo, rev, remote, mode, encoding)})
return ArtifactDatasetSource(reader, artifacts)
3 changes: 2 additions & 1 deletion src/ebonite/ext/ext_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class ExtensionLoader:
Extension('ebonite.ext.imageio', ['imageio']),
Extension('ebonite.ext.lightgbm', ['lightgbm'], False),
Extension('ebonite.ext.xgboost', ['xgboost'], False),
Extension('ebonite.ext.docker', ['docker'], False)
Extension('ebonite.ext.docker', ['docker'], False),
Extension('ebonite.ext.dvc', ['dvc'], False)
)

_loaded_extensions: Dict[Extension, ModuleType] = {}
Expand Down
2 changes: 1 addition & 1 deletion src/ebonite/ext/numpy/dataset_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class NumpyNdarrayWriter(DatasetWriter):
"""DatasetWriter implementation for numpy ndarray"""

def write(self, dataset: Dataset) -> Tuple[DatasetReader, ArtifactCollection]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does commit-mechanic of DVC works with writing datasets?

return NumpyNdarrayReader(), ArtifactCollection.from_blobs(
return NumpyNdarrayReader(dataset.dataset_type), ArtifactCollection.from_blobs(
{DATA_FILE: LazyBlob(lambda: save_npz(dataset.data))})


Expand Down
17 changes: 10 additions & 7 deletions src/ebonite/ext/pandas/dataset_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,17 @@ class PandasReader(DatasetReader):
"""DatasetReader for pandas dataframes

:param format: PandasFormat instance to use
:param data_type: DataFrameType to use for aliging read data
:param dataset_type: DataFrameType to use for aliging read data
"""

def __init__(self, format: PandasFormat, data_type: DataFrameType):
self.data_type = data_type
def __init__(self, format: PandasFormat, dataset_type: DataFrameType, path: str = None):
super(PandasReader, self).__init__(dataset_type)
self.path = path or PANDAS_DATA_FILE
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if dataset is in remote location, or in any sort of container? How that will work?

self.format = format

def read(self, artifacts: ArtifactCollection) -> Dataset:
with artifacts.blob_dict() as blobs, blobs[PANDAS_DATA_FILE].bytestream() as b:
return Dataset.from_object(self.data_type.align(self.format.read(b)))
with artifacts.blob_dict() as blobs, blobs[self.path].bytestream() as b:
return Dataset.from_object(self.dataset_type.align(self.format.read(b)))


class PandasWriter(DatasetWriter):
Expand All @@ -209,12 +210,14 @@ class PandasWriter(DatasetWriter):
:param format: PandasFormat instance to use
"""

def __init__(self, format: PandasFormat):
def __init__(self, format: PandasFormat, path: str = None):
self.path = path or PANDAS_DATA_FILE
self.format = format

def write(self, dataset: Dataset) -> Tuple[DatasetReader, ArtifactCollection]:
blob = LazyBlob(lambda: self.format.write(dataset.data))
return PandasReader(self.format, dataset.dataset_type), ArtifactCollection.from_blobs({PANDAS_DATA_FILE: blob})
return (PandasReader(self.format, dataset.dataset_type, self.path),
ArtifactCollection.from_blobs({self.path: blob}))

# class PandasJdbcDatasetSource(_PandasDatasetSource):
# def __init__(self, dataset_type: DatasetType, table: str, connection: str,
Expand Down
19 changes: 13 additions & 6 deletions src/ebonite/repository/dataset/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,20 @@

@type_field('type')
class DatasetReader(EboniteParams):
"""ABC for reading Dataset from files (artifacts) to use with ArtifactDatasetSource"""
"""ABC for reading Dataset from files (artifacts) to use with ArtifactDatasetSource

:param dataset_type: type of the resulting dataset
"""

def __init__(self, dataset_type: DatasetType):
self.dataset_type = dataset_type

@abstractmethod
def read(self, artifacts: ArtifactCollection) -> Dataset:
"""Method to read Dataset from artifacts

:param artifacts: artifacts to read"""
:param artifacts: artifacts to read
"""


@type_field('type')
Expand Down Expand Up @@ -56,7 +63,7 @@ def save(self, dataset_id: str, dataset: Dataset) -> DatasetSource:
pushed = self.repo.push_artifact(self.ARTIFACT_TYPE, dataset_id, blobs)
except ArtifactExistsError as e:
raise DatasetExistsError(dataset_id, self, e)
return ArtifactDatasetSource(reader, pushed, dataset.dataset_type)
return ArtifactDatasetSource(reader, pushed)

def delete(self, dataset_id: str):
try:
Expand All @@ -70,10 +77,10 @@ class ArtifactDatasetSource(DatasetSource):

:param reader: DatasetReader for this dataset
:param artifacts: ArtifactCollection with actual files
:param dataset_type: DatasetType of contained dataset"""
"""

def __init__(self, reader: DatasetReader, artifacts: ArtifactCollection, dataset_type: DatasetType):
super(ArtifactDatasetSource, self).__init__(dataset_type)
def __init__(self, reader: DatasetReader, artifacts: ArtifactCollection):
super(ArtifactDatasetSource, self).__init__(reader.dataset_type)
self.reader = reader
self.artifacts = artifacts

Expand Down
2 changes: 2 additions & 0 deletions test.requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ lightgbm==2.3.1
torch==1.4.0+cpu ; sys_platform != "darwin"

torch==1.4.0 ; sys_platform == "darwin"

dvc==1.1.7
Empty file added tests/ext/test_dvc/__init__.py
Empty file.
3 changes: 3 additions & 0 deletions tests/ext/test_dvc/data1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
col1,col2
123,asdf
456,cvbx
95 changes: 95 additions & 0 deletions tests/ext/test_dvc/test_dataset_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import contextlib
import os
import shutil

import pandas as pd
import pytest

from ebonite.core.analyzer.dataset import DatasetAnalyzer
from ebonite.ext.pandas import DataFrameType
from ebonite.ext.pandas.dataset_source import PandasFormatCsv, PandasReader
from ebonite.ext.s3 import S3ArtifactRepository
from ebonite.utils import fs
from tests.conftest import docker_test
from tests.ext.test_s3.conftest import ACCESS_KEY, SECRET_KEY # noqa


@pytest.fixture
def dvc_repo_factory(tmpdir):
def dvc_repo(remote, remote_kwargs=None):
repo_path = tmpdir
from dvc.repo import Repo
repo = Repo.init(repo_path, no_scm=True)

with repo.config.edit() as conf:
remote_config = {'url': remote}
if remote_kwargs is not None:
remote_config.update(remote_kwargs)
conf['remote']['storage'] = remote_config
conf['core']['remote'] = 'storage'

shutil.copy(fs.current_module_path('data1.csv'), repo_path)
data1_path = os.path.join(repo_path, 'data1.csv')
assert os.path.exists(data1_path)
repo.add([data1_path])
assert os.path.exists(data1_path + '.dvc')
repo.push()
os.remove(data1_path)
shutil.rmtree(os.path.join(repo_path, '.dvc', 'cache'), ignore_errors=True)

return repo_path

return dvc_repo


@pytest.fixture
def local_dvc_repo(tmpdir_factory, dvc_repo_factory):
storage_path = str(tmpdir_factory.mktemp('storage'))
return dvc_repo_factory(storage_path)


@contextlib.contextmanager
def override_env(**envs):
prev = {e: os.environ.get(e, None) for e in envs.keys()}
try:
for e, val in envs.items():
os.environ[e] = val
yield
finally:
for e, val in prev.items():
if val is not None:
os.environ[e] = val


@pytest.fixture
def s3_dvc_repo(s3server, dvc_repo_factory):
url = f'http://localhost:{s3server}'

with override_env(AWS_ACCESS_KEY_ID=ACCESS_KEY, AWS_SECRET_ACCESS_KEY=SECRET_KEY,
S3_ACCESS_KEY=ACCESS_KEY, S3_SECRET_KEY=SECRET_KEY):
S3ArtifactRepository('dvc-bucket', url)._ensure_bucket() # noqa
return dvc_repo_factory('s3://dvc-bucket',
{'endpointurl': url})


def test_create_dvc_source__local(local_dvc_repo):
dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], [])
from ebonite.ext.dvc import create_dvc_source
ds = create_dvc_source(path='data1.csv',
reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'),
repo=local_dvc_repo)
dataset = ds.read()
assert isinstance(dataset.data, pd.DataFrame)
assert DatasetAnalyzer.analyze(dataset.data) == dt


@docker_test
def test_create_dvc_source_s3(s3_dvc_repo):
dt = DataFrameType(['col1', 'col2'], ['int64', 'string'], [])
from ebonite.ext.dvc import create_dvc_source
ds = create_dvc_source(path='data1.csv',
reader=PandasReader(PandasFormatCsv(), dt, 'data1.csv'),
repo=s3_dvc_repo)
dataset = ds.read()
assert isinstance(dataset.data, pd.DataFrame)
assert DatasetAnalyzer.analyze(dataset.data) == dt
1 change: 0 additions & 1 deletion tests/ext/test_ext_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ def test_extension_loader__lazy(ext_loader, two_temp_modules):
assert module_imported(module2)


@pytest.mark.kek
def test_extension_loader__lazy_defered(ext_loader, two_temp_modules, temp_module_factory):
module1, module2 = two_temp_modules

Expand Down
2 changes: 1 addition & 1 deletion tests/ext/test_s3/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def delete_bucket(repo: S3ArtifactRepository):


# fake fixture that ensures that S3 server is up between tests
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def s3server(pytestconfig):
if not has_docker() or 'not docker' in pytestconfig.getoption('markexpr'):
pytest.skip('skipping docker tests')
Expand Down
2 changes: 1 addition & 1 deletion tests/repository/dataset/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

class TestDatasetWriter(DatasetWriter):
def write(self, dataset: Dataset) -> Tuple[DatasetReader, ArtifactCollection]:
return TestDatasetReader(), Blobs({'data': InMemoryBlob(dataset.data.encode('utf8'))})
return TestDatasetReader(dataset.dataset_type), Blobs({'data': InMemoryBlob(dataset.data.encode('utf8'))})


class TestDatasetReader(DatasetReader):
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ deps =
coverage==4.5.4
; coveralls 5.x is buggy for now
commands =
pytest --cov --cov-report= --cov-append --disable-warnings -m "not docker and not tf_v1" tests
pytest --color=no --cov --cov-report= --cov-append --disable-warnings -m "not docker and not tf_v1" tests
pytest --cov --cov-report= --cov-append --disable-warnings tests_requirements
pytest --cov --cov-report= --cov-append --disable-warnings -m docker tests
pip uninstall -y tensorflow
Expand Down