diff --git a/CaseStudy.py b/CaseStudy.py index fca9eb9..4fba134 100644 --- a/CaseStudy.py +++ b/CaseStudy.py @@ -56,8 +56,9 @@ def __init__(self, power_weightsk_file: str = "Power_WeightsK.xlsx", dPower_WeightsK: pd.DataFrame = None, power_hindex_file: str = "Power_Hindex.xlsx", dPower_Hindex: pd.DataFrame = None, power_impexphubs_file: str = "Power_ImpExpHubs.xlsx", dPower_ImpExpHubs: pd.DataFrame = None, - power_impexpprofiles_file: str = "Power_ImpExpProfiles.xlsx", dPower_ImpExpProfiles: pd.DataFrame = None): - self.data_folder = str(data_folder) if str(data_folder).endswith("/") else str(data_folder) + "/" + power_impexpprofiles_file: str = "Power_ImpExpProfiles.xlsx", dPower_ImpExpProfiles: pd.DataFrame = None, + clip_method: str = "none", clip_value: float = 0): + self.data_folder = data_folder if data_folder.endswith("/") else data_folder + "/" self.do_not_scale_units = do_not_scale_units self.do_not_merge_single_node_buses = do_not_merge_single_node_buses @@ -106,11 +107,43 @@ def __init__(self, self.power_demand_file = power_demand_file self.dPower_Demand = ExcelReader.get_Power_Demand(self.data_folder + self.power_demand_file) + if dPower_Hindex is not None: + self.dPower_Hindex = dPower_Hindex + else: + self.power_hindex_file = power_hindex_file + self.dPower_Hindex = ExcelReader.get_Power_Hindex(self.data_folder + self.power_hindex_file) + if dPower_WeightsRP is not None: self.dPower_WeightsRP = dPower_WeightsRP else: self.power_weightsrp_file = power_weightsrp_file - self.dPower_WeightsRP = ExcelReader.get_Power_WeightsRP(self.data_folder + self.power_weightsrp_file) + # Calculate dPower_WeightsRP from Hindex + dPower_WeightsRPs = [] + for scenario in self.dPower_Hindex['scenario'].unique().tolist(): + # Count occurences of each value in column 'rp' of dPower_Hindex + dPower_WeightsRP_scenario = pd.DataFrame(self.dPower_Hindex[self.dPower_Hindex['scenario'] == scenario].reset_index()['rp'].value_counts().sort_index()) + dPower_WeightsRP_scenario = dPower_WeightsRP_scenario.rename(columns={'count': 'pWeight_rp'}) + dPower_WeightsRP_scenario['scenario'] = scenario # Add scenario ID + + # Add other columns with default values + dPower_WeightsRP_scenario['id'] = np.nan + dPower_WeightsRP_scenario['dataPackage'] = np.nan + dPower_WeightsRP_scenario['dataSource'] = np.nan + + dPower_WeightsRPs.append(dPower_WeightsRP_scenario) + + dPower_WeightsRP = pd.concat(dPower_WeightsRPs, ignore_index=False) + + if os.path.exists(self.data_folder + self.power_weightsrp_file): # Compare with given file if it exists + self.dPower_WeightsRP = ExcelReader.get_Power_WeightsRP(self.data_folder + self.power_weightsrp_file) + + calculated = dPower_WeightsRP.reset_index().set_index(["rp", "scenario"]) + fromFile = self.dPower_WeightsRP.reset_index().set_index(["rp", "scenario"]) + if not (calculated['pWeight_rp'] / calculated['pWeight_rp'].sum()).equals(fromFile['pWeight_rp'] / fromFile['pWeight_rp'].sum()): + printer.warning(f"Values for 'pWeight_rp' in '{self.data_folder + self.power_weightsrp_file}' do not match the calculated values based on '{self.power_hindex_file}'. Please check if this is intended, using the file '{self.data_folder + self.power_weightsrp_file}' instead of the calculated values.") + else: # Use calculated dPower_WeightsRP otherwise + printer.warning(f"Executing without 'Power_WeightsRP' (since no file was found at '{self.data_folder + self.power_weightsrp_file}').") + self.dPower_WeightsRP = dPower_WeightsRP if dPower_WeightsK is not None: self.dPower_WeightsK = dPower_WeightsK @@ -124,7 +157,7 @@ def __init__(self, self.power_hindex_file = power_hindex_file self.dPower_Hindex = ExcelReader.get_Power_Hindex(self.data_folder + self.power_hindex_file) - self.rpTransitionMatrixAbsolute, self.rpTransitionMatrixRelativeTo, self.rpTransitionMatrixRelativeFrom = self.get_rpTransitionMatrices() + self.rpTransitionMatrixAbsolute, self.rpTransitionMatrixRelativeTo, self.rpTransitionMatrixRelativeFrom = self.get_rpTransitionMatrices(clip_method=clip_method, clip_value=clip_value) if self.dPower_Parameters["pEnableThermalGen"]: if dPower_ThermalGen is not None: @@ -560,7 +593,7 @@ def merge_single_node_buses(self): self.dPower_VRESProfiles.sort_index(inplace=True) # Create transition matrix from Hindex - def get_rpTransitionMatrices(self): + def get_rpTransitionMatrices(self, clip_method: str = "none", clip_value: float = 0) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: rps = sorted(self.dPower_Hindex.index.get_level_values('rp').unique().tolist()) ks = sorted(self.dPower_Hindex.index.get_level_values('k').unique().tolist()) rpTransitionMatrixAbsolute = pd.DataFrame(0, index=rps, columns=rps) # Initialize with zeros @@ -574,6 +607,27 @@ def get_rpTransitionMatrices(self): rpTransitionMatrixAbsolute.at[previous_rp, rp] += 1 previous_rp = rp + # Clip according to selected method + match clip_method: + case "none": + pass + case "absolute_count": # Get 'clip_value' highest values of each row of the transition matrix, set all others to 0 + if int(clip_value) != clip_value or clip_value < 0: + raise ValueError(f"For 'absolute_count', clip_value must be a non-negative integer, not {clip_value}.") + for rp in rps: + threshold = rpTransitionMatrixAbsolute.loc[rp].nlargest(int(clip_value)).min() + if (rpTransitionMatrixAbsolute.loc[rp] == threshold).sum() > 1: + printer.warning(f"For rp {rp}, there are multiple values with the same value as the threshold ({threshold}). This means that more than {clip_value} values are kept.") + rpTransitionMatrixAbsolute.loc[rp, rpTransitionMatrixAbsolute.loc[rp] < threshold] = 0 + case "relative_to_highest": # Get all values that are at least 'clip_value' * 100 % of the highest value of each row of the transition matrix, set all others to 0 + if clip_value < 0 or clip_value > 1: + raise ValueError(f"For 'relative_to_highest', clip_value must be between 0 and 1, not {clip_value}.") + for rp in rps: + threshold = rpTransitionMatrixAbsolute.loc[rp].max() * clip_value + rpTransitionMatrixAbsolute.loc[rp][rpTransitionMatrixAbsolute.loc[rp] < threshold] = 0 + case _: + raise ValueError(f"clip_method must be either 'none', 'absolute_count' or 'relative_to_highest', not {clip_method}.") + # Calculate relative transition matrix (nerd info: for the sum, the axis is irrelevant, as there are the same number of transitions to an rp as there are transitions from an rp away. For the division however, the axis matters) rpTransitionMatrixRelativeTo = rpTransitionMatrixAbsolute.div(rpTransitionMatrixAbsolute.sum(axis=1), axis=0) # Sum of probabilities is 1 for r -> all others rpTransitionMatrixRelativeFrom = rpTransitionMatrixAbsolute.div(rpTransitionMatrixAbsolute.sum(axis=0), axis=1) # Sum of probabilities is 1 for all others -> r @@ -591,13 +645,19 @@ def to_full_hourly_model(self, inplace: bool) -> Optional['CaseStudy']: """ caseStudy = self.copy() if not inplace else self + # First Adjustment of Hindex (important if the case study was filtered before, to get a coherent p-index) + caseStudy.dPower_Hindex = caseStudy.dPower_Hindex.reset_index() + for i in caseStudy.dPower_Hindex.index: + caseStudy.dPower_Hindex.loc[i, "p"] = f"h{i + 1:0>4}" + caseStudy.dPower_Hindex = caseStudy.dPower_Hindex.set_index(["p", "rp", "k"]) + # Adjust Demand adjusted_demand = [] - for i, _ in caseStudy.dPower_BusInfo.iterrows(): - for h, row in caseStudy.dPower_Hindex.iterrows(): - adjusted_demand.append(["rp01", h[0].replace("h", "k"), i, caseStudy.dPower_Demand.loc[(h[1], h[2], i), "Demand"]]) + for i in caseStudy.dPower_BusInfo.index: + for h in caseStudy.dPower_Hindex.index: + adjusted_demand.append(["rp01", h[0].replace("h", "k"), i, caseStudy.dPower_Demand.loc[(h[1], h[2], i), "value"], "ScenarioA", None, None, None]) - caseStudy.dPower_Demand = pd.DataFrame(adjusted_demand, columns=["rp", "k", "i", "Demand"]) + caseStudy.dPower_Demand = pd.DataFrame(adjusted_demand, columns=["rp", "k", "i", "value", "scenario", "id", "dataPackage", "dataSource"]) caseStudy.dPower_Demand = caseStudy.dPower_Demand.set_index(["rp", "k", "i"]) # Adjust VRESProfiles @@ -605,29 +665,38 @@ def to_full_hourly_model(self, inplace: bool) -> Optional['CaseStudy']: adjusted_vresprofiles = [] caseStudy.dPower_VRESProfiles.sort_index(inplace=True) for g in caseStudy.dPower_VRESProfiles.index.get_level_values('g').unique().tolist(): - if len(caseStudy.dPower_VRESProfiles.loc[:, :, g]) > 0: # Check if VRESProfiles has entries for g - for h, row in caseStudy.dPower_Hindex.iterrows(): - adjusted_vresprofiles.append(["rp01", h[0].replace("h", "k"), g, caseStudy.dPower_VRESProfiles.loc[(h[1], h[2], g), "Capacity"]]) + for h in caseStudy.dPower_Hindex.index: + adjusted_vresprofiles.append(["rp01", h[0].replace("h", "k"), g, caseStudy.dPower_VRESProfiles.loc[(h[1], h[2], g), "value"], "ScenarioA", None, None, None]) - caseStudy.dPower_VRESProfiles = pd.DataFrame(adjusted_vresprofiles, columns=["rp", "k", "g", "Capacity"]) + caseStudy.dPower_VRESProfiles = pd.DataFrame(adjusted_vresprofiles, columns=["rp", "k", "g", "value", "scenario", "id", "dataPackage", "dataSource"]) caseStudy.dPower_VRESProfiles = caseStudy.dPower_VRESProfiles.set_index(["rp", "k", "g"]) + # Adjust Inflows + if hasattr(caseStudy, "dPower_Inflows"): + adjusted_inflows = [] + caseStudy.dPower_Inflows.sort_index(inplace=True) + for g in caseStudy.dPower_Inflows.index.get_level_values('g').unique().tolist(): + for h in caseStudy.dPower_Hindex.index: + adjusted_inflows.append(["rp01", h[0].replace("h", "k"), g, caseStudy.dPower_Inflows.loc[(h[1], h[2], g), "value"], "ScenarioA", None, None, None]) + caseStudy.dPower_Inflows = pd.DataFrame(adjusted_inflows, columns=["rp", "k", "g", "value", "scenario", "id", "dataPackage", "dataSource"]) + caseStudy.dPower_Inflows = caseStudy.dPower_Inflows.set_index(["rp", "k", "g"]) + # Adjust Hindex caseStudy.dPower_Hindex = caseStudy.dPower_Hindex.reset_index() - for i, row in caseStudy.dPower_Hindex.iterrows(): - caseStudy.dPower_Hindex.loc[i] = f"h{i + 1:0>4}", f"rp01", f"k{i + 1:0>4}", None, None, None + for i in caseStudy.dPower_Hindex.index: + caseStudy.dPower_Hindex.loc[i] = f"h{i + 1:0>4}", f"rp01", f"k{i + 1:0>4}", None, None, None, "ScenarioA" caseStudy.dPower_Hindex = caseStudy.dPower_Hindex.set_index(["p", "rp", "k"]) # Adjust WeightsK caseStudy.dPower_WeightsK = caseStudy.dPower_WeightsK.reset_index() caseStudy.dPower_WeightsK = caseStudy.dPower_WeightsK.drop(caseStudy.dPower_WeightsK.index) for i in range(len(caseStudy.dPower_Hindex)): - caseStudy.dPower_WeightsK.loc[i] = f"k{i + 1:0>4}", None, 1, None, None + caseStudy.dPower_WeightsK.loc[i] = f"{caseStudy.dPower_Hindex.index[i][2]}", None, 1, None, None, "ScenarioA" caseStudy.dPower_WeightsK = caseStudy.dPower_WeightsK.set_index("k") # Adjust WeightsRP caseStudy.dPower_WeightsRP = caseStudy.dPower_WeightsRP.drop(caseStudy.dPower_WeightsRP.index) - caseStudy.dPower_WeightsRP.loc["rp01"] = 1 + caseStudy.dPower_WeightsRP.loc["rp01"] = None, 1, None, None, "ScenarioA" if not inplace: return caseStudy @@ -669,7 +738,7 @@ def filter_timesteps(self, start: str, end: str, inplace: bool = False) -> Optio case_study = self if inplace else self.copy() for df_name in CaseStudy.k_dependent_dataframes: - if hasattr(case_study, df_name): + if hasattr(case_study, df_name) and getattr(case_study, df_name) is not None: df = getattr(case_study, df_name) index = df.index.names @@ -706,3 +775,43 @@ def filter_representative_periods(self, rp: str, inplace: bool = False) -> Optio setattr(case_study, df_name, filtered_df) return None if inplace else case_study + + def shift_ks(self, shift: int, inplace: bool = False) -> Optional[Self]: + """ + Shifts all k indices by the given amount, i.e., if shift is 4, then the first 4 + timesteps are moved to the back of the time series. + + :param shift: The amount to shift the k indices by. + :param inplace: If True, modifies the current instance. If False, returns a new instance. + :return: None if inplace is True, otherwise a new CaseStudy instance. + """ + case_study = self if inplace else self.copy() + + for df_name in CaseStudy.k_dependent_dataframes: + if df_name in ["dPower_WeightsK", "dPower_Hindex"]: + continue # These dataframes are not shifted, as they are not time series + + if hasattr(case_study, df_name): + df = getattr(case_study, df_name) + if df is None or df.empty: + continue + + index = df.index.names + df = df.reset_index() + + df["k_int"] = df["k"].str.replace("k", "").astype(int) + k_int_max = df["k_int"].max() + k_int_min = df["k_int"].min() + + df["k_int_new"] = ((df["k_int"] - k_int_min + shift) % (k_int_max - k_int_min + 1)) + k_int_min + + df["k"] = "k" + df["k_int_new"].astype(str).str.zfill(4) + df = df.drop(columns=["k_int", "k_int_new"]) + df = df.set_index(index) + + # Sort by index to ensure that the order of the indices is correct after shifting + df = df.sort_index() + + setattr(case_study, df_name, df) + + return None if inplace else case_study diff --git a/ExcelReader.py b/ExcelReader.py index c46af00..f9012ef 100644 --- a/ExcelReader.py +++ b/ExcelReader.py @@ -21,6 +21,8 @@ def check_LEGOExcel_version(excel_file_path: str, version_specifier: str, fail_o # Check if the file has the correct version specifier wb = openpyxl.load_workbook(excel_file_path) for sheet in wb.sheetnames: + if sheet.startswith("~"): # Skip sheets that start with '~' + continue if wb[sheet].cell(row=2, column=3).value != version_specifier: if fail_on_wrong_version: raise ValueError(f"Excel file '{excel_file_path}' does not have the correct version specifier. Expected '{version_specifier}' but got '{wb[sheet].cell(row=2, column=3).value}'.") @@ -47,6 +49,9 @@ def __read_non_pivoted_file(excel_file_path: str, version_specifier: str, indice data = pd.DataFrame() for scenario in xls.sheet_names: # Iterate through all sheets, i.e., through all scenarios + if scenario.startswith("~"): + printer.warning(f"Skipping sheet '{scenario}' from '{excel_file_path}' because it starts with '~'.") + continue df = pd.read_excel(excel_file_path, skiprows=[0, 1, 2, 4, 5, 6], sheet_name=scenario) if has_excl_column: if not keep_excl_columns: @@ -106,7 +111,7 @@ def get_Data_Sources(excel_file_path: str, keep_excluded_entries: bool = False, :param fail_on_wrong_version: If True, raise an error if the version of the Excel file does not match the expected version :return: dData_Sources """ - dData_Sources = __read_non_pivoted_file(excel_file_path, "v0.1.0", ["dataSource"], False, False, fail_on_wrong_version) + dData_Sources = __read_non_pivoted_file(excel_file_path, "v0.2.0", ["dataSource"], False, False, fail_on_wrong_version) if keep_excluded_entries: printer.warning("'keep_excluded_entries' is set for 'get_Data_Sources', although nothing is excluded anyway - please check if this is intended.") @@ -354,12 +359,13 @@ def get_Power_Wind_TechnicalDetails(excel_file_path: str, keep_excluded_entries: return dPower_Wind_TechnicalDetails -def compare_Excels(source_path: str, target_path: str, dont_check_formatting: bool = False) -> bool: +def compare_Excels(source_path: str, target_path: str, dont_check_formatting: bool = False, precision: float = 1e-6) -> bool: """ Compare two Excel files for differences in formatting and values. :param source_path: Path to the source Excel file :param target_path: Path to the target Excel file :param dont_check_formatting: If True, skip formatting checks + :param precision: Precision for floating point comparison :return: True if the files are equal, False otherwise """ start_time = time.time() @@ -376,22 +382,29 @@ def compare_Excels(source_path: str, target_path: str, dont_check_formatting: bo continue target_sheet = target[sheet] - for row in range(1, source_sheet.max_row + 1): + for row in range(1, min(source_sheet.max_row, target_sheet.max_row) + 1): if not dont_check_formatting: if source_sheet.row_dimensions[row].height != target_sheet.row_dimensions[row].height: printer.error(f"Mismatch in row height at {sheet}/row {row}: {source_sheet.row_dimensions[row].height} != {target_sheet.row_dimensions[row].height}") equal = False - for col in range(1, source_sheet.max_column + 1): + for col in range(1, min(source_sheet.max_column, target_sheet.max_column) + 1): source_cell = source_sheet.cell(row=row, column=col) target_cell = target_sheet.cell(row=row, column=col) # Value if source_cell.value != target_cell.value: - source_value = str(source_cell.value).replace("[", r"\[") # Required to prevent rich from interpreting brackets as style definitions - target_value = str(target_cell.value).replace("[", r"\[") - printer.error(f"Mismatch in value at {sheet}/{source_cell.coordinate}: {source_value} != {target_value}") - equal = False + if (isinstance(source_cell.value, float) or isinstance(source_cell.value, int)) and (isinstance(target_cell.value, float) or isinstance(target_cell.value, int)): + if abs(source_cell.value - target_cell.value) / (source_cell.value if source_cell.value != 0 else 1) >= precision: + source_value = str(source_cell.value).replace("[", r"\[") # Required to prevent rich from interpreting brackets as style definitions + target_value = str(target_cell.value).replace("[", r"\[") + printer.error(f"Mismatch in value at {sheet}/{source_cell.coordinate}: {source_value} != {target_value}") + equal = False + else: + source_value = str(source_cell.value).replace("[", r"\[") # Required to prevent rich from interpreting brackets as style definitions + target_value = str(target_cell.value).replace("[", r"\[") + printer.error(f"Mismatch in value at {sheet}/{source_cell.coordinate}: {source_value} != {target_value}") + equal = False if not dont_check_formatting: # Font @@ -434,9 +447,32 @@ def compare_Excels(source_path: str, target_path: str, dont_check_formatting: bo # Column width if row == 1: # Only need to check column width for the first row - if source_sheet.column_dimensions[openpyxl.utils.get_column_letter(col)].width != target_sheet.column_dimensions[openpyxl.utils.get_column_letter(col)].width: - printer.error(f"Mismatch in column width at {sheet}/column {col}: {source_sheet.column_dimensions[openpyxl.utils.get_column_letter(col)].width} != {target_sheet.column_dimensions[openpyxl.utils.get_column_letter(col)].width}") + source_columnwidth = source_sheet.column_dimensions[openpyxl.utils.get_column_letter(col)].width + for group in source_sheet.column_groups: + start, end = group.split(":") + start = openpyxl.utils.column_index_from_string(start) + end = openpyxl.utils.column_index_from_string(end) + if start < col <= end: + source_columnwidth = source_sheet.column_dimensions[openpyxl.utils.get_column_letter(start)].width + break + + target_columnwidth = target_sheet.column_dimensions[openpyxl.utils.get_column_letter(col)].width + for group in target_sheet.column_groups: + start, end = group.split(":") + start = openpyxl.utils.column_index_from_string(start) + end = openpyxl.utils.column_index_from_string(end) + if start < col <= end: + target_columnwidth = target_sheet.column_dimensions[openpyxl.utils.get_column_letter(start)].width + break + if source_columnwidth != target_columnwidth: + printer.error(f"Mismatch in column width at {sheet}/column {col}: {source_columnwidth} != {target_columnwidth}") equal = False + if source_sheet.max_column != target_sheet.max_column: + printer.error(f"Target sheet '{sheet}' has {abs(source_sheet.max_column - target_sheet.max_column)} {"more" if source_sheet.max_column > target_sheet.max_column else "less"} columns ({target_sheet.max_column} in total) than source sheet ({source_sheet.max_column} in total)") + equal = False + if source_sheet.max_row != target_sheet.max_row: + printer.error(f"Target sheet '{sheet}' has {abs(source_sheet.max_row - target_sheet.max_row)} {"more" if source_sheet.max_row > target_sheet.max_row else "less"} rows ({target_sheet.max_row} in total) than source sheet ({source_sheet.max_row} in total)") + equal = False printer.information(f"Compared Excel file '{source_path}' to '{target_path}' in {time.time() - start_time:.2f} seconds") return equal diff --git a/ExcelWriter.py b/ExcelWriter.py index cd88198..0807ffa 100644 --- a/ExcelWriter.py +++ b/ExcelWriter.py @@ -400,65 +400,65 @@ def write_Power_Wind_TechnicalDetails(self, dPower_Wind_TechnicalDetails: pd.Dat """ self._write_Excel_from_definition(dPower_Wind_TechnicalDetails, folder_path, "Power_Wind_TechnicalDetails") + @staticmethod + def model_to_excel(model: pyomo.core.Model, target_path: str) -> None: + """ + Write all variables of the given Pyomo model to an Excel file. -def model_to_excel(model: pyomo.core.Model, target_path: str) -> None: - """ - Write all variables of the given Pyomo model to an Excel file. - - :param model: The Pyomo model to be written to Excel. - :param target_path: Path to the target Excel file. - :return: None - """ - printer.information(f"Writing model to '{target_path}'") - wb = openpyxl.Workbook() - ws = wb.active - - for i, var in enumerate(model.component_objects(pyomo.core.Var, active=True)): - if i == 0: # Use the automatically existing sheet for the first variable - ws.title = str(var) - else: # Create a sheet for each (other) variable - ws = wb.create_sheet(title=str(var)) - - # Prepare the data from the model - data = [(j, v.value if not v.stale else None) for j, v in var.items()] - - # Extract parameter names from the variable's index structure - param_names = [] - - if var.is_indexed(): - index_set = var.index_set() - - try: - # Get names from the index set - if hasattr(index_set, 'subsets') and index_set.subsets(): - for idx, subset in enumerate(index_set.subsets()): - if subset.domain.dimen is not None: - for i, domain in enumerate(subset.domain.subsets()): - param_names.append(f"{subset.name}[{i}]: {domain.name}") - else: - param_names.append(subset.name) - param_names.append(str(var)) - except (AttributeError, TypeError): - if len(data) > 0: - # Determine from actual data structure - col_number = len(data[0][0]) if not isinstance(data[0][0], str) else 1 - param_names = [f"index_{j}" for j in range(col_number)] + [str(var)] - else: - param_names = [] - - # Create header row with parameter names - ws.append(param_names) - - # Handle data writing - if len(data) == 0: - # Create a row showing "No entries" for each parameter - ws.append(["No entries"] * len(param_names)) - else: - # Write data to the sheet - for j, v in data: - ws.append(([j_index for j_index in j] if not isinstance(j, str) else [j]) + [v]) + :param model: The Pyomo model to be written to Excel. + :param target_path: Path to the target Excel file. + :return: None + """ + printer.information(f"Writing model to '{target_path}'") + wb = openpyxl.Workbook() + ws = wb.active + + for i, var in enumerate(model.component_objects(pyomo.core.Var, active=True)): + if i == 0: # Use the automatically existing sheet for the first variable + ws.title = str(var) + else: # Create a sheet for each (other) variable + ws = wb.create_sheet(title=str(var)) + + # Prepare the data from the model + data = [(j, v.value if not v.stale else None) for j, v in var.items()] + + # Extract parameter names from the variable's index structure + param_names = [] + + if var.is_indexed(): + index_set = var.index_set() + + try: + # Get names from the index set + if hasattr(index_set, 'subsets') and index_set.subsets(): + for idx, subset in enumerate(index_set.subsets()): + if subset.domain.dimen is not None: + for j, domain in enumerate(subset.domain.subsets()): + param_names.append(f"{subset.name}[{j}]: {domain.name}") + else: + param_names.append(subset.name) + param_names.append(str(var)) + except (AttributeError, TypeError): + if len(data) > 0: + # Determine from actual data structure + col_number = len(data[0][0]) if not isinstance(data[0][0], str) else 1 + param_names = [f"index_{j}" for j in range(col_number)] + [str(var)] + else: + param_names = [] + + # Create header row with parameter names + ws.append(param_names) + + # Handle data writing + if len(data) == 0: + # Create a row showing "No entries" for each parameter + ws.append(["No entries"] * len(param_names)) + else: + # Write data to the sheet + for j, v in data: + ws.append(([j_index for j_index in j] if not isinstance(j, str) else [j]) + [v]) - wb.save(target_path) + wb.save(target_path) if __name__ == "__main__": @@ -466,16 +466,17 @@ def model_to_excel(model: pyomo.core.Model, target_path: str) -> None: from rich_argparse import RichHelpFormatter parser = argparse.ArgumentParser(description="Re-write all files in given folder and compare against source", formatter_class=RichHelpFormatter) - parser.add_argument("caseStudyFolder", type=str, default="data/example/", help="Path to folder containing data for LEGO model.", nargs="?") + parser.add_argument("caseStudyFolder", type=str, help="Path to folder containing data for LEGO model.") parser.add_argument("excelDefinitionsPath", type=str, help="Path to the Excel definitions XML file. Uses default if none is supplied.", nargs="?") parser.add_argument("--dontCheckFormatting", action="store_true", help="Do not check formatting of the Excel files. Only check if they are equal.") + parser.add_argument("--dontFailOnWrongVersion", action="store_true", help="Do not fail if the version in the Excel file does not match the version in the XML definitions file.") + parser.add_argument("--precision", type=float, default=1e-6, help="Precision for comparing floating point values, default is 1e-6") args = parser.parse_args() printer.set_width(300) if not args.caseStudyFolder.endswith("/"): args.caseStudyFolder += "/" - printer.information(f"Loading case study from '{args.caseStudyFolder}'") if args.excelDefinitionsPath is None: @@ -508,14 +509,17 @@ def model_to_excel(model: pyomo.core.Model, target_path: str) -> None: for excel_definition_id, file_path, read, write in combinations: printer.information(f"Writing '{excel_definition_id}', read from '{file_path}'") - data = read(file_path, True, True) + data = read(file_path, True, not args.dontFailOnWrongVersion) write(data, f"{args.caseStudyFolder}output") printer.information(f"Comparing '{args.caseStudyFolder}output/{excel_definition_id}.xlsx' against source file '{file_path}'") - filesEqual = ExcelReader.compare_Excels(file_path, f"{args.caseStudyFolder}output/{excel_definition_id}.xlsx", args.dontCheckFormatting) - if filesEqual: - printer.success(f"Excel files are equal") + if not os.path.exists(file_path): + printer.warning(f"Input file '{file_path}' does not exist - skipping comparison") else: - printer.error(f"Excel files are NOT equal - see above for details") + filesEqual = ExcelReader.compare_Excels(file_path, f"{args.caseStudyFolder}output/{excel_definition_id}.xlsx", args.dontCheckFormatting, args.precision) + if filesEqual: + printer.success(f"Excel files are equal") + else: + printer.error(f"Excel files are NOT equal - see above for details") printer.separator() diff --git a/TableDefinition.py b/TableDefinition.py index 80b79f1..b2120de 100644 --- a/TableDefinition.py +++ b/TableDefinition.py @@ -159,7 +159,15 @@ def __init__(self, readable_name: str, db_name: str, description: str, unit: str self.db_name = db_name self.description = description self.unit = unit - self.column_width = column_width + 0.7109375 # Difference between Excel's default font and the shown column width (see https://foss.heptapod.net/openpyxl/openpyxl/-/issues/293) + match column_width: # This is required since Excel saves some widths with a slightly different value than what is shown in the GUI (please add more cases if you find any) + case 19.5: + self.column_width = column_width + 0.640625 + case 4.86 | 16.86 | 23.86: + self.column_width = column_width + 0.7103125 + case 10.57: + self.column_width = column_width + 0.71515625 + case _: + self.column_width = column_width + 0.7109375 # Difference between Excel's default font and the shown column width (see https://foss.heptapod.net/openpyxl/openpyxl/-/issues/293) self.cell_style = cell_style self.scenario_dependent = scenario_dependent self.pivoted = pivoted diff --git a/TableDefinitions.xml b/TableDefinitions.xml index f3499a6..82a6c69 100644 --- a/TableDefinitions.xml +++ b/TableDefinitions.xml @@ -13,7 +13,7 @@ - v0.1.0 + v0.2.0 Data Sources 45.0 @@ -22,6 +22,7 @@ + @@ -1142,14 +1143,14 @@ - 008080 - CCFFCC - B8CCE4 - DAEEF3 - D9D9D9 - F2F2F2 - 0000FF - FFFFFF + FF008080 + FFCCFFCC + FFB8CCE4 + FFDAEEF3 + FFD9D9D9 + FFF2F2F2 + FF0000FF + FFFFFFFF diff --git a/Utilities.py b/Utilities.py index fc2aba9..4964338 100644 --- a/Utilities.py +++ b/Utilities.py @@ -144,7 +144,7 @@ def apply_kmedoids_aggregation( aggregation_result = _run_kmedoids_clustering(pivot_df, k, rp_length) print(f" \nStep 4: Building representative period data") - demand_data, vres_data = _build_representative_periods( + data = _build_representative_periods( case_study, scenario, aggregation_result, rp_length ) @@ -154,8 +154,9 @@ def apply_kmedoids_aggregation( ) all_processed_data[scenario] = { - 'demand': demand_data, - 'vres_profiles': vres_data, + 'Power_Demand': data["Power_Demand"], + 'Power_VRESProfiles': data["Power_VRESProfiles"] if "Power_VRESProfiles" in data else [], + 'Power_Inflows': data["Power_Inflows"] if "Power_Inflows" in data else [], 'weights_rp': weights_rp, 'weights_k': weights_k, 'hindex': hindex @@ -169,8 +170,30 @@ def apply_kmedoids_aggregation( return aggregated_case_study -def _extract_scenario_data(case_study, scenario: str, capacity_normalization: str) -> pd.DataFrame: - """Extract and combine demand and VRES data for a single scenario - OPTIMIZED.""" +def _extract_scenario_data(case_study, scenario: str, capacity_normalization_strategy: str) -> pd.DataFrame: + """Extract and combine demand, VRES, and inflows data for a single scenario.""" + + def _apply_capacity_normalization_strategy(df, capacity_normalization_strategy): + """Apply capacity normalization strategy to a dataframe with technology data.""" + if capacity_normalization_strategy == "installed": + return df['ExisUnits'].fillna(0) + else: # maxInvestment + return np.maximum( + df['ExisUnits'].fillna(0), + df['EnableInvest'].fillna(0) * df['MaxInvest'].fillna(0) + ) + + def _pivot_technologies(df, value_column, index_cols=None): + """Pivot technologies as columns and drop 'g' column.""" + if index_cols is None: + index_cols = ['scenario', 'rp', 'k', 'g', 'i'] + + return df.pivot_table( + index=index_cols, + columns='tec', + values=value_column, + fill_value=0 + ).reset_index().drop(columns=['g']) # Extract demand data for this scenario demand_df = case_study.dPower_Demand.reset_index() @@ -182,6 +205,7 @@ def _extract_scenario_data(case_study, scenario: str, capacity_normalization: st # Initialize with demand data scenario_df = demand_df[['scenario', 'rp', 'i', 'k', 'value']].rename(columns={'value': 'demand'}) + vres_with_profiles = None # Process VRES data if available if (hasattr(case_study, 'dPower_VRES') and case_study.dPower_VRES is not None and hasattr(case_study, 'dPower_VRESProfiles') and case_study.dPower_VRESProfiles is not None): @@ -203,16 +227,8 @@ def _extract_scenario_data(case_study, scenario: str, capacity_normalization: st how='left' ) - # Apply capacity normalization (vectorized) - if capacity_normalization == "installed": - normalization_factor = vres_with_profiles['ExisUnits'].fillna(0) - else: # maxInvestment - normalization_factor = np.maximum( - vres_with_profiles['ExisUnits'].fillna(0), - vres_with_profiles['EnableInvest'].fillna(0) * vres_with_profiles['MaxInvest'].fillna(0) - ) - - # Calculate weighted capacity factor + # Apply capacity normalization and calculate weighted capacity factor + normalization_factor = _apply_capacity_normalization_strategy(vres_with_profiles, capacity_normalization_strategy) vres_with_profiles['weighted_cf'] = ( vres_with_profiles['value'].fillna(0) * vres_with_profiles['MaxProd'].fillna(0) * @@ -220,20 +236,81 @@ def _extract_scenario_data(case_study, scenario: str, capacity_normalization: st ) # Pivot technologies as columns - vres_with_profiles = vres_with_profiles.pivot_table( - index=['scenario', 'rp', 'k', 'g', 'i'], - columns='tec', - values='weighted_cf', - fill_value=0 - ).reset_index().drop(columns=['g']) - - # Merge with demand data - scenario_df = pd.merge( - scenario_df, - vres_with_profiles, - on=['scenario', 'rp', 'k', 'i'], - how='left' - ) + vres_with_profiles = _pivot_technologies(vres_with_profiles, 'weighted_cf') + + inflows_with_tech = None + # Process Inflows data if available + if hasattr(case_study, 'dPower_Inflows') and case_study.dPower_Inflows is not None: + # Get Inflows data for this scenario + inflows_df = case_study.dPower_Inflows.reset_index() + inflows_df = inflows_df[inflows_df['scenario'] == scenario].copy() + + if len(inflows_df) > 0: + # Collect all inflows data from different sources + inflows_parts = [] + + # Try to merge with Power_VRES data + if (hasattr(case_study, 'dPower_VRES') and case_study.dPower_VRES is not None and + 'vres_df' in locals() and len(vres_df) > 0): + inflows_with_vres = pd.merge( + inflows_df, + vres_df[['g', 'tec', 'i', 'ExisUnits', 'EnableInvest', 'MaxInvest']], + on='g', + how='left' + ) + inflows_parts.append(inflows_with_vres) + + # Try to merge with Power_Storage data + if hasattr(case_study, 'dPower_Storage') and case_study.dPower_Storage is not None: + storage_df = case_study.dPower_Storage.reset_index() + storage_df = storage_df[storage_df['scenario'] == scenario].copy() + + if len(storage_df) > 0: + inflows_with_storage = pd.merge( + inflows_df, + storage_df[['g', 'tec', 'i', 'ExisUnits', 'EnableInvest', 'MaxInvest']], + on='g', + how='inner' + ) + inflows_parts.append(inflows_with_storage) + + # Combine all inflows parts + if inflows_parts: + inflows_with_tech = pd.concat(inflows_parts, ignore_index=True) + + # Apply capacity normalization + normalization_factor = _apply_capacity_normalization_strategy(inflows_with_tech, capacity_normalization_strategy) + inflows_with_tech['value'] = inflows_with_tech['value'].fillna(0) * normalization_factor + + # Pivot technologies as columns + inflows_with_tech = _pivot_technologies(inflows_with_tech, 'value') + + # Combine VRES and inflows data + combined_tech_data = None + if vres_with_profiles is not None and inflows_with_tech is not None: + combined_tech_data = pd.concat([vres_with_profiles, inflows_with_tech], + ignore_index=True, sort=False) + elif vres_with_profiles is not None: + combined_tech_data = vres_with_profiles + elif inflows_with_tech is not None: + combined_tech_data = inflows_with_tech + + # Merge the combined technology data with scenario_df + if combined_tech_data is not None: + # Use right join to keep ALL demand data (even nodes without technology data) + # Replicates demand for nodes with technology, and preserves demand-only nodes + scenario_df = pd.merge( + combined_tech_data, + scenario_df, + on=['scenario', 'rp', 'k', 'i'], + how='right' + ) + + # Fill NaN values in technology columns with 0 for demand-only nodes + tech_columns = [col for col in scenario_df.columns + if col not in ['scenario', 'rp', 'k', 'i', 'demand']] + if tech_columns: + scenario_df[tech_columns] = scenario_df[tech_columns].fillna(0) return scenario_df @@ -296,7 +373,8 @@ def _run_kmedoids_clustering(pivot_df: pd.DataFrame, k: int, rp_length: int): noTypicalPeriods=k, hoursPerPeriod=rp_length, clusterMethod='k_medoids', - rescaleClusterPeriods=False + rescaleClusterPeriods=False, + solver="gurobi" ) typical_periods = aggregation.createTypicalPeriods() @@ -307,65 +385,42 @@ def _run_kmedoids_clustering(pivot_df: pd.DataFrame, k: int, rp_length: int): def _build_representative_periods(case_study, scenario: str, aggregation, rp_length: int): - """Build demand and VRES profile data for representative periods.""" + """Build demand, VRES profile, and inflows data for representative periods.""" def _extract_numeric_and_calc_p(df, rp_length): """Extract numeric values from rp/k strings and calculate absolute hour.""" - df['rp_num'] = df['rp'].str.extract(r'(\d+)').astype(int) - df['k_num'] = df['k'].str.extract(r'(\d+)').astype(int) + df['rp_num'] = df['rp'].str[2:].astype(int) + df['k_num'] = df['k'].str[1:].astype(int) df['p'] = (df['rp_num'] - 1) * rp_length + df['k_num'] return df - # Process demand data - demand_original = case_study.dPower_Demand.reset_index() - demand_original = demand_original[demand_original['scenario'] == scenario].copy() - demand_original = _extract_numeric_and_calc_p(demand_original, rp_length) - - demand_data = [] - for cluster_idx, medoid_period in enumerate(aggregation.clusterCenterIndices): - rp_new = f'rp{cluster_idx + 1:02d}' - medoid_hours = range(medoid_period * rp_length + 1, (medoid_period + 1) * rp_length + 1) - medoid_demand_data = demand_original[demand_original['p'].isin(medoid_hours)] - - for k_offset, abs_hour in enumerate(medoid_hours, start=1): - k_new = f'k{k_offset:02d}' - hour_demand = medoid_demand_data[medoid_demand_data['p'] == abs_hour] - - for _, row in hour_demand.iterrows(): - demand_data.append({ - 'rp': rp_new, - 'i': row['i'], - 'k': k_new, - 'scenario': scenario, - 'value': row['value'] - }) - - # Process VRES data if available - vres_data = [] + time_series_tables = [("Power_Demand", case_study.dPower_Demand)] if hasattr(case_study, 'dPower_VRESProfiles') and case_study.dPower_VRESProfiles is not None: - vres_original = case_study.dPower_VRESProfiles.reset_index() - vres_original = vres_original[vres_original['scenario'] == scenario].copy() - vres_original = _extract_numeric_and_calc_p(vres_original, rp_length) + time_series_tables.append(("Power_VRESProfiles", case_study.dPower_VRESProfiles)) + if hasattr(case_study, 'dPower_Inflows') and case_study.dPower_Inflows is not None: + time_series_tables.append(("Power_Inflows", case_study.dPower_Inflows)) + + data = {name: [] for name, _ in time_series_tables} + for name, df in time_series_tables: + df_original = df.reset_index() + df_original = df_original[df_original['scenario'] == scenario].copy() + df_original = _extract_numeric_and_calc_p(df_original, rp_length) for cluster_idx, medoid_period in enumerate(aggregation.clusterCenterIndices): rp_new = f'rp{cluster_idx + 1:02d}' medoid_hours = range(medoid_period * rp_length + 1, (medoid_period + 1) * rp_length + 1) - medoid_vres_data = vres_original[vres_original['p'].isin(medoid_hours)] + medoid_data = df_original[df_original['p'].isin(medoid_hours)] for k_offset, abs_hour in enumerate(medoid_hours, start=1): - k_new = f'k{k_offset:02d}' - hour_vres = medoid_vres_data[medoid_vres_data['p'] == abs_hour] + k_new = f'k{k_offset:04d}' + hour_data = medoid_data[medoid_data['p'] == abs_hour] - for _, row in hour_vres.iterrows(): - vres_data.append({ - 'rp': rp_new, - 'k': k_new, - 'g': row['g'], - 'scenario': scenario, - 'value': row['value'] - }) + for _, row in hour_data.iterrows(): + row['rp'] = rp_new + row['k'] = k_new + data[name].append(row) - return demand_data, vres_data + return data def _build_scenario_weights_and_indices(aggregation, scenario: str, rp_length: int): @@ -377,16 +432,22 @@ def _build_scenario_weights_and_indices(aggregation, scenario: str, rp_length: i weights_rp.append({ 'rp': f'rp{rp_idx + 1:02d}', 'scenario': scenario, - 'pWeight_rp': int(weight) + 'pWeight_rp': int(weight), + 'id': None, + "dataPackage": None, + "dataSource": None, }) # K weights (all 1 for hourly resolution) weights_k = [] for k in range(1, rp_length + 1): weights_k.append({ - 'k': f'k{k:02d}', + 'k': f'k{k:04d}', 'scenario': scenario, - 'pWeight_k': 1 + 'pWeight_k': 1, + 'id': None, + "dataPackage": None, + "dataSource": None, }) # Hindex mapping @@ -396,8 +457,11 @@ def _build_scenario_weights_and_indices(aggregation, scenario: str, rp_length: i hindex.append({ 'p': f'h{orig_p * rp_length + k:04d}', 'rp': f'rp{cluster_id + 1:02d}', - 'k': f'k{k:02d}', - 'scenario': scenario + 'k': f'k{k:04d}', + 'scenario': scenario, + 'id': None, + "dataPackage": None, + "dataSource": None, }) return weights_rp, weights_k, hindex @@ -409,13 +473,15 @@ def _update_casestudy_with_scenarios(case_study, all_processed_data: Dict): # Collect all data across scenarios all_demand_data = [] all_vres_data = [] + all_inflows_data = [] all_weights_rp_data = [] all_weights_k_data = [] all_hindex_data = [] for scenario, scenario_data in all_processed_data.items(): - all_demand_data.extend(scenario_data['demand']) - all_vres_data.extend(scenario_data['vres_profiles']) + all_demand_data.extend(scenario_data['Power_Demand']) + all_vres_data.extend(scenario_data['Power_VRESProfiles']) + all_inflows_data.extend(scenario_data['Power_Inflows']) all_weights_rp_data.extend(scenario_data['weights_rp']) all_weights_k_data.extend(scenario_data['weights_k']) all_hindex_data.extend(scenario_data['hindex']) @@ -432,6 +498,11 @@ def _update_casestudy_with_scenarios(case_study, all_processed_data: Dict): case_study.dPower_VRESProfiles = vres_df.set_index(['rp', 'k', 'g']) print(f" - Updated VRES profiles: {len(all_vres_data)} entries") + if all_inflows_data: + inflows_df = pd.DataFrame(all_inflows_data) + case_study.dPower_Inflows = inflows_df.set_index(['rp', 'k', 'g']) + print(f" - Updated inflows: {len(all_inflows_data)} entries") + if all_weights_rp_data: weights_rp_df = pd.DataFrame(all_weights_rp_data) case_study.dPower_WeightsRP = weights_rp_df.set_index(['rp']) diff --git a/data/example/Data_Packages.xlsx b/data/example/Data_Packages.xlsx index 5256c4c..525e12b 100644 Binary files a/data/example/Data_Packages.xlsx and b/data/example/Data_Packages.xlsx differ diff --git a/data/example/Data_Sources.xlsx b/data/example/Data_Sources.xlsx index e771983..1279441 100644 Binary files a/data/example/Data_Sources.xlsx and b/data/example/Data_Sources.xlsx differ diff --git a/data/example/Global_Scenarios.xlsx b/data/example/Global_Scenarios.xlsx index 0c0d807..951f6ea 100644 Binary files a/data/example/Global_Scenarios.xlsx and b/data/example/Global_Scenarios.xlsx differ diff --git a/data/example/Power_BusInfo.xlsx b/data/example/Power_BusInfo.xlsx index a942a25..41bcb37 100644 Binary files a/data/example/Power_BusInfo.xlsx and b/data/example/Power_BusInfo.xlsx differ diff --git a/data/example/Power_Demand.xlsx b/data/example/Power_Demand.xlsx index c055d7d..2cff11c 100644 Binary files a/data/example/Power_Demand.xlsx and b/data/example/Power_Demand.xlsx differ diff --git a/data/example/Power_Demand_KInRows.xlsx b/data/example/Power_Demand_KInRows.xlsx index 9ad5a44..d02bbca 100644 Binary files a/data/example/Power_Demand_KInRows.xlsx and b/data/example/Power_Demand_KInRows.xlsx differ diff --git a/data/example/Power_Hindex.xlsx b/data/example/Power_Hindex.xlsx index c9d355d..50f40d2 100644 Binary files a/data/example/Power_Hindex.xlsx and b/data/example/Power_Hindex.xlsx differ diff --git a/data/example/Power_Inflows.xlsx b/data/example/Power_Inflows.xlsx index a5d2ebd..3e3f17a 100644 Binary files a/data/example/Power_Inflows.xlsx and b/data/example/Power_Inflows.xlsx differ diff --git a/data/example/Power_Inflows_KInRows.xlsx b/data/example/Power_Inflows_KInRows.xlsx index a1368b7..2cc6df6 100644 Binary files a/data/example/Power_Inflows_KInRows.xlsx and b/data/example/Power_Inflows_KInRows.xlsx differ diff --git a/data/example/Power_Network.xlsx b/data/example/Power_Network.xlsx index af21c57..77c44bc 100644 Binary files a/data/example/Power_Network.xlsx and b/data/example/Power_Network.xlsx differ diff --git a/data/example/Power_Storage.xlsx b/data/example/Power_Storage.xlsx index b67e3c7..7c4c476 100644 Binary files a/data/example/Power_Storage.xlsx and b/data/example/Power_Storage.xlsx differ diff --git a/data/example/Power_ThermalGen.xlsx b/data/example/Power_ThermalGen.xlsx index 62ddc10..6db9a70 100644 Binary files a/data/example/Power_ThermalGen.xlsx and b/data/example/Power_ThermalGen.xlsx differ diff --git a/data/example/Power_VRES.xlsx b/data/example/Power_VRES.xlsx index adc8368..1201d1f 100644 Binary files a/data/example/Power_VRES.xlsx and b/data/example/Power_VRES.xlsx differ diff --git a/data/example/Power_VRESProfiles.xlsx b/data/example/Power_VRESProfiles.xlsx index 33740f4..0e7bdb1 100644 Binary files a/data/example/Power_VRESProfiles.xlsx and b/data/example/Power_VRESProfiles.xlsx differ diff --git a/data/example/Power_VRESProfiles_KInRows.xlsx b/data/example/Power_VRESProfiles_KInRows.xlsx index b68fc05..73abf38 100644 Binary files a/data/example/Power_VRESProfiles_KInRows.xlsx and b/data/example/Power_VRESProfiles_KInRows.xlsx differ diff --git a/data/example/Power_WeightsK.xlsx b/data/example/Power_WeightsK.xlsx index 4cf10f4..bba1675 100644 Binary files a/data/example/Power_WeightsK.xlsx and b/data/example/Power_WeightsK.xlsx differ diff --git a/data/example/Power_WeightsRP.xlsx b/data/example/Power_WeightsRP.xlsx index 40ea572..78d645d 100644 Binary files a/data/example/Power_WeightsRP.xlsx and b/data/example/Power_WeightsRP.xlsx differ diff --git a/data/example/Power_Wind_TechnicalDetails.xlsx b/data/example/Power_Wind_TechnicalDetails.xlsx index 8e056ac..2e3072e 100644 Binary files a/data/example/Power_Wind_TechnicalDetails.xlsx and b/data/example/Power_Wind_TechnicalDetails.xlsx differ diff --git a/printer.py b/printer.py index c753835..5ddc0d0 100644 --- a/printer.py +++ b/printer.py @@ -116,7 +116,10 @@ def error(self, text: str, prefix: str = "Error: ", hard_wrap_chars: str = None) """ text = self.handle_hard_wrap_chars(text, prefix, hard_wrap_chars) - self.console.print(f"[red]{prefix}{text}[/red]") + if len(prefix) > 0: + self.console.print(f"[red]{prefix}[/red]{text}") # Only have prefix in color if it is set + else: + self.console.print(f"[red]{text}[/red]") self._log(f"{prefix}{text}") return None @@ -134,7 +137,10 @@ def warning(self, text: str, prefix: str = "Warning: ", hard_wrap_chars: str = N """ text = self.handle_hard_wrap_chars(text, prefix, hard_wrap_chars) - self.console.print(f"[yellow]{prefix}{text}[/yellow]") + if len(prefix) > 0: + self.console.print(f"[yellow]{prefix}[/yellow]{text}") # Only have prefix in color if it is set + else: + self.console.print(f"[yellow]{text}[/yellow]") self._log(f"{prefix}{text}") return None @@ -152,7 +158,10 @@ def success(self, text: str, prefix: str = "", hard_wrap_chars: str = None): """ text = self.handle_hard_wrap_chars(text, prefix, hard_wrap_chars) - self.console.print(f"[green]{prefix}{text}[/green]") + if len(prefix) > 0: + self.console.print(f"[green]{prefix}[/green]{text}") # Only have prefix in color if it is set + else: + self.console.print(f"[green]{text}[/green]") self._log(f"{prefix}{text}") return None