Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 106 additions & 160 deletions open_mastr/mastr.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import os
from sqlalchemy import inspect, create_engine
from pathlib import Path
from sqlalchemy import inspect, create_engine, Engine, Table
from sqlalchemy.orm import DeclarativeBase
from typing import Literal, Optional, Type, TypeVar, Union
from collections.abc import Mapping

# import xml dependencies
from open_mastr.xml_download.utils_download_bulk import (
download_documentation,
download_xml_Mastr,
delete_xml_files_not_from_given_date,
)
from open_mastr.xml_download.utils_write_to_database import (
write_mastr_xml_to_database,
)
from open_mastr.utils.xsd_tables import MastrTableDescription, read_mastr_table_descriptions_from_xsd

from open_mastr.utils.helpers import (
validate_parameter_format_for_download_method,
Expand All @@ -34,13 +40,18 @@
setup_logger,
)
import open_mastr.utils.orm as orm
from open_mastr.utils.sqlalchemy_tables import make_sqlalchemy_model_from_mastr_table_description

# constants
from open_mastr.utils.constants import TECHNOLOGIES, ADDITIONAL_TABLES

# setup logger
log = setup_logger()

# TODO: Repeating Type[DeclarativeBase_T] in function signatures is strange. There must be a better option.
DeclarativeBase_T = TypeVar("DeclarativeBase_T", bound=DeclarativeBase)
FALLBACK_DOCS_PATH = Path(__file__).parent / "resources" / "Dokumentation-MaStR-Gesamtdatenexport-20251227-Fallback.zip"


class Mastr:
"""
Expand Down Expand Up @@ -71,21 +82,23 @@ class Mastr:

"""

def __init__(self, engine="sqlite", connect_to_translated_db=False) -> None:
def __init__(
self,
engine: Union[Engine, Literal["sqlite"]] = "sqlite",
mastr_table_to_db_table_name: Optional[dict[str, str]] = None,
output_dir: Optional[Union[str, Path]] = None,
home_dir: Optional[Union[str, Path]] = None,
) -> None:
validate_parameter_format_for_mastr_init(engine)

self.output_dir = get_output_dir()
self.home_directory = get_project_home_dir()
self.output_dir = output_dir or get_output_dir()
self.home_directory = home_dir or get_project_home_dir()

self._sqlite_folder_path = os.path.join(self.output_dir, "data", "sqlite")

os.makedirs(self._sqlite_folder_path, exist_ok=True)

self.is_translated = connect_to_translated_db
if connect_to_translated_db:
self.engine = create_translated_database_engine(
engine, self._sqlite_folder_path
)
else:
self.engine = create_database_engine(engine, self._sqlite_folder_path)
self.engine = create_database_engine(engine, self._sqlite_folder_path)

log.info(
"\n==================================================\n"
Expand All @@ -97,7 +110,39 @@ def __init__(self, engine="sqlite", connect_to_translated_db=False) -> None:
"'pip install --upgrade open-mastr'\n"
)

orm.Base.metadata.create_all(self.engine)
def generate_data_model(
self,
data: Optional[list[str]] = None,
catalog_value_as_str: bool = True,
base: Optional[Type[DeclarativeBase_T]] = None,
) -> dict[str, Type[DeclarativeBase_T]]:
data = transform_data_parameter(data)

docs_folder_path = os.path.join(self.output_dir, "data", "docs_download")
os.makedirs(docs_folder_path, exist_ok=True)
zipped_docs_file_path = os.path.join(
docs_folder_path,
"Dokumentation MaStR Gesamtdatenexport.zip"
)
try:
download_documentation(zipped_docs_file_path)
return _download_docs_and_generate_data_model(
zipped_docs_file_path=zipped_docs_file_path,
data=data,
catalog_value_as_str=catalog_value_as_str,
base=base,
)
except Exception as e:
log.exception(
f"Encountered {e} when downloading or processing MaStR documentation."
f" Falling back to stored docs at {FALLBACK_DOCS_PATH}"
)
return _download_docs_and_generate_data_model(
zipped_docs_file_path=FALLBACK_DOCS_PATH,
data=data,
catalog_value_as_str=catalog_value_as_str,
base=base
)

def download(
self,
Expand All @@ -106,6 +151,8 @@ def download(
date=None,
bulk_cleansing=True,
keep_old_downloads: bool = False,
mastr_table_to_db_table: Optional[Mapping[str, Table]] = None,
alter_database_tables: bool = True,
**kwargs,
) -> None:
"""
Expand Down Expand Up @@ -165,13 +212,6 @@ def download(
keep_old_downloads: bool
If set to True, prior downloaded MaStR zip files will be kept.
"""

if self.is_translated:
raise TypeError(
"You are currently connected to a translated database.\n"
"A translated database cannot be further processed."
)

if method == "API":
log.warning(
"Downloading the whole registry via the MaStR SOAP-API is deprecated. "
Expand All @@ -181,6 +221,20 @@ def download(
log.warning("Attention: method='API' changed to method='bulk'.")
method = "bulk"

if not mastr_table_to_db_table:
mastr_table_to_db_model = self.generate_data_model(
data=data,
catalog_value_as_str=bulk_cleansing,
)
mastr_table_to_db_table = {
mastr_table: db_model.__table__
for mastr_table, db_model in mastr_table_to_db_model.items()
}
log.info("Ensuring database tables for MaStR are present: Dropping old tables if existing and creating new ones.")
for db_table in mastr_table_to_db_table.values():
db_table.drop(self.engine, checkfirst=True)
db_table.create(self.engine)

validate_parameter_format_for_download_method(
method=method,
data=data,
Expand All @@ -192,21 +246,20 @@ def download(

date = transform_date_parameter(self, date, **kwargs)


# Find the name of the zipped xml folder
bulk_download_date = parse_date_string(date)
xml_folder_path = os.path.join(self.output_dir, "data", "xml_download")
os.makedirs(xml_folder_path, exist_ok=True)
zipped_xml_file_path = os.path.join(
xml_folder_path,
f"Gesamtdatenexport_{bulk_download_date}.zip",
f"Gesamtdatenexport_{bulk_download_date.strftime('%Y%m%d')}.zip",
)

delete_zip_file_if_corrupted(zipped_xml_file_path)
if not keep_old_downloads:
delete_xml_files_not_from_given_date(zipped_xml_file_path, xml_folder_path)
delete_xml_files_not_from_given_date(zipped_xml_file_path, xml_folder_path)

download_xml_Mastr(zipped_xml_file_path, date, data, xml_folder_path)
download_xml_Mastr(zipped_xml_file_path, bulk_download_date, data, xml_folder_path)

log.info(
"\nWould you like to speed up the creation of your MaStR database?\n"
Expand All @@ -217,7 +270,6 @@ def download(
delete_zip_file_if_corrupted(zipped_xml_file_path)
delete_xml_files_not_from_given_date(zipped_xml_file_path, xml_folder_path)


print(
"\nWould you like to speed up the creation of your MaStR database?\n"
"Try our new parallelized processing by setting os.environ['USE_RECOMMENDED_NUMBER_OF_PROCESSES'] = True "
Expand All @@ -230,146 +282,40 @@ def download(
data=data,
bulk_cleansing=bulk_cleansing,
bulk_download_date=bulk_download_date,
mastr_table_to_db_table=mastr_table_to_db_table,
alter_database_tables=alter_database_tables,
)

def to_csv(
self, tables: list = None, chunksize: int = 500000, limit: int = None
) -> None:
"""
Save the database as csv files along with the metadata file.
If 'tables=None' all possible tables will be exported.

Parameters
------------
tables: None or list
For exporting selected tables choose from:
["wind", "solar", "biomass", "hydro", "gsgk", "combustion", "nuclear", "storage",
"balancing_area", "electricity_consumer", "gas_consumer", "gas_producer",
"gas_storage", "gas_storage_extended",
"grid_connections", "grids", "market_actors", "market_roles",
"locations_extended", "permit", "deleted_units", "storage_units"]
chunksize: int
Defines the chunksize of the tables export.
Default value is 500.000 rows to include in each chunk.
limit: None or int
Limits the number of exported data rows.
"""

if self.is_translated:
raise TypeError(
"You are currently connected to a translated database.\n"
"A translated database cannot be used for the csv export."
)

log.info("Starting csv-export")

data_path = get_data_version_dir()

create_data_dir()

# Validate and parse tables parameter
validate_parameter_data(method="csv_export", data=tables)
data = transform_data_parameter(
method="bulk", data=tables, api_data_types=None, api_location_types=None
pass
# TODO: Think about this.


def _download_docs_and_generate_data_model(
zipped_docs_file_path: Path,
data: list[str],
catalog_value_as_str: bool = True,
base: Optional[Type[DeclarativeBase_T]] = None,
):
if base is None:

class MastrBase(DeclarativeBase):
pass

base = MastrBase

mastr_table_descriptions = read_mastr_table_descriptions_from_xsd(
zipped_docs_file_path=zipped_docs_file_path, data=data
)
mastr_table_to_db_model: dict[str, DeclarativeBase_T] = {}
for mastr_table_description in mastr_table_descriptions:
sqlalchemy_model = make_sqlalchemy_model_from_mastr_table_description(
table_description=mastr_table_description,
catalog_value_as_str=catalog_value_as_str,
base=base
)
mastr_table_to_db_model[mastr_table_description.table_name] = sqlalchemy_model

# Determine tables to export
technologies_to_export = []
additional_tables_to_export = []
for table in data:
if table in TECHNOLOGIES:
technologies_to_export.append(table)
elif table in ADDITIONAL_TABLES:
additional_tables_to_export.append(table)
else:
additional_tables_to_export.extend(
data_to_include_tables([table], mapping="export_db_tables")
)

if technologies_to_export:
log.info(f"Technology tables: {technologies_to_export}")
if additional_tables_to_export:
log.info(f"Additional tables: {additional_tables_to_export}")

log.info(f"Tables are saved to: {data_path}")

reverse_fill_basic_units(technology=technologies_to_export, engine=self.engine)

# Export technologies to csv
for tech in technologies_to_export:
db_query_to_csv(
db_query=create_db_query(tech=tech, limit=limit, engine=self.engine),
data_table=tech,
chunksize=chunksize,
)
# Export additional tables to csv
for addit_table in additional_tables_to_export:
db_query_to_csv(
db_query=create_db_query(
additional_table=addit_table, limit=limit, engine=self.engine
),
data_table=addit_table,
chunksize=chunksize,
)

# FIXME: Currently metadata is only created for technology data, Fix in #386
# Configure and save data package metadata file along with data
# save_metadata(data=technologies_to_export, engine=self.engine)

def translate(self) -> None:
"""
A database can be translated only once.

Deletes translated versions of the currently connected database.

Translates currently connected database,renames it with '-translated'
suffix and updates self.engine's path accordingly.

!!! example
```python

from open_mastr import Mastr
import pandas as pd

db = Mastr()
db.download(data='biomass')
db.translate()

df = pd.read_sql(sql='biomass_extended', con=db.engine)
print(df.head(10))
```

"""

if "sqlite" not in self.engine.dialect.name:
raise ValueError("engine has to be of type 'sqlite'")
if self.is_translated:
raise TypeError("The currently connected database is already translated.")

inspector = inspect(self.engine)
old_path = r"{}".format(self.engine.url.database)
new_path = old_path[:-3] + "-translated.db"

if os.path.exists(new_path):
try:
os.remove(new_path)
except Exception as e:
log.error(
f"An error occurred while removing old translated database: {e}"
)

log.info("Replacing previous version of the translated database...")

for table in inspector.get_table_names():
rename_table(table, inspector.get_columns(table), self.engine)

self.engine.dispose()

try:
os.rename(old_path, new_path)
log.info(f"Database '{old_path}' changed to '{new_path}'")
except Exception as e:
log.error(f"An error occurred while renaming database: {e}")

self.engine = create_engine(f"sqlite:///{new_path}")
self.is_translated = True
return mastr_table_to_db_model
Binary file not shown.
Loading