Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions csv_detective/detection/formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def detect_formats(
# Perform testing on fields
if not in_chunks:
# table is small enough to be tested in one go
scores_table_fields = test_col(
scores_table_fields, all_meta = test_col(
table=table,
formats=formats,
limited_output=limited_output,
Expand All @@ -59,7 +59,7 @@ def detect_formats(
analysis["categorical"] = res_categorical
col_values = None
else:
scores_table_fields, analysis, col_values = test_col_chunks(
scores_table_fields, analysis, col_values, all_meta = test_col_chunks(
table=table,
file_path=file_path,
analysis=analysis,
Expand Down Expand Up @@ -128,4 +128,22 @@ def detect_formats(
for header, col_metadata in analysis["columns"].items():
analysis["formats"][col_metadata["format"]].append(header)

# enrich date/datetime columns with date_format from meta collected during detection
for col_name, detection in analysis["columns"].items():
if isinstance(detection, list):
detection = next(
(d for d in detection if d.get("python_type") in ("date", "datetime")),
None,
)
if detection is None:
continue
if detection.get("python_type") not in ("date", "datetime"):
continue
col_meta = all_meta.get(col_name, {}).get(detection["format"], {})
date_formats = col_meta.get("date_format", set())
if len(date_formats) == 1:
detection["date_format"] = list(date_formats)
else:
detection["date_format"] = None

return analysis, col_values
78 changes: 70 additions & 8 deletions csv_detective/formats/date.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,32 @@ def date_casting(val: str) -> datetime | None:
).replace("SEP", seps + "?")


def _is(val) -> bool:
def _is(val, meta=None) -> bool:
# many early stops, to cut processing time
# and avoid the costly use of date_casting as much as possible
# /!\ timestamps are considered ints, not dates
if not isinstance(val, str) or len(val) > 20 or len(val) < 8:
return False
# if it's a usual date pattern
if (
# with this syntax, if any of the first value is True, the next ones are not computed
bool(re.match(jjmmaaaa_pattern, val))
or bool(re.match(aaaammjj_pattern, val))
or bool(re.match(string_month_pattern, val, re.IGNORECASE))
):
if re.match(jjmmaaaa_pattern, val):
if meta is not None:
fmt = detect_strptime_format(val)
if fmt:
meta.setdefault("date_format", set()).add(fmt)
return True
if re.match(aaaammjj_pattern, val):
if meta is not None:
fmt = detect_strptime_format(val)
if fmt:
meta.setdefault("date_format", set()).add(fmt)
return True
if re.match(string_month_pattern, val, re.IGNORECASE):
return True
if re.match(r"^-?\d+[\.|,]\d+$", val):
# regular floats are excluded
return False
# not enough digits => not a date (slightly arbitrary)
if sum([char.isdigit() for char in val]) / len(val) < threshold:
if sum(char.isdigit() for char in val) / len(val) < threshold:
return False
# last resort
res = date_casting(val)
Expand All @@ -84,6 +91,61 @@ def _is(val) -> bool:
return True


def detect_strptime_format(val: str) -> str | None:
"""Returns the strptime format string for a date value, or None if format can't be determined."""
if not isinstance(val, str) or len(val) > 20 or len(val) < 8:
return None

if re.match(jjmmaaaa_pattern, val):
sep = val[2]
if val[5] != sep:
return None
return f"%d{sep}%m{sep}%Y"

if re.match(aaaammjj_pattern, val):
if len(val) == 8:
return "%Y%m%d"
sep = val[4]
if val[7] != sep:
return None
return f"%Y{sep}%m{sep}%d"

return None


def detect_strptime_format_datetime(val: str) -> str | None:
"""Returns the strptime format string for a datetime value, or None if format can't be determined."""
from csv_detective.formats.datetime_aware import pat as aware_pat
from csv_detective.formats.datetime_naive import pat as naive_pat

if not isinstance(val, str) or len(val) < 15:
return None

for pat, has_tz in [(naive_pat, False), (aware_pat, True)]:
if not re.match(pat, val):
continue
sep = val[4]
if sep.isdigit():
sep = ""
elif val[7] != sep:
return None

date_end = 8 if not sep else 10
tsep = val[date_end]

time_part = val[date_end + 1 :]
has_microseconds = "." in time_part

fmt = f"%Y{sep}%m{sep}%d{tsep}%H:%M:%S"
if has_microseconds:
fmt += ".%f"
if has_tz:
fmt += "%z"
return fmt

return None


_test_values = {
True: [
"1960-08-07",
Expand Down
10 changes: 8 additions & 2 deletions csv_detective/formats/datetime_aware.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@
prefix = r"^\d{2}[-/:]?\d{2}"


def _is(val) -> bool:
def _is(val, meta=None) -> bool:
# early stops, to cut processing time
# 16 is the minimal length of a datetime format YYMMDDTHH:MM:SSZ
# 32 is the maximal length of an ISO datetime format YYYY-MM-DDTHH:MM:SS.dddddd+HH:MM, keeping some slack
if not isinstance(val, str) or len(val) > 35 or len(val) < 16 or not re.match(prefix, val):
return False
# if usual format, no need to parse
if bool(re.match(pat, val)):
if meta is not None:
from csv_detective.formats.date import detect_strptime_format_datetime

fmt = detect_strptime_format_datetime(val)
if fmt:
meta.setdefault("date_format", set()).add(fmt)
return True
if sum([char.isdigit() or char in {"-", "/", ":", " "} for char in val]) / len(val) < threshold:
if sum(char.isdigit() or char in {"-", "/", ":", " "} for char in val) / len(val) < threshold:
return False
res = date_casting(val)
return (
Expand Down
10 changes: 8 additions & 2 deletions csv_detective/formats/datetime_naive.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@
)


def _is(val: Any | None) -> bool:
def _is(val: Any | None, meta=None) -> bool:
# early stops, to cut processing time
# 15 is the minimal length of a datetime format YYMMDDTHH:MM:SS
# 26 is the maximal length of an ISO datetime format YYYY-MM-DDTHH:MM:SS.dddddd, keeping some slack
if not isinstance(val, str) or len(val) > 30 or len(val) < 15 or not re.match(prefix, val):
return False
# if usual format, no need to parse
if bool(re.match(pat, val)):
if meta is not None:
from csv_detective.formats.date import detect_strptime_format_datetime

fmt = detect_strptime_format_datetime(val)
if fmt:
meta.setdefault("date_format", set()).add(fmt)
return True
if sum([char.isdigit() or char in {"-", "/", ":", " "} for char in val]) / len(val) < threshold:
if sum(char.isdigit() or char in {"-", "/", ":", " "} for char in val) / len(val) < threshold:
return False
res = date_casting(val)
return res is not None and not bool(res.tzinfo)
Expand Down
27 changes: 23 additions & 4 deletions csv_detective/output/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,21 @@
from csv_detective.utils import display_logs_depending_process_time


def cast(value: str, _type: str) -> str | int | float | bool | date | datetime | bytes | None:
def fast_date_casting(val: str, date_formats: list[str] | None) -> datetime | None:
if date_formats:
for fmt in date_formats:
try:
return datetime.strptime(val, fmt)
except (ValueError, TypeError):
continue
return date_casting(val)


def cast(
value: str,
_type: str,
date_format: list[str] | None = None,
) -> str | int | float | bool | date | datetime | bytes | None:
if not isinstance(value, str) or value in pd._libs.parsers.STR_NA_VALUES:
# STR_NA_VALUES are directly ingested as NaN by pandas, we avoid trying to cast them (into int for instance)
return None
Expand All @@ -31,10 +45,10 @@ def cast(value: str, _type: str) -> str | int | float | bool | date | datetime |
# in hydra json are given to postgres as strings, conversion is done by postgres
return json.loads(value)
case "date":
_date = date_casting(value)
_date = fast_date_casting(value, date_format)
return _date.date() if _date else None
case "datetime":
return date_casting(value)
return fast_date_casting(value, date_format)
case "binary":
return binary_casting(value)
case _:
Expand All @@ -57,7 +71,12 @@ def cast_df(
# to allow having ints and NaN in the same column
df[col_name] = df[col_name].astype(pd.Int64Dtype())
else:
df[col_name] = df[col_name].apply(lambda col: cast(col, _type=detection["python_type"]))
date_format = detection.get("date_format")
df[col_name] = df[col_name].apply(
lambda col, _type=detection["python_type"], _df=date_format: cast(
col, _type=_type, date_format=_df
)
)
if verbose:
display_logs_depending_process_time(
f"Casting columns completed in {round(time() - start, 3)}s",
Expand Down
34 changes: 22 additions & 12 deletions csv_detective/parsing/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,24 @@ def test_col_val(
skipna: bool = True,
limited_output: bool = False,
verbose: bool = False,
meta: dict | None = None,
) -> float:
"""Tests values of the serie using test_func.
- skipna : if True indicates that NaNs are considered True
for the serie to be detected as a certain format
- meta : if provided, passed to test_func to collect extra info (e.g. date_format)
"""
if verbose:
start = time()

if meta is not None and format.name in ("date", "datetime_naive", "datetime_aware"):
test_func = lambda v: format.func(v, meta)
else:
test_func = format.func

# TODO : change for a cleaner method and only test columns in modules labels
def apply_test_func(serie: pd.Series, test_func: Callable, _range: int):
return serie.sample(n=_range).apply(test_func)
def apply_test_func(serie: pd.Series, _test_func: Callable, _range: int):
return serie.sample(n=_range).apply(_test_func)

try:
if skipna:
Expand All @@ -48,7 +55,7 @@ def apply_test_func(serie: pd.Series, test_func: Callable, _range: int):
if not limited_output or format.proportion < 1:
# we want or have to go through the whole column to have the proportion
value_counts = serie.value_counts()
unique_results = value_counts.index.to_series().apply(format.func)
unique_results = value_counts.index.to_series().apply(test_func)
result: float = (unique_results * value_counts.values).sum() / ser_len
return result if result >= format.proportion else 0.0
else:
Expand All @@ -58,9 +65,9 @@ def apply_test_func(serie: pd.Series, test_func: Callable, _range: int):
min(1, ser_len),
min(5, ser_len),
]:
if not all(apply_test_func(serie, format.func, _range)):
if not all(apply_test_func(serie, test_func, _range)):
return 0.0
return float(all(format.func(v) for v in serie.unique()))
return float(all(test_func(v) for v in serie.unique()))
finally:
if verbose and time() - start > 3:
display_logs_depending_process_time(
Expand All @@ -75,25 +82,28 @@ def test_col(
limited_output: bool,
skipna: bool = True,
verbose: bool = False,
):
) -> tuple[pd.DataFrame, dict[str, dict[str, dict]]]:
if verbose:
start = time()
logging.info("Testing columns to get formats")
return_table = pd.DataFrame(columns=table.columns)
all_meta: dict[str, dict[str, dict]] = {}
for idx, (label, format) in enumerate(formats.items()):
if verbose:
start_type = time()
logging.info(f"\t- Starting with format '{label}'")
# improvement lead : put the longest tests behind and make them only if previous tests not satisfactory
# => the following needs to change, "apply" means all columns are tested for one type at once
for col in table.columns:
meta: dict = {}
return_table.loc[label, col] = test_col_val(
table[col],
format,
skipna=skipna,
limited_output=limited_output,
verbose=verbose,
meta=meta,
)
if meta:
all_meta.setdefault(col, {})[label] = meta
if verbose:
display_logs_depending_process_time(
f'\t> Done with format "{label}" in {round(time() - start_type, 3)}s ({idx + 1}/{len(formats)})',
Expand All @@ -103,7 +113,7 @@ def test_col(
display_logs_depending_process_time(
f"Done testing columns in {round(time() - start, 3)}s", time() - start
)
return return_table
return return_table, all_meta


def test_label(
Expand Down Expand Up @@ -138,7 +148,7 @@ def test_col_chunks(
limited_output: bool,
skipna: bool = True,
verbose: bool = False,
) -> tuple[pd.DataFrame, dict, dict[str, pd.Series]]:
) -> tuple[pd.DataFrame, dict, dict[str, pd.Series], dict[str, dict[str, dict]]]:
def build_remaining_tests_per_col(return_table: pd.DataFrame) -> dict[str, list[str]]:
# returns a dict with the table's columns as keys and the list of remaining format labels to apply
return {
Expand All @@ -156,7 +166,7 @@ def build_remaining_tests_per_col(return_table: pd.DataFrame) -> dict[str, list[
logging.info("Testing columns to get formats on chunks")

# analysing the sample to get a first guess
return_table = test_col(table, formats, limited_output, skipna=skipna, verbose=verbose)
return_table, all_meta = test_col(table, formats, limited_output, skipna=skipna, verbose=verbose)
# mandatory_label formats are zeroed out at the end if the label doesn't match,
# so there's no point running the expensive field tests on those columns
mandatory_label_skip: dict[str, set[str]] = {
Expand Down Expand Up @@ -257,4 +267,4 @@ def build_remaining_tests_per_col(return_table: pd.DataFrame) -> dict[str, list[
display_logs_depending_process_time(
f"Done testing chunks in {round(time() - start, 3)}s", time() - start
)
return return_table, analysis, col_values
return return_table, analysis, col_values, all_meta
16 changes: 15 additions & 1 deletion csv_detective/validate.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from collections import defaultdict
from datetime import datetime

import pandas as pd

Expand Down Expand Up @@ -123,7 +124,20 @@ def validate(
if to_check.empty:
continue
value_counts = to_check.value_counts()
unique_results = value_counts.index.to_series().apply(formats[detected["format"]].func)
date_formats = detected.get("date_format")
if date_formats:
def _fast_is(val, _fmts=date_formats):
for fmt in _fmts:
try:
datetime.strptime(val, fmt)
return True
except (ValueError, TypeError):
continue
return False

unique_results = value_counts.index.to_series().apply(_fast_is)
else:
unique_results = value_counts.index.to_series().apply(formats[detected["format"]].func)
chunk_valid_values = (unique_results * value_counts.values).sum()
if formats[detected["format"]].proportion == 1 and chunk_valid_values < len(to_check):
# we can early stop in this case, not all values are valid while we want 100%
Expand Down
Loading