From a5561e397cd6cc0c6335ab75ff256441165d9f51 Mon Sep 17 00:00:00 2001 From: Moritz Lell Date: Sun, 4 Jan 2026 10:12:40 +0100 Subject: [PATCH] Add open() to CSVReader interface This allows subclasses to customize raw file reading. --- beangulp/importers/csvbase.py | 77 +++++++++++++++++++----------- beangulp/importers/csvbase_test.py | 26 ++++++++++ 2 files changed, 75 insertions(+), 28 deletions(-) diff --git a/beangulp/importers/csvbase.py b/beangulp/importers/csvbase.py index bb92f07..54f827b 100644 --- a/beangulp/importers/csvbase.py +++ b/beangulp/importers/csvbase.py @@ -276,6 +276,26 @@ def __init__(self): # warnings.warn('skiplines is deprecated, use header instead', DeprecationWarning) self.header = self.skiplines + def open(self, filepath): + """Open the CSV file for reading. + + This method can be overridden in subclasses to customize raw file reading, + for example to pre-proceess text lines before import. Note that to skip + a fixed number of lines at the file beginning or end, setting the class + members "header" or "footer" is the easier approach. + + This method uses the class member 'encoding'. Overriding this method causes + that member to be ignored unless the overriding method explicitly uses it. + + Args: + filepath: Filesystem path to the input file. + + Returns: + An iterable providing lines of CSV-formatted text. + """ + with open(filepath, encoding = self.encoding) as fd: + yield from fd + def read(self, filepath): """Read CSV file according to class defined columns specification. @@ -292,34 +312,35 @@ def read(self, filepath): """ - with open(filepath, encoding=self.encoding) as fd: - # Skip header and footer lines. - lines = _chomp(fd, self.header, self.footer) - - # Filter out comment lines. - if self.comments: - lines = filter(lambda x: not x.startswith(self.comments), lines) - - reader = csv.reader(lines, dialect=self.dialect) - - # Map column names to column indices. - names = None - if self.names: - headers = next(reader, None) - if headers is None: - raise IndexError("The input file does not contain an header line") - names = {name.strip(): index for index, name in enumerate(headers)} - - # Construct a class with attribute accessors for the - # configured columns that works similarly to a namedtuple. - attrs = {} - for name, column in self.columns.items(): - attrs[name] = property(column.getter(names)) - row = type("Row", (tuple,), attrs) - - # Return data rows. - for x in reader: - yield row(x) + lines = self.open(filepath) + + # Skip header and footer lines. + lines = _chomp(lines, self.header, self.footer) + + # Filter out comment lines. + if self.comments: + lines = filter(lambda x: not x.startswith(self.comments), lines) + + reader = csv.reader(lines, dialect=self.dialect) + + # Map column names to column indices. + names = None + if self.names: + headers = next(reader, None) + if headers is None: + raise IndexError("The input file does not contain an header line") + names = {name.strip(): index for index, name in enumerate(headers)} + + # Construct a class with attribute accessors for the + # configured columns that works similarly to a namedtuple. + attrs = {} + for name, column in self.columns.items(): + attrs[name] = property(column.getter(names)) + row = type("Row", (tuple,), attrs) + + # Return data rows. + for x in reader: + yield row(x) class Importer(beangulp.Importer, CSVReader): diff --git a/beangulp/importers/csvbase_test.py b/beangulp/importers/csvbase_test.py index 5734902..dbc3818 100644 --- a/beangulp/importers/csvbase_test.py +++ b/beangulp/importers/csvbase_test.py @@ -2,6 +2,7 @@ import decimal import re import unittest +from itertools import dropwhile from beancount.core import data from beancount.parser import cmptest @@ -451,6 +452,31 @@ class Reader(CSVReader): self.assertEqual(len(rows), 1) self.assertEqual(rows[0][0], "a") + @docfile + def test_custom_open(self, filename): + """\ + Skip this line + Skip this too + First, Second + a, b + c, d + """ + + class Reader(CSVReader): + first = Column("First") + second = Column("Second") + + def open(self, filepath): + """Skip lines until we find the column headers.""" + lines = super().open(filepath) + return dropwhile(lambda line: "First" not in line, lines) + + reader = Reader() + rows = list(reader.read(filename)) + self.assertEqual(len(rows), 2) + self.assertEqual(rows[0].first, "a") + self.assertEqual(rows[1].second, "d") + class Base(Importer): def identify(self, filepath):