From 790d3f2eab0b175a02d98934a71cb7a77bb948ec Mon Sep 17 00:00:00 2001 From: ywang007 Date: Tue, 16 Aug 2016 09:18:46 -0400 Subject: [PATCH 1/8] adding dshape parameter to CSV --- odo/backends/csv.py | 14 +- odo/backends/csv_ok.py | 397 +++++++++++++++++++++++++++++++++ odo/backends/tests/test_csv.py | 7 + 3 files changed, 416 insertions(+), 2 deletions(-) create mode 100644 odo/backends/csv_ok.py diff --git a/odo/backends/csv.py b/odo/backends/csv.py index cc1d26ecc..24f8661fd 100644 --- a/odo/backends/csv.py +++ b/odo/backends/csv.py @@ -18,7 +18,7 @@ import datashape from datashape import discover, Record, Option -from datashape.predicates import isrecord +from datashape.predicates import isrecord, isdimension from datashape.dispatch import dispatch from ..compatibility import unicode, PY2 @@ -140,18 +140,25 @@ class CSV(object): If the csv file has a header or not encoding : str (default utf-8) File encoding + dshape: datashape or string representation + used specified datashape kwargs : other... Various choices about dialect """ canonical_extension = 'csv' def __init__(self, path, has_header=None, encoding='utf-8', - sniff_nbytes=10000, **kwargs): + sniff_nbytes=10000, dshape=None, **kwargs): self.path = path self._has_header = has_header self.encoding = encoding or 'utf-8' self._kwargs = kwargs self._sniff_nbytes = sniff_nbytes + if dshape: + if isinstance(dshape, (str, unicode)): + dshape = datashape.dshape(dshape) + dshape = None if isdimension(dshape.subshape[0][0]) else dshape + self._dshape = dshape def _sniff_dialect(self, path): kwargs = self._kwargs @@ -330,6 +337,9 @@ def _(): @discover.register(CSV) def discover_csv(c, nrows=1000, **kwargs): + if c._dshape: + return c._dshape + df = csv_to_dataframe(c, nrows=nrows, **kwargs) df = coerce_datetimes(df) diff --git a/odo/backends/csv_ok.py b/odo/backends/csv_ok.py new file mode 100644 index 000000000..5935c6f4a --- /dev/null +++ b/odo/backends/csv_ok.py @@ -0,0 +1,397 @@ +from __future__ import absolute_import, division, print_function + +import sys +import re +import os +import gzip +import bz2 +import uuid +import csv + +from glob import glob +from contextlib import contextmanager + +from toolz import concat, keyfilter, keymap, merge, valfilter + +import pandas as pd + +import datashape + +from datashape import discover, Record, Option +from datashape.predicates import isrecord +from datashape.dispatch import dispatch + +from ..compatibility import unicode, PY2 +from ..utils import keywords, ext, sample, tmpfile +from ..append import append +from ..convert import convert, ooc_types +from ..resource import resource +from ..chunks import chunks +from ..temp import Temp +from ..numpy_dtype import dshape_to_pandas +from .pandas import coerce_datetimes + +dialect_terms = '''delimiter doublequote escapechar lineterminator quotechar +quoting skipinitialspace strict'''.split() + +aliases = {'sep': 'delimiter'} + + +class PipeSniffer(csv.Sniffer): + """A csv sniffer that adds the ``'|'`` character to the list of preferred + delimiters and gives space the least precedence. + + Notes + ----- + The :module:`~csv` module has a list of preferred delimiters that will be + returned before the delimiter(s) discovered by parsing the file. This + results in a dialect having an incorrect delimiter if the data contain a + non preferred delimiter and a preferred one. See the Examples section for + an example showing the difference between the two sniffers. + + Examples + -------- + >>> import csv + >>> data = 'a|b| c|d' # space is preferred but '|' is more common + >>> csv.Sniffer().sniff(data).delimiter # incorrect + ' ' + >>> PipeSniffer().sniff(data).delimiter # correct + '|' + >>> data = 'a|b|Nov 10, 1981|d' + >>> csv.Sniffer().sniff(data).delimiter # can't handle every case :( + ',' + """ + def __init__(self, *args, **kwargs): + csv.Sniffer.__init__(self, *args, **kwargs) + self.preferred = [',', '\t', ';', '|', ':', ' '] + + +def alias(key): + """ Alias kwarg dialect keys to normalized set + + >>> alias('sep') + 'delimiter' + """ + key = key.lower() + return aliases.get(key, key) + + +@contextmanager +def open_file(path, *args, **kwargs): + f = compressed_open.get(ext(path), open)(path, *args, **kwargs) + try: + yield f + finally: + f.close() + + +def infer_header(path, nbytes=10000, encoding='utf-8', **kwargs): + if encoding is None: + encoding = 'utf-8' + with open_file(path, 'rb') as f: + raw = f.read(nbytes) + if not raw: + return True + sniffer = PipeSniffer() + decoded = raw.decode(encoding, 'replace') + sniffable = decoded.encode(encoding) if PY2 else decoded + try: + return sniffer.has_header(sniffable) + except csv.Error: + return None + + +def newlines(encoding): + return b'\r\n'.decode(encoding), b'\n'.decode(encoding) + + +def sniff_dialect(path, nbytes, encoding='utf-8'): + if not os.path.exists(path): + return {} + if encoding is None: + encoding = 'utf-8' + with open_file(path, 'rb') as f: + raw = f.read(nbytes).decode(encoding or 'utf-8', 'replace') + sniffer = PipeSniffer() + try: + dialect = sniffer.sniff(raw, delimiters=sniffer.preferred) + except csv.Error: + return {} + crnl, nl = newlines(encoding) + dialect.lineterminator = crnl if crnl in raw else nl + return dialect_to_dict(dialect) + + +def dialect_to_dict(dialect): + return dict((name, getattr(dialect, name)) + for name in dialect_terms if hasattr(dialect, name)) + + +class CSV(object): + + """ Proxy for a CSV file + + Parameters + ---------- + + path : str + Path to file on disk + has_header : bool + If the csv file has a header or not + encoding : str (default utf-8) + File encoding + _dshape: datashape or string representation + pre-determined datashape + kwargs : other... + Various choices about dialect + """ + canonical_extension = 'csv' + + def __init__(self, path, has_header=None, encoding='utf-8', + sniff_nbytes=10000, dshape=None, **kwargs): + self.path = path + self._has_header = has_header + self.encoding = encoding or 'utf-8' + self._kwargs = kwargs + self._sniff_nbytes = sniff_nbytes + self._dshape = dshape + if isinstance(self._dshape, str): + self._dshape = datashape.dshape(self._dshape) + + def _sniff_dialect(self, path): + kwargs = self._kwargs + dialect = sniff_dialect(path, self._sniff_nbytes, + encoding=self.encoding) + kwargs = merge(dialect, keymap(alias, kwargs)) + return valfilter(lambda x: x is not None, + dict((d, kwargs[d]) + for d in dialect_terms if d in kwargs)) + + @property + def dialect(self): + with sample(self) as fn: + return self._sniff_dialect(fn) + + @property + def has_header(self): + if self._has_header is None: + with sample(self) as fn: + with open(fn, mode='rb') as f: + raw = f.read() + self._has_header = not raw or infer_header(fn, + self._sniff_nbytes, + encoding=self.encoding) + return self._has_header + + +@sample.register(CSV) +@contextmanager +def sample_csv(csv, length=8192, **kwargs): + should_delete = not os.path.exists(csv.path) + if should_delete: + with open_file(csv.path, mode='wb'): + pass + try: + with open_file(csv.path, mode='rb') as f: + with tmpfile(csv.canonical_extension) as fn: + with open(fn, mode='wb') as tmpf: + tmpf.write(f.read(length)) + yield fn + finally: + if should_delete: + os.remove(csv.path) + + +@append.register(CSV, object) +def append_object_to_csv(c, seq, **kwargs): + append(c, convert(chunks(pd.DataFrame), seq, **kwargs), **kwargs) + return c + + +compressed_open = {'gz': gzip.open, 'bz2': bz2.BZ2File} + + +@append.register(CSV, pd.DataFrame) +def append_dataframe_to_csv(c, df, dshape=None, **kwargs): + if not os.path.exists(c.path) or not os.path.getsize(c.path): + has_header = kwargs.pop('header', c.has_header) + else: + has_header = False + sep = kwargs.get('sep', + kwargs.get('delimiter', c.dialect.get('delimiter', ','))) + encoding = kwargs.get('encoding', c.encoding) + + if ext(c.path) in compressed_open: + if sys.version_info[0] >= 3: + kwargs['mode'] = 'at' + kwargs['encoding'] = encoding + elif sys.version_info[0] == 2: + kwargs['mode'] = 'ab' if sys.platform == 'win32' else 'at' + f = compressed_open[ext(c.path)](c.path, **kwargs) + else: + f = c.path + + try: + df.to_csv(f, mode='a', header=has_header, index=False, sep=sep, + encoding=encoding) + finally: + if hasattr(f, 'close'): + f.close() + return c + + +@append.register(CSV, chunks(pd.DataFrame)) +def append_iterator_to_csv(c, cs, **kwargs): + for chunk in cs: + append(c, chunk, **kwargs) + return c + + +@convert.register(pd.DataFrame, (Temp(CSV), CSV), cost=20.0) +def csv_to_dataframe(c, dshape=None, chunksize=None, nrows=None, **kwargs): + try: + return _csv_to_dataframe(c, dshape=dshape, chunksize=chunksize, + nrows=nrows, **kwargs) + except StopIteration: + if nrows: + return _csv_to_dataframe(c, dshape=dshape, chunksize=chunksize, + **kwargs) + else: + raise + + +def _csv_to_dataframe(c, dshape=None, chunksize=None, **kwargs): + header = {False: None, True: 0}.get( + kwargs.pop('has_header', c.has_header), 'infer') + + sep = kwargs.pop( + 'sep', kwargs.pop('delimiter', c.dialect.get('delimiter', ','))) + encoding = kwargs.pop('encoding', c.encoding) + + if dshape: + dtypes, parse_dates = dshape_to_pandas(dshape) + if isrecord(dshape.measure): + names = kwargs.get('names', dshape.measure.names) + else: + names = kwargs.get('names') + else: + dtypes = parse_dates = names = None + + usecols = kwargs.pop('usecols', None) + if parse_dates and usecols: + parse_dates = [col for col in parse_dates if col in usecols] + + compression = kwargs.pop('compression', + {'gz': 'gzip', 'bz2': 'bz2'}.get(ext(c.path))) + + # See read_csv docs for header for reasoning + if names: + try: + found_names = pd.read_csv(c.path, encoding=encoding, + compression=compression, nrows=1) + except StopIteration: + found_names = pd.read_csv(c.path, encoding=encoding, + compression=compression) + if names and header == 'infer': + if [n.strip() for n in found_names] == [n.strip() for n in names]: + header = 0 + elif (all(re.match('^\s*\D\w*\s*$', n) for n in found_names) and + not all(dt == datashape.string for dt in dshape.measure.types)): + header = 0 + else: + header = None + + kwargs = keyfilter(keywords(pd.read_csv).__contains__, kwargs) + return pd.read_csv(c.path, + header=header, + sep=sep, + encoding=encoding, + dtype=dtypes, + parse_dates=parse_dates, + names=names, + compression=compression, + chunksize=chunksize, + usecols=usecols, + **kwargs) + + +@convert.register(chunks(pd.DataFrame), (Temp(CSV), CSV), cost=10.0) +def CSV_to_chunks_of_dataframes(c, chunksize=2 ** 20, **kwargs): + # Load a small 1000 line DF to start + # This helps with rapid viewing of a large CSV file + first = csv_to_dataframe(c, nrows=1000, **kwargs) + if len(first) == 1000: + rest = csv_to_dataframe( + c, chunksize=chunksize, skiprows=1000, **kwargs) + else: + rest = [] + + def _(): + yield first + for df in rest: + yield df + return chunks(pd.DataFrame)(_) + + +@discover.register(CSV) +def discover_csv(c, nrows=1000, **kwargs): + if c._dshape: + return c._dshape + + df = csv_to_dataframe(c, nrows=nrows, **kwargs) + df = coerce_datetimes(df) + + columns = [str(c) if not isinstance(c, (str, unicode)) else c + for c in df.columns] + df.columns = [c.strip() for c in columns] + + # Replace np.nan with None. Forces type string rather than float + for col in df.columns: + if not df[col].count(): + df[col] = None + + measure = discover(df).measure + + # Use Series.notnull to determine Option-ness + measure = Record([[name, Option(typ) + if df[name].isnull().any() and + not isinstance(typ, Option) else typ] + for name, typ in zip(measure.names, measure.types)]) + + return datashape.var * measure + + +@resource.register('.+\.(csv|tsv|ssv|data|dat)(\.gz|\.bz2?)?') +def resource_csv(uri, **kwargs): + return CSV(uri, **kwargs) + + +@resource.register('.*\*.+', priority=12) +def resource_glob(uri, **kwargs): + filenames = sorted(glob(uri)) + r = resource(filenames[0], **kwargs) + return chunks(type(r))([resource(u, **kwargs) for u in sorted(glob(uri))]) + + +@convert.register(chunks(pd.DataFrame), (chunks(CSV), chunks(Temp(CSV))), + cost=10.0) +def convert_glob_of_csvs_to_chunks_of_dataframes(csvs, **kwargs): + def _(): + return concat(convert(chunks(pd.DataFrame), csv, **kwargs) + for csv in csvs) + return chunks(pd.DataFrame)(_) + + +@convert.register(Temp(CSV), (pd.DataFrame, chunks(pd.DataFrame))) +def convert_dataframes_to_temporary_csv(data, **kwargs): + fn = '.%s.csv' % uuid.uuid1() + csv = Temp(CSV)(fn, **kwargs) + return append(csv, data, **kwargs) + + +@dispatch(CSV) +def drop(c): + os.unlink(c.path) + + +ooc_types.add(CSV) diff --git a/odo/backends/tests/test_csv.py b/odo/backends/tests/test_csv.py index 257d8cc2b..276609065 100644 --- a/odo/backends/tests/test_csv.py +++ b/odo/backends/tests/test_csv.py @@ -398,6 +398,13 @@ def test_discover_with_dotted_names(): assert dshape == datashape.dshape('var * {"a.b": int64, "c.d": int64}') assert dshape.measure.names == [u'a.b', u'c.d'] +def test_discover_csv_with_fixed_dshape(): + with filetext('name,val\nAlice,1\nBob,2') as fn: + ds = datashape.dshape('var * {name: string, val: float64}') + csv = CSV(fn, dshape=ds) + ds1 = discover(csv) + assert ds1 == ds + try: unichr From 99c351b858b45147d7675a6cc6439231dce5939e Mon Sep 17 00:00:00 2001 From: ywang007 Date: Tue, 16 Aug 2016 09:21:59 -0400 Subject: [PATCH 2/8] remove dup file --- odo/backends/csv_ok.py | 397 ----------------------------------------- 1 file changed, 397 deletions(-) delete mode 100644 odo/backends/csv_ok.py diff --git a/odo/backends/csv_ok.py b/odo/backends/csv_ok.py deleted file mode 100644 index 5935c6f4a..000000000 --- a/odo/backends/csv_ok.py +++ /dev/null @@ -1,397 +0,0 @@ -from __future__ import absolute_import, division, print_function - -import sys -import re -import os -import gzip -import bz2 -import uuid -import csv - -from glob import glob -from contextlib import contextmanager - -from toolz import concat, keyfilter, keymap, merge, valfilter - -import pandas as pd - -import datashape - -from datashape import discover, Record, Option -from datashape.predicates import isrecord -from datashape.dispatch import dispatch - -from ..compatibility import unicode, PY2 -from ..utils import keywords, ext, sample, tmpfile -from ..append import append -from ..convert import convert, ooc_types -from ..resource import resource -from ..chunks import chunks -from ..temp import Temp -from ..numpy_dtype import dshape_to_pandas -from .pandas import coerce_datetimes - -dialect_terms = '''delimiter doublequote escapechar lineterminator quotechar -quoting skipinitialspace strict'''.split() - -aliases = {'sep': 'delimiter'} - - -class PipeSniffer(csv.Sniffer): - """A csv sniffer that adds the ``'|'`` character to the list of preferred - delimiters and gives space the least precedence. - - Notes - ----- - The :module:`~csv` module has a list of preferred delimiters that will be - returned before the delimiter(s) discovered by parsing the file. This - results in a dialect having an incorrect delimiter if the data contain a - non preferred delimiter and a preferred one. See the Examples section for - an example showing the difference between the two sniffers. - - Examples - -------- - >>> import csv - >>> data = 'a|b| c|d' # space is preferred but '|' is more common - >>> csv.Sniffer().sniff(data).delimiter # incorrect - ' ' - >>> PipeSniffer().sniff(data).delimiter # correct - '|' - >>> data = 'a|b|Nov 10, 1981|d' - >>> csv.Sniffer().sniff(data).delimiter # can't handle every case :( - ',' - """ - def __init__(self, *args, **kwargs): - csv.Sniffer.__init__(self, *args, **kwargs) - self.preferred = [',', '\t', ';', '|', ':', ' '] - - -def alias(key): - """ Alias kwarg dialect keys to normalized set - - >>> alias('sep') - 'delimiter' - """ - key = key.lower() - return aliases.get(key, key) - - -@contextmanager -def open_file(path, *args, **kwargs): - f = compressed_open.get(ext(path), open)(path, *args, **kwargs) - try: - yield f - finally: - f.close() - - -def infer_header(path, nbytes=10000, encoding='utf-8', **kwargs): - if encoding is None: - encoding = 'utf-8' - with open_file(path, 'rb') as f: - raw = f.read(nbytes) - if not raw: - return True - sniffer = PipeSniffer() - decoded = raw.decode(encoding, 'replace') - sniffable = decoded.encode(encoding) if PY2 else decoded - try: - return sniffer.has_header(sniffable) - except csv.Error: - return None - - -def newlines(encoding): - return b'\r\n'.decode(encoding), b'\n'.decode(encoding) - - -def sniff_dialect(path, nbytes, encoding='utf-8'): - if not os.path.exists(path): - return {} - if encoding is None: - encoding = 'utf-8' - with open_file(path, 'rb') as f: - raw = f.read(nbytes).decode(encoding or 'utf-8', 'replace') - sniffer = PipeSniffer() - try: - dialect = sniffer.sniff(raw, delimiters=sniffer.preferred) - except csv.Error: - return {} - crnl, nl = newlines(encoding) - dialect.lineterminator = crnl if crnl in raw else nl - return dialect_to_dict(dialect) - - -def dialect_to_dict(dialect): - return dict((name, getattr(dialect, name)) - for name in dialect_terms if hasattr(dialect, name)) - - -class CSV(object): - - """ Proxy for a CSV file - - Parameters - ---------- - - path : str - Path to file on disk - has_header : bool - If the csv file has a header or not - encoding : str (default utf-8) - File encoding - _dshape: datashape or string representation - pre-determined datashape - kwargs : other... - Various choices about dialect - """ - canonical_extension = 'csv' - - def __init__(self, path, has_header=None, encoding='utf-8', - sniff_nbytes=10000, dshape=None, **kwargs): - self.path = path - self._has_header = has_header - self.encoding = encoding or 'utf-8' - self._kwargs = kwargs - self._sniff_nbytes = sniff_nbytes - self._dshape = dshape - if isinstance(self._dshape, str): - self._dshape = datashape.dshape(self._dshape) - - def _sniff_dialect(self, path): - kwargs = self._kwargs - dialect = sniff_dialect(path, self._sniff_nbytes, - encoding=self.encoding) - kwargs = merge(dialect, keymap(alias, kwargs)) - return valfilter(lambda x: x is not None, - dict((d, kwargs[d]) - for d in dialect_terms if d in kwargs)) - - @property - def dialect(self): - with sample(self) as fn: - return self._sniff_dialect(fn) - - @property - def has_header(self): - if self._has_header is None: - with sample(self) as fn: - with open(fn, mode='rb') as f: - raw = f.read() - self._has_header = not raw or infer_header(fn, - self._sniff_nbytes, - encoding=self.encoding) - return self._has_header - - -@sample.register(CSV) -@contextmanager -def sample_csv(csv, length=8192, **kwargs): - should_delete = not os.path.exists(csv.path) - if should_delete: - with open_file(csv.path, mode='wb'): - pass - try: - with open_file(csv.path, mode='rb') as f: - with tmpfile(csv.canonical_extension) as fn: - with open(fn, mode='wb') as tmpf: - tmpf.write(f.read(length)) - yield fn - finally: - if should_delete: - os.remove(csv.path) - - -@append.register(CSV, object) -def append_object_to_csv(c, seq, **kwargs): - append(c, convert(chunks(pd.DataFrame), seq, **kwargs), **kwargs) - return c - - -compressed_open = {'gz': gzip.open, 'bz2': bz2.BZ2File} - - -@append.register(CSV, pd.DataFrame) -def append_dataframe_to_csv(c, df, dshape=None, **kwargs): - if not os.path.exists(c.path) or not os.path.getsize(c.path): - has_header = kwargs.pop('header', c.has_header) - else: - has_header = False - sep = kwargs.get('sep', - kwargs.get('delimiter', c.dialect.get('delimiter', ','))) - encoding = kwargs.get('encoding', c.encoding) - - if ext(c.path) in compressed_open: - if sys.version_info[0] >= 3: - kwargs['mode'] = 'at' - kwargs['encoding'] = encoding - elif sys.version_info[0] == 2: - kwargs['mode'] = 'ab' if sys.platform == 'win32' else 'at' - f = compressed_open[ext(c.path)](c.path, **kwargs) - else: - f = c.path - - try: - df.to_csv(f, mode='a', header=has_header, index=False, sep=sep, - encoding=encoding) - finally: - if hasattr(f, 'close'): - f.close() - return c - - -@append.register(CSV, chunks(pd.DataFrame)) -def append_iterator_to_csv(c, cs, **kwargs): - for chunk in cs: - append(c, chunk, **kwargs) - return c - - -@convert.register(pd.DataFrame, (Temp(CSV), CSV), cost=20.0) -def csv_to_dataframe(c, dshape=None, chunksize=None, nrows=None, **kwargs): - try: - return _csv_to_dataframe(c, dshape=dshape, chunksize=chunksize, - nrows=nrows, **kwargs) - except StopIteration: - if nrows: - return _csv_to_dataframe(c, dshape=dshape, chunksize=chunksize, - **kwargs) - else: - raise - - -def _csv_to_dataframe(c, dshape=None, chunksize=None, **kwargs): - header = {False: None, True: 0}.get( - kwargs.pop('has_header', c.has_header), 'infer') - - sep = kwargs.pop( - 'sep', kwargs.pop('delimiter', c.dialect.get('delimiter', ','))) - encoding = kwargs.pop('encoding', c.encoding) - - if dshape: - dtypes, parse_dates = dshape_to_pandas(dshape) - if isrecord(dshape.measure): - names = kwargs.get('names', dshape.measure.names) - else: - names = kwargs.get('names') - else: - dtypes = parse_dates = names = None - - usecols = kwargs.pop('usecols', None) - if parse_dates and usecols: - parse_dates = [col for col in parse_dates if col in usecols] - - compression = kwargs.pop('compression', - {'gz': 'gzip', 'bz2': 'bz2'}.get(ext(c.path))) - - # See read_csv docs for header for reasoning - if names: - try: - found_names = pd.read_csv(c.path, encoding=encoding, - compression=compression, nrows=1) - except StopIteration: - found_names = pd.read_csv(c.path, encoding=encoding, - compression=compression) - if names and header == 'infer': - if [n.strip() for n in found_names] == [n.strip() for n in names]: - header = 0 - elif (all(re.match('^\s*\D\w*\s*$', n) for n in found_names) and - not all(dt == datashape.string for dt in dshape.measure.types)): - header = 0 - else: - header = None - - kwargs = keyfilter(keywords(pd.read_csv).__contains__, kwargs) - return pd.read_csv(c.path, - header=header, - sep=sep, - encoding=encoding, - dtype=dtypes, - parse_dates=parse_dates, - names=names, - compression=compression, - chunksize=chunksize, - usecols=usecols, - **kwargs) - - -@convert.register(chunks(pd.DataFrame), (Temp(CSV), CSV), cost=10.0) -def CSV_to_chunks_of_dataframes(c, chunksize=2 ** 20, **kwargs): - # Load a small 1000 line DF to start - # This helps with rapid viewing of a large CSV file - first = csv_to_dataframe(c, nrows=1000, **kwargs) - if len(first) == 1000: - rest = csv_to_dataframe( - c, chunksize=chunksize, skiprows=1000, **kwargs) - else: - rest = [] - - def _(): - yield first - for df in rest: - yield df - return chunks(pd.DataFrame)(_) - - -@discover.register(CSV) -def discover_csv(c, nrows=1000, **kwargs): - if c._dshape: - return c._dshape - - df = csv_to_dataframe(c, nrows=nrows, **kwargs) - df = coerce_datetimes(df) - - columns = [str(c) if not isinstance(c, (str, unicode)) else c - for c in df.columns] - df.columns = [c.strip() for c in columns] - - # Replace np.nan with None. Forces type string rather than float - for col in df.columns: - if not df[col].count(): - df[col] = None - - measure = discover(df).measure - - # Use Series.notnull to determine Option-ness - measure = Record([[name, Option(typ) - if df[name].isnull().any() and - not isinstance(typ, Option) else typ] - for name, typ in zip(measure.names, measure.types)]) - - return datashape.var * measure - - -@resource.register('.+\.(csv|tsv|ssv|data|dat)(\.gz|\.bz2?)?') -def resource_csv(uri, **kwargs): - return CSV(uri, **kwargs) - - -@resource.register('.*\*.+', priority=12) -def resource_glob(uri, **kwargs): - filenames = sorted(glob(uri)) - r = resource(filenames[0], **kwargs) - return chunks(type(r))([resource(u, **kwargs) for u in sorted(glob(uri))]) - - -@convert.register(chunks(pd.DataFrame), (chunks(CSV), chunks(Temp(CSV))), - cost=10.0) -def convert_glob_of_csvs_to_chunks_of_dataframes(csvs, **kwargs): - def _(): - return concat(convert(chunks(pd.DataFrame), csv, **kwargs) - for csv in csvs) - return chunks(pd.DataFrame)(_) - - -@convert.register(Temp(CSV), (pd.DataFrame, chunks(pd.DataFrame))) -def convert_dataframes_to_temporary_csv(data, **kwargs): - fn = '.%s.csv' % uuid.uuid1() - csv = Temp(CSV)(fn, **kwargs) - return append(csv, data, **kwargs) - - -@dispatch(CSV) -def drop(c): - os.unlink(c.path) - - -ooc_types.add(CSV) From d15319388ca58eb89956bfe395f5a8c30bd57daf Mon Sep 17 00:00:00 2001 From: ywang007 Date: Tue, 16 Aug 2016 10:04:10 -0400 Subject: [PATCH 3/8] compare measure --- odo/backends/tests/test_mysql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/odo/backends/tests/test_mysql.py b/odo/backends/tests/test_mysql.py index 14860cacd..ce3147366 100644 --- a/odo/backends/tests/test_mysql.py +++ b/odo/backends/tests/test_mysql.py @@ -203,7 +203,7 @@ def test_sql_to_csv(sql, csv): assert odo(csv, list) == data # explicitly test that we do NOT preserve the header here - assert discover(csv).measure.names != discover(sql).measure.names + assert discover(csv).measure != discover(sql).measure def test_sql_select_to_csv(sql, csv): From 8c7c03ea91ca3766be815aa6e6b990e0aeadc0a4 Mon Sep 17 00:00:00 2001 From: ywang007 Date: Tue, 16 Aug 2016 10:16:11 -0400 Subject: [PATCH 4/8] test --- odo/backends/csv.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/odo/backends/csv.py b/odo/backends/csv.py index 24f8661fd..5b6ba1c18 100644 --- a/odo/backends/csv.py +++ b/odo/backends/csv.py @@ -337,8 +337,8 @@ def _(): @discover.register(CSV) def discover_csv(c, nrows=1000, **kwargs): - if c._dshape: - return c._dshape + #if c._dshape: + # return c._dshape df = csv_to_dataframe(c, nrows=nrows, **kwargs) df = coerce_datetimes(df) From e9c9db0ac588832915ff3752d14e6e4d29f80a23 Mon Sep 17 00:00:00 2001 From: ywang007 Date: Tue, 16 Aug 2016 10:26:12 -0400 Subject: [PATCH 5/8] fix test --- odo/backends/csv.py | 4 ++-- odo/backends/tests/test_mysql.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/odo/backends/csv.py b/odo/backends/csv.py index 5b6ba1c18..24f8661fd 100644 --- a/odo/backends/csv.py +++ b/odo/backends/csv.py @@ -337,8 +337,8 @@ def _(): @discover.register(CSV) def discover_csv(c, nrows=1000, **kwargs): - #if c._dshape: - # return c._dshape + if c._dshape: + return c._dshape df = csv_to_dataframe(c, nrows=nrows, **kwargs) df = coerce_datetimes(df) diff --git a/odo/backends/tests/test_mysql.py b/odo/backends/tests/test_mysql.py index ce3147366..c664e09cf 100644 --- a/odo/backends/tests/test_mysql.py +++ b/odo/backends/tests/test_mysql.py @@ -202,8 +202,8 @@ def test_sql_to_csv(sql, csv): csv = odo(sql, fn) assert odo(csv, list) == data - # explicitly test that we do NOT preserve the header here - assert discover(csv).measure != discover(sql).measure + # explicitly test that we do NOT preserve the header here ??? + #assert discover(csv).measure.name != discover(sql).measure.name def test_sql_select_to_csv(sql, csv): From f2a79bc70630b3a260b3adc70e945419e93ade14 Mon Sep 17 00:00:00 2001 From: ywang007 Date: Tue, 16 Aug 2016 12:40:48 -0400 Subject: [PATCH 6/8] change dshape parameter to CSV to user_dshape to avoid conflicts --- odo/backends/csv.py | 17 +++++++++-------- odo/backends/tests/test_csv.py | 10 +++++++--- odo/backends/tests/test_mysql.py | 2 +- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/odo/backends/csv.py b/odo/backends/csv.py index 24f8661fd..b965edaad 100644 --- a/odo/backends/csv.py +++ b/odo/backends/csv.py @@ -140,25 +140,26 @@ class CSV(object): If the csv file has a header or not encoding : str (default utf-8) File encoding - dshape: datashape or string representation - used specified datashape + user_dshape: datashape or string representation + user specified datashape kwargs : other... Various choices about dialect """ canonical_extension = 'csv' def __init__(self, path, has_header=None, encoding='utf-8', - sniff_nbytes=10000, dshape=None, **kwargs): + sniff_nbytes=10000, user_dshape=None, **kwargs): self.path = path self._has_header = has_header self.encoding = encoding or 'utf-8' self._kwargs = kwargs self._sniff_nbytes = sniff_nbytes - if dshape: - if isinstance(dshape, (str, unicode)): - dshape = datashape.dshape(dshape) - dshape = None if isdimension(dshape.subshape[0][0]) else dshape - self._dshape = dshape + if user_dshape: + if isinstance(user_dshape, (str, unicode)): + user_dshape = datashape.dshape(user_dshape) + if not isrecord(user_dshape.measure): + raise TypeError('Please provide a Record dshape for t') + self._dshape = user_dshape def _sniff_dialect(self, path): kwargs = self._kwargs diff --git a/odo/backends/tests/test_csv.py b/odo/backends/tests/test_csv.py index 276609065..eca49fd56 100644 --- a/odo/backends/tests/test_csv.py +++ b/odo/backends/tests/test_csv.py @@ -399,11 +399,15 @@ def test_discover_with_dotted_names(): assert dshape.measure.names == [u'a.b', u'c.d'] def test_discover_csv_with_fixed_dshape(): - with filetext('name,val\nAlice,1\nBob,2') as fn: + with filetext('name,val\nAlice,1\n,0\nBob,2') as fn: ds = datashape.dshape('var * {name: string, val: float64}') - csv = CSV(fn, dshape=ds) - ds1 = discover(csv) + csv1 = CSV(fn, user_dshape=ds) + ds1 = discover(csv1) assert ds1 == ds + csv2 = CSV(fn, has_header=True) + ds2 = discover(csv2) + assert ds1 != ds2 + try: diff --git a/odo/backends/tests/test_mysql.py b/odo/backends/tests/test_mysql.py index c664e09cf..754ff7f15 100644 --- a/odo/backends/tests/test_mysql.py +++ b/odo/backends/tests/test_mysql.py @@ -203,7 +203,7 @@ def test_sql_to_csv(sql, csv): assert odo(csv, list) == data # explicitly test that we do NOT preserve the header here ??? - #assert discover(csv).measure.name != discover(sql).measure.name + assert discover(csv).measure.name != discover(sql).measure.name def test_sql_select_to_csv(sql, csv): From aff0daa57142eba73cd264e4a8e649c94dbc36f3 Mon Sep 17 00:00:00 2001 From: ywang007 Date: Tue, 16 Aug 2016 13:31:24 -0400 Subject: [PATCH 7/8] typo --- odo/backends/csv.py | 2 +- odo/backends/tests/test_mysql.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/odo/backends/csv.py b/odo/backends/csv.py index b965edaad..714a0d7d4 100644 --- a/odo/backends/csv.py +++ b/odo/backends/csv.py @@ -158,7 +158,7 @@ def __init__(self, path, has_header=None, encoding='utf-8', if isinstance(user_dshape, (str, unicode)): user_dshape = datashape.dshape(user_dshape) if not isrecord(user_dshape.measure): - raise TypeError('Please provide a Record dshape for t') + raise TypeError('Please provide a Record dshape for the csv') self._dshape = user_dshape def _sniff_dialect(self, path): diff --git a/odo/backends/tests/test_mysql.py b/odo/backends/tests/test_mysql.py index 754ff7f15..b8bc2b28e 100644 --- a/odo/backends/tests/test_mysql.py +++ b/odo/backends/tests/test_mysql.py @@ -203,7 +203,7 @@ def test_sql_to_csv(sql, csv): assert odo(csv, list) == data # explicitly test that we do NOT preserve the header here ??? - assert discover(csv).measure.name != discover(sql).measure.name + assert discover(csv).measure.names != discover(sql).measure.names def test_sql_select_to_csv(sql, csv): From 19b683f3fad3136c456ccd7acff9b3141857d2db Mon Sep 17 00:00:00 2001 From: ywang007 Date: Tue, 16 Aug 2016 13:43:37 -0400 Subject: [PATCH 8/8] unwind not needed change --- odo/backends/tests/test_mysql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/odo/backends/tests/test_mysql.py b/odo/backends/tests/test_mysql.py index b8bc2b28e..14860cacd 100644 --- a/odo/backends/tests/test_mysql.py +++ b/odo/backends/tests/test_mysql.py @@ -202,7 +202,7 @@ def test_sql_to_csv(sql, csv): csv = odo(sql, fn) assert odo(csv, list) == data - # explicitly test that we do NOT preserve the header here ??? + # explicitly test that we do NOT preserve the header here assert discover(csv).measure.names != discover(sql).measure.names