diff --git a/CHANGELOG.md b/CHANGELOG.md index 09712482..23c31f1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,8 @@ and the versioning aims to respect [Semantic Versioning](http://semver.org/spec/ - Limit number of parallel CI jobs [#669](https://github.com/OpenEnergyPlatform/open-MaStR/pull/669) ### Removed +- Deprecate and remove Soap API Download and Mirror code + [#635](https://github.com/OpenEnergyPlatform/open-MaStR/pull/635) ## [v0.15.0] Turbo parsing with open-MaStR EasterEggspress - 2025-04-19 diff --git a/docs/advanced.md b/docs/advanced.md index e9471f1a..041bd66a 100644 --- a/docs/advanced.md +++ b/docs/advanced.md @@ -155,14 +155,6 @@ Moreover, the datatypes of different entries are set in the data cleansing proce If needed, the tables in the database can be obtained as csv files. Those files are created by first merging corresponding tables (e.g all tables that contain information about solar) and then dumping those tables to `.csv` files with the [`to_csv`][open_mastr.Mastr.to_csv] method. -=== "Advantages" - * No registration for an API key is needed - * Download of the whole dataset is possible - -=== "Disadvantages" - * No single tables or entries can be downloaded - * Download takes long time (you can use the partial download though, see [Getting Started](getting_started.md#bulk-download)) - **Note**: By default, existing zip files in `$HOME/.open-MaStR/data/xml_download` are deleted when a new file is downloaded. You can change this behavior by setting `keep_old_downloads`=True in [`Mastr.download()`][open_mastr.Mastr.download]. @@ -179,12 +171,8 @@ via its API a [registration](https://www.marktstammdatenregister.de/MaStRHilfe/f To download data from the MaStR API using the `open-MaStR`, the credentials (MaStR user and token) need to be provided in a certain way. Three options exist: 1. **Credentials file:** - Both, user and token, are stored in plain text in the credentials file. - For storing the credentials in the credentials file (plus optionally using keyring for the token) simply instantiate - [`MaStRDownload`][open_mastr.soap_api.download.MaStRDownload] once and you get asked for a user name and a token. The - information you insert will be used to create the credentials file. - - It is also possible to create the credentials file by hand, using this format: + Both, user and token, are stored in plain text in the credentials file. The file is located at + '~/.open-MaStR/config/credentials.cfg'. Fill in your user and token like this: ``` [MaStR] @@ -194,8 +182,6 @@ To download data from the MaStR API using the `open-MaStR`, the credentials (MaS The `token` should be written in one line, without line breaks. - The credentials file needs to be stored at: `$HOME/.open-MaStR/config/credentials.cfg` - 2. **Credentials file + keyring:** The user is stored in the credentials file, while the token is stored encrypted in the [keyring](https://pypi.org/project/keyring/). @@ -419,33 +405,19 @@ For API calls, models and optional parameters refer to the identifier (`EinheitMastrNummer`) and their power plant type (`Einheitentyp`). You can then sort them by power plant type and use the power plant type specific API query to retrieve information about it.
- Cumbersome?
- Luckily, `open-MaStR` has you covered and provides methods to just query for all units of a power - plant type. - ### MaStRDownload -The class `MaStRDownload` builds upon methods provided in the class `MaStRAPI`.
- -It provides methods to download power plant unit types and additional information -for each unit type, such as extended unit data, permit data, chp-specific data, location data -or eeg-specific data.
- -The class handles the querying logic and knows which additional data for each unit type is available -and which SOAP service has to be used to query it. +!!! warning "MaStRDownload has been removed" + In versions > `v0.16.0` the `MaStRDownload` class cannot be used anymore. ### MaStRMirror -The class `MaStRMirror` builds upon methods provided in the class `MaStRDownload`.
- -The aim of the class has been to mirror the Marktstammdatenregister database and keep it up-to-date. -Historically, `open-mastr` has been developed before the owner of the dataset, BNetzA, offered the `bulk` download. -The class can still be used for use-cases where only the most recent changes to a local database are of interest. -For downloading the entire MaStR database we recommend the bulk download functionalities by specifying `donwload(method="bulk")`. +!!! warning "MaStRMirror has been removed" + In versions > `v0.16.0` the `MaStRMirror` class cannot be used anymore. diff --git a/docs/conf.py b/docs/conf.py deleted file mode 100644 index 93b52ff0..00000000 --- a/docs/conf.py +++ /dev/null @@ -1,64 +0,0 @@ -# Configuration file for the Sphinx documentation builder. -# -# This file only contains a selection of the most common options. For a full -# list see the documentation: -# https://www.sphinx-doc.org/en/master/usage/configuration.html - -# -- Path setup -------------------------------------------------------------- - -# If extensions (or modules to document with autodoc) are in another directory, -# add these directories to sys.path here. If the directory is relative to the -# documentation root, use os.path.abspath to make it absolute, like shown here. -# -import os -import sys - -sys.path.insert(0, os.path.abspath("../open_mastr")) - - -# -- Project information ----------------------------------------------------- - -project = "open-MaStR" -copyright = "2024 Reiner Lemoine Institut gGmbH and fortiss GmbH and OFFIS e.V." -author = "" - - -# -- General configuration --------------------------------------------------- - -# Add any Sphinx extension module names here, as strings. They can be -# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom -# ones. -extensions = [ - "sphinx.ext.autosectionlabel", - "sphinx.ext.autodoc", - "sphinx.ext.napoleon", - "sphinx_tabs.tabs", - "m2r2", -] - -source_suffix = [".rst", ".md"] - -# Add any paths that contain templates here, relative to this directory. -templates_path = ["_templates"] - -# List of patterns, relative to source directory, that match files and -# directories to ignore when looking for source files. -# This pattern also affects html_static_path and html_extra_path. -exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] - - -# -- Options for HTML output ------------------------------------------------- - -# The theme to use for HTML and HTML Help pages. See the documentation for -# a list of builtin themes. -# -html_theme = "sphinx_rtd_theme" - -# Add any paths that contain custom static files (such as style sheets) here, -# relative to this directory. They are copied after the builtin static files, -# so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ["_static"] -html_css_files = ["custom.css"] - -# Autodoc config -autoclass_content = "both" diff --git a/docs/data-release-notes.md b/docs/data-release-notes.md deleted file mode 100644 index e59b599e..00000000 --- a/docs/data-release-notes.md +++ /dev/null @@ -1,31 +0,0 @@ -## Release notes and version history - -### API download versions - -#### dataversion-2022-05-16-BA -- Complete restructure of the software -- Include XML download -- Switch to CalVer for data versioning #221 -- Update to OEMetadata v1.5.1 - -#### dataversion-2.2.0 (2019-12-05) -- Code version [v0.9.0](https://github.com/OpenEnergyPlatform/open-MaStR/releases/tag/v0.9.0) -- Includes: wind; hydro; biomass -- new power-unit download: true -#### dataversion-2.1.2 -- Code version [v0.9.0](https://github.com/OpenEnergyPlatform/open-MaStR/releases/tag/v0.9.0) -- Includes: wind; hydro; biomass -#### dataversion-1.5 -- Includes: wind-permits; storages; solar; basic postprocessing -#### dataversion-1.4 - - Includes: permit data for wind and updated metadata -#### dataversion-1.3 -#### dataversion-1.2 -#### dataversion-1.1 -#### dataversion-1.0 -Test version - - - - - diff --git a/docs/dataset.md b/docs/dataset.md index 2063cdf0..772395bf 100644 --- a/docs/dataset.md +++ b/docs/dataset.md @@ -6,7 +6,7 @@ Most unit information is openly accessible and is published under an open data l For units with a net capacity of less than 30 kW, some location information is restricted from publication. This includes street names, house numbers, parcel designations, and exact coordinates of units. The most detailed location information accessible for all units is the postal code or the municipality. -In our paper titled [Analyzing Data Reliability in Germany's Energy System: A Validation of Unit Locations of the Marktstammdatenregister](https://arxiv.org/abs/2304.10581), we provide further insights into the content and quality of the dataset. +In our paper titled [Monitoring Germany's Core Energy System Dataset: A Data Quality Analysis of the Marktstammdatenregister](https://doi.org/10.1145/3717413.3717421), we provide further insights into the content and quality of the dataset. ## Content @@ -17,12 +17,12 @@ The German Federal Network Agency regularly updates the dataset and adds new tab ## Difference between `bulk` and `API` dataset -As you may have noticed, we distinguish between `bulk` and `API` datasets. The `bulk` dataset refers to the data obtained from the zipped XML files downloaded from [here](https://www.marktstammdatenregister.de/MaStR/Datendownload) using the [`Mastr.download`][open_mastr.Mastr.download] function. The `API` data is obtained by requesting information via the SOAP-API and the [`soap_api.download.MaStRDownload`][open_mastr.soap_api.download.MaStRDownload] module. +As you may have noticed, we distinguish between `bulk` and `API` datasets. The `bulk` dataset refers to the data obtained from the zipped XML files downloaded from [here](https://www.marktstammdatenregister.de/MaStR/Datendownload) using the [`Mastr.download`][open_mastr.Mastr.download] function. The `API` data is obtained by requesting information via the SOAP-API and the [`soap_api.download.MaStRAPI`][open_mastr.soap_api.download.MaStRAPI] module. ??? question "Why is the table structure in the open-mastr database as it is?" The structure of the database is historically determined by the data retrieved via API. (open-mastr existed before the XML-dump was provided). -
See [MaStR data model](#Mastr-data-model) +
See [MaStR data model](#mastr-data-model) ## Tables in the database @@ -86,7 +86,7 @@ After downloading the MaStR, you will find a database with a large number of tab ### MaStR data model -A useful overview of the MaStR data model can be found [here (in german)](https://www.marktstammdatenregister.de/MaStRHilfe/files/webdienst/Objektmodell%20-%20Fachliche%20Ansicht%20V1.2.0.pdf). A translated version using the names from the tables you can find in your local database is presented here: +A useful overview of the MaStR data model can be found at the MaStR [help page](https://www.marktstammdatenregister.de/MaStRHilfe/subpages/faq.html). A translated version using the names from the tables you can find in your local database is presented here: === "translated image (english)" ![Data model of the MaStR](images/DetailAnlagen_english.PNG) diff --git a/docs/getting_started.md b/docs/getting_started.md index 891efbbe..0c17fcb4 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -50,17 +50,9 @@ More detailed information can be found in the section [bulk download](advanced.m API download ----------------------------------- -When using `download(method="API")`, the data is retrieved from the MaStR API. For using the MaStR API, credentials -are needed (see [SOAP API download](advanced.md#soap-api-download)). +!!! warning "API download has been removed" + In versions > `v0.16.0` the `download(method="API")` cannot be used anymore. The default behaviour changes `method="API"` to `method="bulk"`. You can still use the basic SOAP API functionalities from the [`MaStRAPI`][open_mastr.soap_api.download.MaStRAPI] class. -```python -from open_mastr import Mastr - -db = Mastr() -db.download(method='API') -``` - -The default settings will save retrieved data into the sqlite database. The function can be used to mirror the open-MaStR database using the SOAP API. Note that the data provided by the bulk download and the SOAP API is similar, but not exactly the same. ## Accessing the database diff --git a/docs/reference/advanced.md b/docs/reference/advanced.md index c06958a0..30d8040d 100644 --- a/docs/reference/advanced.md +++ b/docs/reference/advanced.md @@ -1,5 +1,3 @@ # Advanced functions to use the MaStR SOAP-API ::: open_mastr.soap_api.download.MaStRAPI -::: open_mastr.soap_api.download.MaStRDownload -::: open_mastr.soap_api.mirror.MaStRMirror diff --git a/main.py b/main.py index 88730b4e..fd4c5757 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,6 @@ open-MaStR - Main file Bulk: Download XML-Dump and fill in local SQLite database. -API: Download latest entries using the SOAP-API. SPDX-License-Identifier: AGPL-3.0-or-later """ @@ -45,34 +44,6 @@ "permit", ] -# API download -# for parameter explanation see: https://open-mastr.readthedocs.io/en/latest/advanced/#soap-api-download - -api_date = "latest" -api_chunksize = 10 -api_limit = 10 -api_processes = None - -data_api = [ - "biomass", - "combustion", - "gsgk", - "hydro", - "nuclear", - "solar", - "storage", - "wind", -] - -api_data_types = ["unit_data", "eeg_data", "kwk_data", "permit_data"] - -api_location_types = [ - "location_elec_generation", - "location_elec_consumption", - "location_gas_generation", - "location_gas_consumption", -] - if __name__ == "__main__": # instantiate Mastr class db = Mastr() @@ -81,18 +52,6 @@ # bulk download db.download(method="bulk", data=data_bulk, date=bulk_date, bulk_cleansing=True) - # API download - db.download( - method="API", - data=data_api, - date=api_date, - api_processes=api_processes, - api_limit=api_limit, - api_chunksize=api_chunksize, - api_data_types=api_data_types, - api_location_types=api_location_types, - ) - ## export to csv """ Technology-related tables are exported as joined, whereas additional tables diff --git a/open_mastr/mastr.py b/open_mastr/mastr.py index a2bbf303..23af6f5b 100644 --- a/open_mastr/mastr.py +++ b/open_mastr/mastr.py @@ -10,12 +10,7 @@ write_mastr_xml_to_database, ) -# import soap_API dependencies -from open_mastr.soap_api.mirror import MaStRMirror - from open_mastr.utils.helpers import ( - print_api_settings, - validate_api_credentials, validate_parameter_format_for_download_method, validate_parameter_format_for_mastr_init, validate_parameter_data, @@ -51,8 +46,8 @@ class Mastr: """ `Mastr` is used to download the MaStR database and keep it up-to-date. - A SQL database is used to mirror the MaStR database. It can be filled with - data either from the MaStR-bulk download or from the MaStR-API. + An SQL database is used to mirror the MaStR database. It is filled by + downloading and parsing the MaStR via bulk download. !!! example @@ -111,60 +106,55 @@ def download( date=None, bulk_cleansing=True, keep_old_downloads: bool = False, - api_processes=None, - api_limit=50, - api_chunksize=1000, - api_data_types=None, - api_location_types=None, **kwargs, ) -> None: """ - Download the MaStR either via the bulk download or via the MaStR API and write it to a - SQLite database. + Downloads the MaStR registry and writes it to a local database. Parameters ---------- - method : 'API' or 'bulk', optional - Either "API" or "bulk". Determines whether the data is downloaded via the - zipped bulk download or via the MaStR API. The latter requires an account - from marktstammdatenregister.de, - (see :ref:`Configuration `). Default to 'bulk'. + method : 'bulk', optional + Only "bulk" is a valid value. The download via the MaStR SOAP API has been removed. + Defaults to 'bulk'. data : str or list or None, optional - Determines which data is partially downloaded from the bulk download and written to the database. If None, all data is downloaded and written to the database. - If it is a list, possible entries are listed below with respect to the download method. Missing categories are - being developed. If only one data is of interest, this can be given as a string. Default to None, where all data is included. - - | Data | Bulk | API | - |-----------------------|------|------| - | "wind" | Yes | Yes | - | "solar" | Yes | Yes | - | "biomass" | Yes | Yes | - | "hydro" | Yes | Yes | - | "gsgk" | Yes | Yes | - | "combustion" | Yes | Yes | - | "nuclear" | Yes | Yes | - | "gas" | Yes | Yes | - | "storage" | Yes | Yes | - | "storage_units" | Yes | Yes | - | "electricity_consumer"| Yes | No | - | "location" | Yes | Yes | - | "market" | Yes | No | - | "grid" | Yes | No | - | "balancing_area" | Yes | No | - | "permit" | Yes | Yes | - | "deleted_units" | Yes | No | - | "deleted_market_actors"| Yes | No | - | "retrofit_units" | Yes | No | + Specifies which tables to download. + + **Possible values:** + + - "wind" + - "solar" + - "biomass" + - "hydro" + - "gsgk" + - "combustion" + - "nuclear" + - "gas" + - "storage" + - "storage_units" + - "electricity_consumer" + - "location" + - "market" + - "grid" + - "balancing_area" + - "permit" + - "deleted_units" + - "deleted_market_actors" + - "retrofit_units" + + **Usage:** + + - If `None`, all data is downloaded. + - If a string, only the specified table is downloaded (e.g., `"wind"`). + - If a list, multiple tables are downloaded (e.g., `["wind", "solar"]`). + date : None or `datetime.datetime` or str, optional - | date | Bulk | API | - |-----------------------|------|------| - | "today" | latest files are downloaded from marktstammdatenregister.de | - | - | "20230101" | If file from this date exists locally, it is used. Otherwise it throws an error (You can only receive todays data from the server) | - | - | "existing" | Deprecated since 0.16, see [#616](https://github.com/OpenEnergyPlatform/open-MaStR/issues/616#issuecomment-3089377062) | - | - | "latest" | - | Retrieve data that is newer than the newest data already in the table | - | datetime.datetime(2020, 11, 27) | - | Retrieve data that is newer than this time stamp | - | None | set date="today" | set date="latest" | + | date | description | + |-----------------------|------| + | "today" | latest files are downloaded from marktstammdatenregister.de | + | "20230101" | If file from this date exists locally, it is used. Otherwise it throws an error (You can only receive todays data from the server) | + | "existing" | Deprecated since 0.16, see [#616](https://github.com/OpenEnergyPlatform/open-MaStR/issues/616#issuecomment-3089377062) | + | None | set date="today" | Default to `None`. bulk_cleansing : bool, optional @@ -174,31 +164,6 @@ def download( only contain IDs. Cleansing replaces these IDs with their corresponding original entries. keep_old_downloads: bool If set to True, prior downloaded MaStR zip files will be kept. - api_processes : int or None or "max", optional - Number of parallel processes used to download additional data. - Defaults to `None`. If set to "max", the maximum number of possible processes - is used. - - !!! warning - - The implementation of parallel processes is currently under construction. - Please let the argument `api_processes` at the default value `None`. - api_limit : int or None, optional - Limit the number of units that data is downloaded for. Defaults to `None` which refers - to query data for existing data requests, for example created by - [`create_additional_data_requests`][open_mastr.soap_api.mirror.MaStRMirror.create_additional_data_requests]. Note: There is a limited number of - requests you are allowed to have per day, so setting api_limit to a value is - recommended. - api_chunksize : int or None, optional - Data is downloaded and inserted into the database in chunks of `chunksize`. - Defaults to 1000. - api_data_types : list or None, optional - Select the type of additional data that should be retrieved. Choose from - "unit_data", "eeg_data", "kwk_data", "permit_data". Defaults to all. - api_location_types : list or None, optional - Select the type of location that should be retrieved. Choose from - "location_elec_generation", "location_elec_consumption", "location_gas_generation", - "location_gas_consumption". Defaults to all. """ if self.is_translated: @@ -207,112 +172,65 @@ def download( "A translated database cannot be further processed." ) + if method == "API": + log.warning( + "Downloading the whole registry via the MaStR SOAP-API has been removed. " + "You can still use the open_mastr.soap_api.download.MaStRAPI class " + "to construct single calls." + ) + log.warning("Attention: method='API' changed to method='bulk'.") + method = "bulk" + validate_parameter_format_for_download_method( method=method, data=data, date=date, bulk_cleansing=bulk_cleansing, - api_processes=api_processes, - api_limit=api_limit, - api_chunksize=api_chunksize, - api_data_types=api_data_types, - api_location_types=api_location_types, **kwargs, ) - ( - data, - api_data_types, - api_location_types, - harm_log, - ) = transform_data_parameter( - method, data, api_data_types, api_location_types, **kwargs - ) + data = transform_data_parameter(data, **kwargs) - date = transform_date_parameter(self, method, date, **kwargs) + date = transform_date_parameter(self, date, **kwargs) - if method == "bulk": - # Find the name of the zipped xml folder - bulk_download_date = parse_date_string(date) - xml_folder_path = os.path.join(self.output_dir, "data", "xml_download") - os.makedirs(xml_folder_path, exist_ok=True) - zipped_xml_file_path = os.path.join( - xml_folder_path, - f"Gesamtdatenexport_{bulk_download_date}.zip", - ) - - delete_zip_file_if_corrupted(zipped_xml_file_path) - if not keep_old_downloads: - delete_xml_files_not_from_given_date( - zipped_xml_file_path, - xml_folder_path, - ) + # Find the name of the zipped xml folder + bulk_download_date = parse_date_string(date) + xml_folder_path = os.path.join(self.output_dir, "data", "xml_download") + os.makedirs(xml_folder_path, exist_ok=True) + zipped_xml_file_path = os.path.join( + xml_folder_path, + f"Gesamtdatenexport_{bulk_download_date}.zip", + ) - download_xml_Mastr( - zipped_xml_file_path, bulk_download_date, data, xml_folder_path - ) + delete_zip_file_if_corrupted(zipped_xml_file_path) + if not keep_old_downloads: + delete_xml_files_not_from_given_date(zipped_xml_file_path, xml_folder_path) - log.info( - "\nWould you like to speed up the creation of your MaStR database?\n" - "Try our new parallelized processing by setting os.environ['USE_RECOMMENDED_NUMBER_OF_PROCESSES'] = True " - "or configure your own number of processes via os.environ['NUMBER_OF_PROCESSES'] = your_number\n" - ) + download_xml_Mastr( + zipped_xml_file_path, bulk_download_date, data, xml_folder_path + ) - write_mastr_xml_to_database( - engine=self.engine, - zipped_xml_file_path=zipped_xml_file_path, - data=data, - bulk_cleansing=bulk_cleansing, - bulk_download_date=bulk_download_date, - ) + log.info( + "\nWould you like to speed up the creation of your MaStR database?\n" + "Try our new parallelized processing by setting os.environ['USE_RECOMMENDED_NUMBER_OF_PROCESSES'] = True " + "or configure your own number of processes via os.environ['NUMBER_OF_PROCESSES'] = your_number\n" + ) - if method == "API": - validate_api_credentials() - - # Set api_processes to None in order to avoid the malfunctioning usage - if api_processes: - api_processes = None - log.warning( - "The implementation of parallel processes " - "is currently under construction. Please let " - "the argument api_processes at the default value None." - ) + delete_zip_file_if_corrupted(zipped_xml_file_path) + delete_xml_files_not_from_given_date(zipped_xml_file_path, xml_folder_path) - print_api_settings( - harmonisation_log=harm_log, - data=data, - date=date, - api_data_types=api_data_types, - api_chunksize=api_chunksize, - api_limit=api_limit, - api_processes=api_processes, - api_location_types=api_location_types, - ) + print( + "\nWould you like to speed up the creation of your MaStR database?\n" + "Try our new parallelized processing by setting os.environ['USE_RECOMMENDED_NUMBER_OF_PROCESSES'] = True " + "or configure your own number of processes via os.environ['NUMBER_OF_PROCESSES'] = your_number\n" + ) - mastr_mirror = MaStRMirror( - engine=self.engine, - parallel_processes=api_processes, - restore_dump=None, - ) - # Download basic unit data - mastr_mirror.backfill_basic(data, limit=api_limit, date=date) - - # Download additional unit data - for tech in data: - # mastr_mirror.create_additional_data_requests(data) - for data_type in api_data_types: - mastr_mirror.retrieve_additional_data( - tech, data_type, chunksize=api_chunksize, limit=api_limit - ) - - # Download basic location data - mastr_mirror.backfill_locations_basic(limit=api_limit, date="latest") - - # Download extended location data - if api_location_types: - for location_type in api_location_types: - mastr_mirror.retrieve_additional_location_data( - location_type, limit=api_limit - ) + write_mastr_xml_to_database( + engine=self.engine, + zipped_xml_file_path=zipped_xml_file_path, + data=data, + bulk_cleansing=bulk_cleansing, + bulk_download_date=bulk_download_date, + ) def to_csv( self, tables: list = None, chunksize: int = 500000, limit: int = None @@ -351,14 +269,7 @@ def to_csv( # Validate and parse tables parameter validate_parameter_data(method="csv_export", data=tables) - ( - data, - api_data_types, - api_location_types, - harm_log, - ) = transform_data_parameter( - method="bulk", data=tables, api_data_types=None, api_location_types=None - ) + data = transform_data_parameter(tables, **kwargs) # Determine tables to export technologies_to_export = [] diff --git a/open_mastr/soap_api/__init__.py b/open_mastr/soap_api/__init__.py index 139597f9..8b137891 100644 --- a/open_mastr/soap_api/__init__.py +++ b/open_mastr/soap_api/__init__.py @@ -1,2 +1 @@ - diff --git a/open_mastr/soap_api/download.py b/open_mastr/soap_api/download.py index 17765058..030ab7b2 100644 --- a/open_mastr/soap_api/download.py +++ b/open_mastr/soap_api/download.py @@ -1,24 +1,14 @@ -import json import logging -import multiprocessing -import os import time from functools import wraps -from itertools import product - -import pandas as pd import requests from open_mastr.utils import credentials as cred from open_mastr.utils.config import ( - create_data_dir, - get_data_version_dir, - get_filenames, setup_logger, ) -from tqdm import tqdm from zeep import Client, Settings from zeep.cache import SqliteCache -from zeep.exceptions import Fault, XMLParseError +from zeep.exceptions import Fault from zeep.helpers import serialize_object from zeep.transports import Transport @@ -254,1180 +244,3 @@ def filter(self, record): zplogger.filters = [ f for f in zplogger.filters if not isinstance(f, FilterExceptions) ] + [f for f in error_filters if f.name in which_errors] - - -def replace_second_level_keys_with_first_level_data(dic: dict) -> dict: - """The returned dict from the API call often contains nested dicts. The - columns where this happens are defined in flatten_rule_replace. The nested dicts - are replaced by its actual value. - - Example: - "WasserrechtAblaufdatum": {"Wert": None, "NichtVorhanden": False} - -> "WasserrechtAblaufdatum": None - - - Parameters - ---------- - dic : dict - Dictionary containing information on single unit - - Returns - ------- - dict - Dictionary where nested entries are replaced by data of interest - """ - flatten_rule_replace = { - "Hausnummer": "Wert", - "Kraftwerksnummer": "Wert", - "Weic": "Wert", - "WeitereBrennstoffe": "Wert", - "WeitererHauptbrennstoff": "Wert", - "AnlagenkennzifferAnlagenregister": "Wert", - "VerhaeltnisErtragsschaetzungReferenzertrag": "Wert", - "VerhaeltnisReferenzertragErtrag10Jahre": "Wert", - "VerhaeltnisReferenzertragErtrag15Jahre": "Wert", - "VerhaeltnisReferenzertragErtrag5Jahre": "Wert", - "RegistrierungsnummerPvMeldeportal": "Wert", - "BiogasGaserzeugungskapazitaet": "Wert", - "BiomethanErstmaligerEinsatz": "Wert", - "Frist": "Wert", - "WasserrechtAblaufdatum": "Wert", - } - for k, v in flatten_rule_replace.items(): - if k in dic: - dic[k] = dic[k][v] - - return dic - - -def replace_linked_units_with_unit_identifier(dic: dict) -> dict: - """If data point in 'dic' has one or more VerknuepfteEinheit or - VerknuepfteEinheiten in its respective dict, the related units (VerknuepfteEinheiten) are inserted as comma-separated strings. - - Parameters - ---------- - dic : dict - Dictionary containing information on single unit - - Returns - ------- - dict - Dictionary where linked units are replaced with linked unit identifier (MaStRNummer) - """ - - flatten_rule_replace_list = { - "VerknuepfteEinheit": "MaStRNummer", - "VerknuepfteEinheiten": "MaStRNummer", - "Netzanschlusspunkte": "NetzanschlusspunktMastrNummer", - } - for k, v in flatten_rule_replace_list.items(): - if k in dic: - if len(dic[k]) != 0: - mastr_nr_list = [unit[v] for unit in dic[k]] - dic[k] = ", ".join(mastr_nr_list) - else: # in case list is emtpy - dic[k] = "" - return dic - - -def replace_entries_of_type_list(dic: dict) -> dict: - """Entries that are lists are replaced: - Empty lists are replaced with None. - Lists of strings are replaced with comma seperated strings. - - Parameters - ---------- - dic : dict - Dictionary containing information on single unit - - Returns - ------- - dict - Dictionary containing information on single unit without list entries. - """ - flatten_rule_none_if_empty_list = [ - "ArtDerFlaeche", - "WeitereBrennstoffe", - "VerknuepfteErzeugungseinheiten", - ] - for k in flatten_rule_none_if_empty_list: - if k in dic: - dic[k] = None if dic[k] == [] else ",".join(dic[k]) - return dic - - -def flatten_dict(data: list, serialize_with_json: bool = False) -> list: - """ - Flattens MaStR data dictionary to depth of one - - Parameters - ---------- - data : list of dict - Data returned from MaStR-API query - - Returns - ------- - list of dict - Flattened data dictionary - """ - flatten_rule_serialize = ["Ertuechtigung"] - - flatten_rule_move_up_and_merge = ["Hersteller"] - - for dic in data: - dic = replace_second_level_keys_with_first_level_data(dic) - dic = replace_linked_units_with_unit_identifier(dic) - - # Serilializes dictionary entries with unknown number of sub-entries into JSON string - # This affects "Ertuechtigung" in extended unit data of hydro - if serialize_with_json: - for k in flatten_rule_serialize: - if k in dic.keys(): - dic[k] = json.dumps(dic[k], indent=4, sort_keys=True, default=str) - - # Join 'Id' with original key to new column - # and overwrite original data with 'Wert' - for k in flatten_rule_move_up_and_merge: - if k in dic.keys(): - dic.update({k + "Id": dic[k]["Id"]}) - dic.update({k: dic[k]["Wert"]}) - - dic = replace_entries_of_type_list(dic) - return data - - -def _missed_units_to_file(data, data_type, missed_units): - """ - Write IDs of missed units to file - - Parameters - ---------- - data : str - Data, see :meth:`MaStRDownload.download_power_plants` - data_type : str - Which type of additional data. Options: 'extended', 'eeg', 'kwk', 'permit' - missed_units : list - Unit IDs of missed data - """ - - data_path = get_data_version_dir() - filenames = get_filenames() - missed_units_file = os.path.join( - data_path, filenames["raw"][data][f"{data_type}_fail"] - ) - - with open(missed_units_file, "w") as f: - for i, error in missed_units: - f.write(f"{i},{error}\n") - - -class MaStRDownload: - """ - !!! warning - - **This class is deprecated** and will not be maintained in the future. - Instead use [`Mastr.download`][open_mastr.Mastr.download] with parameter - `method` = "bulk" to get bulk downloads of the dataset. - - Use the higher level interface for bulk download - - `MaStRDownload` builds on top of [`MaStRAPI`][open_mastr.soap_api.download.MaStRAPI] and provides - an interface for easier downloading. - Use methods documented below to retrieve specific data. On the example of - data for nuclear power plants, this looks like - - ```python - - from open_mastr.soap_api.download import MaStRDownload - - mastr_dl = MaStRDownload() - - for tech in ["nuclear", "hydro", "wind", "solar", "biomass", "combustion", "gsgk"]: - power_plants = mastr_dl.download_power_plants(tech, limit=10) - print(power_plants.head()) - ``` - - !!! warning - - Be careful with increasing `limit`. Typically, your account allows only for 10,000 API - request per day. - - """ - - def __init__(self, parallel_processes=None): - """ - - Parameters - ---------- - parallel_processes : int or bool, optional - Specify number of parallel unit data download, respectively - the number of processes you want to use for downloading. - For single-process download (avoiding the use of python - multiprocessing package) choose False. - Defaults to number of cores (including hyperthreading). - """ - log.warning( - """ - The `MaStRDownload` class is deprecated and will not be maintained in the future. - To get a full table of the Marktstammdatenregister, use the open_mastr.Mastr.download - method. - - If this change causes problems for you, please comment in this issue on github: - https://github.com/OpenEnergyPlatform/open-MaStR/issues/487 - - """ - ) - - # Number of parallel processes - if parallel_processes == "max": - self.parallel_processes = multiprocessing.cpu_count() - else: - self.parallel_processes = parallel_processes - - # Specify which additional data for each unit type is available - # and which SOAP service has to be used to query it - self._unit_data_specs = { - "biomass": { - "unit_data": "GetEinheitBiomasse", - "energietraeger": ["Biomasse"], - "kwk_data": "GetAnlageKwk", - "eeg_data": "GetAnlageEegBiomasse", - "permit_data": "GetEinheitGenehmigung", - }, - "combustion": { - "unit_data": "GetEinheitVerbrennung", - "energietraeger": [ - "Steinkohle", - "Braunkohle", - "Erdgas", - "AndereGase", - "Mineraloelprodukte", - "NichtBiogenerAbfall", - "Waerme", - ], - "kwk_data": "GetAnlageKwk", - "permit_data": "GetEinheitGenehmigung", - }, - "gsgk": { - "unit_data": "GetEinheitGeothermieGrubengasDruckentspannung", - "energietraeger": [ - "Geothermie", - "Solarthermie", - "Grubengas", - "Klaerschlamm", - ], - "kwk_data": "GetAnlageKwk", - "eeg_data": "GetAnlageEegGeothermieGrubengasDruckentspannung", - "permit_data": "GetEinheitGenehmigung", - }, - "nuclear": { - "unit_data": "GetEinheitKernkraft", - "energietraeger": ["Kernenergie"], - "permit_data": "GetEinheitGenehmigung", - }, - "solar": { - "unit_data": "GetEinheitSolar", - "energietraeger": ["SolareStrahlungsenergie"], - "eeg_data": "GetAnlageEegSolar", - "permit_data": "GetEinheitGenehmigung", - }, - "wind": { - "unit_data": "GetEinheitWind", - "energietraeger": ["Wind"], - "eeg_data": "GetAnlageEegWind", - "permit_data": "GetEinheitGenehmigung", - }, - "hydro": { - "unit_data": "GetEinheitWasser", - "energietraeger": ["Wasser"], - "eeg_data": "GetAnlageEegWasser", - "permit_data": "GetEinheitGenehmigung", - }, - "storage": { - "unit_data": "GetEinheitStromSpeicher", - "energietraeger": ["Speicher"], - "eeg_data": "GetAnlageEegSpeicher", - # todo: additional data request not created for permit, create manually - "permit_data": "GetEinheitGenehmigung", - }, - "gas_storage": { - "unit_data": "GetEinheitGasSpeicher", - "energietraeger": ["Speicher"], - }, - # TODO: unsure if energietraeger Ergdas makes sense - "gas_consumer": { - "unit_data": "GetEinheitGasVerbraucher", - "energietraeger": ["Erdgas"], - }, - "electricity_consumer": { - "unit_data": "GetEinheitStromVerbraucher", - "energietraeger": ["Strom"], - }, - "gas_producer": { - "unit_data": "GetEinheitGasErzeuger", - "energietraeger": [None], - }, - "location_elec_generation": "GetLokationStromErzeuger", - "location_elec_consumption": "GetLokationStromVerbraucher", - "location_gas_generation": "GetLokationGasErzeuger", - "location_gas_consumption": "GetLokationGasVerbraucher", - } - - # Map additional data to primary key via data_fcn - self._additional_data_primary_key = { - "extended_unit_data": "EinheitMastrNummer", - "kwk_unit_data": "KwkMastrNummer", - "eeg_unit_data": "EegMastrNummer", - "permit_unit_data": "GenMastrNummer", - "location_data": "MastrNummer", - } - - # Check if MaStR credentials are available and otherwise ask - # for user input - self._mastr_api = MaStRAPI() - self._mastr_api._user = cred.check_and_set_mastr_user() - self._mastr_api._key = cred.check_and_set_mastr_token(self._mastr_api._user) - - def download_power_plants(self, data, limit=None): - """ - Download power plant unit data for one data type. - - Based on list with basic information about each unit, subsequently additional - data is retrieved: - - * Extended unit data - * EEG data is collected during support of renewable energy installations - by the Erneuerbare-Energie-Gesetz. - * KWK data is collected to the support program Kraft-Waerme-Kopplung - * Permit data is available for some installations (German: Genehmigungsdaten) - - Data is stored in CSV file format in `~/open-MaStR/data//` by - default. - - Parameters - ---------- - data : str - Retrieve unit data for one power system unit. Power plants are - grouped by following technologies: - - * 'nuclear' - * 'hydro' - * 'solar' - * 'wind' - * 'biomass' - * 'combustion' - * 'gsgk' - * 'storage' - limit : int - Maximum number of units to be downloaded. - - Returns - ------- - pd.DataFrame - Joined data tables. - """ - # Create data version directory - create_data_dir() - - # Check requests contingent - self.daily_contingent() - - # Retrieve basic power plant unit data - # The return value is casted into a list, because a generator gets returned - # This was introduced later, after creation of this method - units = [ - unit - for sublist in self.basic_unit_data(data=data, limit=limit) - for unit in sublist - ] - - # Prepare list of unit ID for different additional data (extended, eeg, kwk, permit) - mastr_ids = self._create_ID_list(units, "unit_data", "EinheitMastrNummer", data) - eeg_ids = self._create_ID_list(units, "eeg_data", "EegMastrNummer", data) - kwk_ids = self._create_ID_list(units, "kwk_data", "KwkMastrNummer", data) - permit_ids = self._create_ID_list(units, "permit_data", "GenMastrNummer", data) - - # Download additional data for unit - extended_data, extended_missed = self.additional_data( - data, mastr_ids, "extended_unit_data" - ) - if eeg_ids: - eeg_data, eeg_missed = self.additional_data(data, eeg_ids, "eeg_unit_data") - else: - eeg_data = eeg_missed = [] - if kwk_ids: - kwk_data, kwk_missed = self.additional_data(data, kwk_ids, "kwk_unit_data") - else: - kwk_data = kwk_missed = [] - if permit_ids: - permit_data, permit_missed = self.additional_data( - data, permit_ids, "permit_unit_data" - ) - else: - permit_data = permit_missed = [] - - # Retry missed additional unit data - if extended_missed: - ( - extended_data_retry, - extended_missed_retry, - ) = self._retry_missed_additional_data( - data, [_[0] for _ in extended_missed], "extended_unit_data" - ) - extended_data.extend(extended_data_retry) - _missed_units_to_file(data, "extended", extended_missed_retry) - if eeg_missed: - eeg_data_retry, eeg_missed_retry = self._retry_missed_additional_data( - data, [_[0] for _ in eeg_missed], "eeg_unit_data" - ) - eeg_data.extend(eeg_data_retry) - _missed_units_to_file(data, "eeg", eeg_missed_retry) - if kwk_missed: - kwk_data_retry, kwk_missed_retry = self._retry_missed_additional_data( - data, [_[0] for _ in kwk_missed], "kwk_unit_data" - ) - kwk_data.extend(kwk_data_retry) - _missed_units_to_file(data, "kwk", kwk_missed_retry) - if permit_missed: - permit_data_retry, permit_missed_retry = self._retry_missed_additional_data( - data, [_[0] for _ in permit_missed], "permit_unit_data" - ) - permit_data.extend(permit_data_retry) - _missed_units_to_file(data, "permit", permit_missed_retry) - - # Flatten data - extended_data = flatten_dict(extended_data, serialize_with_json=True) - eeg_data = flatten_dict(eeg_data, serialize_with_json=True) - kwk_data = flatten_dict(kwk_data, serialize_with_json=True) - permit_data = flatten_dict(permit_data, serialize_with_json=True) - - # Join data to a single dataframe - idx_cols = [ - (units, "EinheitMastrNummer", ""), - (extended_data, "EinheitMastrNummer", "_unit"), - (eeg_data, "VerknuepfteEinheit", "_eeg"), - (kwk_data, "VerknuepfteEinheiten", "_kwk"), - (permit_data, "VerknuepfteEinheiten", "_permit"), - ] - - joined_data = pd.DataFrame(idx_cols[0][0]).set_index(idx_cols[0][1]) - - for dat, idx_col, suf in idx_cols[1:]: - # Make sure at least on non-empty dict is in dat - if any(dat): - joined_data = joined_data.join( - pd.DataFrame(dat).set_index(idx_col), rsuffix=suf - ) - - # Remove duplicates - joined_data.drop_duplicates(inplace=True) - - to_csv(joined_data, data) # FIXME: reference to helpers im import - - return joined_data - - def _create_ID_list(self, units, data_descriptor, key, data): - """Extracts a list of MaStR numbers (eeg, kwk, or permit Mastr Nr) from the given units.""" - return ( - [basic[key] for basic in units if basic[key]] - if data_descriptor in self._unit_data_specs[data] - else [] - ) - - def basic_unit_data(self, data=None, limit=2000, date_from=None, max_retries=3): - """ - Download basic unit information for one data type. - - Retrieves basic information about units. The number of unit in - bound to `limit`. - - Parameters - ---------- - data : str, optional - Technology data is requested for. See :meth:`MaStRDownload.download_power_plants` - for options. - Data is retrieved using `MaStRAPI.GetGefilterteListeStromErzeuger`. - If not given, it defaults to `None`. This implies data for all available - technologies is retrieved using the web service function - `MaStRAPI.GetListeAlleEinheiten`. - limit : int, optional - Maximum number of units to download. - If not provided, data for all units is downloaded. - - !!! warning - Mind the daily request limit for your MaStR account. - date_from: `datetime.datetime()`, optional - If specified, only units with latest change date newer than this are queried. - Defaults to `None`. - max_retries: int, optional - Maximum number of retries in case of errors with the connection to the server. - - Yields - ------ - list of dict - A generator of dicts is returned with each dictionary containing - information about one unit. - """ - # Split download of basic unit data in chunks of 2000 - # Reason: the API limits retrieval of data to 2000 items - chunksize = 2000 - chunks_start = list(range(1, limit + 1, chunksize)) - limits = [ - chunksize if (x + chunksize) <= limit else limit - x + 1 - for x in chunks_start - ] - - # Deal with or w/o data type being specified - energietraeger = ( - self._unit_data_specs[data]["energietraeger"] if data else [None] - ) - - # In case multiple energy carriers (energietraeger) exist for one data type, - # loop over these and join data to one list - for et in energietraeger: - log.info( - f"Get list of units with basic information for data type {data} ({et})" - ) - yield from ( - basic_data_download( - self._mastr_api, - "GetListeAlleEinheiten", - "Einheiten", - chunks_start, - limits, - date_from, - max_retries, - data, - et=et, - ) - if et is None - else basic_data_download( - self._mastr_api, - "GetGefilterteListeStromErzeuger", - "Einheiten", - chunks_start, - limits, - date_from, - max_retries, - data, - et=et, - ) - ) - - def additional_data(self, data, unit_ids, data_fcn, timeout=10): - """ - Retrieve addtional informations about units. - - Extended information on units is available. Depending on type, additional data from EEG - and KWK subsidy program are available. - Furthermore, for some units, data about permit is retrievable. - - Parameters - ---------- - data : str - data, see :meth:`MaStRDownload.download_power_plants` - unit_ids : list - Unit identifier for additional data - data_fcn : str - Name of method from :class:`MaStRDownload` to be used for querying additional data. - Choose from - - * "extended_unit_data" (:meth:`~.extended_unit_data`): Extended information - (i.e. technical, location) - about a unit. The exact set of information depends on the data type. - * "eeg_unit_data" (:meth:`~.eeg_unit_data`): Unit Information from - EEG unit registry. The exact - set of information depends on the data. - * "kwk_unit_data" (:meth:`~.kwk_unit_data`): Unit information from KWK unit registry. - * "permit_unit_data" (:meth:`~.permit_unit_data`): Information about the permit - process of a unit. - timeout: int, optional - Timeout limit for data retrieval for each unit when using multiprocessing. Defaults to 10. - - Returns - ------- - tuple of list of dict and tuple - Returns additional data in dictionaries that are packed into a list. - ```python - return = ( - [additional_unit_data_dict1, additional_unit_data_dict2, ...], - [tuple("SME930865355925", "Reason for failing dowload"), ...] - ) - ``` - """ - # Prepare a list of unit IDs packed as tuple associated with data - prepared_args = list(product(unit_ids, [data])) - - # Prepare results lists - - if self.parallel_processes: - data, data_missed = self._retrieve_data_in_parallel_process( - prepared_args, data_fcn, data, timeout - ) - else: - data, data_missed = self._retrieve_data_in_single_process( - prepared_args, data_fcn, data - ) - - # Remove Nones and empty dicts - data = [dat for dat in data if dat] - data_missed = [dat for dat in data_missed if dat] - - # Add units missed due to timeout to data_missed - units_retrieved = [_[self._additional_data_primary_key[data_fcn]] for _ in data] - units_missed_timeout = [ - (u, "Timeout") - for u in unit_ids - if u not in units_retrieved + [_[0] for _ in data_missed] - ] - data_missed += units_missed_timeout - - return data, data_missed - - def _retrieve_data_in_single_process(self, prepared_args, data_fcn, data): - data_list = [] - data_missed_list = [] - for unit_specs in tqdm( - prepared_args, - total=len(prepared_args), - desc=f"Downloading {data_fcn} ({data})", - unit="unit", - ): - data_tmp, data_missed_tmp = self.__getattribute__(data_fcn)(unit_specs) - if not data_tmp: - log.debug( - f"Download for additional data for " - f"{data_missed_tmp[0]} ({data}) failed. " - f"Traceback of caught error:\n{data_missed_tmp[1]}" - ) - data_list.append(data_tmp) - data_missed_list.append(data_missed_tmp) - - return data_list, data_missed_list - - def _retrieve_data_in_parallel_process( - self, prepared_args, data_fcn, data, timeout - ): - data_list = [] - data_missed_list = [] - with multiprocessing.Pool( - processes=self.parallel_processes, maxtasksperchild=1 - ) as pool: - with tqdm( - total=len(prepared_args), - desc=f"Downloading {data_fcn} ({data})", - unit="unit", - ) as pbar: - unit_result = pool.imap_unordered( - self.__getattribute__(data_fcn), prepared_args, chunksize=1 - ) - while True: - try: - # Try to retrieve data from concurrent processes - data_tmp, data_missed_tmp = unit_result.next(timeout=timeout) - - if not data_tmp: - log.debug( - f"Download for additional data for " - f"{data_missed_tmp[0]} ({data}) failed. " - f"Traceback of caught error:\n{data_missed_tmp[1]}" - ) - data_list.append(data_tmp) - data_missed_list.append(data_missed_tmp) - pbar.update() - except StopIteration: - # Multiprocessing returns StropIteration when results list gets empty - break - except multiprocessing.TimeoutError: - # If retrieval time exceeds timeout of next(), pass on - log.debug(f"Data request for 1 {data} unit timed out") - return data_list, data_missed_list - - def extended_unit_data(self, unit_specs): - """ - Download extended data for a unit. - - This extended unit information is provided separately. - - Parameters - ---------- - unit_specs : tuple - *EinheitMastrNummer* and data type as tuple that for example looks like - - ```python - - tuple("SME930865355925", "hydro") - ``` - - Returns - ------- - dict - Extended information about unit, if download successful, - otherwise empty dict - tuple - *EinheitMastrNummer* and message the explains why a download failed. Format - - ```python - - tuple("SME930865355925", "Reason for failing dowload") - ``` - """ - - mastr_id, data = unit_specs - try: - unit_data = self._mastr_api.__getattribute__( - self._unit_data_specs[data]["unit_data"] - )(einheitMastrNummer=mastr_id) - unit_missed = None - except ( - XMLParseError, - Fault, - requests.exceptions.ConnectionError, - requests.exceptions.ReadTimeout, - ) as e: - # log.exception( - # f"Failed to download unit data for {mastr_id} because of SOAP API exception: {e}", - # exc_info=False) - unit_data = {} - unit_missed = (mastr_id, repr(e)) - - return unit_data, unit_missed - - def eeg_unit_data(self, unit_specs): - """ - Download EEG (Erneuerbare Energien Gesetz) data for a unit. - - Additional data collected during a subsidy program for supporting - installations of renewable energy power plants. - - Parameters - ---------- - unit_specs : tuple - *EegMastrnummer* and data type as tuple that for example looks like - - .. code-block:: python - - tuple("EEG961554380393", "hydro") - - Returns - ------- - dict - EEG details about unit, if download successful, - otherwise empty dict - tuple - *EegMastrNummer* and message the explains why a download failed. Format - - ```python - - tuple("EEG961554380393", "Reason for failing dowload") - ``` - """ - eeg_id, data = unit_specs - try: - eeg_data = self._mastr_api.__getattribute__( - self._unit_data_specs[data]["eeg_data"] - )(eegMastrNummer=eeg_id) - eeg_missed = None - except ( - XMLParseError, - Fault, - requests.exceptions.ConnectionError, - requests.exceptions.ReadTimeout, - ) as e: - # log.exception( - # f"Failed to download eeg data for {eeg_id} because of SOAP API exception: {e}", - # exc_info=False) - eeg_data = {} - eeg_missed = (eeg_id, repr(e)) - - return eeg_data, eeg_missed - - def kwk_unit_data(self, unit_specs): - """ - Download KWK (german: Kraft-Wärme-Kopplung, english: Combined Heat and Power, CHP) - data for a unit. - - Additional data collected during a subsidy program for supporting - combined heat power plants. - - Parameters - ---------- - unit_specs : tuple - *KwkMastrnummer* and data type as tuple that for example looks like - - ```python - - tuple("KWK910493229164", "biomass") - ``` - - - Returns - ------- - dict - KWK details about unit, if download successful, - otherwise empty dict - tuple - *KwkMastrNummer* and message the explains why a download failed. Format - - ```python - - tuple("KWK910493229164", "Reason for failing dowload") - ``` - """ - kwk_id, data = unit_specs - try: - kwk_data = self._mastr_api.__getattribute__( - self._unit_data_specs[data]["kwk_data"] - )(kwkMastrNummer=kwk_id) - kwk_missed = None - except ( - XMLParseError, - Fault, - requests.exceptions.ConnectionError, - requests.exceptions.ReadTimeout, - ) as e: - # log.exception( - # f"Failed to download unit data for {kwk_id} because of SOAP API exception: {e}", - # exc_info=False) - kwk_data = {} - kwk_missed = (kwk_id, repr(e)) - - return kwk_data, kwk_missed - - def permit_unit_data(self, unit_specs): - """ - Download permit data for a unit. - - Parameters - ---------- - unit_specs : tuple - *GenMastrnummer* and data type as tuple that for example looks like - - ```python - - tuple("SGE952474728808", "biomass") - ``` - - - Returns - ------- - dict - Permit details about unit, if download successful, - otherwise empty dict - tuple - *GenMastrNummer* and message the explains why a download failed. Format - - ```python - - tuple("GEN952474728808", "Reason for failing dowload") - ``` - """ - permit_id, data = unit_specs - try: - permit_data = self._mastr_api.__getattribute__( - self._unit_data_specs[data]["permit_data"] - )(genMastrNummer=permit_id) - permit_missed = None - except ( - XMLParseError, - Fault, - requests.exceptions.ConnectionError, - requests.exceptions.ReadTimeout, - ) as e: - # log.exception( - # f"Failed to download unit data for {permit_id} " - # f"because of SOAP API exception: {e}", - # exc_info=False) - permit_data = {} - permit_missed = (permit_id, repr(e)) - - return permit_data, permit_missed - - def location_data(self, specs): - """ - Download extended data for a location - - Allows to download additional data for different location types, see *specs*. - - Parameters - ---------- - specs : tuple - Location *Mastrnummer* and data_name as tuple that for example looks like - - ```python - - tuple("SEL927688371072", "location_elec_generation") - ``` - - - Returns - ------- - dict - Detailed information about a location, if download successful, - otherwise empty dict - tuple - Location *MastrNummer* and message the explains why a download failed. Format - - ```python - - tuple("SEL927688371072", "Reason for failing dowload") - ``` - """ - - # Unpack tuple argument to two separate variables - location_id, data_name = specs - - try: - data = self._mastr_api.__getattribute__(self._unit_data_specs[data_name])( - lokationMastrNummer=location_id - ) - missed = None - except ( - XMLParseError, - Fault, - requests.exceptions.ConnectionError, - requests.exceptions.ReadTimeout, - ) as e: - data = {} - missed = (location_id, repr(e)) - - return data, missed - - def _retry_missed_additional_data(self, data, missed_ids, data_fcn, retries=3): - """ - Retry to download extended data that was missed earlier. - - Tries three times (default) to download data. - - Parameters - ---------- - data : str - data, see :meth:`MaStRDownload.download_power_plants` - missed_ids : list - Unit identifiers for additional data - data_fcn : str - Name of method from :class:`MaStRDownload` to be used for querying additional data - retries : int - Number of retries (default: 3). - - Returns - ------- - tuple of lists - Queried data and still missed unit IDs are returned as :code:`(data, missed_units)`. - """ - - log.info( - f"Retrying to download additional data for {len(missed_ids)} " - f"{data} units with {retries} retries" - ) - - data = [] - - missed_ids_remaining = missed_ids - for _ in range(1, retries + 1): - data_tmp, missed_ids_tmp = self.additional_data( - data, missed_ids_remaining, data_fcn - ) - if data_tmp: - data.extend(data_tmp) - missed_ids_remaining = [_[0] for _ in missed_ids_tmp] - - if not any(missed_ids_remaining): - break - - return data, missed_ids_tmp - - def basic_location_data(self, limit=2000, date_from=None, max_retries=3): - """ - Retrieve basic location data in chunks - - Retrieves data for all types of locations at once using - `MaStRAPI.GetListeAlleLokationen`. - Locations include - - * Electricity generation location (SEL - Stromerzeugungslokation) - * Electricity consumption location (SVL - Stromverbrauchslokation) - * Gas generation location (GEL - Gaserzeugungslokation) - * Gas consumption location (GVL - Gasverbrauchslokation) - - Parameters - ---------- - limit: int, optional - Maximum number of locations to download. - - !!! warning - Mind the daily request limit for your MaStR account, usually 10,000 per day. - date_from: `datetime.datetime`, optional - If specified, only locations with latest change date newer than this are queried. - max_retries: int, optional - Maximum number of retries for each chunk in case of errors with the connection to - the server. - - Yields - ------ - generator of generators - For each chunk a separate generator is returned all wrapped into another generator. - Access with - - ```python - - chunks = mastr_dl.basic_location_data( - date_from=datetime.datetime(2020, 11, 7, 0, 0, 0), limit=2010 - ) - - for chunk in chunks: - for location in chunk: - print(location) # prints out one dict per location one after another - ``` - """ - # Prepare indices for chunked data retrieval - chunksize = 2000 - chunks_start = list(range(1, limit + 1, chunksize)) - limits = [ - chunksize if (x + chunksize) <= limit else limit - x + 1 - for x in chunks_start - ] - - yield from basic_data_download( - self._mastr_api, - "GetListeAlleLokationen", - "Lokationen", - chunks_start, - limits, - date_from, - max_retries, - ) - - def daily_contingent(self): - contingent = self._mastr_api.GetAktuellerStandTageskontingent() - log.info( - f"Daily requests contigent: " - f"{contingent['AktuellerStandTageskontingent']} " - f"/ {contingent['AktuellesLimitTageskontingent']}" - ) - - -def basic_data_download( - mastr_api, - fcn_name, - category, - chunks_start, - limits, - date_from, - max_retries, - data=None, - et=None, -): - """ - Helper function for downloading basic data with MaStR list query - - Helps to query data from list-returning functions like GetListeAlleEinheiten. - Automatically - - * respects limit of 2.000 rows returned by MaStR list functions - * stops, if no further data is available - * nicely integrates dynamic update of tqdm progress bar - - Parameters - ---------- - mastr_api: :class:`MaStRAPI` - MaStR API wrapper - fcn_name: str - Name of list-returning download function - category: str - Either "Einheiten" or "Lokationen" - chunks_start: list of int - Start index for each chunk. MaStR data is internally index by an integer index that can - be used to start querying data from a certain position. Here, it is used to - iterate over the available data. - limits: list of int - Limit of queried row for each chunk. - date_from: datetime.datetime - Date for querying only newer data than this date - max_retries: int - Number of maximum retries for each chunk - data: str, optional - Choose a subset from available technologies. Only relevant if category="Einheiten". - Defaults to all technologies. - et: str - Energietraeger of a data type. Some technologies are subdivided into a list of - energietraeger. Only relevant if category="Einheiten". Defaults to None. - - Yields - ------ - list - Basic unit or location information. Depends on category. - """ - - # Construct description string - description = f"Get basic {category} data information" - if data: - description += f" for data {data}" - if et: - description += f" ({et})" - - pbar = tqdm(desc=description, unit=" units") - - # Iterate over chunks and download data - # Results are first collected per 'et' (units_tech) for properly - # displaying download progress. - # Later, all units of a single data are collected in 'units' - for chunk_start, limit_iter in zip(chunks_start, limits): - # Use a retry loop to retry on connection errors - for try_number in range(max_retries + 1): - try: - if et is None: - response = getattr(mastr_api, fcn_name)( - startAb=chunk_start, limit=limit_iter, datumAb=date_from - ) - else: - response = getattr(mastr_api, fcn_name)( - energietraeger=et, - startAb=chunk_start, - limit=limit_iter, - datumAb=date_from, - ) - - except ( - requests.exceptions.ConnectionError, - Fault, - requests.exceptions.ReadTimeout, - ) as e: - try_number += 1 - log.debug( - f"MaStR SOAP API does not respond properly: {e}. Retry {try_number}" - ) - time.sleep(5) - else: - # If it does run into the except clause, break out of the for loop - # This also means query was successful - units_tech = response[category] - yield units_tech - pbar.update(len(units_tech)) - break - else: - log.error( - f"Finally failed to download data." - f"Basic unit data of index {chunk_start} to " - f"{chunk_start + limit_iter - 1} will be missing." - ) - # TODO: this has potential risk! Please change - # If the download continuously fails on the last chunk, this query will run forever - response = {"Ergebniscode": "OkWeitereDatenVorhanden"} - - # Stop querying more data, if no further data available - if response["Ergebniscode"] == "OkWeitereDatenVorhanden": - continue - - # Update progress bar and move on with next et or data type - pbar.total = pbar.n - pbar.refresh() - pbar.close() - break - - # Make sure progress bar is closed properly - pbar.close() - - -if __name__ == "__main__": - pass diff --git a/open_mastr/soap_api/metadata/create.py b/open_mastr/soap_api/metadata/create.py index fb254339..87b74233 100644 --- a/open_mastr/soap_api/metadata/create.py +++ b/open_mastr/soap_api/metadata/create.py @@ -6,7 +6,6 @@ from open_mastr.soap_api.metadata.description import DataDescription from open_mastr.utils.config import get_data_config, get_filenames, column_renaming -from open_mastr.soap_api.download import MaStRDownload # TODO: We should not describe the data in both metadata folder and orm.py @@ -178,174 +177,177 @@ def datapackag_base(reference_date, publication_date=None, statistik_flag=None): return datapackage_meta -def create_datapackage_meta_json( - reference_date, - technologies=None, - data=["raw"], - statistik_flag=None, - json_serialize=True, -): - """ - Create a frictionless data conform metadata description - - Parameters - ---------- - reference_date: datetime.datetime - Reference date for data - technologies: list - Only consider specified technologies in metadata JSON string. If not provided, metadata is created for data - of all technologies. - data: list, optional - Specify which data should be documented by this metadata. - statistik_flag: str or None - Describe if filtering is applied during CSV export of data. Read in - :meth:`~.open_mastr.soap_api.mirror.MaStRMirror.to_csv()` for more details. - json_serialize: bool - Toggle between returning a JSON string or a dict - - * True: return JSON string - * False: return dict - - Returns - ------- - dict or str - Set `json_serialize` to False, for returning a dict instead of JSON str. - """ - - datapackage_base_dict = datapackag_base( - reference_date, statistik_flag=statistik_flag - ) - - table_columns = DataDescription().functions_data_documentation() - mastr_dl = MaStRDownload() - - # Filter specified technologies - if technologies: - unit_data_specs = { - k: v for k, v in mastr_dl._unit_data_specs.items() if k in technologies - } - - filenames = get_filenames() - - renaming = column_renaming() - - resources_meta = {"resources": []} - - # Add resources of raw data files - for tech, specs in unit_data_specs.items(): - raw_fields = [] - specs["basic_data"] = "GetListeAlleEinheiten" - - for data_type in [ - "basic_data", - "unit_data", - "eeg_data", - "kwk_data", - "permit_data", - ]: - if data_type in specs.keys(): - for name, specification in table_columns[specs[data_type]].items(): - if name in renaming[data_type]["columns"]: - name = f"{name}_{renaming[data_type]['suffix']}" - raw_fields.append({"name": name, **specification, "unit": None}) - - if "raw" in data: - resource = { - "profile": "tabular-data-resource", - "name": f"bnetza_mastr_{tech}_raw", - "title": f"open-MaStR {tech} units (raw)", - "path": filenames["raw"][tech]["joined"], - "scheme": "file", - "encoding": "utf-8", - "mediatype": "text/csv", - "schema": { - "fields": raw_fields, - "primaryKey": ["EinheitMastrNummer"], - }, - "dialect": {"delimiter": ","}, - } - - resources_meta["resources"].append(resource) - if "cleaned" in data: - resource = { - "profile": "tabular-data-resource", - "name": f"bnetza_mastr_{tech}_cleaned", - "title": f"open-MaStR {tech} units (cleaned)", - "path": filenames["cleaned"][tech], - "scheme": "file", - "encoding": "utf-8", - "mediatype": "text/csv", - "schema": { - "fields": raw_fields, - "primaryKey": ["EinheitMastrNummer"], - }, - "dialect": {"delimiter": ","}, - } - - resources_meta["resources"].append(resource) - if "postprocessed" in data: - processed_fields = [ - { - "name": "geom", - "unit": None, - "type": "str", - "desciption": "Standort der Anlage als Punktgeometrie im WKB Format", - "examples": "0101000020e610000071fbe59315131c40a2b437f8c20e4a40", - }, - { - "name": "comment", - "unit": None, - "type": "str", - "desciption": "Information about data post-processing", - "examples": "has_geom; outside_vg250", - }, - ] - if tech == "wind": - processed_fields.append( - { - "name": "tags", - "unit": None, - "type": "json", - "desciption": "Data insights and report about post-processing steps", - "examples": { - "plz_check": False, - "processed": True, - "inside_germany": True, - }, - } - ) - processed_fields.append( - { - "name": "geom", - "unit": None, - "type": "str", - "desciption": "Standort der Anlage als Punktgeometrie im WKB Format (EPSG 3035)", - "examples": "0101000020e610000071fbe59315131c40a2b437f8c20e4a40", - } - ) - resource = { - "profile": "tabular-data-resource", - "name": f"bnetza_mastr_{tech}", - "title": f"open-MaStR {tech} units", - "path": filenames["postprocessed"][tech], - "scheme": "file", - "encoding": "utf-8", - "mediatype": "text/csv", - "schema": { - "fields": raw_fields + processed_fields, - "primaryKey": ["EinheitMastrNummer"], - }, - "dialect": {"delimiter": ","}, - } - - resources_meta["resources"].append(resource) - - datapackage_dict = {**datapackage_base_dict, **resources_meta} - - if json_serialize: - return json.dumps(datapackage_dict, ensure_ascii=False) - else: - return datapackage_dict +# At the time of commenting this function, it was not called in the mastr.py file +# The actual call there was commented as well +# def create_datapackage_meta_json( +# reference_date, +# technologies=None, +# data=["raw"], +# statistik_flag=None, +# json_serialize=True, +# ): +# """ +# Create a frictionless data conform metadata description +# +# Parameters +# ---------- +# reference_date: datetime.datetime +# Reference date for data +# technologies: list +# Only consider specified technologies in metadata JSON string. If not provided, metadata is created for data +# of all technologies. +# data: list, optional +# Specify which data should be documented by this metadata. +# statistik_flag: str or None +# Describe if filtering is applied during CSV export of data. Read in +# :meth:`~.open_mastr.soap_api.mirror.MaStRMirror.to_csv()` for more details. +# json_serialize: bool +# Toggle between returning a JSON string or a dict +# +# * True: return JSON string +# * False: return dict +# +# Returns +# ------- +# dict or str +# Set `json_serialize` to False, for returning a dict instead of JSON str. +# """ +# +# datapackage_base_dict = datapackag_base( +# reference_date, statistik_flag=statistik_flag +# ) +# +# table_columns = DataDescription().functions_data_documentation() +# mastr_dl = MaStRDownload() +# +# # Filter specified technologies +# if technologies: +# unit_data_specs = { +# k: v for k, v in mastr_dl._unit_data_specs.items() if k in technologies +# } +# print(technologies) +# +# filenames = get_filenames() +# +# renaming = column_renaming() +# +# resources_meta = {"resources": []} +# +# # Add resources of raw data files +# for tech, specs in unit_data_specs.items(): +# raw_fields = [] +# specs["basic_data"] = "GetListeAlleEinheiten" +# +# for data_type in [ +# "basic_data", +# "unit_data", +# "eeg_data", +# "kwk_data", +# "permit_data", +# ]: +# if data_type in specs.keys(): +# for name, specification in table_columns[specs[data_type]].items(): +# if name in renaming[data_type]["columns"]: +# name = f"{name}_{renaming[data_type]['suffix']}" +# raw_fields.append({"name": name, **specification, "unit": None}) +# +# if "raw" in data: +# resource = { +# "profile": "tabular-data-resource", +# "name": f"bnetza_mastr_{tech}_raw", +# "title": f"open-MaStR {tech} units (raw)", +# "path": filenames["raw"][tech]["joined"], +# "scheme": "file", +# "encoding": "utf-8", +# "mediatype": "text/csv", +# "schema": { +# "fields": raw_fields, +# "primaryKey": ["EinheitMastrNummer"], +# }, +# "dialect": {"delimiter": ","}, +# } +# +# resources_meta["resources"].append(resource) +# if "cleaned" in data: +# resource = { +# "profile": "tabular-data-resource", +# "name": f"bnetza_mastr_{tech}_cleaned", +# "title": f"open-MaStR {tech} units (cleaned)", +# "path": filenames["cleaned"][tech], +# "scheme": "file", +# "encoding": "utf-8", +# "mediatype": "text/csv", +# "schema": { +# "fields": raw_fields, +# "primaryKey": ["EinheitMastrNummer"], +# }, +# "dialect": {"delimiter": ","}, +# } +# +# resources_meta["resources"].append(resource) +# if "postprocessed" in data: +# processed_fields = [ +# { +# "name": "geom", +# "unit": None, +# "type": "str", +# "desciption": "Standort der Anlage als Punktgeometrie im WKB Format", +# "examples": "0101000020e610000071fbe59315131c40a2b437f8c20e4a40", +# }, +# { +# "name": "comment", +# "unit": None, +# "type": "str", +# "desciption": "Information about data post-processing", +# "examples": "has_geom; outside_vg250", +# }, +# ] +# if tech == "wind": +# processed_fields.append( +# { +# "name": "tags", +# "unit": None, +# "type": "json", +# "desciption": "Data insights and report about post-processing steps", +# "examples": { +# "plz_check": False, +# "processed": True, +# "inside_germany": True, +# }, +# } +# ) +# processed_fields.append( +# { +# "name": "geom", +# "unit": None, +# "type": "str", +# "desciption": "Standort der Anlage als Punktgeometrie im WKB Format (EPSG 3035)", +# "examples": "0101000020e610000071fbe59315131c40a2b437f8c20e4a40", +# } +# ) +# resource = { +# "profile": "tabular-data-resource", +# "name": f"bnetza_mastr_{tech}", +# "title": f"open-MaStR {tech} units", +# "path": filenames["postprocessed"][tech], +# "scheme": "file", +# "encoding": "utf-8", +# "mediatype": "text/csv", +# "schema": { +# "fields": raw_fields + processed_fields, +# "primaryKey": ["EinheitMastrNummer"], +# }, +# "dialect": {"delimiter": ","}, +# } +# +# resources_meta["resources"].append(resource) +# +# datapackage_dict = {**datapackage_base_dict, **resources_meta} +# +# if json_serialize: +# return json.dumps(datapackage_dict, ensure_ascii=False) +# else: +# return datapackage_dict def column_docs_csv(technologies, base_path): diff --git a/open_mastr/soap_api/mirror.py b/open_mastr/soap_api/mirror.py deleted file mode 100644 index a75b5350..00000000 --- a/open_mastr/soap_api/mirror.py +++ /dev/null @@ -1,1052 +0,0 @@ -import datetime -import os -import pandas as pd -from sqlalchemy import and_, func -from sqlalchemy.sql import exists -import shlex -import subprocess -from datetime import date - -from open_mastr.utils.config import ( - setup_logger, -) -from open_mastr.soap_api.download import MaStRDownload, flatten_dict -from open_mastr.utils import orm -from open_mastr.utils.helpers import session_scope, reverse_unit_type_map - -from open_mastr.utils.constants import ORM_MAP, UNIT_TYPE_MAP - -log = setup_logger() - - -class MaStRMirror: - """ - !!! warning - - **This class is deprecated** and will not be maintained in the future. - Instead use [`Mastr.download`][open_mastr.Mastr.download] with parameter - `method` = "bulk" to mirror the MaStR dataset to a local database. - - Mirror the Marktstammdatenregister database and keep it up-to-date. - - A PostgreSQL database is used to mirror the MaStR database. It builds - on functionality for bulk data download - provided by [`MaStRDownload`][open_mastr.soap_api.download.MaStRDownload]. - - A rough overview is given by the following schema on the example of wind power units. - ![Schema on the example of downloading wind power units using the API](../images/MaStR_Mirror.svg){ loading=lazy width=70% align=center} - - - - Initially, basic unit data gets backfilled with `~.backfill_basic` - (downloads basic unit data for 2,000 - units of type 'solar'). - - ```python - - from open_mastr.soap_api.prototype_mastr_reflected import MaStRMirror - - mastr_mirror = MaStRMirror() - mastr_mirror.backfill_basic("solar", limit=2000) - ``` - Based on this, requests for - additional data are created. This happens during backfilling basic data. - But it is also possible to (re-)create - requests for remaining additional data using - [`MaStRMirror.create_additional_data_requests`][open_mastr.soap_api.mirror.MaStRMirror.create_additional_data_requests]. - - ```python - - mastr_mirror.create_additional_data_requests("solar") - ``` - - Additional unit data, in the case of wind power this is extended data, - EEG data and permit data, can be - retrieved subsequently by `~.retrieve_additional_data`. - - ```python - mastr_mirror.retrieve_additional_data("solar", ["unit_data"]) - ``` - - The data can be joined to one table for each data type and exported to - CSV files using [`Mastr.to_csv`][open_mastr.mastr.Mastr.to_csv]. - - Also consider to use `~.dump` and `~.restore` for specific purposes. - - !!! Note - This feature was built before the bulk download was offered at marktstammdatenregister.de. - It can still be used to compare the two datasets received from the API and the bulk download. - - """ - - def __init__( - self, - engine, - restore_dump=None, - parallel_processes=None, - ): - """ - Parameters - ---------- - engine: sqlalchemy.engine.Engine - database engine - restore_dump: str or path-like, optional - Save path of SQL dump file including filename. - The database is restored from the SQL dump. - Defaults to `None` which means nothing gets restored. - Should be used in combination with `empty_schema=True`. - parallel_processes: int - Number of parallel processes used to download additional data. - Defaults to `None`. - """ - log.warning( - """ - The `MaStRMirror` class is deprecated and will not be maintained in the future. - To get a full table of the Marktstammdatenregister, use the open_mastr.Mastr.download - method. - - If this change causes problems for you, please comment in this issue on github: - https://github.com/OpenEnergyPlatform/open-MaStR/issues/487 - - """ - ) - - self._engine = engine - - # Associate downloader - self.mastr_dl = MaStRDownload(parallel_processes=parallel_processes) - - # Restore database from a dump - if restore_dump: - self.restore(restore_dump) - - # Map technologies on ORMs - self.orm_map = ORM_MAP - - # Map data and MaStR unit type - # Map technologies on ORMs - self.unit_type_map = UNIT_TYPE_MAP - self.unit_type_map_reversed = reverse_unit_type_map() - - def backfill_basic(self, data=None, date=None, limit=10**8) -> None: - """Backfill basic unit data. - - Fill database table 'basic_units' with data. It allows specification - of which data should be retrieved via - the described parameter options. - - Under the hood, [`MaStRDownload.basic_unit_data`][open_mastr.soap_api.download.MaStRDownload.basic_unit_data] is used. - - Parameters - ---------- - data: list - Specify data types for which data should be backfilled. - - * ['solar']: Backfill data for a single data type. - * ['solar', 'wind'] (`list`): Backfill data for multiple technologies given in a list. - date: None, :class:`datetime.datetime`, str - Specify backfill date from which on data is retrieved - - Only data with modification time stamp greater that `date` is retrieved. - - * `datetime.datetime(2020, 11, 27)`: Retrieve data which is newer - than this time stamp - * 'latest': Retrieve data which is newer than the newest data - already in the table. - It is aware of a different 'latest date' for each data. - Hence, it works in combination with - `data=None` and `data=["wind", "solar"]` for example. - - !!! warning - - Don't use 'latest' in combination with `limit`. This might - lead to unexpected results. - * `None`: Complete backfill - limit: int - Maximum number of units. - Defaults to the large number of 10**8 which means - all available data is queried. Use with care! - """ - - dates = self._get_list_of_dates(date, data) - - for data_type, date in zip(data, dates): - self._write_basic_data_for_one_data_type_to_db(data_type, date, limit) - - def backfill_locations_basic( - self, limit=10**7, date=None, delete_additional_data_requests=True - ): - """ - Backfill basic location data. - - Fill database table 'locations_basic' with data. It allows specification - of which data should be retrieved via - the described parameter options. - - Under the hood, [`MaStRDownload.basic_location_data`][open_mastr.soap_api.download.MaStRDownload.basic_location_data] - is used. - - Parameters - ---------- - date: None, `datetime.datetime`, str - Specify backfill date from which on data is retrieved - - Only data with modification time stamp greater that `date` is retrieved. - - * `datetime.datetime(2020, 11, 27)`: Retrieve data which is newer than - this time stamp - * 'latest': Retrieve data which is newer than the newest data already in the table. - !!! warning - - Don't use 'latest' in combination with `limit`. This might lead to - unexpected results. - * `None`: Complete backfill - limit: int - Maximum number of locations to download. - Defaults to `None` which means no limit is set and all available data is queried. - Use with care! - delete_additional_data_requests: bool - Useful to speed up download of data. Ignores existence of already created requests - for additional data and - skips deletion these. - """ - - date = self._get_date(date, technology_list=None) - locations_basic = self.mastr_dl.basic_location_data(limit, date_from=date) - - for locations_chunk in locations_basic: - # Remove duplicates returned from API - locations_chunk_unique = [ - location - for n, location in enumerate(locations_chunk) - if location["LokationMastrNummer"] - not in [_["LokationMastrNummer"] for _ in locations_chunk[n + 1 :]] - ] - locations_unique_ids = [ - _["LokationMastrNummer"] for _ in locations_chunk_unique - ] - - with session_scope(engine=self._engine) as session: - # Find units that are already in the DB - common_ids = [ - _.LokationMastrNummer - for _ in session.query( - orm.LocationBasic.LokationMastrNummer - ).filter( - orm.LocationBasic.LokationMastrNummer.in_(locations_unique_ids) - ) - ] - inserted_and_updated = self._create_inserted_and_updated_list( - "locations", session, locations_chunk_unique, common_ids - ) - - # Create data requests for all newly inserted and updated locations - new_requests = [ - { - "LokationMastrNummer": location["LokationMastrNummer"], - "location_type": self.unit_type_map[location["Lokationtyp"]], - "request_date": datetime.datetime.now(tz=datetime.timezone.utc), - } - for location in inserted_and_updated - ] - - # Delete old data requests - if delete_additional_data_requests: - ids_to_delete = [ - _["LokationMastrNummer"] for _ in inserted_and_updated - ] - session.query(orm.AdditionalLocationsRequested).filter( - orm.AdditionalLocationsRequested.LokationMastrNummer.in_( - ids_to_delete - ) - ).filter( - orm.AdditionalLocationsRequested.request_date - < datetime.datetime.now(tz=datetime.timezone.utc) - ).delete( - synchronize_session="fetch" - ) - session.commit() - - # Do bulk insert of new data requests - session.bulk_insert_mappings( - orm.AdditionalLocationsRequested, new_requests - ) - - def retrieve_additional_data(self, data, data_type, limit=10**8, chunksize=1000): - """ - Retrieve additional unit data - - Execute additional data requests stored in - `open_mastr.soap_api.orm.AdditionalDataRequested`. - See also docs of [`MaStRDownload.additional_data`][open_mastr.soap_api.download.MaStRDownload.additional_data] - for more information on how data is downloaded. - - Parameters - ---------- - data: `str` - See list of available technologies in - `open_mastr.soap_api.download.py.MaStRDownload.download_power_plants`. - data_type: `str` - Select type of additional data that is to be retrieved. Choose from - "unit_data", "eeg_data", "kwk_data", "permit_data". - limit: int - Limit number of units that data is download for. Defaults to the very large number 10**8 - which refers to query data for existing data requests, for example created by - `~.create_additional_data_requests`. - chunksize: int - Data is downloaded and inserted into the database in chunks of `chunksize`. - Defaults to 1000. - """ - - # Mapping of download from MaStRDownload - download_functions = { - "unit_data": "extended_unit_data", - "eeg_data": "eeg_unit_data", - "kwk_data": "kwk_unit_data", - "permit_data": "permit_unit_data", - } - - if chunksize > limit: - chunksize = limit - - number_units_queried = 0 - while number_units_queried < limit: - with session_scope(engine=self._engine) as session: - ( - requested_chunk, - requested_ids, - ) = self._get_additional_data_requests_from_db( - table_identifier="additional_data", - session=session, - data_request_type=data_type, - data=data, - chunksize=chunksize, - ) - - if not requested_ids: - log.info("No further data is requested") - break - - # Retrieve data - unit_data, missed_units = self.mastr_dl.additional_data( - data, requested_ids, download_functions[data_type] - ) - - unit_data = flatten_dict(unit_data, serialize_with_json=False) - number_units_merged = 0 - - # Prepare data and add to database table - for unit_dat in unit_data: - unit = self._preprocess_additional_data_entry( - unit_dat, data, data_type - ) - session.merge(unit) - number_units_merged += 1 - session.commit() - - log.info( - f"Downloaded data for {len(unit_data)} units ({len(requested_ids)} requested). " - ) - self._delete_missed_data_from_request_table( - table_identifier="additional_data", - session=session, - missed_requests=missed_units, - requested_chunk=requested_chunk, - ) - # Update while iteration condition - number_units_queried += len(requested_ids) - # Emergency break out: if now new data gets inserted/update, don't retrieve any - # further data - if number_units_merged == 0: - log.info("No further data is requested") - break - - def retrieve_additional_location_data( - self, location_type, limit=10**8, chunksize=1000 - ): - """ - Retrieve extended location data - - Execute additional data requests stored in - `open_mastr.soap_api.orm.AdditionalLocationsRequested`. - See also docs of `open_mastr.soap_api.download.py.MaStRDownload.additional_data` - for more information on how data is downloaded. - - Parameters - ---------- - location_type: `str` - Select type of location that is to be retrieved. Choose from - "location_elec_generation", "location_elec_consumption", "location_gas_generation", - "location_gas_consumption". - limit: int - Limit number of locations that data is download for. Defaults large number 10**8 - which refers to query data for existing data requests. - chunksize: int - Data is downloaded and inserted into the database in chunks of `chunksize`. - Defaults to 1000. - """ - - # Process arguments - if chunksize > limit: - chunksize = limit - - locations_queried = 0 - while locations_queried < limit: - with session_scope(engine=self._engine) as session: - # Get a chunk - ( - requested_chunk, - requested_ids, - ) = self._get_additional_data_requests_from_db( - table_identifier="additional_location_data", - session=session, - data_request_type=location_type, - data=None, - chunksize=chunksize, - ) - - if not requested_ids: - log.info("No further data is requested") - break - - # Reset number of locations inserted or updated for this chunk - number_locations_merged = 0 - - # Retrieve data - location_data, missed_locations = self.mastr_dl.additional_data( - location_type, requested_ids, "location_data" - ) - - # Prepare data and add to database table - location_data = flatten_dict(location_data) - for location_dat in location_data: - location_dat = self._add_data_source_and_download_date(location_dat) - # Remove query status information from response - for exclude in [ - "Ergebniscode", - "AufrufVeraltet", - "AufrufVersion", - "AufrufLebenszeitEnde", - ]: - del location_dat[exclude] - - # Make data types JSON serializable - location_dat["DatumLetzteAktualisierung"] = location_dat[ - "DatumLetzteAktualisierung" - ].isoformat() - - if type(location_dat["DatumLetzteAktualisierung"]) == str: - location_dat[ - "DatumLetzteAktualisierung" - ] = datetime.datetime.strptime( - location_dat["DatumLetzteAktualisierung"], - "%Y-%m-%dT%H:%M:%S.%f", - ) - - # Create new instance and update potentially existing one - location = orm.LocationExtended(**location_dat) - session.merge(location) - number_locations_merged += 1 - - session.commit() - # Log locations where data retrieval was not successful - log.info(f"Downloaded data for {len(location_data)} ") - self._delete_missed_data_from_request_table( - table_identifier="additional_location_data", - session=session, - missed_requests=missed_locations, - requested_chunk=requested_chunk, - ) - # Update while iteration condition - locations_queried += len(requested_ids) - - # Emergency break out: if now new data gets inserted/update, - # don't retrieve any further data - if number_locations_merged == 0: - log.info("No further data is requested") - break - - def create_additional_data_requests( - self, - technology, - data_types=["unit_data", "eeg_data", "kwk_data", "permit_data"], - delete_existing=True, - ): - """ - Create new requests for additional unit data - - For units that exist in basic_units but not in the table for additional - data of `data_type`, a new data request - is submitted. - - Parameters - ---------- - technology: str - Specify technology additional data should be requested for. - data_types: list - Select type of additional data that is to be requested. - Defaults to all data that is available for a - technology. - delete_existing: bool - Toggle deletion of already existing requests for additional data. - Defaults to True. - """ - - data_requests = [] - - with session_scope(engine=self._engine) as session: - # Check which additional data is missing - for data_type in data_types: - if data_type_available := self.orm_map[technology].get(data_type, None): - log.info( - f"Create requests for additional data of type {data_type} for {technology}" - ) - - # Get ORM for additional data by technology and data_type - additional_data_orm = getattr(orm, data_type_available) - - # Delete prior additional data requests for this technology and data_type - if delete_existing: - session.query(orm.AdditionalDataRequested).filter( - orm.AdditionalDataRequested.technology == technology, - orm.AdditionalDataRequested.data_type == data_type, - ).delete() - session.commit() - - # Query database for missing additional data - units_for_request = self._get_units_for_request( - data_type, session, additional_data_orm, technology - ) - - # Prepare data for additional data request - for basic_unit in units_for_request: - data_request = { - "EinheitMastrNummer": basic_unit.EinheitMastrNummer, - "technology": self.unit_type_map[basic_unit.Einheittyp], - "data_type": data_type, - "request_date": datetime.datetime.now( - tz=datetime.timezone.utc - ), - } - if data_type == "unit_data": - data_request[ - "additional_data_id" - ] = basic_unit.EinheitMastrNummer - elif data_type == "eeg_data": - data_request[ - "additional_data_id" - ] = basic_unit.EegMastrNummer - elif data_type == "kwk_data": - data_request[ - "additional_data_id" - ] = basic_unit.KwkMastrNummer - elif data_type == "permit_data": - data_request[ - "additional_data_id" - ] = basic_unit.GenMastrNummer - data_requests.append(data_request) - - # Insert new requests for additional data into database - session.bulk_insert_mappings(orm.AdditionalDataRequested, data_requests) - - def _add_data_source_and_download_date(self, entry: dict) -> dict: - """Adds DatenQuelle = 'APT' and DatumDownload = date.today""" - entry["DatenQuelle"] = "API" - entry["DatumDownload"] = date.today() - return entry - - def _create_data_list_from_basic_units(self, session, basic_units_chunk): - # Make sure that no duplicates get inserted into database - # (would result in an error) - # Only new data gets inserted or data with newer modification date gets updated - - # Remove duplicates returned from API - basic_units_chunk_unique = [ - unit - for n, unit in enumerate(basic_units_chunk) - if unit["EinheitMastrNummer"] - not in [_["EinheitMastrNummer"] for _ in basic_units_chunk[n + 1 :]] - ] - basic_units_chunk_unique_ids = [ - _["EinheitMastrNummer"] for _ in basic_units_chunk_unique - ] - - # Find units that are already in the DB - common_ids = [ - _.EinheitMastrNummer - for _ in session.query(orm.BasicUnit.EinheitMastrNummer).filter( - orm.BasicUnit.EinheitMastrNummer.in_(basic_units_chunk_unique_ids) - ) - ] - basic_units_chunk_unique = self._correct_typo_in_column_name( - basic_units_chunk_unique - ) - - inserted_and_updated = self._create_inserted_and_updated_list( - "basic_units", session, basic_units_chunk_unique, common_ids - ) - - # Submit additional data requests - - extended_data = [] - eeg_data = [] - kwk_data = [] - permit_data = [] - - for basic_unit in inserted_and_updated: - extended_data = self._append_additional_data_from_basic_unit( - extended_data, basic_unit, "EinheitMastrNummer", "unit_data" - ) - eeg_data = self._append_additional_data_from_basic_unit( - eeg_data, basic_unit, "EegMastrNummer", "eeg_data" - ) - kwk_data = self._append_additional_data_from_basic_unit( - kwk_data, basic_unit, "KwkMastrNummer", "kwk_data" - ) - permit_data = self._append_additional_data_from_basic_unit( - permit_data, basic_unit, "GenMastrNummer", "permit_data" - ) - return extended_data, eeg_data, kwk_data, permit_data, inserted_and_updated - - def _correct_typo_in_column_name(self, basic_units_chunk_unique: list) -> list: - """ - Corrects the typo DatumLetzeAktualisierung -> DatumLetzteAktualisierung - (missing t in Letzte) in the column name. - """ - basic_units_chunk_unique_correct = [] - for unit in basic_units_chunk_unique: - # Rename the typo in column DatumLetzeAktualisierung - if "DatumLetzteAktualisierung" not in unit: - unit["DatumLetzteAktualisierung"] = unit.pop( - "DatumLetzeAktualisierung", None - ) - basic_units_chunk_unique_correct.append(unit) - return basic_units_chunk_unique_correct - - def _append_additional_data_from_basic_unit( - self, - data_list: list, - basic_unit: dict, - basic_unit_identifier: str, - data_type: str, - ) -> list: - """Appends a new entry from basic units to an existing list of unit IDs. This list is - used when requesting additional data from the MaStR API.""" - if basic_unit[basic_unit_identifier]: - data_list.append( - { - "EinheitMastrNummer": basic_unit["EinheitMastrNummer"], - "additional_data_id": basic_unit[basic_unit_identifier], - "technology": self.unit_type_map[basic_unit["Einheittyp"]], - "data_type": data_type, - "request_date": datetime.datetime.now(tz=datetime.timezone.utc), - } - ) - return data_list - - def _create_inserted_and_updated_list( - self, table_identifier, session, list_chunk_unique, common_ids - ) -> list: - """Creates the insert and update list and saves it to the BasicTable. - This method is called both in backfill_basics and backfill_location_basics.""" - if table_identifier == "locations": - mastr_number_identifier = "LokationMastrNummer" - table_class = orm.LocationBasic - elif table_identifier == "basic_units": - mastr_number_identifier = "EinheitMastrNummer" - table_class = orm.BasicUnit - - insert = [] - updated = [] - for entry in list_chunk_unique: - # In case data for the unit already exists, only update if new data is newer - if entry[mastr_number_identifier] in common_ids: - query_filter = ( - and_( - orm.BasicUnit.EinheitMastrNummer == entry["EinheitMastrNummer"], - orm.BasicUnit.DatumLetzteAktualisierung - < entry["DatumLetzteAktualisierung"], - ) - if table_identifier == "basic_units" - else orm.LocationBasic.LokationMastrNummer - == entry["LokationMastrNummer"] - ) - if session.query(exists().where(query_filter)).scalar(): - updated.append(entry) - session.merge(table_class(**entry)) - # In case of new data, just insert - else: - insert.append(entry) - session.bulk_save_objects([table_class(**u) for u in insert]) - session.commit() - return insert + updated - - def _write_basic_data_for_one_data_type_to_db(self, data, date, limit) -> None: - log.info(f"Backfill data for data type {data}") - - # Catch weird MaStR SOAP response - basic_units = self.mastr_dl.basic_unit_data(data, limit, date_from=date) - - with session_scope(engine=self._engine) as session: - log.info( - "Insert basic unit data into DB and submit additional data requests" - ) - for basic_units_chunk in basic_units: - # Insert basic data into database - ( - extended_data, - eeg_data, - kwk_data, - permit_data, - inserted_and_updated, - ) = self._create_data_list_from_basic_units(session, basic_units_chunk) - - # Delete old entries for additional data requests - additional_data_table = orm.AdditionalDataRequested.__table__ - ids_to_delete = [_["EinheitMastrNummer"] for _ in inserted_and_updated] - session.execute( - additional_data_table.delete() - .where( - additional_data_table.c.EinheitMastrNummer.in_(ids_to_delete) - ) - .where(additional_data_table.c.technology == "wind") - .where( - additional_data_table.c.request_date - < datetime.datetime.now(tz=datetime.timezone.utc) - ) - ) - - # Flush delete statements to database - session.commit() - - # Insert new requests for additional data - session.bulk_insert_mappings(orm.AdditionalDataRequested, extended_data) - session.bulk_insert_mappings(orm.AdditionalDataRequested, eeg_data) - session.bulk_insert_mappings(orm.AdditionalDataRequested, kwk_data) - session.bulk_insert_mappings(orm.AdditionalDataRequested, permit_data) - - log.info("Backfill successfully finished") - - def _get_date(self, date, technology_list): - """Parses 'latest' to the latest date in the database, else returns the given date.""" - if technology_list: - return self._get_list_of_dates(date, technology_list) - else: - return self._get_single_date(date) - - def _get_single_date(self, date): - if date != "latest": - return date - - with session_scope(engine=self._engine) as session: - if date_queried := ( - session.query(orm.LocationExtended.DatumLetzteAktualisierung) - .order_by(orm.LocationExtended.DatumLetzteAktualisierung.desc()) - .first() - ): - return date_queried[0] - else: - return None - - def _get_list_of_dates(self, date, technology_list) -> list: - if date != "latest": - return [date] * len(technology_list) - - dates = [] - for tech in technology_list: - if tech: - # In case technologies are specified, latest data date - # gets queried per data - with session_scope(engine=self._engine) as session: - newest_date = ( - session.query(orm.BasicUnit.DatumLetzteAktualisierung) - .filter( - orm.BasicUnit.Einheittyp - == self.unit_type_map_reversed[tech] - ) - .order_by(orm.BasicUnit.DatumLetzteAktualisierung.desc()) - .first() - ) - else: - # If technologies aren't defined ([None]) latest date per data - # is queried in query - # This also leads that the remainder of the loop body is skipped - with session_scope(engine=self._engine) as session: - subquery = session.query( - orm.BasicUnit.Einheittyp, - func.max(orm.BasicUnit.DatumLetzteAktualisierung).label( - "maxdate" - ), - ).group_by(orm.BasicUnit.Einheittyp) - dates = [s[1] for s in subquery] - technology_list = [self.unit_type_map[s[0]] for s in subquery] - # Break the for loop over data here, because we - # write technology_list and dates at once - break - - # Add date to dates list - if newest_date: - dates.append(newest_date[0]) - # Cover the case where no data is in the database and latest is still used - else: - dates.append(None) - - return dates - - def _delete_missed_data_from_request_table( - self, table_identifier, session, missed_requests, requested_chunk - ): - if table_identifier == "additional_data": - id_attribute = "additional_data_id" - elif table_identifier == "additional_location_data": - id_attribute = "LokationMastrNummer" - - missed_entry_ids = [e[0] for e in missed_requests] - for missed_req in missed_requests: - missed = ( - orm.MissedAdditionalData( - additional_data_id=missed_req[0], reason=missed_req[1] - ) - if table_identifier == "additional_data" - else orm.MissedExtendedLocation( - LokationMastrNummer=missed_req[0], - reason=missed_req[1], - ) - ) - session.add(missed) - session.commit() - # Remove entries from additional data request table if additional data - # was retrieved - deleted_entries = [] - for requested_entry in requested_chunk: - if getattr(requested_entry, id_attribute) not in missed_entry_ids: - session.delete(requested_entry) - deleted_entries.append(getattr(requested_entry, id_attribute)) - log.info( - f"Missed requests: {len(missed_requests)}. " - f"Deleted requests: {len(deleted_entries)}." - ) - session.commit() - - def _preprocess_additional_data_entry(self, unit_dat, technology, data_type): - unit_dat = self._add_data_source_and_download_date(unit_dat) - # Remove query status information from response - for exclude in [ - "Ergebniscode", - "AufrufVeraltet", - "AufrufVersion", - "AufrufLebenszeitEnde", - ]: - del unit_dat[exclude] - - # Pre-serialize dates/datetimes and decimal in hydro Ertuechtigung - # This is required because sqlalchemy does not know how serialize - # dates/decimal of a JSON - if "Ertuechtigung" in unit_dat: - for ertuechtigung in unit_dat["Ertuechtigung"]: - if ertuechtigung["DatumWiederinbetriebnahme"]: - ertuechtigung["DatumWiederinbetriebnahme"] = ertuechtigung[ - "DatumWiederinbetriebnahme" - ].isoformat() - ertuechtigung["ProzentualeErhoehungDesLv"] = float( - ertuechtigung["ProzentualeErhoehungDesLv"] - ) - # Some data (data_in_list) is handed over as type:list, hence - # non-compatible with sqlite or postgresql - # This replaces the list with the first element in the list - - data_as_list = ["NetzbetreiberMastrNummer", "Netzbetreiberzuordnungen"] - - for dat in data_as_list: - if dat in unit_dat and type(unit_dat[dat]) == list: - if len(unit_dat[dat]) > 0: - unit_dat[dat] = f"{unit_dat[dat][0]}" - else: - unit_dat[dat] = None - - # Rename the typo in column zugeordneteWirkleistungWechselrichter - if "zugeordneteWirkleistungWechselrichter" in unit_dat.keys(): - unit_dat["ZugeordneteWirkleistungWechselrichter"] = unit_dat.pop( - "zugeordneteWirkleistungWechselrichter" - ) - - # Create new instance and update potentially existing one - return getattr(orm, self.orm_map[technology][data_type])(**unit_dat) - - def _get_additional_data_requests_from_db( - self, table_identifier, session, data_request_type, data, chunksize - ): - """Retrieves the data that is requested from the database table AdditionalDataRequested.""" - if table_identifier == "additional_data": - requested_chunk = ( - session.query(orm.AdditionalDataRequested) - .filter( - and_( - orm.AdditionalDataRequested.data_type == data_request_type, - orm.AdditionalDataRequested.technology == data, - ) - ) - .limit(chunksize) - ) - - ids = [_.additional_data_id for _ in requested_chunk] - if table_identifier == "additional_location_data": - requested_chunk = ( - session.query(orm.AdditionalLocationsRequested) - .filter( - orm.AdditionalLocationsRequested.location_type == data_request_type - ) - .limit(chunksize) - ) - ids = [_.LokationMastrNummer for _ in requested_chunk] - return requested_chunk, ids - - def _get_units_for_request( - self, data_type, session, additional_data_orm, technology - ): - if data_type == "unit_data": - units_for_request = ( - session.query(orm.BasicUnit) - .outerjoin( - additional_data_orm, - orm.BasicUnit.EinheitMastrNummer - == additional_data_orm.EinheitMastrNummer, - ) - .filter( - orm.BasicUnit.Einheittyp == self.unit_type_map_reversed[technology] - ) - .filter(additional_data_orm.EinheitMastrNummer.is_(None)) - .filter(orm.BasicUnit.EinheitMastrNummer.isnot(None)) - ) - elif data_type == "eeg_data": - units_for_request = ( - session.query(orm.BasicUnit) - .outerjoin( - additional_data_orm, - orm.BasicUnit.EegMastrNummer == additional_data_orm.EegMastrNummer, - ) - .filter( - orm.BasicUnit.Einheittyp == self.unit_type_map_reversed[technology] - ) - .filter(additional_data_orm.EegMastrNummer.is_(None)) - .filter(orm.BasicUnit.EegMastrNummer.isnot(None)) - ) - elif data_type == "kwk_data": - units_for_request = ( - session.query(orm.BasicUnit) - .outerjoin( - additional_data_orm, - orm.BasicUnit.KwkMastrNummer == additional_data_orm.KwkMastrNummer, - ) - .filter( - orm.BasicUnit.Einheittyp == self.unit_type_map_reversed[technology] - ) - .filter(additional_data_orm.KwkMastrNummer.is_(None)) - .filter(orm.BasicUnit.KwkMastrNummer.isnot(None)) - ) - elif data_type == "permit_data": - units_for_request = ( - session.query(orm.BasicUnit) - .outerjoin( - additional_data_orm, - orm.BasicUnit.GenMastrNummer == additional_data_orm.GenMastrNummer, - ) - .filter( - orm.BasicUnit.Einheittyp == self.unit_type_map_reversed[technology] - ) - .filter(additional_data_orm.GenMastrNummer.is_(None)) - .filter(orm.BasicUnit.GenMastrNummer.isnot(None)) - ) - else: - raise ValueError(f"Data type {data_type} is not a valid option.") - - return units_for_request - - def dump(self, dumpfile="open-mastr-continuous-update.backup"): - """ - Dump MaStR database. - - Parameters - ---------- - dumpfile : str or path-like, optional - Save path for dump including filename. When only a filename is given, - the dump is saved to CWD. - """ - dump_cmd = ( - f"pg_dump -Fc " - f"-f {dumpfile} " - f"-n mastr_mirrored " - f"-h localhost " - f"-U open-mastr " - f"-p 55443 " - f"open-mastr" - ) - - proc = subprocess.Popen(dump_cmd, shell=True, env={"PGPASSWORD": "open-mastr"}) - proc.wait() - - def restore(self, dumpfile): - """ - Restore the MaStR database from an SQL dump. - - Parameters - ---------- - dumpfile : str or path-like, optional - Save path for dump including filename. When only a filename is given, the - dump is restored from CWD. - - - !!! warning - - If tables that are restored from the dump contain data, restore doesn't work! - - """ - # Interpret file name and path - dump_file_dir, dump_file = os.path.split(dumpfile) - cwd = os.path.abspath(os.path.dirname(dump_file_dir)) - - # Define import of SQL dump with pg_restore - restore_cmd = ( - f"pg_restore -h localhost -U open-mastr -p 55443 -d open-mastr {dump_file}" - ) - restore_cmd = shlex.split(restore_cmd) - - # Execute restore command - proc = subprocess.Popen( - restore_cmd, - shell=False, - env={"PGPASSWORD": "open-mastr"}, - cwd=cwd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - proc.wait() - - -def list_of_dicts_to_columns(row) -> pd.Series: # FIXME: Function not used - """ - Expand data stored in dict to spearate columns - - Parameters - ---------- - row: list of dict - Usually applied using apply on a column of a pandas DataFrame, - hence, a Series. This column of the - DataFrame should consist of a single-level dict with an - arbitrary number of columns. Each key is - transformed into a new column, while data from - each dict inside the list is concatenated by key. Such - that the data is stored into a list for each key/column. - - Returns - ------- - pd.Series - Pandas Series with keys as columns and values concatenated to a list for each key. - """ - columns = {k: [] for dic in row for k, _ in dic.items()} - for dic in row: - for k, v in dic.items(): - columns[k].append(v) - - return pd.Series(columns) diff --git a/open_mastr/soap_api/parallel.py b/open_mastr/soap_api/parallel.py deleted file mode 100644 index 7d6eb571..00000000 --- a/open_mastr/soap_api/parallel.py +++ /dev/null @@ -1,28 +0,0 @@ -# import datetime -# import multiprocessing -# import logging -# import tqdm -# -# from open_mastr.soap_api.utils import is_time_blacklisted -# log = logging.getLogger(__name__) -# -# last_successful_download = datetime.datetime.now() -# -# # Check if worker threads need to be killed -# -# -# def _stop_execution(time_blacklist, timeout): -# # Last successful execution was more than 10 minutes ago. Server seems -# # unresponsive, so stop execution permanently by raising an error -# if last_successful_download + datetime.timedelta(minutes=timeout) < datetime.datetime.now(): -# log.error('No response from server in the last {} minutes. Stopping execution.'.format(timeout)) -# raise ConnectionAbortedError -# # Stop execution smoothly if current system time is in blacklist by -# # returning. Calling function can decide to continue running later if -# # needed -# if time_blacklist and is_time_blacklisted(last_successful_download.time()): -# log.info('Current time is in blacklist. Halting.') -# return True -# # ... Add more checks here if needed ... -# return False -# diff --git a/open_mastr/soap_api/utils.py b/open_mastr/soap_api/utils.py deleted file mode 100644 index 117a08a8..00000000 --- a/open_mastr/soap_api/utils.py +++ /dev/null @@ -1,38 +0,0 @@ -# __copyright__ = "© Reiner Lemoine Institut" -# __license__ = "GNU Affero General Public License Version 3 (AGPL-3.0)" -# __url__ = "https://www.gnu.org/licenses/agpl-3.0.en.html" -# __author__ = "Ludee; Bachibouzouk; solar-c" -# __issue__ = "https://github.com/OpenEnergyPlatform/examples/issues/52" -# __version__ = "v0.10.0" -# -# import logging -# import datetime -# -# -# log = logging.getLogger(__name__) -# -# -# def is_time_blacklisted(time): -# times_blacklist = [ -# ('8:00', '18:00'), # BNetzA Business hours -# ('23:30', '00:10'), # Daily database cronjob -# # Add more if needed... -# ] -# -# # check if time is in a given interval between upper and lower -# def in_interval(lower, upper): -# # Convert str to datatime object -# def parse_time(t): -# return datetime.datetime.strptime(t, "%H:%M").time() -# lower = parse_time(lower) -# upper = parse_time(upper) -# -# # Handle interval that spans over midnight (i.e. 23:30-0:30) -# if lower > upper: -# return (time <= upper or time >= lower) -# # Handle all other intevals -# return (lower <= time and upper >= time) -# -# # check if time is in interval for each interval in the blacklist -# in_interval = [in_interval(lower, upper) for lower, upper in times_blacklist] -# return any(in_interval) diff --git a/open_mastr/utils/constants.py b/open_mastr/utils/constants.py index e5cc476b..210d438d 100644 --- a/open_mastr/utils/constants.py +++ b/open_mastr/utils/constants.py @@ -22,19 +22,6 @@ "storage_units", ] -# Possible values for parameter 'data' with API download method -API_DATA = [ - "wind", - "solar", - "biomass", - "hydro", - "gsgk", - "combustion", - "nuclear", - "storage", - "location", - "permit", -] # Technology related values of parameter 'data' # Methods like Mastr.to_csv() must separate these from additional tables @@ -70,9 +57,6 @@ "storage_units", ] -# Possible data types for API download -API_DATA_TYPES = ["unit_data", "eeg_data", "kwk_data", "permit_data"] - # Possible location types for API download API_LOCATION_TYPES = [ "location_elec_generation", diff --git a/open_mastr/utils/helpers.py b/open_mastr/utils/helpers.py index 9ac2492b..f4361a0e 100644 --- a/open_mastr/utils/helpers.py +++ b/open_mastr/utils/helpers.py @@ -1,8 +1,7 @@ import os import json -import sys from contextlib import contextmanager -from datetime import date, datetime +from datetime import date from warnings import warn from zipfile import BadZipfile, ZipFile @@ -15,7 +14,6 @@ import pandas as pd from tqdm import tqdm -from open_mastr.soap_api.metadata.create import create_datapackage_meta_json from open_mastr.utils import orm from open_mastr.utils.config import ( get_filenames, @@ -23,13 +21,10 @@ column_renaming, ) -from open_mastr.soap_api.download import MaStRAPI, log +from open_mastr.soap_api.download import log from open_mastr.utils.constants import ( BULK_DATA, TECHNOLOGIES, - API_DATA, - API_DATA_TYPES, - API_LOCATION_TYPES, BULK_INCLUDE_TABLES_MAP, BULK_ADDITIONAL_TABLES_CSV_EXPORT_MAP, ORM_MAP, @@ -83,11 +78,6 @@ def validate_parameter_format_for_download_method( data, date, bulk_cleansing, - api_processes, - api_limit, - api_chunksize, - api_data_types, - api_location_types, **kwargs, ) -> None: if "technology" in kwargs: @@ -104,60 +94,11 @@ def validate_parameter_format_for_download_method( validate_parameter_data(method, data) validate_parameter_date(method, date) validate_parameter_bulk_cleansing(bulk_cleansing) - validate_parameter_api_processes(api_processes) - validate_parameter_api_limit(api_limit) - validate_parameter_api_chunksize(api_chunksize) - validate_parameter_api_data_types(api_data_types) - validate_parameter_api_location_types(api_location_types) - - raise_warning_for_invalid_parameter_combinations( - method, - bulk_cleansing, - date, - api_processes, - api_data_types, - api_location_types, - api_limit, - api_chunksize, - ) def validate_parameter_method(method) -> None: - if method not in ["bulk", "API"]: - raise ValueError("parameter method has to be either 'bulk', or 'API'.") - - -def validate_parameter_api_location_types(api_location_types) -> None: - if not isinstance(api_location_types, list) and api_location_types is not None: - raise ValueError("parameter api_location_types has to be a list or 'None'.") - - if isinstance(api_location_types, list): - if not api_location_types: # api_location_types == [] - raise ValueError("parameter api_location_types cannot be an empty list!") - for value in api_location_types: - if value not in API_LOCATION_TYPES: - raise ValueError( - f"list entries of api_data_types have to be in {API_LOCATION_TYPES}." - ) - - -def validate_parameter_api_data_types(api_data_types) -> None: - if not isinstance(api_data_types, list) and api_data_types is not None: - raise ValueError("parameter api_data_types has to be a list or 'None'.") - - if isinstance(api_data_types, list): - if not api_data_types: # api_data_types == [] - raise ValueError("parameter api_data_types cannot be an empty list!") - for value in api_data_types: - if value not in API_DATA_TYPES: - raise ValueError( - f"list entries of api_data_types have to be in {API_DATA_TYPES}." - ) - - -def validate_parameter_api_chunksize(api_chunksize) -> None: - if not isinstance(api_chunksize, int) and api_chunksize is not None: - raise ValueError("parameter api_chunksize has to be an integer or 'None'.") + if method != "bulk": + raise ValueError("parameter method has to be 'bulk'.") def validate_parameter_bulk_cleansing(bulk_cleansing) -> None: @@ -165,11 +106,6 @@ def validate_parameter_bulk_cleansing(bulk_cleansing) -> None: raise ValueError("parameter bulk_cleansing has to be boolean") -def validate_parameter_api_limit(api_limit) -> None: - if not isinstance(api_limit, int) and api_limit is not None: - raise ValueError("parameter api_limit has to be an integer or 'None'.") - - def validate_parameter_date(method, date) -> None: if date is None: # default return @@ -182,26 +118,6 @@ def validate_parameter_date(method, date) -> None: "Parameter date has to be a proper date in the format yyyymmdd" "or 'today' for bulk method." ) from e - elif method == "API": - if not isinstance(date, datetime) and date != "latest": - raise ValueError( - "parameter api_date has to be 'latest' or a datetime object or 'None'" - " for API method." - ) - - -def validate_parameter_api_processes(api_processes) -> None: - if api_processes not in ["max", None] and not isinstance(api_processes, int): - raise ValueError( - "parameter api_processes has to be 'max' or an integer or 'None'" - ) - if api_processes == "max" or isinstance(api_processes, int): - system = sys.platform - if system not in ["linux2", "linux"]: - raise ValueError( - "The functionality of multiprocessing only works on Linux based systems. " - "On your system, the parameter api_processes has to be 'None'." - ) def validate_parameter_data(method, data) -> None: @@ -217,10 +133,6 @@ def validate_parameter_data(method, data) -> None: raise ValueError( f"Allowed values for parameter data with bulk method are {BULK_DATA}" ) - if method == "API" and value not in API_DATA: - raise ValueError( - f"Allowed values for parameter data with API method are {API_DATA}" - ) if method == "csv_export" and value not in TECHNOLOGIES + ADDITIONAL_TABLES: raise ValueError( "Allowed values for CSV export are " @@ -228,92 +140,39 @@ def validate_parameter_data(method, data) -> None: ) -def raise_warning_for_invalid_parameter_combinations( - method, - bulk_cleansing, - date, - api_processes, - api_data_types, - api_location_types, - api_limit, - api_chunksize, -): - if method == "API" and bulk_cleansing is not True: - warn( - "For method = 'API', bulk download related parameters " - "(with prefix bulk_) are ignored." - ) - - if method == "bulk" and ( - any( - parameter is not None - for parameter in [ - api_processes, - api_data_types, - api_location_types, - ] - ) - or api_limit != 50 - or api_chunksize != 1000 - ): - warn( - "For method = 'bulk', API related parameters (with prefix api_) are ignored." - ) - - -def transform_data_parameter( - method, data, api_data_types, api_location_types, **kwargs -): +def transform_data_parameter(data, **kwargs): """ Parse input parameters related to data as lists. Harmonize variables for later use. Data output depends on the possible data types of chosen method. """ + + # data was named technology in an early version of open-mastr data = kwargs.get("technology", data) + # parse parameters as list if isinstance(data, str): data = [data] elif data is None: - data = BULK_DATA if method == "bulk" else API_DATA - if api_data_types is None: - api_data_types = API_DATA_TYPES - if api_location_types is None: - api_location_types = API_LOCATION_TYPES - - # data input harmonisation - harmonisation_log = [] - if method == "API" and "permit" in data: - data.remove("permit") - api_data_types.append( - "permit_data" - ) if "permit_data" not in api_data_types else api_data_types - harmonisation_log.append("permit") - - if method == "API" and "location" in data: - data.remove("location") - api_location_types = API_LOCATION_TYPES - harmonisation_log.append("location") - - return data, api_data_types, api_location_types, harmonisation_log - - -def transform_date_parameter(self, method, date, **kwargs): - if method == "bulk": - date = kwargs.get("bulk_date", date) - date = "today" if date is None else date - if date == "existing": - log.warning( - """ - The date parameter 'existing' is deprecated and will be removed in the future. - The date parameter is set to `today`. + data = BULK_DATA + + return data - If this change causes problems for you, please comment in this issue on github: - https://github.com/OpenEnergyPlatform/open-MaStR/issues/616#issuecomment-3089377062 +def transform_date_parameter(self, date, **kwargs): + date = kwargs.get("bulk_date", date) + date = "today" if date is None else date + if date == "existing": + log.warning( """ - ) - date = "today" - elif method == "API": - date = kwargs.get("api_date", date) + The date parameter 'existing' is deprecated and will be removed in the future. + The date parameter is set to `today`. + + If this change causes problems for you, please comment in this issue on github: + https://github.com/OpenEnergyPlatform/open-MaStR/issues/616#issuecomment-3089377062 + + """ + ) + date = "today" return date @@ -333,52 +192,6 @@ def session_scope(engine): session.close() -def print_api_settings( - harmonisation_log, - data, - date, - api_data_types, - api_chunksize, - api_limit, - api_processes, - api_location_types, -): - log.info( - f"Downloading with soap_API.\n\n -- API settings -- \nunits after date: " - f"{date}\nunit download limit per data: " - f"{api_limit}\nparallel_processes: {api_processes}\nchunksize: " - f"{api_chunksize}\ndata_api: {data}" - ) - if "permit" in harmonisation_log: - log.warning( - f"data_types: {api_data_types} - " - "Attention, 'permit_data' was automatically set in api_data_types, " - "as you defined 'permit' in parameter data_api." - ) - - else: - log.info(f"data_types: {api_data_types}") - - if "location" in harmonisation_log: - log.warning( - f"location_types: {api_location_types} - " - "Attention, 'location' is in parameter data. location_types are set accordingly. " - "If you want to change location_types, please remove 'location' " - "from data_api and specify api_location_types." - ) - - else: - log.info( - f"location_types: {api_location_types}", - "\n ------------------ \n", - ) - - -def validate_api_credentials() -> None: - mastr_api = MaStRAPI() - assert mastr_api.GetAktuellerStandTageskontingent()["Ergebniscode"] == "OK" - - def data_to_include_tables(data: list, mapping: str = None) -> list: """ Convert user input 'data' to the list 'include_tables'. @@ -553,42 +366,45 @@ def create_db_query( return query_additional_tables -def save_metadata(data: list = None, engine=None) -> None: - """ - Save metadata during csv export. - - Parameters - ---------- - data: list - List of exported technologies for which metadata is needed. - engine: - User-defined database engine. - - Returns - ------- - - """ - data_path = get_data_version_dir() - filenames = get_filenames() - metadata_file = os.path.join(data_path, filenames["metadata"]) - unit_type_map_reversed = reverse_unit_type_map() - - with session_scope(engine=engine) as session: - # check for latest db entry for exported technologies - mastr_technologies = [unit_type_map_reversed[tech] for tech in data] - newest_date = ( - session.query(orm.BasicUnit.DatumLetzteAktualisierung) - .filter(orm.BasicUnit.Einheittyp.in_(mastr_technologies)) - .order_by(orm.BasicUnit.DatumLetzteAktualisierung.desc()) - .first()[0] - ) - - metadata = create_datapackage_meta_json(newest_date, data, json_serialize=False) - - with open(metadata_file, "w", encoding="utf-8") as f: - json.dump(metadata, f, ensure_ascii=False, indent=4) - - log.info("Saved metadata") +# At the time of commenting this, the call of this function in mastr.py was already +# commented out for more than a year + +# def save_metadata(data: list = None, engine=None) -> None: +# """ +# Save metadata during csv export. +# +# Parameters +# ---------- +# data: list +# List of exported technologies for which metadata is needed. +# engine: +# User-defined database engine. +# +# Returns +# ------- +# +# """ +# data_path = get_data_version_dir() +# filenames = get_filenames() +# metadata_file = os.path.join(data_path, filenames["metadata"]) +# unit_type_map_reversed = reverse_unit_type_map() +# +# with session_scope(engine=engine) as session: +# # check for latest db entry for exported technologies +# mastr_technologies = [unit_type_map_reversed[tech] for tech in data] +# newest_date = ( +# session.query(orm.BasicUnit.DatumLetzteAktualisierung) +# .filter(orm.BasicUnit.Einheittyp.in_(mastr_technologies)) +# .order_by(orm.BasicUnit.DatumLetzteAktualisierung.desc()) +# .first()[0] +# ) +# +# metadata = create_datapackage_meta_json(newest_date, data, json_serialize=False) +# +# with open(metadata_file, "w", encoding="utf-8") as f: +# json.dump(metadata, f, ensure_ascii=False, indent=4) +# +# log.info("Saved metadata") def reverse_fill_basic_units(technology=None, engine=None): diff --git a/open_mastr/utils/orm.py b/open_mastr/utils/orm.py index 667b415e..70890ecb 100644 --- a/open_mastr/utils/orm.py +++ b/open_mastr/utils/orm.py @@ -44,34 +44,6 @@ class BasicUnit(Base): EinheitSystemstatus = Column(String) -class AdditionalDataRequested(Base): - __tablename__ = "additional_data_requested" - - id = Column( - Integer, - Sequence("additional_data_requested_id_seq"), - primary_key=True, - ) - EinheitMastrNummer = Column(String) - additional_data_id = Column(String) - technology = Column(String) - data_type = Column(String) - request_date = Column(DateTime(timezone=True), default=func.now()) - - -class MissedAdditionalData(Base): - __tablename__ = "missed_additional_data" - - id = Column( - Integer, - Sequence("additional_data_missed_id_seq"), - primary_key=True, - ) - additional_data_id = Column(String) - reason = Column(String) - download_date = Column(DateTime(timezone=True), default=func.now()) - - class Extended(object): NetzbetreiberMastrNummer = Column(String) Registrierungsdatum = Column(Date) @@ -436,32 +408,6 @@ class LocationExtended(ParentAllTables, Base): Lokationtyp = Column(String) -class AdditionalLocationsRequested(Base): - __tablename__ = "additional_locations_requested" - - id = Column( - Integer, - Sequence("additional_locations_requested_id_seq"), - primary_key=True, - ) - LokationMastrNummer = Column(String) - location_type = Column(String) - request_date = Column(DateTime(timezone=True), default=func.now()) - - -class MissedExtendedLocation(ParentAllTables, Base): - __tablename__ = "missed_extended_location_data" - - id = Column( - Integer, - Sequence("additional_location_data_missed_id_seq"), - primary_key=True, - ) - LokationMastrNummer = Column(String) - reason = Column(String) - download_date = Column(DateTime(timezone=True), default=func.now()) - - class GasStorage(ParentAllTables, Base): __tablename__ = "gas_storage" diff --git a/scripts/mirror_mastr_csv_export.py b/scripts/mirror_mastr_csv_export.py deleted file mode 100644 index 00cf6812..00000000 --- a/scripts/mirror_mastr_csv_export.py +++ /dev/null @@ -1,27 +0,0 @@ -from open_mastr.utils.helpers import reverse_fill_basic_units, create_db_query - - -technology = [ - "solar", - "wind", - "biomass", - "combustion", - "gsgk", - "hydro", - "nuclear", - "storage", -] - -data_types = ["unit_data", "eeg_data", "kwk_data", "permit_data"] -location_types = [ - "location_elec_generation", - "location_elec_consumption", - "location_gas_generation", - "location_gas_consumption", -] - -# Fill the basic_units table from extended tables -reverse_fill_basic_units() - -# to csv per tech -create_db_query(technology=technology, additional_data=data_types, limit=None) diff --git a/scripts/mirror_mastr_dump.py b/scripts/mirror_mastr_dump.py deleted file mode 100644 index 69a3a3d2..00000000 --- a/scripts/mirror_mastr_dump.py +++ /dev/null @@ -1,9 +0,0 @@ -from open_mastr.soap_api.mirror import MaStRMirror -import datetime - -# Dump data -now = datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S") -dump_file = f"{now}_open-mastr-mirror.backup" - -mastr_refl = MaStRMirror() -mastr_refl.dump(dump_file) diff --git a/scripts/mirror_mastr_update_latest.py b/scripts/mirror_mastr_update_latest.py deleted file mode 100644 index 40c61681..00000000 --- a/scripts/mirror_mastr_update_latest.py +++ /dev/null @@ -1,44 +0,0 @@ -from open_mastr.soap_api.mirror import MaStRMirror -import datetime - -limit = None -technology = [ - "wind", - "biomass", - "combustion", - "gsgk", - "hydro", - "nuclear", - "storage", - "solar", -] -data_types = ["unit_data", "eeg_data", "kwk_data", "permit_data"] -location_types = [ - "location_elec_generation", - "location_elec_consumption", - "location_gas_generation", - "location_gas_consumption", -] -processes = 12 - -mastr_mirror = MaStRMirror( - empty_schema=False, parallel_processes=processes, restore_dump=None -) - -# Download basic unit data -mastr_mirror.backfill_basic(technology, limit=limit, date="latest") - -# Download additional unit data -for tech in technology: - # mastr_mirror.create_additional_data_requests(tech) - for data_type in data_types: - mastr_mirror.retrieve_additional_data( - tech, data_type, chunksize=1000, limit=limit - ) - -# Download basic location data -mastr_mirror.backfill_locations_basic(limit=limit, date="latest") - -# Download extended location data -for location_type in location_types: - mastr_mirror.retrieve_additional_location_data(location_type, limit=limit) diff --git a/tests/soap_api/test_download.py b/tests/soap_api/test_download.py index e4dc4c0d..2ff42097 100644 --- a/tests/soap_api/test_download.py +++ b/tests/soap_api/test_download.py @@ -1,6 +1,5 @@ -from open_mastr.soap_api.download import MaStRAPI, MaStRDownload, flatten_dict +from open_mastr.soap_api.download import MaStRAPI import pytest -import datetime @pytest.fixture @@ -13,11 +12,6 @@ def mastr_api(): return MaStRAPI() -@pytest.fixture -def mastr_download(): - return MaStRDownload() - - @pytest.mark.dependency(name="db_reachable") def test_soap_wrapper_connection(mastr_api_fake_credentials): mastr_api = mastr_api_fake_credentials @@ -59,159 +53,3 @@ def test_soap_wrapper_power_plant_list(mastr_api): assert key in einheit assert response["Ergebniscode"] == "OkWeitereDatenVorhanden" - - -def test_basic_unit_data(mastr_download): - data = [ - unit - for sublist in mastr_download.basic_unit_data(data="nuclear", limit=1) - for unit in sublist - ] - - assert len(data) == 1 - assert data[0]["Einheittyp"] == "Kernenergie" - - -def test_additional_data_nuclear(mastr_download): - - data_fcns = [ - ("SME963513379837", "extended_unit_data"), - ("SGE951929415553", "permit_unit_data"), - ] - - for mastr_nummer, data_fcn in data_fcns: - units_downloaded, units_missed = mastr_download.additional_data( - "nuclear", [mastr_nummer], data_fcn - ) - - assert len(units_downloaded) + len(units_missed) == 1 - - -def test_additional_data_biomass(mastr_download): - - data_fcns = [ - ("SEE936595511945", "extended_unit_data"), - ("EEG929630520224", "extended_unit_data"), - ("KWK998480117397", "kwk_unit_data"), - ("SGE974254715891", "permit_unit_data"), - ] - - for mastr_nummer, data_fcn in data_fcns: - units_downloaded, units_missed = mastr_download.additional_data( - "biomass", [mastr_nummer], data_fcn - ) - - assert len(units_downloaded) + len(units_missed) == 1 - - -def test_flatten_dict(): - data_before_flatten = [ - { - "Frist": {"Wert": datetime.date(2017, 6, 15), "NichtVorhanden": False}, - "WasserrechtsNummer": None, - "WasserrechtAblaufdatum": {"Wert": None, "NichtVorhanden": False}, - "Registrierungsdatum": datetime.date(2019, 9, 11), - "VerknuepfteEinheiten": [ - { - "MaStRNummer": "SEE900290686291", - "Einheittyp": "Windeinheit", - "Einheitart": "Stromerzeugungseinheit", - } - ], - }, - { - "Frist": {"Wert": datetime.date(2018, 3, 27), "NichtVorhanden": False}, - "WasserrechtsNummer": None, - "WasserrechtAblaufdatum": {"Wert": None, "NichtVorhanden": False}, - "Registrierungsdatum": datetime.date(2019, 2, 4), - "VerknuepfteEinheiten": [ - { - "MaStRNummer": "SEE908999761141", - "Einheittyp": "Windeinheit", - "Einheitart": "Stromerzeugungseinheit", - }, - { - "MaStRNummer": "SEE978389475514", - "Einheittyp": "Windeinheit", - "Einheitart": "Stromerzeugungseinheit", - }, - { - "MaStRNummer": "SEE960287756734", - "Einheittyp": "Windeinheit", - "Einheitart": "Stromerzeugungseinheit", - }, - ], - }, - { - "Frist": {"Wert": datetime.date(2018, 3, 23), "NichtVorhanden": False}, - "WasserrechtsNummer": None, - "WasserrechtAblaufdatum": {"Wert": None, "NichtVorhanden": False}, - "Registrierungsdatum": datetime.date(2019, 2, 15), - "VerknuepfteEinheiten": [ - { - "MaStRNummer": "SEE902566900605", - "Einheittyp": "Windeinheit", - "Einheitart": "Stromerzeugungseinheit", - }, - { - "MaStRNummer": "SEE945851284479", - "Einheittyp": "Windeinheit", - "Einheitart": "Stromerzeugungseinheit", - }, - { - "MaStRNummer": "SEE975973981666", - "Einheittyp": "Windeinheit", - "Einheitart": "Stromerzeugungseinheit", - }, - ], - }, - { - "Frist": {"Wert": datetime.date(2016, 1, 23), "NichtVorhanden": False}, - "WasserrechtsNummer": None, - "WasserrechtAblaufdatum": {"Wert": None, "NichtVorhanden": False}, - "Registrierungsdatum": datetime.date(2019, 2, 7), - "VerknuepfteEinheiten": [ - { - "MaStRNummer": "SEE969499157391", - "Einheittyp": "Windeinheit", - "Einheitart": "Stromerzeugungseinheit", - }, - { - "MaStRNummer": "SEE949984955732", - "Einheittyp": "Windeinheit", - "Einheitart": "Stromerzeugungseinheit", - }, - ], - }, - ] - data_after_flatten = [ - { - "Frist": datetime.date(2017, 6, 15), - "WasserrechtsNummer": None, - "WasserrechtAblaufdatum": None, - "Registrierungsdatum": datetime.date(2019, 9, 11), - "VerknuepfteEinheiten": "SEE900290686291", - }, - { - "Frist": datetime.date(2018, 3, 27), - "WasserrechtsNummer": None, - "WasserrechtAblaufdatum": None, - "Registrierungsdatum": datetime.date(2019, 2, 4), - "VerknuepfteEinheiten": "SEE908999761141, SEE978389475514, SEE960287756734", - }, - { - "Frist": datetime.date(2018, 3, 23), - "WasserrechtsNummer": None, - "WasserrechtAblaufdatum": None, - "Registrierungsdatum": datetime.date(2019, 2, 15), - "VerknuepfteEinheiten": "SEE902566900605, SEE945851284479, SEE975973981666", - }, - { - "Frist": datetime.date(2016, 1, 23), - "WasserrechtsNummer": None, - "WasserrechtAblaufdatum": None, - "Registrierungsdatum": datetime.date(2019, 2, 7), - "VerknuepfteEinheiten": "SEE969499157391, SEE949984955732", - }, - ] - assert flatten_dict(data_before_flatten) == data_after_flatten diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 7779a9c8..5f8cfa81 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -1,25 +1,21 @@ import pytest import os from os.path import expanduser -import sys +import itertools + import random from os.path import join -from datetime import datetime + import pandas as pd from open_mastr import Mastr from zipfile import ZipFile from open_mastr.utils import orm -from open_mastr.utils.constants import ( - API_LOCATION_TYPES, - TECHNOLOGIES, - ADDITIONAL_TABLES, -) +from open_mastr.utils.constants import TECHNOLOGIES, ADDITIONAL_TABLES, BULK_DATA from open_mastr.utils.config import get_data_version_dir, create_data_dir from open_mastr.utils.helpers import ( validate_parameter_format_for_download_method, validate_parameter_format_for_mastr_init, - validate_api_credentials, transform_data_parameter, data_to_include_tables, session_scope, @@ -47,9 +43,8 @@ def db(): return Mastr() -@pytest.fixture -def parameter_dict_working_list(): - parameter_dict_bulk = { +def test_Mastr_validate_working_parameter(): + valid_params = { "method": ["bulk"], "data": [ "wind", @@ -75,171 +70,55 @@ def parameter_dict_working_list(): ], "date": ["today", "20200108", "existing"], "bulk_cleansing": [True, False], - "api_processes": [None], - "api_limit": [50], - "api_chunksize": [1000], - "api_data_types": [None], - "api_location_types": [None], } + method_vals = valid_params["method"] + data_vals = valid_params["data"] + date_vals = valid_params["date"] + bulk_cleansing_vals = valid_params["bulk_cleansing"] + combinations = list( + itertools.product(method_vals, data_vals, date_vals, bulk_cleansing_vals) + ) - parameter_dict_API = { - "method": ["API"], - "data": [ - "wind", - "solar", - "biomass", - "hydro", - "gsgk", - "combustion", - "nuclear", - "storage", - "location", - "permit", - None, - ["wind", "solar"], - ], - "date": [None, datetime(2022, 2, 2), "latest"], - "bulk_cleansing": [True], - "api_processes": [None] - if sys.platform not in ["linux2", "linux"] - else [2, 20, None, "max"], - "api_limit": [15, None], - "api_chunksize": [20], - "api_data_types": [ - ["unit_data", "eeg_data", "kwk_data", "permit_data"], - ["unit_data"], - None, - ], - "api_location_types": [ - ["location_elec_generation", "location_elec_consumption"], - [ - "location_elec_generation", - "location_elec_consumption", - "location_gas_generation", - "location_gas_consumption", - ], - None, - ], - } - return [parameter_dict_bulk, parameter_dict_API] + # working parameters + for method, data, date, bulk_cleansing in combinations: + assert ( + validate_parameter_format_for_download_method( + method, + data, + date, + bulk_cleansing, + ) + is None + ) -@pytest.fixture -def parameter_dict_not_working(): - parameter_dict = { +def test_Mastr_validate_not_working_parameter(): + invalid_params = { "method": [5, "BULK", "api"], "data": ["wint", "Solar", "biomasse", 5, []], "date": [124, "heute", 123], "bulk_cleansing": ["cleansing", 4, None], - "api_processes": ["20", "None"], - "api_limit": ["15", "None"], - "api_chunksize": ["20"], - "api_data_types": ["unite_data", 5, []], - "api_location_types": ["locatione_elec_generation", 5, []], } - return parameter_dict - - -def test_Mastr_validate_working_parameter(parameter_dict_working_list): - for parameter_dict_working in parameter_dict_working_list: - parameter_dict = { - key: parameter_dict_working[key][0] for key in parameter_dict_working - } - - # working parameters - for key in list(parameter_dict_working.keys()): - for value in parameter_dict_working[key]: - parameter_dict[key] = value - ( - method, - data, - date, - bulk_cleansing, - api_processes, - api_limit, - api_chunksize, - api_data_types, - api_location_types, - ) = get_parameters_from_parameter_dict(parameter_dict) - assert ( - validate_parameter_format_for_download_method( - method, - data, - date, - bulk_cleansing, - api_processes, - api_limit, - api_chunksize, - api_data_types, - api_location_types, - ) - is None + for key, invalid_values in invalid_params.items(): + for invalid_value in invalid_values: + params = { + "method": "bulk", + "data": "wind", + "date": "today", + "bulk_cleansing": True, + } + params[key] = invalid_value + + with pytest.raises(ValueError): + validate_parameter_format_for_download_method( + params["method"], + params["data"], + params["date"], + params["bulk_cleansing"], ) -def test_Mastr_validate_not_working_parameter( - parameter_dict_working_list, parameter_dict_not_working -): - for parameter_dict_working in parameter_dict_working_list: - parameter_dict_initial = { - key: parameter_dict_working[key][0] for key in parameter_dict_working - } - - # not working parameters - for key in list(parameter_dict_not_working.keys()): - for value in parameter_dict_not_working[key]: - # reset parameter_dict so that all parameters are working except one - parameter_dict = parameter_dict_initial.copy() - parameter_dict[key] = value - ( - method, - data, - date, - bulk_cleansing, - api_processes, - api_limit, - api_chunksize, - api_data_types, - api_location_types, - ) = get_parameters_from_parameter_dict(parameter_dict) - with pytest.raises(ValueError): - validate_parameter_format_for_download_method( - method, - data, - date, - bulk_cleansing, - api_processes, - api_limit, - api_chunksize, - api_data_types, - api_location_types, - ) - - -def get_parameters_from_parameter_dict(parameter_dict): - method = parameter_dict["method"] - data = parameter_dict["data"] - date = parameter_dict["date"] - bulk_cleansing = parameter_dict["bulk_cleansing"] - api_processes = parameter_dict["api_processes"] - api_limit = parameter_dict["api_limit"] - api_chunksize = parameter_dict["api_chunksize"] - api_data_types = parameter_dict["api_data_types"] - api_location_types = parameter_dict["api_location_types"] - return ( - method, - data, - date, - bulk_cleansing, - api_processes, - api_limit, - api_chunksize, - api_data_types, - api_location_types, - ) - - def test_validate_parameter_format_for_mastr_init(db): engine_list_working = ["sqlite", db.engine] engine_list_failing = ["HI", 12] @@ -253,21 +132,15 @@ def test_validate_parameter_format_for_mastr_init(db): def test_transform_data_parameter(): - (data, api_data_types, api_location_types, harm_log,) = transform_data_parameter( - method="API", - data=["wind", "location"], - api_data_types=["eeg_data"], - api_location_types=None, + data_first = transform_data_parameter( + data="wind", ) + assert data_first == ["wind"] - assert data == ["wind"] - assert api_data_types == ["eeg_data"] - assert api_location_types == API_LOCATION_TYPES - assert harm_log == ["location"] - - -def test_validate_api_credentials(): - validate_api_credentials() + data_second = transform_data_parameter( + data=None, + ) + assert data_second == BULK_DATA def test_data_to_include_tables(): @@ -308,98 +181,6 @@ def test_data_to_include_tables_error(): data_to_include_tables(data=["wind", "hydro"], mapping="X32J_22") -@pytest.mark.skipif( - not _db_exists, - reason="The database is smaller than 1 MB, thus suspected to be empty or non-existent.", -) -def test_db_query_to_csv(tmpdir, engine): - """ - The test checks for 2 random tech and 2 random additional tables: - 1. If csv's have been created and encoded in 'utf-8' and are not empty - 2. For techs, if limited (limit=60) EinheitMastrNummer in basic_units are included in CSV file - 3. For additional_tables, if csv is not empty - - Parameters - ---------- - tmpdir - temporary directory to test export - engine - sqlite engine from conftest.py - - Returns - ------- - - """ - unit_type_map_reversed = reverse_unit_type_map() - - # FIXME: Define path to tmpdir (pytest internal temporary dir) - # to not delete actual data export, when test is run locally - # Use the parameter that will be implemented in #394 - # create data dir - create_data_dir() - - techs = random.sample(TECHNOLOGIES, k=2) - addit_tables = random.sample(ADDITIONAL_TABLES, k=2) - - with session_scope(engine=engine) as session: - for tech in techs: - db_query_to_csv( - db_query=create_db_query(tech=tech, limit=60, engine=engine), - data_table=tech, - chunksize=20, - ) - - # Test if LIMITED EinheitMastrNummer in basic_units are included in CSV file - csv_path = join( - get_data_version_dir(), - f"bnetza_mastr_{tech}_raw.csv", - ) - # check if table has been created and encoding is correct - df_tech = pd.read_csv( - csv_path, index_col="EinheitMastrNummer", encoding="utf-8" - ) - - # check whether table is empty (returns True if it is) - assert False == df_tech.empty - - units = session.query(orm.BasicUnit.EinheitMastrNummer).filter( - orm.BasicUnit.Einheittyp == unit_type_map_reversed[tech] - ) - set_MastrNummer = {unit.EinheitMastrNummer for unit in units} - for idx in df_tech.index: - assert idx in set_MastrNummer - - # FIXME: delete when tmpdir is implemented - # remove file in data folder - os.remove(csv_path) - - for addit_table in addit_tables: - csv_path = join( - get_data_version_dir(), - f"bnetza_mastr_{addit_table}_raw.csv", - ) - - db_query_to_csv( - db_query=create_db_query( - additional_table=addit_table, limit=60, engine=engine - ), - data_table=addit_table, - chunksize=20, - ) - - # check if table has been created and encoding is correct - df_at = pd.read_csv(csv_path, encoding="utf-8") - - # check if table is empty (returns True if it is) - assert False == df_at.empty - - # FIXME: delete when tmpdir is implemented - # remove file in data folder - os.remove(csv_path) - - # FIXME: delete when tmpdir is implemented - # delete empty data dir - os.rmdir(get_data_version_dir()) - - def test_delete_zip_file_if_corrupted(): test_zip_path = os.path.join("tests", "test.zip") with ZipFile(test_zip_path, "w") as zf: @@ -410,8 +191,3 @@ def test_delete_zip_file_if_corrupted(): delete_zip_file_if_corrupted(test_zip_path) assert not os.path.exists(test_zip_path) - - -def test_save_metadata(): - # FIXME: implement in #386 - pass diff --git a/tests/test_mastr.py b/tests/test_mastr.py index 16f7c1f6..ce7cd6fa 100644 --- a/tests/test_mastr.py +++ b/tests/test_mastr.py @@ -55,7 +55,6 @@ def test_Mastr_init(db): # test if folder structure exists assert os.path.exists(db.home_directory) assert os.path.exists(db._sqlite_folder_path) - # test if engine and connection were created assert type(db.engine) == sqlalchemy.engine.Engine diff --git a/tests/xml_download/test_utils_write_to_database.py b/tests/xml_download/test_utils_write_to_database.py index 75243b1a..bf54f16d 100644 --- a/tests/xml_download/test_utils_write_to_database.py +++ b/tests/xml_download/test_utils_write_to_database.py @@ -12,7 +12,7 @@ from sqlalchemy.sql import text from open_mastr.utils import orm -from open_mastr.utils.orm import RetrofitUnits, NuclearExtended, tablename_mapping +from open_mastr.utils.orm import RetrofitUnits, ElectricityConsumer, tablename_mapping from open_mastr.xml_download.utils_write_to_database import ( add_missing_columns_to_table, add_zero_as_first_character_for_too_short_string, @@ -220,19 +220,20 @@ def test_correct_ordering_of_filelist(): not _xml_file_exists, reason="The zipped xml file could not be found." ) def test_read_xml_file(zipped_xml_file_path): + file_name = "EinheitenStromVerbraucher" with ZipFile(zipped_xml_file_path, "r") as f: - df = read_xml_file(f, "EinheitenKernkraft.xml") + df = read_xml_file(f, f"{file_name}.xml") assert df.shape[0] > 0 # Since the file is from the latest download, its content can vary over time. To make sure that the table is # correctly created, we check that all of its columns are associated are included in our mapping. for column in df.columns: - if column in tablename_mapping["einheitenkernkraft"]["replace_column_names"]: - column = tablename_mapping["einheitenkernkraft"]["replace_column_names"][ + if column in tablename_mapping[file_name.lower()]["replace_column_names"]: + column = tablename_mapping[file_name.lower()]["replace_column_names"][ column ] - assert column in NuclearExtended.__table__.columns.keys() + assert column in ElectricityConsumer.__table__.columns.keys() def test_add_zero_as_first_character_for_too_short_string():