diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fc77404 --- /dev/null +++ b/.gitignore @@ -0,0 +1,20 @@ +# IDE +.idea/ +*.xml +*.iml + +# Python +__pycache__/ +*.py[cod] +*$py.class + +# Environments +.env +.venv/ +venv/ +env/ + +# Distribution / Packaging +dist/ +build/ +*.egg-info/ diff --git a/handelsregister.py b/handelsregister.py index a17aed3..03ccc1a 100755 --- a/handelsregister.py +++ b/handelsregister.py @@ -5,11 +5,13 @@ """ import argparse +import tempfile import mechanize import re import pathlib import sys from bs4 import BeautifulSoup +import urllib.parse # Dictionaries to map arguments to values schlagwortOptionen = { @@ -48,15 +50,13 @@ def __init__(self, args): ( "Connection", "keep-alive" ), ] - self.cachedir = pathlib.Path("cache") + self.cachedir = pathlib.Path(tempfile.gettempdir()) / "handelsregister_cache" self.cachedir.mkdir(parents=True, exist_ok=True) def open_startpage(self): self.browser.open("https://www.handelsregister.de", timeout=10) def companyname2cachename(self, companyname): - # map a companyname to a filename, that caches the downloaded HTML, so re-running this script touches the - # webserver less often. return self.cachedir / companyname def search_company(self): @@ -64,11 +64,15 @@ def search_company(self): if self.args.force==False and cachename.exists(): with open(cachename, "r") as f: html = f.read() - print("return cached content for %s" % self.args.schlagwoerter) + if not self.args.json: + print("return cached content for %s" % self.args.schlagwoerter) else: # TODO implement token bucket to abide by rate limit # Use an atomic counter: https://gist.github.com/benhoyt/8c8a8d62debe8e5aa5340373f9c509c7 - response_search = self.browser.follow_link(text="Advanced search") + self.browser.select_form(name="naviForm") + self.browser.form.new_control('hidden', 'naviForm:erweiterteSucheLink', {'value': 'naviForm:erweiterteSucheLink'}) + self.browser.form.new_control('hidden', 'target', {'value': 'erweiterteSucheLink'}) + response_search = self.browser.submit() if self.args.debug == True: print(self.browser.title()) @@ -95,28 +99,49 @@ def search_company(self): return get_companies_in_searchresults(html) + def parse_result(result): cells = [] for cellnum, cell in enumerate(result.find_all('td')): - #print('[%d]: %s [%s]' % (cellnum, cell.text, cell)) cells.append(cell.text.strip()) - #assert cells[7] == 'History' d = {} d['court'] = cells[1] + + # Extract register number: HRB, HRA, VR, GnR followed by numbers (e.g. HRB 12345, VR 6789) + # Also capture suffix letter if present (e.g. HRB 12345 B), but avoid matching start of words (e.g. " Formerly") + reg_match = re.search(r'(HRA|HRB|GnR|VR|PR)\s*\d+(\s+[A-Z])?(?!\w)', d['court']) + d['register_num'] = reg_match.group(0) if reg_match else None + d['name'] = cells[2] d['state'] = cells[3] - d['status'] = cells[4] + d['status'] = cells[4].strip() # Original value for backward compatibility + d['statusCurrent'] = cells[4].strip().upper().replace(' ', '_') # Transformed value + + # Ensure consistent register number suffixes (e.g. ' B' for Berlin HRB, ' HB' for Bremen) which might be implicit + if d['register_num']: + suffix_map = { + 'Berlin': {'HRB': ' B'}, + 'Bremen': {'HRA': ' HB', 'HRB': ' HB', 'GnR': ' HB', 'VR': ' HB', 'PR': ' HB'} + } + reg_type = d['register_num'].split()[0] + suffix = suffix_map.get(d['state'], {}).get(reg_type) + if suffix and not d['register_num'].endswith(suffix): + d['register_num'] += suffix d['documents'] = cells[5] # todo: get the document links d['history'] = [] hist_start = 8 - hist_cnt = (len(cells)-hist_start)/3 + for i in range(hist_start, len(cells), 3): + if i + 1 >= len(cells): + break + if "Branches" in cells[i] or "Niederlassungen" in cells[i]: + break d['history'].append((cells[i], cells[i+1])) # (name, location) - #print('d:',d) + return d def pr_company_info(c): - for tag in ('name', 'court', 'state', 'status'): + for tag in ('name', 'court', 'register_num', 'district', 'state', 'statusCurrent'): print('%s: %s' % (tag, c.get(tag, '-'))) print('history:') for name, loc in c.get('history'): @@ -125,20 +150,18 @@ def pr_company_info(c): def get_companies_in_searchresults(html): soup = BeautifulSoup(html, 'html.parser') grid = soup.find('table', role='grid') - #print('grid: %s', grid) results = [] for result in grid.find_all('tr'): a = result.get('data-ri') if a is not None: index = int(a) - #print('r[%d] %s' % (index, result)) + d = parse_result(result) results.append(d) return results def parse_args(): -# Parse arguments parser = argparse.ArgumentParser(description='A handelsregister CLI') parser.add_argument( "-d", @@ -166,6 +189,12 @@ def parse_args(): choices=["all", "min", "exact"], default="all" ) + parser.add_argument( + "-j", + "--json", + help="Return response as JSON", + action="store_true" + ) args = parser.parse_args() @@ -179,10 +208,14 @@ def parse_args(): return args if __name__ == "__main__": + import json args = parse_args() h = HandelsRegister(args) h.open_startpage() companies = h.search_company() if companies is not None: - for c in companies: - pr_company_info(c) + if args.json: + print(json.dumps(companies)) + else: + for c in companies: + pr_company_info(c) diff --git a/test_handelsregister.py b/test_handelsregister.py index ccb2847..fa1951a 100644 --- a/test_handelsregister.py +++ b/test_handelsregister.py @@ -3,22 +3,55 @@ import argparse def test_parse_search_result(): - # simplified html from a real search html = '%s' % """
Berlin District court Berlin (Charlottenburg) HRB 44343
GASAG AGBerlincurrently registered
History
1.) Gasag Berliner Gaswerke Aktiengesellschaft1.) Berlin
""" res = get_companies_in_searchresults(html) assert res == [{ 'court':'Berlin District court Berlin (Charlottenburg) HRB 44343', + 'register_num': 'HRB 44343 B', 'name':'GASAG AG', 'state':'Berlin', - 'status':'currently registered', + 'status':'currently registered', # Original value for backward compatibility + 'statusCurrent':'CURRENTLY_REGISTERED', # Transformed value 'documents': 'ADCDHDDKUTVÖSI', 'history':[('1.) Gasag Berliner Gaswerke Aktiengesellschaft', '1.) Berlin')] },] -def test_get_results(): - args = argparse.Namespace(debug=False, force=False, schlagwoerter='deutsche bahn', schlagwortOptionen='all') +@pytest.mark.parametrize("company, state_id", [ + ("Hafen Hamburg", "Hamburg"), + ("Bayerische Motoren Werke", "Bayern"), + ("Daimler Truck", "Baden-Württemberg"), + ("Volkswagen", "Niedersachsen"), + ("RWE", "Nordrhein-Westfalen"), + ("Fraport", "Hessen"), + ("Saarstahl", "Saarland"), + ("Mainz", "Rheinland-Pfalz"), + ("Nordex", "Mecklenburg-Vorpommern"), + ("Jenoptik", "Thüringen"), + ("Vattenfall", "Berlin"), + ("Bremen", "Bremen"), + ("Sachsen", "Sachsen"), + ("Magdeburg", "Sachsen-Anhalt"), + ("Kiel", "Schleswig-Holstein"), + ("Potsdam", "Brandenburg") +]) +def test_search_by_state_company(company, state_id): + + args = argparse.Namespace(debug=False, force=True, schlagwoerter=company, schlagwortOptionen='all', json=False) + h = HandelsRegister(args) + h.open_startpage() + companies = h.search_company() + assert companies is not None + assert len(companies) > 0 + +def test_haus_anker_b_suffix(): + args = argparse.Namespace(debug=False, force=True, schlagwoerter='Haus-Anker Verwaltungs GmbH', schlagwortOptionen='exact', json=False) h = HandelsRegister(args) h.open_startpage() companies = h.search_company() - assert len(companies) > 0 \ No newline at end of file + assert companies is not None + + target_company = next((c for c in companies if '138434' in c['register_num']), None) + + assert target_company is not None, "Haus-Anker Verwaltungs GmbH with expected number not found" + assert target_company['register_num'] == 'HRB 138434 B' \ No newline at end of file