Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions src/pystatsv1/trackd/adapters/core_gl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# SPDX-License-Identifier: MIT
"""Generic 'core_gl' adapter for Track D BYOD normalization.

This adapter is the bridge from "perfect" template exports (already matching the
contract) to "slightly messy" Sheets/Excel exports.

Features (v1):
- Header matching that tolerates case/spacing/punctuation (e.g., "Account ID")
- Whitespace trimming across all cells
- Money cleanup for debit/credit (commas, $, parentheses-as-negative)
- Canonical output column order (required first, then passthrough extras)

Inputs
------
Reads contract-named files from ``tables/``:
- chart_of_accounts.csv
- gl_journal.csv

Outputs
-------
Writes contract-named files to ``normalized/`` with contract column names.
"""

from __future__ import annotations

import csv
from pathlib import Path
from typing import Any

from .._errors import TrackDDataError, TrackDSchemaError
from ..contracts import schemas_for_profile
from .base import NormalizeContext
from .mapping import (
build_rename_map,
clean_cell,
detect_duplicate_destinations,
parse_money,
)


_COA_ALIASES: dict[str, tuple[str, ...]] = {
"account_id": ("acct_id", "acct", "account", "account number", "account_no"),
"account_name": ("acct_name", "name"),
"account_type": ("type",),
"normal_side": ("normal", "side"),
}

_GL_ALIASES: dict[str, tuple[str, ...]] = {
"txn_id": ("txnid", "transaction_id", "transaction id", "id"),
"doc_id": ("doc", "document", "document_id", "document id"),
"description": ("desc", "memo", "narrative"),
"account_id": ("acct_id", "acct", "account", "account number", "account_no"),
"debit": ("dr", "debits"),
"credit": ("cr", "credits"),
}


def _write_normalized_csv(
src: Path,
dst: Path,
*,
required_columns: tuple[str, ...],
aliases: dict[str, tuple[str, ...]] | None = None,
money_columns: tuple[str, ...] = (),
) -> dict[str, Any]:
with src.open("r", newline="", encoding="utf-8-sig") as f_in:
reader = csv.DictReader(f_in)
if not reader.fieldnames:
raise TrackDDataError(f"CSV appears to have no header row: {src.name}")

fieldnames = [str(c) for c in reader.fieldnames if c is not None]
rename_map = build_rename_map(fieldnames, required_columns=required_columns, aliases=aliases)

dups = detect_duplicate_destinations(rename_map)
if dups:
pieces = [f"{dst}: {', '.join(srcs)}" for dst, srcs in sorted(dups.items())]
raise TrackDSchemaError(
"Ambiguous column mapping (multiple source columns map to the same required column).\n"
+ "\n".join(pieces)
)

# Determine output fields: required columns first, then passthrough extras.
required_set = set(required_columns)
extras: list[str] = []
for c in fieldnames:
dest = rename_map.get(c, c)
if dest in required_set:
continue
# Preserve original extra column names (trimmed).
extras.append(c.strip())

out_fields = list(required_columns) + extras

dst.parent.mkdir(parents=True, exist_ok=True)
with dst.open("w", newline="", encoding="utf-8") as f_out:
writer = csv.DictWriter(f_out, fieldnames=out_fields)
writer.writeheader()
n_rows = 0
for row in reader:
out_row: dict[str, str] = {k: "" for k in out_fields}

# Map + clean required columns
for src_col in fieldnames:
raw_val = row.get(src_col)
val = clean_cell(raw_val)
dest = rename_map.get(src_col, src_col).strip()

# Extra columns: keep under original header (trimmed).
if dest not in required_set:
dest = src_col.strip()

if dest not in out_row:
# If an extra column name collides with required, prefer required slot.
continue

if dest in money_columns:
val = parse_money(val)

out_row[dest] = val

writer.writerow(out_row)
n_rows += 1

return {
"src": str(src),
"dst": str(dst),
"written_rows": n_rows,
"written_columns": out_fields,
}


class CoreGLAdapter:
name = "core_gl"

def normalize(self, ctx: NormalizeContext) -> dict[str, Any]:
schemas = schemas_for_profile(ctx.profile)

ctx.normalized_dir.mkdir(parents=True, exist_ok=True)

files: list[dict[str, Any]] = []
for schema in schemas:
src = ctx.tables_dir / schema.name
dst = ctx.normalized_dir / schema.name
if not src.exists():
raise TrackDDataError(f"Missing required input file for adapter '{self.name}': {src}")

if schema.name == "chart_of_accounts.csv":
aliases = _COA_ALIASES
money_cols: tuple[str, ...] = ()
elif schema.name == "gl_journal.csv":
aliases = _GL_ALIASES
money_cols = ("debit", "credit")
else:
aliases = None
money_cols = ()

files.append(
_write_normalized_csv(
src,
dst,
required_columns=schema.required_columns,
aliases=aliases,
money_columns=money_cols,
)
)

return {
"ok": True,
"adapter": self.name,
"profile": ctx.profile,
"project": str(ctx.project_root),
"tables_dir": str(ctx.tables_dir),
"normalized_dir": str(ctx.normalized_dir),
"files": files,
}
142 changes: 142 additions & 0 deletions src/pystatsv1/trackd/adapters/mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# SPDX-License-Identifier: MIT
"""Small mapping/cleaning utilities for Track D BYOD adapters.

Design goals (Phase 3.1):
- Keep utilities tiny and dependency-light (csv-first).
- Support *boring* transformations that come up in Sheets/Excel exports:
- column-name normalization / rename matching
- whitespace trimming
- simple money parsing (commas, $, parentheses-as-negative)

These helpers are intentionally not a full ETL framework. They exist to keep
individual adapters readable and consistent.
"""

from __future__ import annotations

import re
from typing import Iterable


_RE_NON_ALNUM = re.compile(r"[^a-z0-9_]+")
_RE_UNDERSCORES = re.compile(r"_+")
_RE_MONEY = re.compile(r"^\(?\s*(?P<body>.*)\s*\)?$")


def normalize_col_name(name: str) -> str:
"""Normalize a column header for matching purposes.

Examples
--------
"Account ID" -> "account_id"
" normal-side " -> "normal_side"
"DOC-ID" -> "doc_id"
"""

s = (name or "").strip().lower()
s = s.replace("-", "_").replace(" ", "_")
s = _RE_NON_ALNUM.sub("_", s)
s = _RE_UNDERSCORES.sub("_", s).strip("_")
return s


def build_rename_map(
fieldnames: Iterable[str],
*,
required_columns: tuple[str, ...],
aliases: dict[str, tuple[str, ...]] | None = None,
) -> dict[str, str]:
"""Build a mapping from source fieldnames to required/normalized names.

Strategy:
1) direct normalized match (case/spacing/punct insensitivity)
2) optional aliases (also normalized), used only as a *fallback*

Why fallback-only?
- Many exports include both a canonical column (e.g., "Description") and a
near-synonym (e.g., "Memo"). We don't want aliases to create ambiguous
mappings when a direct match already exists.

Returns a dict that maps *source column name* -> *destination column name*.
"""

src = list(fieldnames)
required_norm = {normalize_col_name(c): c for c in required_columns}

alias_norm: dict[str, str] = {}
if aliases:
for dest, alts in aliases.items():
for a in alts:
alias_norm[normalize_col_name(a)] = dest

out: dict[str, str] = {}
claimed: set[str] = set()

# Pass 1: exact required matches
for col in src:
n = normalize_col_name(col)
if n in required_norm:
dest = required_norm[n]
out[col] = dest
claimed.add(dest)

# Pass 2: alias fallback (only if dest not already claimed)
for col in src:
if col in out:
continue
n = normalize_col_name(col)
dest = alias_norm.get(n)
if dest and dest not in claimed:
out[col] = dest
claimed.add(dest)

return out


def detect_duplicate_destinations(rename_map: dict[str, str]) -> dict[str, list[str]]:
"""Return destinations that are mapped from multiple sources."""

rev: dict[str, list[str]] = {}
for src, dst in rename_map.items():
rev.setdefault(dst, []).append(src)
return {dst: srcs for dst, srcs in rev.items() if len(srcs) > 1}


def clean_cell(value: object) -> str:
"""Trim whitespace and coerce missing values to empty string."""
if value is None:
return ""
s = str(value)
return s.strip()


def parse_money(value: str) -> str:
"""Parse common spreadsheet money formats into a simple numeric string.

Supported patterns:
- "$1,234.00" -> "1234.00"
- "(1,234.00)" or "($1,234.00)" -> "-1234.00"
- "-1,234" -> "-1234"

If the string is blank, returns blank.
"""

s = (value or "").strip()
if not s:
return ""

neg = False
if s.startswith("(") and s.endswith(")"):
neg = True
s = s[1:-1].strip()

# Strip currency and grouping separators.
s = s.replace("$", "").replace(",", "").strip()
if not s:
return ""

# If it already has a leading minus, keep it.
if s.startswith("-"):
return s

return f"-{s}" if neg else s
33 changes: 28 additions & 5 deletions src/pystatsv1/trackd/byod.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pathlib import Path
from typing import Any

from ._errors import TrackDDataError
from ._errors import TrackDDataError, TrackDSchemaError
from ._types import PathLike
from .adapters.base import NormalizeContext, TrackDAdapter
from .contracts import ALLOWED_PROFILES, schemas_for_profile
Expand Down Expand Up @@ -244,11 +244,16 @@ def _get_adapter(name: str | None) -> TrackDAdapter:
n = (name or "").strip().lower() or "passthrough"
if n == "passthrough":
return _PassthroughAdapter()
if n == "core_gl":
from .adapters.core_gl import CoreGLAdapter

return CoreGLAdapter()
raise TrackDDataError(
f"Unknown adapter: {name}.\n" "Use one of: passthrough"
f"Unknown adapter: {name}.\n" "Use one of: passthrough, core_gl"
)



def normalize_byod_project(project: PathLike, *, profile: str | None = None) -> dict[str, Any]:
"""Normalize BYOD project tables into ``normalized/`` outputs.

Expand Down Expand Up @@ -293,8 +298,20 @@ def normalize_byod_project(project: PathLike, *, profile: str | None = None) ->

adapter = _get_adapter(cfg.get("adapter"))

# Validate required schema issues first, so adapters can assume headers exist.
validate_dataset(tables_dir, profile=p)
# Validation strategy:
# - passthrough expects contract-shaped inputs under tables/
# - other adapters may accept non-canonical headers, so we validate after normalize
if getattr(adapter, "name", "") == "passthrough":
# Validate required schema issues first, so passthrough can assume headers exist.
validate_dataset(tables_dir, profile=p)
else:
# Light check: required files must exist; detailed schema validation runs on normalized outputs.
schemas = schemas_for_profile(p)
missing = [s.name for s in schemas if not (tables_dir / s.name).exists()]
if missing:
raise TrackDSchemaError(
"Missing required files in tables/: " + ", ".join(missing)
)

ctx = NormalizeContext(
project_root=root,
Expand All @@ -303,4 +320,10 @@ def normalize_byod_project(project: PathLike, *, profile: str | None = None) ->
raw_dir=(root / "raw"),
normalized_dir=(root / "normalized"),
)
return adapter.normalize(ctx)
report = adapter.normalize(ctx)

if getattr(adapter, "name", "") != "passthrough":
# Ensure adapter output conforms to the Track D contract.
validate_dataset(ctx.normalized_dir, profile=p)

return report
1 change: 1 addition & 0 deletions tests/test_trackd_byod_adapter_selection_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ def test_trackd_byod_normalize_uses_adapter_from_config(tmp_path: Path, capsys)
assert rc == 1
assert "unknown adapter" in out
assert "passthrough" in out
assert "core_gl" in out
Loading