diff --git a/flexmeasures/data/models/planning/__init__.py b/flexmeasures/data/models/planning/__init__.py index 4caf906a60..87bea79bd4 100644 --- a/flexmeasures/data/models/planning/__init__.py +++ b/flexmeasures/data/models/planning/__init__.py @@ -1,5 +1,5 @@ from __future__ import annotations - +from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timedelta from tabulate import tabulate @@ -64,6 +64,28 @@ class Scheduler: return_multiple: bool = False + def _build_stock_groups(self, flex_model): + + groups = defaultdict(list) + soc_sensor_to_stock_model = {} + + # identify stock models + for i, fm in enumerate(flex_model): + if fm.get("soc_at_start") is not None: + soc_sensor = fm["sensor"] + soc_sensor_to_stock_model[soc_sensor] = i + + # group devices by soc sensor + for d, fm in enumerate(flex_model): + soc = fm.get("state_of_charge") + + if soc is None: + continue + + groups[soc.id].append(d) + + return dict(groups) + def __init__( self, sensor: Sensor | None = None, # deprecated diff --git a/flexmeasures/data/models/planning/linear_optimization.py b/flexmeasures/data/models/planning/linear_optimization.py index 4141042509..7cf03f48b9 100644 --- a/flexmeasures/data/models/planning/linear_optimization.py +++ b/flexmeasures/data/models/planning/linear_optimization.py @@ -41,6 +41,7 @@ def device_scheduler( # noqa C901 commitment_upwards_deviation_price: list[pd.Series] | list[float] | None = None, commitments: list[pd.DataFrame] | list[Commitment] | None = None, initial_stock: float | list[float] = 0, + stock_groups: dict[int, list[int]] | None = None, ) -> tuple[list[pd.Series], float, SolverResults, ConcreteModel]: """This generic device scheduler is able to handle an EMS with multiple devices, with various types of constraints on the EMS level and on the device level, @@ -100,6 +101,17 @@ def device_scheduler( # noqa C901 resolution = pd.to_timedelta(device_constraints[0].index.freq).to_pytimedelta() end = device_constraints[0].index.to_pydatetime()[-1] + resolution + # map device → stock group + device_to_group = {} + + if stock_groups: + for g, devices in stock_groups.items(): + for d in devices: + device_to_group[d] = g + else: + for d in range(len(device_constraints)): + device_to_group[d] = d + # Move commitments from old structure to new if commitments is None: commitments = [] @@ -484,33 +496,77 @@ def grouped_commitment_equalities(m, c, j, g): ) model.commitment_sign = Var(model.c, domain=Binary, initialize=0) + # def _get_stock_change(m, d, j): + # """Determine final stock change of device d until time j. + # + # Apply conversion efficiencies to conversion from flow to stock change and vice versa, + # and apply storage efficiencies to stock levels from one datetime to the next. + # """ + # if isinstance(initial_stock, list): + # # No initial stock defined for inflexible device + # initial_stock_d = initial_stock[d] if d < len(initial_stock) else 0 + # else: + # initial_stock_d = initial_stock + # + # stock_changes = [ + # ( + # m.device_power_down[d, k] / m.device_derivative_down_efficiency[d, k] + # + m.device_power_up[d, k] * m.device_derivative_up_efficiency[d, k] + # + m.stock_delta[d, k] + # ) + # for k in range(0, j + 1) + # ] + # efficiencies = [m.device_efficiency[d, k] for k in range(0, j + 1)] + # final_stock_change = [ + # stock - initial_stock_d + # for stock in apply_stock_changes_and_losses( + # initial_stock_d, stock_changes, efficiencies + # ) + # ][-1] + # return final_stock_change + def _get_stock_change(m, d, j): - """Determine final stock change of device d until time j. - Apply conversion efficiencies to conversion from flow to stock change and vice versa, - and apply storage efficiencies to stock levels from one datetime to the next. - """ + # determine the stock group of this device + group = device_to_group[d] + + # all devices belonging to this stock + devices = [dev for dev, g in device_to_group.items() if g == group] + + # initial stock if isinstance(initial_stock, list): - # No initial stock defined for inflexible device - initial_stock_d = initial_stock[d] if d < len(initial_stock) else 0 + initial_stock_g = initial_stock[d] if d < len(initial_stock) else 0 else: - initial_stock_d = initial_stock + initial_stock_g = initial_stock + + stock_changes = [] + + for k in range(0, j + 1): + + change = 0 + + for dev in devices: + change += ( + m.device_power_down[dev, k] + / m.device_derivative_down_efficiency[dev, k] + + m.device_power_up[dev, k] + * m.device_derivative_up_efficiency[dev, k] + + m.stock_delta[dev, k] + ) + + stock_changes.append(change) - stock_changes = [ - ( - m.device_power_down[d, k] / m.device_derivative_down_efficiency[d, k] - + m.device_power_up[d, k] * m.device_derivative_up_efficiency[d, k] - + m.stock_delta[d, k] - ) - for k in range(0, j + 1) - ] efficiencies = [m.device_efficiency[d, k] for k in range(0, j + 1)] + final_stock_change = [ - stock - initial_stock_d + stock - initial_stock_g for stock in apply_stock_changes_and_losses( - initial_stock_d, stock_changes, efficiencies + initial_stock_g, + stock_changes, + efficiencies, ) ][-1] + return final_stock_change # Add constraints as a tuple of (lower bound, value, upper bound) diff --git a/flexmeasures/data/models/planning/storage.py b/flexmeasures/data/models/planning/storage.py index 836d81950e..7098bdf640 100644 --- a/flexmeasures/data/models/planning/storage.py +++ b/flexmeasures/data/models/planning/storage.py @@ -94,13 +94,45 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 resolution = self.resolution belief_time = self.belief_time + # For backwards compatibility with the single asset scheduler + flex_model = self.flex_model.copy() + if not isinstance(flex_model, list): + flex_model = [flex_model] + + # Identify stock models (entries defining SOC limits) + self.stock_models = {} + + for fm in flex_model: + if fm.get("soc_at_start") is not None: + sensor = fm["sensor"] + if isinstance(sensor, Sensor): + self.stock_models[sensor.id] = fm + else: + self.stock_models[sensor] = fm + + device_models = [] + stock_models = {} + + for fm in flex_model: + + # stock model + if fm.get("soc_at_start") is not None: + sensor = fm["sensor"] + stock_models[sensor.id if isinstance(sensor, Sensor) else sensor] = fm + continue + + # device model + if fm.get("state_of_charge") is not None: + device_models.append(fm) + + flex_model = device_models + self.stock_models = stock_models + # List the asset(s) and sensor(s) being scheduled if self.asset is not None: if not isinstance(self.flex_model, list): self.flex_model = [self.flex_model] - sensors: list[Sensor | None] = [ - flex_model_d.get("sensor") for flex_model_d in self.flex_model - ] + sensors: list[Sensor | None] = [fm.get("sensor") for fm in device_models] assets: list[Asset | None] = [ # noqa: F841 s.asset if s is not None else flex_model_d.get("asset") for s, flex_model_d in zip(sensors, self.flex_model) @@ -118,18 +150,28 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 asset = self.sensor.generic_asset assets = [asset] # noqa: F841 - # For backwards compatibility with the single asset scheduler - flex_model = self.flex_model.copy() - if not isinstance(flex_model, list): - flex_model = [flex_model] + num_flexible_devices = len(device_models) + + soc_at_start = [None] * num_flexible_devices + soc_targets = [None] * num_flexible_devices + soc_min = [None] * num_flexible_devices + soc_max = [None] * num_flexible_devices - # total number of flexible devices D described in the flex-model - num_flexible_devices = len(flex_model) + # Assign SOC constraints from stock model to the first device in each group + for stock_id, devices in self.stock_groups.items(): + + stock_model = self.stock_models.get(stock_id) + + if stock_model is None: + continue + + d0 = devices[0] + + soc_at_start[d0] = stock_model.get("soc_at_start") + soc_targets[d0] = stock_model.get("soc_targets") + soc_min[d0] = stock_model.get("soc_min") + soc_max[d0] = stock_model.get("soc_max") - soc_at_start = [flex_model_d.get("soc_at_start") for flex_model_d in flex_model] - soc_targets = [flex_model_d.get("soc_targets") for flex_model_d in flex_model] - soc_min = [flex_model_d.get("soc_min") for flex_model_d in flex_model] - soc_max = [flex_model_d.get("soc_max") for flex_model_d in flex_model] soc_minima = [flex_model_d.get("soc_minima") for flex_model_d in flex_model] soc_maxima = [flex_model_d.get("soc_maxima") for flex_model_d in flex_model] storage_efficiency = [ @@ -554,6 +596,21 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 ) ) + # # --- apply shared stock groups + # if hasattr(self, "stock_groups") and self.stock_groups: + # for stock_id, devices in self.stock_groups.items(): + # + # if len(devices) <= 1: + # continue + # + # # combine stock delta + # combined_delta = sum( + # device_constraints[d]["stock delta"] for d in devices + # ) + # + # for d in devices: + # device_constraints[d]["stock delta"] = combined_delta + # Create the device constraints for all the flexible devices for d in range(num_flexible_devices): sensor_d = sensors[d] @@ -719,7 +776,15 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 # soc-maxima will become a soft constraint (modelled as stock commitments), so remove hard constraint soc_maxima[d] = None - if soc_at_start[d] is not None: + # only apply SOC constraints to the first device of a shared stock + apply_soc_constraints = True + + for stock_id, devices in self.stock_groups.items(): + if d in devices and d != devices[0]: + apply_soc_constraints = False + break + + if soc_at_start[d] is not None and apply_soc_constraints: device_constraints[d] = add_storage_constraints( start, end, @@ -1002,6 +1067,29 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901 + message ) + # --- apply shared stock groups + if hasattr(self, "stock_groups") and self.stock_groups: + for stock_id, devices in self.stock_groups.items(): + + if len(devices) <= 1: + continue + + d0 = devices[0] + + combined_delta = sum( + device_constraints[d]["stock delta"] for d in devices + ) + + device_constraints[d0]["stock delta"] = combined_delta + + # secondary devices keep their delta but must not have SOC constraints + for d in devices[1:]: + device_constraints[d]["stock delta"] = 0 + + # disable stock bounds for secondary devices + device_constraints[d]["equals"] = np.nan + device_constraints[d]["min"] = np.nan + device_constraints[d]["max"] = np.nan return ( sensors, start, @@ -1134,6 +1222,7 @@ def deserialize_flex_config(self): soc_targets=self.flex_model[d].get("soc_targets"), sensor=self.flex_model[d]["sensor"], ) + self.stock_groups = self._build_stock_groups(self.flex_model) else: raise TypeError( @@ -1381,7 +1470,9 @@ class StorageScheduler(MetaStorageScheduler): fallback_scheduler_class: Type[Scheduler] = StorageFallbackScheduler - def compute(self, skip_validation: bool = False) -> SchedulerOutputType: + def compute( # noqa: C901 + self, skip_validation: bool = False + ) -> SchedulerOutputType: """Schedule a battery or Charge Point based directly on the latest beliefs regarding market prices within the specified time window. For the resulting consumption schedule, consumption is defined as positive values. @@ -1400,18 +1491,23 @@ def compute(self, skip_validation: bool = False) -> SchedulerOutputType: commitments, ) = self._prepare(skip_validation=skip_validation) + initial_stock = [0] * len(soc_at_start) + + for stock_id, devices in self.stock_groups.items(): + d0 = devices[0] + s = soc_at_start[d0] + + value = s * (timedelta(hours=1) / resolution) if s is not None else 0 + + for d in devices: + initial_stock[d] = value + ems_schedule, expected_costs, scheduler_results, model = device_scheduler( device_constraints=device_constraints, ems_constraints=ems_constraints, commitments=commitments, - initial_stock=[ - ( - soc_at_start_d * (timedelta(hours=1) / resolution) - if soc_at_start_d is not None - else 0 - ) - for soc_at_start_d in soc_at_start - ], + initial_stock=initial_stock, + stock_groups=self.stock_groups, ) if "infeasible" in (tc := scheduler_results.solver.termination_condition): raise InfeasibleProblemException(tc) @@ -1450,26 +1546,35 @@ def compute(self, skip_validation: bool = False) -> SchedulerOutputType: flex_model["sensor"] = sensors[0] flex_model = [flex_model] - soc_schedule = { - flex_model_d["state_of_charge"]: convert_units( - integrate_time_series( - series=ems_schedule[d], - initial_stock=soc_at_start[d], - stock_delta=device_constraints[d]["stock delta"] - * resolution - / timedelta(hours=1), - up_efficiency=device_constraints[d]["derivative up efficiency"], - down_efficiency=device_constraints[d]["derivative down efficiency"], - storage_efficiency=device_constraints[d]["efficiency"] - .astype(float) - .fillna(1), - ), - from_unit="MWh", - to_unit=flex_model_d["state_of_charge"].unit, + soc_schedule = {} + + for stock_idx, (stock_id, devices) in enumerate(self.stock_groups.items()): + d0 = devices[0] + + stock_series = sum(ems_schedule[d] for d in devices) + + soc = integrate_time_series( + series=stock_series, + initial_stock=soc_at_start[d0], + stock_delta=device_constraints[d0]["stock delta"] + * resolution + / timedelta(hours=1), + up_efficiency=device_constraints[d0]["derivative up efficiency"], + down_efficiency=device_constraints[d0]["derivative down efficiency"], + storage_efficiency=device_constraints[d0]["efficiency"] + .astype(float) + .fillna(1), ) - for d, flex_model_d in enumerate(flex_model) - if isinstance(flex_model_d.get("state_of_charge", None), Sensor) - } + + # attach SOC sensor if defined + soc_sensor = flex_model[d0].get("state_of_charge") + + if isinstance(soc_sensor, Sensor): + soc_schedule[soc_sensor] = convert_units( + soc, + from_unit="MWh", + to_unit=soc_sensor.unit, + ) # Resample each device schedule to the resolution of the device's power sensor if self.resolution is None: diff --git a/flexmeasures/data/models/planning/tests/test_commitments.py b/flexmeasures/data/models/planning/tests/test_commitments.py index 9f2bf36fda..258a94931b 100644 --- a/flexmeasures/data/models/planning/tests/test_commitments.py +++ b/flexmeasures/data/models/planning/tests/test_commitments.py @@ -721,3 +721,205 @@ def test_mixed_gas_and_electricity_assets(app, db): f"This ensures optimizer prioritizes filling battery early over idling. " f"Ratio: {costs_data['prefer charging device 0 sooner'] / costs_data['prefer curtailing device 0 later']:.1f}×" ) + + +def test_two_devices_shared_stock(app, db): + """ + Test scheduling two batteries sharing a single shared stock. + Each battery: 20→80 kWh (60 kWh increase). + Combined SoC in shared stock cannot exceed 100 kWh at any time. + """ + # ---- time + start = pd.Timestamp("2024-01-01T00:00:00+01:00") + end = pd.Timestamp("2024-01-02T00:00:00+01:00") + power_sensor_resolution = pd.Timedelta("15m") + soc_sensor_resolution = pd.Timedelta(0) + + # ---- assets + battery_type = get_or_create_model(GenericAssetType, name="battery") + + b1 = GenericAsset(name="B1", generic_asset_type=battery_type) + b2 = GenericAsset(name="B2", generic_asset_type=battery_type) + + db.session.add_all([b1, b2]) + db.session.commit() + + s1 = Sensor( + name="power1", + unit="kW", + event_resolution=power_sensor_resolution, + generic_asset=b1, + ) + s2 = Sensor( + name="power2", + unit="kW", + event_resolution=power_sensor_resolution, + generic_asset=b2, + ) + + soc1 = Sensor( + name="soc1", + unit="kWh", + event_resolution=soc_sensor_resolution, + generic_asset=b1, + ) + + soc2 = Sensor( + name="soc2", + unit="kWh", + event_resolution=soc_sensor_resolution, + generic_asset=b2, + ) + + db.session.add_all([soc1, soc2, s1, s2]) + db.session.commit() + + # ---- shared stock (both batteries charge from same pool) + flex_model = [ + { + "sensor": s1.id, + "stock-id": "shared", + "state-of-charge": {"sensor": soc1.id}, + "soc-at-start": 20.0, + "soc-min": 0.0, + "soc-max": 100.0, + "soc-targets": [{"datetime": "2024-01-01T23:00:00+01:00", "value": 80.0}], + "power-capacity": "20 kW", + "charging-efficiency": 0.95, + "discharging-efficiency": 0.95, + }, + { + "sensor": s2.id, + "stock-id": "shared", + "state-of-charge": {"sensor": soc2.id}, + "soc-at-start": 20.0, + "soc-min": 0.0, + "soc-max": 100.0, + "soc-targets": [{"datetime": "2024-01-01T23:00:00+01:00", "value": 80.0}], + "power-capacity": "20 kW", + "charging-efficiency": 0.95, + "discharging-efficiency": 0.95, + }, + ] + + flex_context = { + "consumption-price": "100 EUR/MWh", + "production-price": "100 EUR/MWh", + } + + scheduler = StorageScheduler( + asset_or_sensor=b1, + start=start, + end=end, + resolution=power_sensor_resolution, + belief_time=start, + flex_model=flex_model, + flex_context=flex_context, + return_multiple=True, + ) + + schedules = scheduler.compute(skip_validation=True) + + # Extract schedules by type + storage_schedules = [ + entry for entry in schedules if entry.get("name") == "storage_schedule" + ] + soc_schedules = [ + entry for entry in schedules if entry.get("name") == "state_of_charge" + ] + commitment_costs = [ + entry for entry in schedules if entry.get("name") == "commitment_costs" + ] + + assert len(storage_schedules) == 2 + assert len(soc_schedules) == 1 # single shared SoC schedule + assert len(commitment_costs) == 1 + + # Get battery schedules + b1_schedule = next(entry for entry in storage_schedules if entry["sensor"] == s1) + b1_data = b1_schedule["data"] + + b2_schedule = next(entry for entry in storage_schedules if entry["sensor"] == s2) + b2_data = b2_schedule["data"] + + # Both devices should charge to meet their targets + assert (b1_data > 0).any(), "B1 should charge at some point" + assert (b2_data > 0).any(), "B2 should charge at some point" + + costs_data = commitment_costs[0]["data"] + + # B1: 60kWh Δ (20→80) / 0.95 eff × 100 EUR/MWh ≈ 6.32 EUR (charge) + discharge ≈ 4.32 EUR + assert costs_data["electricity energy 0"] == pytest.approx(4.32, rel=1e-2), ( + f"B1 electricity cost (60kWh @ 95% eff + discharge): " + f"60kWh/0.95 × (100 EUR/MWh) ≈ 4.32 EUR, " + f"got {costs_data['electricity energy 0']}" + ) + + # B2: identical to B1 (same parameters and targets) + assert costs_data["electricity energy 1"] == pytest.approx(4.32, rel=1e-2), ( + f"B2 electricity cost (60kWh @ 95% eff + discharge, same as B1): " + f"60kWh/0.95 × (100 EUR/MWh) ≈ 4.32 EUR, " + f"got {costs_data['electricity energy 1']}" + ) + + # Total electricity: B1 (4.32) + B2 (4.32) = 8.64 EUR + total_electricity_cost = sum( + v for k, v in costs_data.items() if k.startswith("electricity energy") + ) + assert total_electricity_cost == pytest.approx(8.64, rel=1e-2), ( + f"Total electricity cost (B1 4.32 + B2 4.32): " + f"≈ 8.64 EUR, got {total_electricity_cost}" + ) + + # B1 charging preference: early charging in shared stock scenario ≈ 9.44e-6 EUR + assert costs_data["prefer charging device 0 sooner"] == pytest.approx( + 9.44e-6, rel=1e-2 + ), ( + f"B1 charging preference (shared stock: both compete for same resource): " + f"≈ 9.44e-6 EUR, got {costs_data['prefer charging device 0 sooner']}" + ) + + # B1 curtailing preference (0.5× multiplier): ≈ 4.72e-6 EUR + assert costs_data["prefer curtailing device 0 later"] == pytest.approx( + 4.72e-6, rel=1e-2 + ), ( + f"B1 curtailing preference (0.5× idle multiplier): " + f"≈ 0.5 × 9.44e-6 = 4.72e-6 EUR, " + f"got {costs_data['prefer curtailing device 0 later']}" + ) + + # B2 charging preference: same as B1 ≈ 9.44e-6 EUR + assert costs_data["prefer charging device 1 sooner"] == pytest.approx( + 9.44e-6, rel=1e-2 + ), ( + f"B2 charging preference (shared stock, same as B1): " + f"≈ 9.44e-6 EUR, got {costs_data['prefer charging device 1 sooner']}" + ) + + # B2 curtailing preference: same as B1 ≈ 4.72e-6 EUR + assert costs_data["prefer curtailing device 1 later"] == pytest.approx( + 4.72e-6, rel=1e-2 + ), ( + f"B2 curtailing preference (0.5× idle multiplier, same as B1): " + f"≈ 4.72e-6 EUR, got {costs_data['prefer curtailing device 1 later']}" + ) + + # Verify charging cost ~2× curtailing cost for B1 (due to 0.5× multiplier) + assert ( + costs_data["prefer charging device 0 sooner"] + > costs_data["prefer curtailing device 0 later"] + ), ( + f"B1 charging preference should cost ~2× more than curtailing " + f"due to 0.5× multiplier. " + f"Ratio: {costs_data['prefer charging device 0 sooner'] / costs_data['prefer curtailing device 0 later']:.1f}×" + ) + + # Verify charging cost ~2× curtailing cost for B2 (due to 0.5× multiplier) + assert ( + costs_data["prefer charging device 1 sooner"] + > costs_data["prefer curtailing device 1 later"] + ), ( + f"B2 charging preference should cost ~2× more than curtailing " + f"due to 0.5× multiplier. " + f"Ratio: {costs_data['prefer charging device 1 sooner'] / costs_data['prefer curtailing device 1 later']:.1f}×" + ) diff --git a/flexmeasures/data/schemas/scheduling/storage.py b/flexmeasures/data/schemas/scheduling/storage.py index 1154c4c97f..15aa84bfb2 100644 --- a/flexmeasures/data/schemas/scheduling/storage.py +++ b/flexmeasures/data/schemas/scheduling/storage.py @@ -230,6 +230,16 @@ class StorageFlexModelSchema(Schema): validate=OneOf(["electricity", "gas"]), metadata=dict(description="Commodity label for this device/asset."), ) + stock_id = fields.Str( + data_key="stock-id", + required=False, + load_default=None, + validate=validate.Length(min=1), + metadata=dict( + description="Identifier of a shared storage (stock) that this device charges/discharges. " + "Devices with the same stock-id share one SOC state." + ), + ) def __init__( self, @@ -511,6 +521,16 @@ class DBStorageFlexModelSchema(Schema): validate=OneOf(["electricity", "gas"]), metadata=dict(description="Commodity label for this device/asset."), ) + stock_id = fields.Str( + data_key="stock-id", + required=False, + load_default=None, + validate=validate.Length(min=1), + metadata=dict( + description="Identifier of a shared storage (stock) that this device charges/discharges. " + "Devices with the same stock-id share one SOC state." + ), + ) mapped_schema_keys: dict diff --git a/flexmeasures/ui/static/openapi-specs.json b/flexmeasures/ui/static/openapi-specs.json index 6224dafe03..576e8cfb6e 100644 --- a/flexmeasures/ui/static/openapi-specs.json +++ b/flexmeasures/ui/static/openapi-specs.json @@ -7,7 +7,7 @@ }, "termsOfService": null, "title": "FlexMeasures", - "version": "0.31.0" + "version": "0.32.0" }, "externalDocs": { "description": "FlexMeasures runs on the open source FlexMeasures technology. Read the docs here.", @@ -5412,6 +5412,15 @@ "gas" ], "description": "Commodity label for this device/asset." + }, + "stock-id": { + "type": [ + "string", + "null" + ], + "default": null, + "minLength": 1, + "description": "Identifier of a shared storage (stock) that this device charges/discharges. Devices with the same stock-id share one SOC state." } }, "additionalProperties": false