diff --git a/CHANGELOG.md b/CHANGELOG.md index 23c31f1e..67f9e607 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ and the versioning aims to respect [Semantic Versioning](http://semver.org/spec/ ## [v0.XX.X] unreleased - 202X-XX-XX ### Added +- Add interactive download functionality for MaStR date selection + [#696](https://github.com/OpenEnergyPlatform/open-MaStR/pull/696) - Add trusted publishing for test releases [#713](https://github.com/OpenEnergyPlatform/open-MaStR/pull/713) ### Changed diff --git a/open_mastr/mastr.py b/open_mastr/mastr.py index 23af6f5b..c45ad4be 100644 --- a/open_mastr/mastr.py +++ b/open_mastr/mastr.py @@ -4,6 +4,7 @@ # import xml dependencies from open_mastr.xml_download.utils_download_bulk import ( download_xml_Mastr, + select_download_date, delete_xml_files_not_from_given_date, ) from open_mastr.xml_download.utils_write_to_database import ( @@ -106,6 +107,7 @@ def download( date=None, bulk_cleansing=True, keep_old_downloads: bool = False, + select_date_interactively: bool = False, **kwargs, ) -> None: """ @@ -157,6 +159,11 @@ def download( | None | set date="today" | Default to `None`. + select_date_interactively : bool, optional + If set to True, the user will be presented with a list of available download dates + from the MaStR website and can interactively select which date to download. + This allows downloading historical data instead of just the latest available data. + When True, the `date` parameter is ignored. Defaults to False. bulk_cleansing : bool, optional If set to True, data cleansing is applied after the download (which is recommended). In its original format, many entries in the MaStR are encoded with IDs. Columns like @@ -192,8 +199,26 @@ def download( date = transform_date_parameter(self, date, **kwargs) - # Find the name of the zipped xml folder - bulk_download_date = parse_date_string(date) + # Handle interactive date selection if requested + if select_date_interactively: + log.info( + "Interactive date selection enabled. Fetching available downloads..." + ) + selected_date, selected_url = select_download_date() + + if selected_date is None: + log.info("Download cancelled by user.") + return + + # Update the date and use the selected URL + date = selected_date + bulk_download_date = selected_date + custom_url = selected_url + else: + # Find the name of the zipped xml folder + bulk_download_date = parse_date_string(date) + custom_url = None + xml_folder_path = os.path.join(self.output_dir, "data", "xml_download") os.makedirs(xml_folder_path, exist_ok=True) zipped_xml_file_path = os.path.join( @@ -206,9 +231,8 @@ def download( delete_xml_files_not_from_given_date(zipped_xml_file_path, xml_folder_path) download_xml_Mastr( - zipped_xml_file_path, bulk_download_date, data, xml_folder_path + zipped_xml_file_path, bulk_download_date, data, xml_folder_path, custom_url ) - log.info( "\nWould you like to speed up the creation of your MaStR database?\n" "Try our new parallelized processing by setting os.environ['USE_RECOMMENDED_NUMBER_OF_PROCESSES'] = True " @@ -371,3 +395,28 @@ def translate(self) -> None: self.engine = create_engine(f"sqlite:///{new_path}") self.is_translated = True + + def browse_available_downloads(self): + """ + Browse available MaStR downloads from the website without starting the download. + + This method fetches and displays all available download dates from the MaStR website, + allowing users to see what historical data is available before deciding to download. + + Returns + ------- + list of dict + List of available downloads with date, version, and type information. + + Examples + -------- + >>> from open_mastr import Mastr + >>> db = Mastr() + >>> available_downloads = db.browse_available_downloads() + >>> # User can then choose a date and download with: + >>> # db.download(select_date_interactively=True) + """ + from open_mastr.xml_download.utils_download_bulk import list_available_downloads + + log.info("Browsing available MaStR downloads...") + return list_available_downloads() diff --git a/open_mastr/xml_download/utils_download_bulk.py b/open_mastr/xml_download/utils_download_bulk.py index 9ace1d83..9eedebab 100644 --- a/open_mastr/xml_download/utils_download_bulk.py +++ b/open_mastr/xml_download/utils_download_bulk.py @@ -5,6 +5,9 @@ from importlib.metadata import PackageNotFoundError, version from zipfile import ZipFile from pathlib import Path +import urllib.request +import re +from datetime import datetime import numpy as np import requests @@ -123,7 +126,11 @@ def gen_url( def download_xml_Mastr( - save_path: str, bulk_date_string: str, bulk_data_list: list, xml_folder_path: str + save_path: str, + bulk_date_string: str, + bulk_data_list: list, + xml_folder_path: str, + url: str = None, ) -> None: """Downloads the zipped MaStR. @@ -137,12 +144,32 @@ def download_xml_Mastr( List of tables/technologis to be downloaded. xml_folder_path: str Path where the downloaded MaStR zip file will be saved. + url: str, optional + Custom download URL. If None, generates URL based on bulk_date_string. """ log.info("Starting the Download from marktstammdatenregister.de.") - url_time = dt.strptime(bulk_date_string, "%Y%m%d").date().timetuple() - url = gen_url(url_time) + # Helper function to convert date string to time.struct_time + def _parse_date_string(date_str): + """Convert YYYYMMDD string to time.struct_time object.""" + try: + # Use datetime.strptime for robust date parsing + parsed_date = dt.strptime(date_str, "%Y%m%d") + # Convert to time.struct_time using timetuple() + return parsed_date.timetuple() + except (ValueError, IndexError) as e: + log.warning(f"Invalid date format '{date_str}': {e}. Using current date.") + return time.localtime() + + # Parse the date string to time.struct_time (needed for both cases) + url_time = _parse_date_string(bulk_date_string) + + # Determine the URL to use + if url is None: + # Generate URL from date string if no custom URL provided + url = gen_url(url_time) + # else: custom URL is already provided, use it as-is time_a = time.perf_counter() r = requests.get(url, stream=True, headers={"User-Agent": USER_AGENT}) @@ -353,3 +380,172 @@ def full_download_without_unzip_http( else: # remove warning bar.set_postfix_str(s="") + + +def get_available_download_links( + url="https://www.marktstammdatenregister.de/MaStR/Datendownload", +): + """ + Fetch all available download links from the MaStR website. + + This function retrieves all available Gesamtdatenexport files from the MaStR + download page, including both current and historical exports. + + Parameters + ---------- + url : str, optional + The URL of the MaStR download page. Defaults to the official download page. + + Returns + ------- + list of dict + A list of dictionaries containing information about available downloads. + Each dictionary contains: + - 'url': The download URL + - 'date': The date of the export (YYYYMMDD format) + - 'version': The MaStR version (e.g., '24.1', '24.2') + - 'type': 'current' for current exports, 'stichtag' for historical exports + + Examples + -------- + >>> links = get_available_download_links() + >>> for link in links[:3]: + ... print(f"Date: {link['date']}, Version: {link['version']}, Type: {link['type']}") + Date: 20250103, Version: 24.2, Type: current + Date: 20241231, Version: 24.2, Type: current + Date: 20241230, Version: 24.2, Type: current + """ + log.info("Fetching available download links from MaStR website...") + + headers = {"User-Agent": USER_AGENT} + req = urllib.request.Request(url, headers=headers) + + try: + with urllib.request.urlopen(req) as response: + html = response.read().decode("utf-8") + except Exception as e: + log.error(f"Failed to fetch download page: {e}") + return [] + + # Pattern for current exports + pattern_current = re.compile( + r"https://download\.marktstammdatenregister\.de/Gesamtdatenexport_([0-9]{8})_([0-9]{2}\.[0-9])\.zip" + ) + # Pattern for historical exports (Stichtag) + pattern_stichtag = re.compile( + r"https://download\.marktstammdatenregister\.de/Stichtag/Gesamtdatenexport_([0-9]{8})_([0-9]{2}\.[0-9])\.zip" + ) + + # Find all current export links + current_matches = pattern_current.findall(html) + current_links = [ + { + "url": f"https://download.marktstammdatenregister.de/Gesamtdatenexport_{date}_{version}.zip", + "date": date, + "version": version, + "type": "current", + } + for date, version in current_matches + ] + + # Find all historical export links + stichtag_matches = pattern_stichtag.findall(html) + stichtag_links = [ + { + "url": f"https://download.marktstammdatenregister.de/Stichtag/Gesamtdatenexport_{date}_{version}.zip", + "date": date, + "version": version, + "type": "stichtag", + } + for date, version in stichtag_matches + ] + + # Combine and sort by date (newest first) + all_links = current_links + stichtag_links + all_links.sort(key=lambda x: x["date"], reverse=True) + + log.info(f"Found {len(all_links)} available download links") + return all_links + + +def list_available_downloads(): + """ + Display available downloads in a user-friendly format. + + Returns + ------- + list of dict + List of available downloads with formatted dates and versions. + """ + links = get_available_download_links() + + if not links: + print("No download links found. Please check your internet connection.") + return [] + + print("\n" + "=" * 80) + print("AVAILABLE MAStR DOWNLOADS") + print("=" * 80) + print(f"{'#':<4} {'Date':<12} {'Version':<10} {'Type':<12} {'URL'}") + print("-" * 80) + + for i, link in enumerate(links, 1): + # Format date for better readability + date_formatted = f"{link['date'][:4]}-{link['date'][4:6]}-{link['date'][6:]}" + print( + f"{i:<4} {date_formatted:<12} {link['version']:<10} {link['type']:<12} {link['url']}" + ) + + print("=" * 80) + print(f"Total: {len(links)} downloads available") + print("=" * 80) + + return links + + +def select_download_date(): + """ + Interactive function to let the user select a download date. + + Prompts the user to choose from available downloads or enter a custom date. + + Returns + ------- + tuple + (date_string, url) where date_string is in YYYYMMDD format and url is the download URL + Returns (None, None) if user cancels or no valid selection is made + """ + links = list_available_downloads() + + if not links: + return None, None + + print("\nOptions:") + print("1. Select from the list above (enter the number)") + print("2. Cancel") + + while True: + choice = input("\nPlease enter your choice (1-2): ").strip() + + if choice == "1": + # Select from list + while True: + try: + index = int(input(f"Enter a number (1-{len(links)}): ").strip()) + if 1 <= index <= len(links): + selected = links[index - 1] + print( + f"\nSelected: {selected['date']} (Version {selected['version']}, Type: {selected['type']})" + ) + return selected["date"], selected["url"] + else: + print(f"Please enter a number between 1 and {len(links)}") + except ValueError: + print("Please enter a valid number") + + elif choice == "2": + print("Download selection cancelled.") + return None, None + + else: + print("Invalid choice. Please enter 1, or 2.") diff --git a/tests/test_interactive_download.py b/tests/test_interactive_download.py new file mode 100644 index 00000000..0fd6b2e0 --- /dev/null +++ b/tests/test_interactive_download.py @@ -0,0 +1,146 @@ +import pytest +from unittest.mock import patch, MagicMock +from open_mastr.xml_download.utils_download_bulk import ( + get_available_download_links, + list_available_downloads, + select_download_date, +) +from open_mastr.mastr import Mastr + +# Sample HTML content for mocking urlopen +SAMPLE_HTML = """ + + + + + + + +""" + +# Sample download links for mocking +SAMPLE_LINKS = [ + { + "url": "https://download.marktstammdatenregister.de/Gesamtdatenexport_20250103_24.2.zip", + "date": "20250103", + "version": "24.2", + "type": "current", + }, + { + "url": "https://download.marktstammdatenregister.de/Gesamtdatenexport_20241231_24.2.zip", + "date": "20241231", + "version": "24.2", + "type": "current", + }, + { + "url": "https://download.marktstammdatenregister.de/Stichtag/Gesamtdatenexport_20241130_24.1.zip", + "date": "20241130", + "version": "24.1", + "type": "stichtag", + }, +] + + +@patch("urllib.request.urlopen") +def test_get_available_download_links(mock_urlopen): + """Test fetching and parsing of download links.""" + mock_response = MagicMock() + mock_response.read.return_value = SAMPLE_HTML.encode("utf-8") + mock_response.__enter__.return_value = mock_response + mock_urlopen.return_value = mock_response + + links = get_available_download_links() + + assert len(links) == 3 + assert links[0]["date"] == "20250103" + assert links[0]["version"] == "24.2" + assert links[0]["type"] == "current" + assert links[2]["date"] == "20241130" + assert links[2]["version"] == "24.1" + assert links[2]["type"] == "stichtag" + + +@patch("open_mastr.xml_download.utils_download_bulk.get_available_download_links") +@patch("builtins.print") +def test_list_available_downloads(mock_print, mock_get_links): + """Test the formatted output of available downloads.""" + mock_get_links.return_value = SAMPLE_LINKS + + result = list_available_downloads() + + assert result == SAMPLE_LINKS + # Check that print was called with the expected header + mock_print.assert_any_call("=" * 80) + mock_print.assert_any_call("AVAILABLE MAStR DOWNLOADS") + mock_print.assert_any_call( + f"{'#':<4} {'Date':<12} {'Version':<10} {'Type':<12} {'URL'}" + ) + + +@patch("open_mastr.xml_download.utils_download_bulk.list_available_downloads") +def test_select_download_date_valid_selection(mock_list_downloads): + """Test interactive date selection with valid user input.""" + mock_list_downloads.return_value = SAMPLE_LINKS + + # Simulate user choosing option 1, then selecting the 2nd item + with patch("builtins.input", side_effect=["1", "2"]): + date, url = select_download_date() + assert date == "20241231" + assert url == SAMPLE_LINKS[1]["url"] + + +@patch("open_mastr.xml_download.utils_download_bulk.list_available_downloads") +def test_select_download_date_cancel(mock_list_downloads): + """Test interactive date selection when the user cancels.""" + mock_list_downloads.return_value = SAMPLE_LINKS + + # Simulate user choosing option 2 (Cancel) + with patch("builtins.input", side_effect=["2"]): + date, url = select_download_date() + assert date is None + assert url is None + + +@patch("open_mastr.mastr.write_mastr_xml_to_database") +@patch("open_mastr.mastr.select_download_date") +@patch("open_mastr.mastr.download_xml_Mastr") +def test_mastr_download_interactive(mock_download, mock_select_date, mock_write_db): + """Test the main download method with interactive selection.""" + mock_select_date.return_value = ("20241231", "http://example.com/file.zip") + db = Mastr() + db.download(select_date_interactively=True) + + # Assert that select_download_date was called + mock_select_date.assert_called_once() + + # Assert that download_xml_Mastr was called with the correct URL + mock_download.assert_called_once() + args, kwargs = mock_download.call_args + assert args[4] == "http://example.com/file.zip" + assert args[1] == "20241231" # date argument + + +@patch("open_mastr.mastr.select_download_date") +@patch("open_mastr.mastr.download_xml_Mastr") +def test_mastr_download_interactive_cancel(mock_download, mock_select_date): + """Test the main download method when interactive selection is cancelled.""" + mock_select_date.return_value = (None, None) + db = Mastr() + db.download(select_date_interactively=True) + + # Assert that select_download_date was called + mock_select_date.assert_called_once() + + # Assert that download_xml_Mastr was NOT called + mock_download.assert_not_called() + + +@patch("open_mastr.xml_download.utils_download_bulk.list_available_downloads") +def test_mastr_browse_available_downloads(mock_list_downloads): + """Test the browse_available_downloads method.""" + mock_list_downloads.return_value = SAMPLE_LINKS + db = Mastr() + result = db.browse_available_downloads() + + mock_list_downloads.assert_called_once() + assert result == SAMPLE_LINKS