-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathUtilities.py
More file actions
618 lines (483 loc) · 25.5 KB
/
Utilities.py
File metadata and controls
618 lines (483 loc) · 25.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
from __future__ import annotations
import typing
from typing import TYPE_CHECKING, Literal, Dict
import numpy as np
import pandas as pd
import tsam.timeseriesaggregation as tsam
from InOutModule.printer import Printer
if TYPE_CHECKING:
from InOutModule.CaseStudy import CaseStudy
printer = Printer.getInstance()
def inflowsToCapacityFactors(inflows_df: pd.DataFrame, vres_df: pd.DataFrame, vresProfiles_df: pd.DataFrame) -> pd.DataFrame:
"""
Convert inflows to capacity factors and concat them to vresProfiles_df.
- inflows_df: inflow data with inflows per generator (g) and representative period (rp).
- vres_df: contains generator technical data, including 'MaxProd'.
- vresProfiles_df: existing VRES profiles (indexed by rp, k, g).
"""
df = inflows_df.copy()
# Prepare vres_df with ['g','MaxProd']
vres_tmp = vres_df.reset_index()[['g', 'MaxProd']]
if vres_tmp['g'].duplicated().any():
raise ValueError("Duplicated generator found in Power_VRES.")
maxProd = vres_tmp.set_index('g')['MaxProd'].astype(float)
# Join MaxProd into inflows
df = df.join(maxProd, on='g', how='left')
if df['MaxProd'].isna().any() or (df['MaxProd'] == 0).any():
printer.warning(f"Some inflows correspond to generators which are not in Power_VRES (or have MaxProd=0). They will be ignored: {df[df['MaxProd'].isna() | (df['MaxProd'] == 0)].index.get_level_values('g').unique()}")
df = df.dropna(subset=['MaxProd'])
df = df[df['MaxProd'] != 0]
# Divide inflow value by MaxProd
df['value'] = df['value'] / df['MaxProd']
# Drop helper column
df = df.drop(columns=['MaxProd'])
return pd.concat([vresProfiles_df, df], axis=0)
def capacityFactorsToInflows(vresProfiles_df: pd.DataFrame, vres_df: pd.DataFrame, inflows_df: pd.DataFrame, remove_Inflows_from_VRESProfiles_inplace: bool = False) -> pd.DataFrame:
"""
Convert capacity factors in vresProfiles_df back to inflows.
- vresProfiles_df: DataFrame with capacity factors (indexed by rp, k, g).
- vres_df: DataFrame containing generator technical data, including 'MaxProd'.
- inflows_df: template inflows DataFrame (used to filter only those generators that are inflow-based).
- remove_Inflows_from_VRESProfiles_inplace: if True, remove inflow generators from the original vresProfiles_df.
"""
df = vresProfiles_df.reset_index()
# Get list of inflow generators
inflow_generators = inflows_df.reset_index()['g'].unique()
# Prepare vres_df with ['g','MaxProd']
vres_tmp = vres_df.reset_index()[['g', 'MaxProd']]
if vres_tmp['g'].duplicated().any():
raise ValueError("Duplicated generator found in Power_VRES.")
maxProd = vres_tmp.set_index('g')['MaxProd'].astype(float)
# Keep only inflow generators
df = df[df['g'].isin(inflow_generators)]
# Join MaxProd
df = df.join(maxProd, on='g', how='left')
if df['MaxProd'].isna().any() or (df['MaxProd'] == 0).any():
raise ValueError("MaxProd is missing or zero for some generators in inflows.")
# Multiply capacity factor by MaxProd
df['value'] = df['value'] * df['MaxProd']
# Drop helper column
df = df.drop(columns=['MaxProd'])
# Remove inflow generators from vresProfiles_df after calculation if requested
if remove_Inflows_from_VRESProfiles_inplace:
mask = vresProfiles_df.index.get_level_values('g').isin(inflow_generators)
vresProfiles_df.drop(vresProfiles_df.index[mask], inplace=True)
return df.set_index(['rp', 'k', 'g']).sort_index(level="k")
def _extract_scenario_data(case_study, scenario: str, capacity_normalization_strategy: str) -> pd.DataFrame:
"""Extract and combine demand, VRES, and inflows data for a single scenario."""
def _apply_capacity_normalization_strategy(df, capacity_normalization_strategy):
"""Apply capacity normalization strategy to a dataframe with technology data."""
if capacity_normalization_strategy == "installed":
return df['ExisUnits'].fillna(0)
else: # maxInvestment
return np.maximum(
df['ExisUnits'].fillna(0),
df['EnableInvest'].fillna(0) * df['MaxInvest'].fillna(0)
)
def _pivot_technologies(df, value_column, index_cols=None):
"""Pivot technologies as columns and drop 'g' column."""
if index_cols is None:
index_cols = ['scenario', 'rp', 'k', 'g', 'i']
return df.pivot_table(
index=index_cols,
columns='tec',
values=value_column,
fill_value=0
).reset_index().drop(columns=['g'])
# Extract demand data for this scenario
demand_df = case_study.dPower_Demand.reset_index()
demand_df = demand_df[demand_df['scenario'] == scenario].copy()
if len(demand_df) == 0:
raise ValueError(f"No demand data found for scenario {scenario}")
# Initialize with demand data
scenario_df = demand_df[['scenario', 'rp', 'i', 'k', 'value']].rename(columns={'value': 'demand'})
vres_with_profiles = None
# Process VRES data if available
if (hasattr(case_study, 'dPower_VRES') and case_study.dPower_VRES is not None and
hasattr(case_study, 'dPower_VRESProfiles') and case_study.dPower_VRESProfiles is not None):
# Get VRES data for this scenario
vres_df = case_study.dPower_VRES.reset_index()
vres_df = vres_df[vres_df['scenario'] == scenario].copy()
# Get VRES profiles for this scenario
vres_profiles_df = case_study.dPower_VRESProfiles.reset_index()
vres_profiles_df = vres_profiles_df[vres_profiles_df['scenario'] == scenario].copy()
if len(vres_df) > 0 and len(vres_profiles_df) > 0:
# Merge of VRES with VRESProfiles
vres_with_profiles = pd.merge(
vres_profiles_df,
vres_df[['g', 'tec', 'i', 'ExisUnits', 'MaxProd', 'EnableInvest', 'MaxInvest']],
on='g',
how='left'
)
# Apply capacity normalization and calculate weighted capacity factor
normalization_factor = _apply_capacity_normalization_strategy(vres_with_profiles, capacity_normalization_strategy)
vres_with_profiles['weighted_cf'] = (
vres_with_profiles['value'].fillna(0) *
vres_with_profiles['MaxProd'].fillna(0) *
normalization_factor
)
# Pivot technologies as columns
vres_with_profiles = _pivot_technologies(vres_with_profiles, 'weighted_cf')
inflows_with_tech = None
# Process Inflows data if available
if hasattr(case_study, 'dPower_Inflows') and case_study.dPower_Inflows is not None:
# Get Inflows data for this scenario
inflows_df = case_study.dPower_Inflows.reset_index()
inflows_df = inflows_df[inflows_df['scenario'] == scenario].copy()
if len(inflows_df) > 0:
# Collect all inflows data from different sources
inflows_parts = []
# Try to merge with Power_VRES data
if (hasattr(case_study, 'dPower_VRES') and case_study.dPower_VRES is not None and
vres_with_profiles is not None and len(vres_df) > 0):
inflows_with_vres = pd.merge(
inflows_df,
vres_df[['g', 'tec', 'i', 'ExisUnits', 'EnableInvest', 'MaxInvest']],
on='g',
how='left'
)
inflows_parts.append(inflows_with_vres)
# Try to merge with Power_Storage data
if hasattr(case_study, 'dPower_Storage') and case_study.dPower_Storage is not None:
storage_df = case_study.dPower_Storage.reset_index()
storage_df = storage_df[storage_df['scenario'] == scenario].copy()
if len(storage_df) > 0:
inflows_with_storage = pd.merge(
inflows_df,
storage_df[['g', 'tec', 'i', 'ExisUnits', 'EnableInvest', 'MaxInvest']],
on='g',
how='inner'
)
inflows_parts.append(inflows_with_storage)
# Combine all inflows parts
if inflows_parts:
inflows_with_tech = pd.concat(inflows_parts, ignore_index=True)
# Apply capacity normalization
normalization_factor = _apply_capacity_normalization_strategy(inflows_with_tech, capacity_normalization_strategy)
inflows_with_tech['value'] = inflows_with_tech['value'].fillna(0) * normalization_factor
# Pivot technologies as columns
inflows_with_tech = _pivot_technologies(inflows_with_tech, 'value')
# Combine VRES and inflows data
combined_tech_data = None
if vres_with_profiles is not None and inflows_with_tech is not None:
combined_tech_data = pd.concat([vres_with_profiles, inflows_with_tech],
ignore_index=True, sort=False)
elif vres_with_profiles is not None:
combined_tech_data = vres_with_profiles
elif inflows_with_tech is not None:
combined_tech_data = inflows_with_tech
# Merge the combined technology data with scenario_df
if combined_tech_data is not None:
# Use right join to keep ALL demand data (even nodes without technology data)
# Replicates demand for nodes with technology, and preserves demand-only nodes
scenario_df = pd.merge(
combined_tech_data,
scenario_df,
on=['scenario', 'rp', 'k', 'i'],
how='right'
)
# Fill NaN values in technology columns with 0 for demand-only nodes
tech_columns = [col for col in scenario_df.columns
if col not in ['scenario', 'rp', 'k', 'i', 'demand']]
if tech_columns:
scenario_df[tech_columns] = scenario_df[tech_columns].fillna(0)
return scenario_df
def _prepare_disaggregated_data(scenario_df: pd.DataFrame, sum_production: bool) -> pd.DataFrame:
"""Prepare data for disaggregated clustering (keeps buses separate)."""
result_df = scenario_df.copy()
if sum_production:
result_df = _sum_technology_columns(result_df)
return result_df
def _prepare_aggregated_data(scenario_df: pd.DataFrame, sum_production: bool) -> pd.DataFrame:
"""Prepare data for aggregated clustering (sum across buses)."""
grouping_cols = ['scenario', 'rp', 'k']
exclude_cols = grouping_cols + ['i']
value_cols = [col for col in scenario_df.columns if col not in exclude_cols]
# Aggregate across buses
aggregated_df = scenario_df.groupby(grouping_cols)[value_cols].sum().reset_index()
if sum_production:
aggregated_df = _sum_technology_columns(aggregated_df)
return aggregated_df
def _sum_technology_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Sum all technology columns into a single 'production' column."""
result_df = df.copy()
exclude_cols = {'scenario', 'rp', 'i', 'k', 'demand'}
tech_cols = [col for col in df.columns if col not in exclude_cols]
if tech_cols:
result_df['production'] = df[tech_cols].sum(axis=1)
result_df = result_df.drop(columns=tech_cols)
return result_df
def _run_kmedoids_clustering(pivot_df: pd.DataFrame, k: int, rp_length: int, solver: str = None, verbose: bool = False):
"""Run k-medoids clustering using tsam."""
printer = Printer.getInstance()
# Prepare data for tsam
pivot_df_sorted = pivot_df.sort_values('k')
# Create datetime index
pivot_df_sorted['datetime'] = pd.date_range(start='2010-01-01', periods=len(pivot_df_sorted), freq='h')
# Drop grouping columns and set datetime index
clustering_data = pivot_df_sorted.drop(columns=['scenario', 'rp', 'k']).set_index('datetime')
if verbose:
printer.information(f" Running k-medoids with {k} clusters, {rp_length} hours/period, {len(clustering_data)} total hours")
# Run clustering
tsam_kwargs = dict(
noTypicalPeriods=k,
hoursPerPeriod=rp_length,
clusterMethod='k_medoids',
rescaleClusterPeriods=False,
)
if solver is not None:
tsam_kwargs['solver'] = solver
aggregation = tsam.TimeSeriesAggregation(clustering_data, **tsam_kwargs)
typical_periods = aggregation.createTypicalPeriods()
if verbose:
printer.information(f" Clustering completed. Created {len(typical_periods)} typical periods.")
printer.information(f" Cluster center indices (medoids): {aggregation.clusterCenterIndices}")
return aggregation
def _build_representative_periods(case_study, scenario: str, aggregation, rp_length: int):
"""Build demand, VRES profile, and inflows data for representative periods."""
def _extract_numeric_and_calc_p(df, rp_length):
"""Extract numeric values from rp/k strings and calculate absolute hour."""
df['rp_num'] = df['rp'].str[2:].astype(int)
df['k_num'] = df['k'].str[1:].astype(int)
df['p'] = (df['rp_num'] - 1) * rp_length + df['k_num']
return df
time_series_tables = [("Power_Demand", case_study.dPower_Demand)]
if hasattr(case_study, 'dPower_VRESProfiles') and case_study.dPower_VRESProfiles is not None:
time_series_tables.append(("Power_VRESProfiles", case_study.dPower_VRESProfiles))
if hasattr(case_study, 'dPower_Inflows') and case_study.dPower_Inflows is not None:
time_series_tables.append(("Power_Inflows", case_study.dPower_Inflows))
data = {name: [] for name, _ in time_series_tables}
for name, df in time_series_tables:
df_original = df.reset_index()
df_original = df_original[df_original['scenario'] == scenario].copy()
df_original = _extract_numeric_and_calc_p(df_original, rp_length)
for cluster_idx, medoid_period in enumerate(aggregation.clusterCenterIndices):
rp_new = f'rp{cluster_idx + 1:02d}'
medoid_hours = range(medoid_period * rp_length + 1, (medoid_period + 1) * rp_length + 1)
medoid_data = df_original[df_original['p'].isin(medoid_hours)]
for k_offset, abs_hour in enumerate(medoid_hours, start=1):
k_new = f'k{k_offset:04d}'
hour_data = medoid_data[medoid_data['p'] == abs_hour]
for _, row in hour_data.iterrows():
row['rp'] = rp_new
row['k'] = k_new
data[name].append(row)
return data
def _build_scenario_weights_and_indices(aggregation, scenario: str, rp_length: int):
"""Build representative period weights and hour indices for a single scenario."""
# RP weights
weights_rp = []
for rp_idx, weight in aggregation._clusterPeriodNoOccur.items():
weights_rp.append({
'rp': f'rp{rp_idx + 1:02d}',
'scenario': scenario,
'pWeight_rp': int(weight),
'id': None,
"dataPackage": None,
"dataSource": None,
})
# K weights (all 1 for hourly resolution)
weights_k = []
for k in range(1, rp_length + 1):
weights_k.append({
'k': f'k{k:04d}',
'scenario': scenario,
'pWeight_k': 1,
'id': None,
"dataPackage": None,
"dataSource": None,
})
# Hindex mapping
hindex = []
for orig_p, cluster_id in enumerate(aggregation._clusterOrder):
for k in range(1, rp_length + 1):
hindex.append({
'p': f'h{orig_p * rp_length + k:04d}',
'rp': f'rp{cluster_id + 1:02d}',
'k': f'k{k:04d}',
'scenario': scenario,
'id': None,
"dataPackage": None,
"dataSource": None,
})
return weights_rp, weights_k, hindex
def _update_casestudy_with_scenarios(case_study, all_processed_data: Dict, verbose: bool = False):
"""Update CaseStudy with aggregated data, maintaining original index structures."""
printer = Printer.getInstance()
# Collect all data across scenarios
all_demand_data = []
all_vres_data = []
all_inflows_data = []
all_weights_rp_data = []
all_weights_k_data = []
all_hindex_data = []
for scenario, scenario_data in all_processed_data.items():
all_demand_data.extend(scenario_data['Power_Demand'])
all_vres_data.extend(scenario_data['Power_VRESProfiles'])
all_inflows_data.extend(scenario_data['Power_Inflows'])
all_weights_rp_data.extend(scenario_data['weights_rp'])
all_weights_k_data.extend(scenario_data['weights_k'])
all_hindex_data.extend(scenario_data['hindex'])
if verbose:
printer.information(f"Updating CaseStudy with combined data:")
if all_demand_data:
demand_df = pd.DataFrame(all_demand_data)
case_study.dPower_Demand = demand_df.set_index(['rp', 'k', 'i'])
if verbose:
printer.information(f" - Updated demand: {len(all_demand_data)} entries")
if all_vres_data:
vres_df = pd.DataFrame(all_vres_data)
case_study.dPower_VRESProfiles = vres_df.set_index(['rp', 'k', 'g'])
if verbose:
printer.information(f" - Updated VRES profiles: {len(all_vres_data)} entries")
if all_inflows_data:
inflows_df = pd.DataFrame(all_inflows_data)
case_study.dPower_Inflows = inflows_df.set_index(['rp', 'k', 'g'])
if verbose:
printer.information(f" - Updated inflows: {len(all_inflows_data)} entries")
if all_weights_rp_data:
weights_rp_df = pd.DataFrame(all_weights_rp_data)
case_study.dPower_WeightsRP = weights_rp_df.set_index(['rp'])
if verbose:
printer.information(f" - Updated RP weights: {len(all_weights_rp_data)} entries")
if all_weights_k_data:
weights_k_df = pd.DataFrame(all_weights_k_data)
case_study.dPower_WeightsK = weights_k_df.set_index(['k'])
if verbose:
printer.information(f" - Updated K weights: {len(all_weights_k_data)} entries")
if all_hindex_data:
hindex_df = pd.DataFrame(all_hindex_data)
case_study.dPower_Hindex = hindex_df.set_index(['p', 'rp', 'k'])
if verbose:
printer.information(f" - Updated Hindex: {len(all_hindex_data)} entries")
if verbose:
printer.information("CaseStudy update completed successfully!")
def get_kmedoids_representative_periods(case_study, number_rps: int, rp_length: int = 24,
cluster_strategy: Literal["aggregated", "disaggregated"] = "aggregated",
capacity_normalization: Literal["installed", "maxInvestment"] = "maxInvestment",
sum_production: bool = False, solver: str = "gurobi",
verbose: bool = False) -> dict[str, tsam.TimeSeriesAggregation]:
"""
Get the representative periods using k-medoids temporal aggregation. Does not modify the original CaseStudy.
Each scenario from dGlobal_Scenarios is processed independently.
:param case_study: The CaseStudy object to aggregate
:param number_rps: Number of representative periods to create
:param rp_length: Hours per representative period (e.g., 24, 48)
:param cluster_strategy: "aggregated" (sum across buses) or "disaggregated" (keep buses separate)
:param capacity_normalization: "installed" or "maxInvestment" for VRES capacity factor weighting
:param sum_production: If True, sum all technologies into single production column
:param solver: Solver to use for k-medoids clustering (e.g. "gurobi", "glpk"). Defaults to "gurobi".
:param verbose: If True, print detailed processing information
:return: TSAM TimeSeriesAggregation object with representative periods for each scenario
"""
# Get scenario names
scenario_names = case_study.dGlobal_Scenarios.index.values
# Process each scenario independently
all_scenario_results = {}
for scenario in scenario_names:
printer.information(f"Scenario: {scenario}") if verbose else None
printer.information(f"Extracting data for scenario {scenario}") if verbose else None
scenario_clustering_data = _extract_scenario_data(case_study, scenario, capacity_normalization)
if len(scenario_clustering_data) == 0:
raise ValueError(f"No data found for scenario {scenario}")
printer.information(f"Found {len(scenario_clustering_data)} data points for clustering") if verbose else None
printer.information(f"Preparing data using {cluster_strategy} strategy") if verbose else None
if cluster_strategy == "disaggregated":
pivot_df = _prepare_disaggregated_data(scenario_clustering_data, sum_production)
else:
pivot_df = _prepare_aggregated_data(scenario_clustering_data, sum_production)
printer.information(f"Prepared {len(pivot_df)} time periods for clustering") if verbose else None
printer.information(f"Running k-medoids clustering (k={number_rps}, rp_length={rp_length})") if verbose else None
aggregation_result = _run_kmedoids_clustering(pivot_df, number_rps, rp_length, solver=solver, verbose=verbose)
printer.information(f"Aggregation result for scenario {scenario} received after {aggregation_result.clusteringDuration} seconds") if verbose else None
all_scenario_results[scenario] = aggregation_result
return all_scenario_results
def apply_representative_periods(
case_study,
aggregation: dict[str, tsam.TimeSeriesAggregation],
rp_length: int = 24,
inplace: bool = False,
verbose: bool = False) -> typing.Optional[CaseStudy]:
"""
Apply precomputed representative periods to a CaseStudy object.
Each scenario from dGlobal_Scenarios is processed independently.
:param case_study: The CaseStudy object to aggregate
:param aggregation: Precomputed TimeSeriesAggregation object
:param rp_length: Hours per representative period (e.g., 24, 48)
:param inplace: If True, modify the original CaseStudy; otherwise, return a new one
:param verbose: If True, print detailed processing information
:returns: New clustered CaseStudy object if inplace is False; otherwise, None
"""
# Create a deep copy to avoid modifying the original
aggregated_case_study = case_study.copy() if not inplace else case_study
scenario_names = aggregated_case_study.dGlobal_Scenarios.index.values
# Process each scenario independently
all_processed_data = {}
for scenario in scenario_names:
printer.information(f"Scenario: {scenario}") if verbose else None
printer.information(f"Building representative period data") if verbose else None
data = _build_representative_periods(
case_study, scenario, aggregation[scenario], rp_length
)
printer.information(f"Building weights and hour indices") if verbose else None
weights_rp, weights_k, hindex = _build_scenario_weights_and_indices(
aggregation[scenario], scenario, rp_length
)
all_processed_data[scenario] = {
'Power_Demand': data["Power_Demand"],
'Power_VRESProfiles': data["Power_VRESProfiles"] if "Power_VRESProfiles" in data else [],
'Power_Inflows': data["Power_Inflows"] if "Power_Inflows" in data else [],
'weights_rp': weights_rp,
'weights_k': weights_k,
'hindex': hindex
}
printer.information(f"Scenario {scenario} completed successfully") if verbose else None
# Update CaseStudy with aggregated data
_update_casestudy_with_scenarios(aggregated_case_study, all_processed_data, verbose=verbose)
printer.information(f"\nAll scenarios have been processed and combined successfully!") if verbose else None
if not inplace:
return aggregated_case_study
else:
return None
def apply_kmedoids_aggregation(
case_study,
k: int,
rp_length: int = 24,
cluster_strategy: Literal["aggregated", "disaggregated"] = "aggregated",
capacity_normalization: Literal["installed", "maxInvestment"] = "maxInvestment",
sum_production: bool = False,
solver: str = "gurobi",
inplace: bool = False,
verbose: bool = False):
"""
Apply k-medoids temporal aggregation to a CaseStudy object.
Each scenario from dGlobal_Scenarios is processed independently.
:param case_study: The CaseStudy object to aggregate
:param k: Number of representative periods to create
:param rp_length: Hours per representative period (e.g., 24, 48)
:param cluster_strategy: "aggregated" (sum across buses) or "disaggregated" (keep buses separate)
:param capacity_normalization: "installed" or "maxInvestment" for VRES capacity factor weighting
:param sum_production: If True, sum all technologies into single production column
:param solver: Solver to use for k-medoids clustering (e.g. "gurobi", "glpk"). Defaults to "gurobi".
:param inplace: If True, modify the original CaseStudy; otherwise, return a new one
:param verbose: If True, print detailed processing information
:return:
CaseStudy: New clustered CaseStudy object if inplace is False; otherwise, None
"""
aggregation_results = get_kmedoids_representative_periods(
case_study,
number_rps=k,
rp_length=rp_length,
cluster_strategy=cluster_strategy,
capacity_normalization=capacity_normalization,
sum_production=sum_production,
solver=solver,
verbose=verbose
)
return apply_representative_periods(
case_study,
aggregation=aggregation_results,
rp_length=rp_length,
inplace=inplace,
verbose=verbose
)