Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 20 additions & 74 deletions CaseStudy.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import os
import warnings
from typing import Optional, Self

Expand All @@ -20,7 +21,6 @@ def __init__(self,
power_businfo_file: str = "Power_BusInfo.xlsx", dPower_BusInfo: pd.DataFrame = None,
power_network_file: str = "Power_Network.xlsx", dPower_Network: pd.DataFrame = None,
power_thermalgen_file: str = "Power_ThermalGen.xlsx", dPower_ThermalGen: pd.DataFrame = None,
power_ror_file: str = "Power_RoR.xlsx", dPower_RoR: pd.DataFrame = None,
power_vres_file: str = "Power_VRES.xlsx", dPower_VRES: pd.DataFrame = None,
power_demand_file: str = "Power_Demand.xlsx", dPower_Demand: pd.DataFrame = None,
power_inflows_file: str = "Power_Inflows.xlsx", dPower_Inflows: pd.DataFrame = None,
Expand Down Expand Up @@ -98,19 +98,6 @@ def __init__(self,
self.power_thermalgen_file = power_thermalgen_file
self.dPower_ThermalGen = ExcelReader.get_dPower_ThermalGen(self.data_folder + self.power_thermalgen_file)

if self.dPower_Parameters["pEnableRoR"]:
if dPower_RoR is not None:
self.dPower_RoR = dPower_RoR
else:
self.power_ror_file = power_ror_file
self.dPower_RoR = self.get_dPower_RoR()

if dPower_Inflows is not None:
self.dPower_Inflows = dPower_Inflows
else:
self.power_inflows_file = power_inflows_file
self.dPower_Inflows = self.get_dPower_Inflows()

if self.dPower_Parameters["pEnableVRES"]:
if dPower_VRES is not None:
self.dPower_VRES = dPower_VRES
Expand All @@ -120,7 +107,7 @@ def __init__(self,

if dPower_VRESProfiles is not None:
self.dPower_VRESProfiles = dPower_VRESProfiles
else:
elif os.path.isfile(self.data_folder + power_vresprofiles_file):
self.power_vresprofiles_file = power_vresprofiles_file
self.dPower_VRESProfiles = ExcelReader.get_dPower_VRESProfiles(self.data_folder + self.power_vresprofiles_file)

Expand All @@ -129,7 +116,14 @@ def __init__(self,
self.dPower_Storage = dPower_Storage
else:
self.power_storage_file = power_storage_file
self.dPower_Storage = self.get_dPower_Storage()
self.dPower_Storage = ExcelReader.get_dPower_Storage(self.data_folder + self.power_storage_file)

if self.dPower_Parameters["pEnableVRES"] or self.dPower_Parameters["pEnableStorage"]:
if dPower_Inflows is not None:
self.dPower_Inflows = dPower_Inflows
elif os.path.isfile(self.data_folder + power_inflows_file):
self.power_inflows_file = power_inflows_file
self.dPower_Inflows = ExcelReader.get_dPower_Inflows(self.data_folder + self.power_inflows_file)

if self.dPower_Parameters["pEnablePowerImportExport"]:
if dPower_ImpExpHubs is not None:
Expand Down Expand Up @@ -169,8 +163,7 @@ def scale_CaseStudy(self):
if self.dPower_Parameters["pEnableThermalGen"]:
self.scale_dPower_ThermalGen()

if self.dPower_Parameters["pEnableRoR"]:
self.scale_dPower_RoR()
if self.dPower_Inflows is not None:
self.scale_dPower_Inflows()

if self.dPower_Parameters["pEnableVRES"]:
Expand Down Expand Up @@ -209,7 +202,6 @@ def scale_dPower_Demand(self):
self.dPower_Demand["value"] *= self.power_scaling_factor

def scale_dPower_ThermalGen(self):
self.dPower_ThermalGen = self.dPower_ThermalGen[self.dPower_ThermalGen["excl"].isnull()] # Only keep rows that are not excluded (i.e., have no value in the "Excl." column)
self.dPower_ThermalGen = self.dPower_ThermalGen[(self.dPower_ThermalGen["ExisUnits"] > 0) | (self.dPower_ThermalGen["EnableInvest"] > 0)] # Filter out all generators that are not existing and not investable

self.dPower_ThermalGen['EFOR'] = self.dPower_ThermalGen['EFOR'].fillna(0) # Fill NaN values with 0 for EFOR
Expand Down Expand Up @@ -243,17 +235,11 @@ def scale_dPower_ThermalGen(self):
self.dPower_ThermalGen['Qmin'] = self.dPower_ThermalGen['Qmin'].fillna(0) * self.reactive_power_scaling_factor
self.dPower_ThermalGen['Qmax'] = self.dPower_ThermalGen['Qmax'].fillna(0) * self.reactive_power_scaling_factor

def scale_dPower_RoR(self):
self.dPower_RoR['InvestCostEUR'] = self.dPower_RoR['MaxProd'] * self.power_scaling_factor * (self.dPower_RoR['InvestCostPerMW'] + self.dPower_RoR['InvestCostPerMWh'] * self.dPower_RoR['Ene2PowRatio']) * (self.cost_scaling_factor / self.power_scaling_factor)
self.dPower_RoR['MaxProd'] *= self.power_scaling_factor

self.dPower_RoR['Qmin'] = self.dPower_RoR['Qmin'].fillna(0) * self.reactive_power_scaling_factor
self.dPower_RoR['Qmax'] = self.dPower_RoR['Qmax'].fillna(0) * self.reactive_power_scaling_factor

def scale_dPower_Inflows(self):
self.dPower_Inflows["Inflow"] *= self.power_scaling_factor
self.dPower_Inflows["value"] *= self.power_scaling_factor

def scale_dPower_VRES(self):
self.dPower_VRES = self.dPower_VRES[(self.dPower_VRES["ExisUnits"] > 0) | ((self.dPower_VRES["EnableInvest"] > 0) & (self.dPower_VRES["MaxInvest"] > 0))] # Filter out all generators that are not existing and not investable
if "MinProd" not in self.dPower_VRES.columns:
self.dPower_VRES['MinProd'] = 0

Expand All @@ -265,6 +251,7 @@ def scale_dPower_VRES(self):
self.dPower_VRES['Qmax'] = self.dPower_VRES['Qmax'].fillna(0) * self.reactive_power_scaling_factor

def scale_dPower_Storage(self):
self.dPower_Storage = self.dPower_Storage[(self.dPower_Storage["ExisUnits"] > 0) | ((self.dPower_Storage["EnableInvest"] > 0) & (self.dPower_Storage["MaxInvest"] > 0))] # Filter out all generators that are not existing and not investable
self.dPower_Storage['IniReserve'] = self.dPower_Storage['IniReserve'].fillna(0)
self.dPower_Storage['MinReserve'] = self.dPower_Storage['MinReserve'].fillna(0)
self.dPower_Storage['MinProd'] = self.dPower_Storage["MinProd"].fillna(0)
Expand All @@ -276,6 +263,10 @@ def scale_dPower_Storage(self):
self.dPower_Storage['Qmin'] = self.dPower_Storage['Qmin'].fillna(0) * self.reactive_power_scaling_factor
self.dPower_Storage['Qmax'] = self.dPower_Storage['Qmax'].fillna(0) * self.reactive_power_scaling_factor

# Check if any DisEffic or ChEffic is nan, if so, raise an error
if self.dPower_Storage['DisEffic'].isna().any() or self.dPower_Storage['ChEffic'].isna().any():
raise ValueError("DisEffic and ChEffic in 'Power_Storage.xlsx' must not contain NaN values. Please check the data.")

def scale_dPower_ImpExpHubs(self):
self.dPower_ImpExpHubs["Pmax Import"] *= self.power_scaling_factor
self.dPower_ImpExpHubs["Pmax Export"] *= self.power_scaling_factor
Expand All @@ -302,7 +293,7 @@ def get_dPower_Parameters(self):
dPower_Parameters = dPower_Parameters.dropna(how="all")
dPower_Parameters = dPower_Parameters.set_index('General')

self.yesNo_to_bool(dPower_Parameters, ['pEnableChDisPower', 'pFixStInterResToIniReserve', 'pEnableSoftLineLoadLimits', 'pEnableThermalGen', 'pEnableRoR', 'pEnableVRES', 'pEnableStorage', 'pEnablePowerImportExport', 'pEnableSOCP'])
self.yesNo_to_bool(dPower_Parameters, ['pEnableChDisPower', 'pFixStInterResToIniReserve', 'pEnableSoftLineLoadLimits', 'pEnableThermalGen', 'pEnableVRES', 'pEnableStorage', 'pEnablePowerImportExport', 'pEnableSOCP'])

# Transform to make it easier to access values
dPower_Parameters = dPower_Parameters.drop(dPower_Parameters.columns[1:], axis=1) # Drop all columns but "Value" (rest is just for information in the Excel)
Expand All @@ -322,34 +313,6 @@ def yesNo_to_bool(df: pd.DataFrame, columns_to_be_changed: list[str]):
raise ValueError(f"Value for {column} must be either 'Yes' or 'No'.")
return df

def get_dPower_RoR(self):
dPower_RoR = self.read_generator_data(self.data_folder + self.power_ror_file)

# If column 'scenario' is not present, add it
if 'scenario' not in dPower_RoR.columns:
dPower_RoR['scenario'] = 'ScenarioA' # TODO: Fill this dynamically, once the Excel file is updated
return dPower_RoR

def get_dPower_Storage(self):
dPower_Storage = self.read_generator_data(self.data_folder + self.power_storage_file)

# If column 'scenario' is not present, add it
if 'scenario' not in dPower_Storage.columns:
dPower_Storage['scenario'] = 'ScenarioA' # TODO: Fill this dynamically, once the Excel file is updated
return dPower_Storage

def get_dPower_Inflows(self):
dPower_Inflows = pd.read_excel(self.data_folder + self.power_inflows_file, skiprows=[0, 1, 3, 4, 5])
dPower_Inflows = dPower_Inflows.drop(dPower_Inflows.columns[0], axis=1)
dPower_Inflows = dPower_Inflows.rename(columns={dPower_Inflows.columns[0]: "rp", dPower_Inflows.columns[1]: "g"})
dPower_Inflows = dPower_Inflows.melt(id_vars=['rp', 'g'], var_name='k', value_name='Inflow')
dPower_Inflows = dPower_Inflows.set_index(['rp', 'g', 'k'])

# If column 'scenario' is not present, add it
if 'scenario' not in dPower_Inflows.columns:
dPower_Inflows['scenario'] = 'ScenarioA' # TODO: Fill this dynamically, once the Excel file is updated
return dPower_Inflows

def get_dPower_ImpExpHubs(self):
dPower_ImpExpHubs = pd.read_excel(self.data_folder + self.power_impexphubs_file, skiprows=[0, 1, 3, 4, 5])
dPower_ImpExpHubs = dPower_ImpExpHubs.drop(dPower_ImpExpHubs.columns[0], axis=1)
Expand Down Expand Up @@ -531,12 +494,6 @@ def merge_single_node_buses(self):
row['i'] = new_bus_name
self.dPower_ThermalGen.loc[i] = row

# Adapt dPower_RoR
for i, row in self.dPower_RoR.iterrows():
if row['i'] in connected_buses:
row['i'] = new_bus_name
self.dPower_RoR.loc[i] = row

# Adapt dPower_VRES
for i, row in self.dPower_VRES.iterrows():
if row['i'] in connected_buses:
Expand Down Expand Up @@ -567,16 +524,6 @@ def merge_single_node_buses(self):
self.dPower_VRESProfiles = self.dPower_VRESProfiles.groupby(['rp', 'i', 'k', 'tec']).mean() # TODO: Aggregate using more complex method (capacity * productionCapacity * ... * / Total Production Capacity)
self.dPower_VRESProfiles.sort_index(inplace=True)

# Function to read generator data
@staticmethod
def read_generator_data(file_path):
d_generator = pd.read_excel(file_path, skiprows=[0, 1, 3, 4, 5])
d_generator = d_generator.drop(d_generator.columns[0], axis=1)
d_generator = d_generator[(d_generator["ExisUnits"] > 0) | (d_generator["EnableInvest"] > 0)] # Filter out all generators that are not existing and not investable
d_generator = d_generator.rename(columns={d_generator.columns[0]: "g", d_generator.columns[1]: "tec", d_generator.columns[2]: "i"})
d_generator = d_generator.set_index('g')
return d_generator

# Create transition matrix from Hindex
def get_rpTransitionMatrices(self):
rps = sorted(self.dPower_Hindex.index.get_level_values('rp').unique().tolist())
Expand Down Expand Up @@ -689,8 +636,7 @@ def filter_scenario(self, scenario_name) -> Self:

if hasattr(caseStudy, "dPower_ThermalGen"):
caseStudy._filter_dataframe("dPower_ThermalGen", scenario_name)
if hasattr(caseStudy, "dPower_RoR") and len(caseStudy.dPower_RoR) > 0:
caseStudy._filter_dataframe("dPower_RoR", scenario_name)
if hasattr(caseStudy, "dPower_Inflows"):
caseStudy._filter_dataframe("dPower_Inflows", scenario_name)
if hasattr(caseStudy, "dPower_VRES"):
caseStudy._filter_dataframe("dPower_VRES", scenario_name)
Expand Down
Loading
Loading