Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7913a17
Rename isLDS to isLDES, adjust unit of demand (to MW)
FelixCAAuer Aug 27, 2025
d4c7c4f
Merge remote-tracking branch 'origin/main' into feature/storageAndRoR
FelixCAAuer Sep 4, 2025
f38be25
Add ImpExpProfiles and Inflows to filter functions
FelixCAAuer Sep 16, 2025
a5e2868
Rewrite scenario_filtering for CaseStudy (hopefully safer)
FelixCAAuer Sep 16, 2025
01e8661
Adjust description of Power_Demand (-> v0.1.4)
FelixCAAuer Sep 16, 2025
2216cae
Implement Power_Inflows_KInRows
FelixCAAuer Sep 16, 2025
60286a3
Adjust comments for VRESProfiles (-> v0.1.1), add RoR1 to VRES
FelixCAAuer Sep 16, 2025
0a95233
Adjust comment for Power_WeightsK (-> v0.1.4)
FelixCAAuer Sep 16, 2025
20192cd
Adjust comment in Power_Hindex (-> v0.1.3)
FelixCAAuer Sep 16, 2025
63d41f3
Update VRES with perfect formatting
FelixCAAuer Sep 16, 2025
85e6320
Merge remote-tracking branch 'origin/main' into feature/storageAndRoR
FelixCAAuer Sep 25, 2025
e990ba1
Remove automatic deletion of non-investable & -existing generators
FelixCAAuer Sep 26, 2025
ddb49b4
Fix filtering of scenarios
FelixCAAuer Sep 26, 2025
3c3e877
Scale Power_Inflows only if they exist
FelixCAAuer Sep 26, 2025
4cde4a5
Fix naming of write Power_VRES/-Profiles
FelixCAAuer Sep 26, 2025
c171e55
Implement write_caseStudy
FelixCAAuer Oct 2, 2025
6e5caae
Fix double-writing of pivoted files, enable writing empty files
FelixCAAuer Oct 2, 2025
738b409
Add option to execute caseStudy without Global_Scenarios
FelixCAAuer Oct 2, 2025
be29a48
Replace ValueError with warning if some Inflows correspond to Storage
FelixCAAuer Oct 3, 2025
8461ac3
Implement scaling for import export price
MarcoAnarmo Oct 3, 2025
c36496f
Merge pull request #16 from IEE-TUGraz/feature/scalingImpExpPrice
FelixCAAuer Oct 3, 2025
03c7c5e
Update data/example to newest format
FelixCAAuer Oct 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 91 additions & 72 deletions CaseStudy.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,44 @@
import copy
import os
import warnings
from pathlib import Path
from typing import Optional, Self

import numpy as np
import pandas as pd

import ExcelReader
from printer import Printer

printer = Printer.getInstance()


class CaseStudy:
# Lists of dataframes based on their dependencies - every table should only be present in one of these lists
rpk_dependent_dataframes: list[str] = ["dPower_Demand",
"dPower_Hindex",
"dPower_ImpExpProfiles",
"dPower_Inflows",
"dPower_VRESProfiles"]
rp_only_dependent_dataframes: list[str] = ["dPower_WeightsRP"]
k_only_dependent_dataframes: list[str] = ["dPower_WeightsK"]
non_time_dependent_dataframes: list[str] = ["dPower_BusInfo",
"dPower_ImpExpHubs",
"dPower_Network",
"dPower_Storage",
"dPower_ThermalGen",
"dPower_VRES"]
non_dependent_dataframes: list[str] = ["dGlobal_Parameters",
"dGlobal_Scenarios",
"dPower_Parameters"]

# Subsets and supersets of the above lists
rp_dependent_dataframes: list[str] = rpk_dependent_dataframes + rp_only_dependent_dataframes
k_dependent_dataframes: list[str] = rpk_dependent_dataframes + k_only_dependent_dataframes
scenario_dependent_dataframes: list[str] = rpk_dependent_dataframes + rp_only_dependent_dataframes + k_only_dependent_dataframes + non_time_dependent_dataframes

def __init__(self,
data_folder: str,
data_folder: str | Path,
do_not_scale_units: bool = False,
do_not_merge_single_node_buses: bool = False,
global_parameters_file: str = "Global_Parameters.xlsx", dGlobal_Parameters: pd.DataFrame = None,
Expand All @@ -31,7 +57,7 @@ def __init__(self,
power_hindex_file: str = "Power_Hindex.xlsx", dPower_Hindex: pd.DataFrame = None,
power_impexphubs_file: str = "Power_ImpExpHubs.xlsx", dPower_ImpExpHubs: pd.DataFrame = None,
power_impexpprofiles_file: str = "Power_ImpExpProfiles.xlsx", dPower_ImpExpProfiles: pd.DataFrame = None):
self.data_folder = data_folder if data_folder.endswith("/") else data_folder + "/"
self.data_folder = str(data_folder) if str(data_folder).endswith("/") else str(data_folder) + "/"
self.do_not_scale_units = do_not_scale_units
self.do_not_merge_single_node_buses = do_not_merge_single_node_buses

Expand All @@ -45,7 +71,16 @@ def __init__(self,
self.dGlobal_Scenarios = dGlobal_Scenarios
else:
self.global_scenarios_file = global_scenarios_file
self.dGlobal_Scenarios = ExcelReader.get_Global_Scenarios(self.data_folder + self.global_scenarios_file)
if not os.path.exists(self.data_folder + self.global_scenarios_file):
printer.warning(f"Executing without 'Global_Scenarios' (since no file was found at '{self.data_folder + self.global_scenarios_file}').")

# Create dataframe for only one Scenario
dGlobal_Scenarios = pd.DataFrame({"excl": np.nan, "id": np.nan, "scenarioID": ["ScenarioA"], "relativeWeight": [1], "comments": np.nan, "scenario": ["Scenarios"]})
dGlobal_Scenarios = dGlobal_Scenarios.set_index("scenarioID")

self.dGlobal_Scenarios = dGlobal_Scenarios
else:
self.dGlobal_Scenarios = ExcelReader.get_Global_Scenarios(self.data_folder + self.global_scenarios_file)

if dPower_Parameters is not None:
self.dPower_Parameters = dPower_Parameters
Expand Down Expand Up @@ -163,7 +198,7 @@ def scale_CaseStudy(self):
if self.dPower_Parameters["pEnableThermalGen"]:
self.scale_dPower_ThermalGen()

if self.dPower_Inflows is not None:
if hasattr(self, "dPower_Inflows") and self.dPower_Inflows is not None:
self.scale_dPower_Inflows()

if self.dPower_Parameters["pEnableVRES"]:
Expand Down Expand Up @@ -202,8 +237,6 @@ def scale_dPower_Demand(self):
self.dPower_Demand["value"] *= self.power_scaling_factor

def scale_dPower_ThermalGen(self):
self.dPower_ThermalGen = self.dPower_ThermalGen[(self.dPower_ThermalGen["ExisUnits"] > 0) | (self.dPower_ThermalGen["EnableInvest"] > 0)] # Filter out all generators that are not existing and not investable

self.dPower_ThermalGen['EFOR'] = self.dPower_ThermalGen['EFOR'].fillna(0) # Fill NaN values with 0 for EFOR

# Only FuelCost is adjusted by efficiency (OMVarCost is not), then both are scaled by the cost_scaling_factor / power_scaling_factor
Expand Down Expand Up @@ -239,7 +272,6 @@ def scale_dPower_Inflows(self):
self.dPower_Inflows["value"] *= self.power_scaling_factor

def scale_dPower_VRES(self):
self.dPower_VRES = self.dPower_VRES[(self.dPower_VRES["ExisUnits"] > 0) | ((self.dPower_VRES["EnableInvest"] > 0) & (self.dPower_VRES["MaxInvest"] > 0))] # Filter out all generators that are not existing and not investable
if "MinProd" not in self.dPower_VRES.columns:
self.dPower_VRES['MinProd'] = 0

Expand All @@ -251,7 +283,6 @@ def scale_dPower_VRES(self):
self.dPower_VRES['Qmax'] = self.dPower_VRES['Qmax'].fillna(0) * self.reactive_power_scaling_factor

def scale_dPower_Storage(self):
self.dPower_Storage = self.dPower_Storage[(self.dPower_Storage["ExisUnits"] > 0) | ((self.dPower_Storage["EnableInvest"] > 0) & (self.dPower_Storage["MaxInvest"] > 0))] # Filter out all generators that are not existing and not investable
self.dPower_Storage['IniReserve'] = self.dPower_Storage['IniReserve'].fillna(0)
self.dPower_Storage['MinReserve'] = self.dPower_Storage['MinReserve'].fillna(0)
self.dPower_Storage['MinProd'] = self.dPower_Storage["MinProd"].fillna(0)
Expand All @@ -273,8 +304,10 @@ def scale_dPower_ImpExpHubs(self):

def scale_dPower_ImpExpProfiles(self):
self.dPower_ImpExpProfiles["ImpExp"] *= self.power_scaling_factor
self.dPower_ImpExpProfiles["Price"] *= self.cost_scaling_factor / self.power_scaling_factor

def get_dGlobal_Parameters(self):
ExcelReader.check_LEGOExcel_version(self.data_folder + self.global_parameters_file, "v0.1.0", False)
dGlobal_Parameters = pd.read_excel(self.data_folder + self.global_parameters_file, skiprows=[0, 1])
dGlobal_Parameters = dGlobal_Parameters.drop(dGlobal_Parameters.columns[0], axis=1)
dGlobal_Parameters = dGlobal_Parameters.set_index('Solver Options')
Expand All @@ -288,6 +321,7 @@ def get_dGlobal_Parameters(self):
return dGlobal_Parameters

def get_dPower_Parameters(self):
ExcelReader.check_LEGOExcel_version(self.data_folder + self.power_parameters_file, "v0.1.0", False)
dPower_Parameters = pd.read_excel(self.data_folder + self.power_parameters_file, skiprows=[0, 1])
dPower_Parameters = dPower_Parameters.drop(dPower_Parameters.columns[0], axis=1)
dPower_Parameters = dPower_Parameters.dropna(how="all")
Expand Down Expand Up @@ -600,90 +634,75 @@ def to_full_hourly_model(self, inplace: bool) -> Optional['CaseStudy']:
else:
return None

def _filter_dataframe(self, df_name: str, scenario_name: str) -> None:
"""
Filters the dataframe with the given name to only include the scenario with the given name.
:param df_name: The name of the dataframe to filter.
:param scenario_name: The name of the scenario to filter for.
:return: None
"""
if not hasattr(self, df_name):
raise ValueError(f"Dataframe '{df_name}' not found in the case study. Please check the input data.")
df = getattr(self, df_name)

filtered_df = df.loc[df['scenario'] == scenario_name]

if len(df) > 0 and len(filtered_df) == 0:
raise ValueError(f"Scenario '{scenario_name}' not found in '{df_name}'. Please check the input data.")

setattr(self, df_name, filtered_df)

def filter_scenario(self, scenario_name) -> Self:
def filter_scenario(self, scenario_name, inplace: bool = False) -> Optional[Self]:
"""
Filters each (relevant) dataframe in the case study to only include the scenario with the given name.
:param scenario_name: The name of the scenario to filter for.
:return: Copy of the case study with the filtered dataframes.
:param inplace: If True, modifies the current instance. If False, returns a new instance.
:return: None if inplace is True, otherwise a new CaseStudy instance.
"""
caseStudy = self.copy()
caseStudy = self if inplace else self.copy()

# dGlobal_Parameters is not filtered, as it is the same for all scenarios
# dPower_Parameters is not filtered, as it is the same for all scenarios
caseStudy._filter_dataframe("dPower_BusInfo", scenario_name)
caseStudy._filter_dataframe("dPower_Network", scenario_name)
caseStudy._filter_dataframe("dPower_Demand", scenario_name)
caseStudy._filter_dataframe("dPower_WeightsRP", scenario_name)
caseStudy._filter_dataframe("dPower_WeightsK", scenario_name)
caseStudy._filter_dataframe("dPower_Hindex", scenario_name)
for df_name in CaseStudy.scenario_dependent_dataframes:
if hasattr(caseStudy, df_name):
df = getattr(caseStudy, df_name)
if df is None:
continue

if hasattr(caseStudy, "dPower_ThermalGen"):
caseStudy._filter_dataframe("dPower_ThermalGen", scenario_name)
if hasattr(caseStudy, "dPower_Inflows"):
caseStudy._filter_dataframe("dPower_Inflows", scenario_name)
if hasattr(caseStudy, "dPower_VRES"):
caseStudy._filter_dataframe("dPower_VRES", scenario_name)
caseStudy._filter_dataframe("dPower_VRESProfiles", scenario_name)
if hasattr(caseStudy, "dPower_Storage"):
caseStudy._filter_dataframe("dPower_Storage", scenario_name)
if hasattr(caseStudy, "dPower_ImpExpHubs") and caseStudy.dPower_ImpExpHubs is not None:
caseStudy._filter_dataframe("dPower_ImpExpHubs", scenario_name)
caseStudy._filter_dataframe("dPower_ImpExpProfiles", scenario_name)
filtered_df = df.loc[df['scenario'] == scenario_name]

return caseStudy
if len(df) > 0 and len(filtered_df) == 0:
raise ValueError(f"Scenario '{scenario_name}' not found in '{df_name}'. Please check the input data.")

def filter_timesteps(self, start: str, end: str) -> Self:
case_study = self.copy()
setattr(caseStudy, df_name, filtered_df)

df_names = ["dPower_Demand", "dPower_VRESProfiles", "dPower_WeightsK", "dPower_Hindex"]
return None if inplace else caseStudy

for df_name in df_names:
df = getattr(case_study, df_name)
def filter_timesteps(self, start: str, end: str, inplace: bool = False) -> Optional[Self]:
"""
Filters each (relevant) dataframe in the case study to only include the timesteps between start and end (both inclusive).
:param start: Start timestep (inclusive).
:param end: End timestep (inclusive).
:param inplace: If True, modifies the current instance. If False, returns a new instance.
:return: None if inplace is True, otherwise a new CaseStudy instance.
"""
case_study = self if inplace else self.copy()

index = df.index.names
df_reset = df.reset_index()
for df_name in CaseStudy.k_dependent_dataframes:
if hasattr(case_study, df_name):
df = getattr(case_study, df_name)

filtered_df_reset = df_reset.loc[(df_reset['k'] >= start) & (df_reset['k'] <= end)]
index = df.index.names
df_reset = df.reset_index()

filtered_df = filtered_df_reset.set_index(index)
filtered_df_reset = df_reset.loc[(df_reset['k'] >= start) & (df_reset['k'] <= end)]

setattr(case_study, df_name, filtered_df)
filtered_df = filtered_df_reset.set_index(index)

return case_study
setattr(case_study, df_name, filtered_df)

def filter_representative_periods(self, rp: str) -> Self:
case_study = self.copy()
return None if inplace else case_study

df_names = ["dPower_Demand", "dPower_VRESProfiles", "dPower_WeightsRP", "dPower_Hindex"]
def filter_representative_periods(self, rp: str, inplace: bool = False) -> Optional[Self]:
"""
Filters each (relevant) dataframe in the case study to only include the representative period with the given name.
:param rp: Name of the representative period to filter for.
:param inplace: If True, modifies the current instance. If False, returns a new instance.
:return: None if inplace is True, otherwise a new CaseStudy instance.
"""
case_study = self if inplace else self.copy()

for df_name in df_names:
df = getattr(case_study, df_name)
for df_name in CaseStudy.rp_dependent_dataframes:
if hasattr(case_study, df_name):
df = getattr(case_study, df_name)

index = df.index.names
df_reset = df.reset_index()
index = df.index.names
df_reset = df.reset_index()

filtered_df_reset = df_reset.loc[(df_reset['rp'] == rp)]
filtered_df_reset = df_reset.loc[(df_reset['rp'] == rp)]

filtered_df = filtered_df_reset.set_index(index)
filtered_df = filtered_df_reset.set_index(index)

setattr(case_study, df_name, filtered_df)
setattr(case_study, df_name, filtered_df)

return case_study
return None if inplace else case_study
Loading
Loading