Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 257 additions & 0 deletions src/pystatsv1/trackd/adapters/gnucash_gl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
# SPDX-License-Identifier: MIT
"""GnuCash CSV export adapter (GL splits -> core_gl normalized tables).

This adapter consumes the output of:

File -> Export -> Export Transactions to CSV

with "Simple Layout" unchecked (the multi-line/complex export).

It then produces the Track D "core_gl" normalized tables:

normalized/chart_of_accounts.csv
normalized/gl_journal.csv

We infer `account_type` and `normal_side` from the top-level account group
(Assets/Liabilities/Equity/Income/Expenses). This is intentionally pragmatic:
it keeps the adapter free and cross-platform without requiring GnuCash APIs.
"""

from __future__ import annotations

import csv
from dataclasses import dataclass
from decimal import Decimal, InvalidOperation
from typing import Any

from .base import NormalizeContext
from .mapping import clean_cell, normalize_col_name, parse_money
from .._errors import TrackDDataError


@dataclass(frozen=True)
class _AcctMeta:
account_type: str
normal_side: str


_ROOT_META = {
"assets": _AcctMeta("Asset", "Debit"),
"liabilities": _AcctMeta("Liability", "Credit"),
"equity": _AcctMeta("Equity", "Credit"),
"income": _AcctMeta("Revenue", "Credit"),
"expenses": _AcctMeta("Expense", "Debit"),
}


def _acct_meta_from_full_name(full_name: str) -> _AcctMeta:
root = (full_name.split(":", 1)[0] if full_name else "").strip().lower()
meta = _ROOT_META.get(root)
if not meta:
raise TrackDDataError(
"GnuCash export uses unexpected top-level account group: "
f"{root!r}. Expected one of: Assets, Liabilities, Equity, Income, Expenses."
)
return meta


def _to_decimal_money(value: str) -> Decimal:
"""Parse an amount into a Decimal, keeping sign."""

cleaned = parse_money(value)
if cleaned == "":
return Decimal("0")
try:
return Decimal(cleaned)
except InvalidOperation as exc: # pragma: no cover
raise TrackDDataError(f"Invalid money amount: {value!r}") from exc


def _fmt_2dp(x: Decimal) -> str:
"""Format with 2 decimals, but use blank for zero.

Track D templates typically leave the non-side empty (rather than "0.00").
"""

q = x.quantize(Decimal("0.01"))
if q == Decimal("0.00"):
return ""
return f"{q:.2f}"


class GnuCashGLAdapter:
name = "gnucash_gl"

def normalize(
self,
ctx: NormalizeContext,
) -> dict[str, Any]:
"""Normalize a GnuCash transactions export to the core_gl contract.

Notes
-----
Users should export from:
File -> Export -> Export Transactions to CSV
with "Simple Layout" unchecked (complex/multi-line).
"""

# This adapter currently targets the minimal core_gl contract.
if ctx.profile != "core_gl":
raise TrackDDataError(
f"gnucash_gl adapter currently supports profile 'core_gl' only (got {ctx.profile!r})."
)

tables_dir = ctx.tables_dir
normalized_dir = ctx.normalized_dir

# We expect the GnuCash export to be placed at tables/gl_journal.csv.
# (BYOD init creates both required files; users overwrite gl_journal.csv.)
src_path = tables_dir / "gl_journal.csv"
if not src_path.exists():
raise TrackDDataError(
"Missing tables/gl_journal.csv. Put the GnuCash export CSV here and re-run normalize."
)

with src_path.open("r", encoding="utf-8", newline="", errors="replace") as f:
reader = csv.DictReader(f)
fieldnames = reader.fieldnames or []

norm_to_src: dict[str, str] = {normalize_col_name(h): h for h in fieldnames}

def _col(*aliases: str) -> str | None:
for a in aliases:
if a in norm_to_src:
return norm_to_src[a]
return None

col_date = _col("date")
col_txn_id = _col("transaction_id")
col_number = _col("number")
col_desc = _col("description")
col_full_acct = _col("full_account_name")
col_acct = _col("account_name")
col_amt = _col("amount_num")

missing = [
k
for k, v in {
"Date": col_date,
"Transaction ID": col_txn_id,
"Number": col_number,
"Description": col_desc,
"Full Account Name": col_full_acct,
"Amount Num.": col_amt,
}.items()
if v is None
]
if missing:
raise TrackDDataError(
"GnuCash export is missing required columns: "
+ ", ".join(missing)
+ ". Make sure you exported 'Transactions to CSV' with Simple Layout unchecked."
)

normalized_dir.mkdir(parents=True, exist_ok=True)

# Collect splits and a derived chart of accounts.
splits: list[dict[str, str]] = []
coa: dict[str, _AcctMeta] = {}

for row in reader:
date = clean_cell(row[col_date])
txn_id = clean_cell(row[col_txn_id])
doc_id = clean_cell(row[col_number])
desc = clean_cell(row[col_desc])

full_acct = clean_cell(row[col_full_acct])
if not full_acct and col_acct:
full_acct = clean_cell(row[col_acct])
if not full_acct:
# Skip empty rows.
continue

meta = _acct_meta_from_full_name(full_acct)
coa.setdefault(full_acct, meta)

amt = _to_decimal_money(clean_cell(row[col_amt]))

debit = Decimal("0")
credit = Decimal("0")
if meta.normal_side == "Debit":
if amt >= 0:
debit = amt
else:
credit = -amt
else: # Credit-normal
if amt >= 0:
credit = amt
else:
debit = -amt

splits.append(
{
"txn_id": txn_id,
"date": date,
"doc_id": doc_id,
"description": desc,
"account_id": full_acct,
"debit": _fmt_2dp(debit) if debit != 0 else "",
"credit": _fmt_2dp(credit) if credit != 0 else "",
}
)

# Write normalized gl_journal.csv
gl_path = normalized_dir / "gl_journal.csv"
with gl_path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(
f,
fieldnames=[
"txn_id",
"date",
"doc_id",
"description",
"account_id",
"debit",
"credit",
],
)
writer.writeheader()
for r in splits:
writer.writerow(r)

# Write normalized chart_of_accounts.csv
coa_path = normalized_dir / "chart_of_accounts.csv"
with coa_path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(
f,
fieldnames=[
"account_id",
"account_name",
"account_type",
"normal_side",
],
)
writer.writeheader()
for account_id in sorted(coa.keys()):
meta = coa[account_id]
leaf = account_id.split(":")[-1].strip()
writer.writerow(
{
"account_id": account_id,
"account_name": leaf,
"account_type": meta.account_type,
"normal_side": meta.normal_side,
}
)

return {
"adapter": self.name,
"profile": ctx.profile,
"project": str(ctx.project_root),
"tables_dir": str(ctx.tables_dir),
"normalized_dir": str(ctx.normalized_dir),
"files": [
{"dst": str(coa_path), "rows": len(coa)},
{"dst": str(gl_path), "rows": len(splits)},
],
}
6 changes: 5 additions & 1 deletion src/pystatsv1/trackd/byod.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,12 @@ def _get_adapter(name: str | None) -> TrackDAdapter:
from .adapters.core_gl import CoreGLAdapter

return CoreGLAdapter()
if n == "gnucash_gl":
from .adapters.gnucash_gl import GnuCashGLAdapter

return GnuCashGLAdapter()
raise TrackDDataError(
f"Unknown adapter: {name}.\n" "Use one of: passthrough, core_gl"
f"Unknown adapter: {name}.\n" "Use one of: passthrough, core_gl, gnucash_gl"
)


Expand Down
1 change: 1 addition & 0 deletions tests/test_trackd_byod_adapter_selection_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ def test_trackd_byod_normalize_uses_adapter_from_config(tmp_path: Path, capsys)
assert "unknown adapter" in out
assert "passthrough" in out
assert "core_gl" in out
assert "gnucash_gl" in out
63 changes: 62 additions & 1 deletion tests/test_trackd_byod_normalize_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,65 @@ def test_trackd_byod_normalize_core_gl_adapter_allows_noncanonical_headers_and_c
assert rows[0]["debit"] == "1234.00"
assert rows[1]["debit"] == "-200.00"
assert rows[2]["credit"] == "2000.00"
assert rows[0]["Memo"] == "hi"


def test_trackd_byod_normalize_gnucash_gl_adapter_consumes_export_and_emits_core_gl_contract(
tmp_path: Path,
capsys,
) -> None:
proj = tmp_path / "byod"

rc_init = main(["trackd", "byod", "init", "--dest", str(proj), "--profile", "core_gl"])
assert rc_init == 0

# Switch adapter to gnucash_gl.
cfg_path = proj / "config.toml"
cfg = cfg_path.read_text(encoding="utf-8")
cfg_path.write_text(cfg.replace('adapter = "passthrough"', 'adapter = "gnucash_gl"'), encoding="utf-8")

# A tiny GnuCash "Export Transactions to CSV" example (complex layout; one transaction, two splits).
(proj / "tables" / "gl_journal.csv").write_text(
"Date,Transaction ID,Number,Description,Notes,Commodity/Currency,Void Reason,Action,Memo,Full Account Name,Account Name,Amount With Sym.,Amount Num.,Value With Sym.,Value Num.,Reconcile,Reconcile Date,Rate/Price\n"
"2026-01-20,6d0b834e77b16d5a0eeb0f92f0dd1681,000001,Test,,CURRENCY::CAD,,,,Expenses:Auto:Gas,Gas,\"$10.00\",10.00,\"$10.00\",10.00,n,,1\n"
"2026-01-20,6d0b834e77b16d5a0eeb0f92f0dd1681,000001,Test,,CURRENCY::CAD,,,,Assets:Current Assets:Cash in Wallet,Cash in Wallet,\"-$10.00\",-10.00,\"-$10.00\",-10.00,n,,1\n",
encoding="utf-8",
)

rc = main(["trackd", "byod", "normalize", "--project", str(proj)])
out = capsys.readouterr().out.lower()

assert rc == 0
assert "adapter: gnucash_gl" in out

# Ensure outputs exist and are parseable.
import csv

gl_path = proj / "normalized" / "gl_journal.csv"
coa_path = proj / "normalized" / "chart_of_accounts.csv"

assert gl_path.exists()
assert coa_path.exists()

with gl_path.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
rows = list(reader)

assert len(rows) == 2

# Expense increases => debit; asset decreases => credit.
exp = next(r for r in rows if r["account_id"].startswith("Expenses"))
cash = next(r for r in rows if r["account_id"].startswith("Assets"))

assert exp["debit"] == "10.00"
assert exp["credit"] in ("", "0.00")

assert cash["credit"] == "10.00"
assert cash["debit"] in ("", "0.00")

with coa_path.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
coa_rows = list(reader)

ids = {r["account_id"] for r in coa_rows}
assert "Expenses:Auto:Gas" in ids
assert "Assets:Current Assets:Cash in Wallet" in ids