diff --git a/documentation/tut/forecasting_scheduling.rst b/documentation/tut/forecasting_scheduling.rst index 1cc24d4cc6..9ad91e72fe 100644 --- a/documentation/tut/forecasting_scheduling.rst +++ b/documentation/tut/forecasting_scheduling.rst @@ -301,7 +301,8 @@ Schedules can be queried by their UUID for up to 1 week after they were triggere Afterwards, the exact schedule can still be retrieved through the `[GET] /sensors//data <../api/v3_0.html#get--api-v3_0-sensors-id-data>`_, using precise filter values for ``start``, ``prior`` and ``source``. Besides the UUID, the endpoint for retrieving schedules takes a sensor ID, which is the sensor ID of one of the power sensors that was referenced in the flex model. -.. note:: If a ``state-of-charge`` sensor was referenced in the flex model (like in the example below), the scheduled state of charge can be retrieved using the same endpoint and UUID, but then using the state-of-charge sensor ID. +.. note:: If a ``state-of-charge`` sensor was referenced in the flex model (like in the example below), FlexMeasures can both persist the scheduled state of charge on that sensor and, when ``soc-at-start`` is omitted, infer the starting state of charge from a recent sensor value near the schedule start. + The scheduled state of charge can then be retrieved using the same endpoint and UUID, but with the state-of-charge sensor ID. .. code-block:: json :emphasize-lines: 3 diff --git a/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py b/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py index c7450e9a81..134374589e 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py @@ -202,6 +202,71 @@ def test_trigger_and_get_schedule( assert sensor.generic_asset.get_attribute("soc_in_mwh") == start_soc +@pytest.mark.parametrize( + "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True +) +def test_trigger_schedule_uses_state_of_charge_sensor_for_soc_at_start( + app, + fresh_db, + add_market_prices_fresh_db, + add_battery_assets_fresh_db, + battery_soc_sensor_fresh_db, + setup_sources_fresh_db, + keep_scheduling_queue_empty, + requesting_user, +): + message = message_for_trigger_schedule(resolution="PT1H") + message["flex-context"] = { + "consumption-price": {"sensor": add_market_prices_fresh_db["epex_da"].id}, + "production-price": {"sensor": add_market_prices_fresh_db["epex_da"].id}, + "site-power-capacity": "1 TW", + } + message["flex-model"]["state-of-charge"] = {"sensor": battery_soc_sensor_fresh_db.id} + message["flex-model"].pop("soc-at-start") + + fresh_db.session.add( + TimedBelief( + sensor=battery_soc_sensor_fresh_db, + source=setup_sources_fresh_db["Seita"], + event_start=parse_datetime(message["start"]), + belief_horizon=timedelta(0), + event_value=50, + ) + ) + fresh_db.session.commit() + + sensor = ( + Sensor.query.filter(Sensor.name == "power") + .join(GenericAsset, GenericAsset.id == Sensor.generic_asset_id) + .filter(GenericAsset.name == "Test battery") + .one_or_none() + ) + + with app.test_client() as client: + trigger_schedule_response = client.post( + url_for("SensorAPI:trigger_schedule", id=sensor.id), + json=message, + ) + assert trigger_schedule_response.status_code == 200 + job_id = trigger_schedule_response.json["schedule"] + + work_on_rq(app.queues["scheduling"], exc_handler=handle_scheduling_exception) + + with app.test_client() as client: + get_soc_schedule_response = client.get( + url_for( + "SensorAPI:get_schedule", id=battery_soc_sensor_fresh_db.id, uuid=job_id + ), + query_string={"duration": "PT24H"}, + ) + assert get_soc_schedule_response.status_code == 200 + assert get_soc_schedule_response.json["unit"] == "%" + assert get_soc_schedule_response.json["values"][0] == 50 + + sensor = fresh_db.session.get(Sensor, sensor.id) + assert sensor.generic_asset.get_attribute("soc_in_mwh") == pytest.approx(0.02) + + @pytest.mark.parametrize( "context_sensor, asset_sensor, parent_sensor, expect_sensor", [ diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index 48b618dcca..473e7891f8 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -1309,15 +1309,6 @@ def add_schedule( # noqa C901 scheduler_module = "flexmeasures.data.models.planning.process" elif scheduler_class == "StorageScheduler": scheduler_module = "flexmeasures.data.models.planning.storage" - if soc_at_start is None and ( - "soc-min" in flex_model or "soc-max" in flex_model - ): - # for asset scheduling, soc at start should be part of the flex model - click.secho( - "For a storage device with SoC constraints, --soc-at-start is required.", - **MsgStyle.ERROR, - ) - raise click.Abort() if soc_at_start: flex_model["soc-at-start"] = soc_at_start.to("%") diff --git a/flexmeasures/cli/tests/test_data_add_fresh_db.py b/flexmeasures/cli/tests/test_data_add_fresh_db.py index 22292bb629..6abc036585 100644 --- a/flexmeasures/cli/tests/test_data_add_fresh_db.py +++ b/flexmeasures/cli/tests/test_data_add_fresh_db.py @@ -10,7 +10,7 @@ from flexmeasures import Asset from flexmeasures.cli.tests.utils import to_flags from flexmeasures.data.models.user import Account -from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.cli.tests.utils import check_command_ran_without_error from flexmeasures.utils.time_utils import server_now @@ -463,3 +463,53 @@ def test_add_storage_schedule( check_command_ran_without_error(result) assert len(power_sensor.search_beliefs()) == 48 + + +def test_add_storage_schedule_uses_state_of_charge_sensor_for_soc_at_start( + app, + fresh_db, + add_market_prices_fresh_db, + add_charging_station_assets_fresh_db, + setup_sources_fresh_db, +): + from flexmeasures.cli.data_add import add_schedule + + charging_station = add_charging_station_assets_fresh_db["Test charging station"] + power_sensor = next(s for s in charging_station.sensors if s.name == "power") + soc_sensor = add_charging_station_assets_fresh_db["uni-soc"] + start = "2015-01-01T00:00:00+01:00" + + fresh_db.session.add( + TimedBelief( + sensor=soc_sensor, + source=setup_sources_fresh_db["Seita"], + event_start=datetime.fromisoformat(start), + event_value=2.5, + belief_time=datetime.fromisoformat(start), + ) + ) + fresh_db.session.commit() + + cli_input_params = { + "sensor": power_sensor.id, + "start": start, + "duration": "PT12H", + "scheduler": "StorageScheduler", + "flex-context": json.dumps( + {"consumption-price": {"sensor": add_market_prices_fresh_db["epex_da"].id}} + ), + "flex-model": json.dumps( + { + "state-of-charge": {"sensor": soc_sensor.id}, + "soc-min": "0 MWh", + "soc-max": "5 MWh", + "power-capacity": "2 MW", + } + ), + } + + result = app.test_cli_runner().invoke(add_schedule, to_flags(cli_input_params)) + + check_command_ran_without_error(result) + assert len(power_sensor.search_beliefs()) == 48 + assert power_sensor.generic_asset.get_attribute("soc_in_mwh") == 2.5 diff --git a/flexmeasures/data/models/planning/storage.py b/flexmeasures/data/models/planning/storage.py index 6fb7aa5e94..fc16128926 100644 --- a/flexmeasures/data/models/planning/storage.py +++ b/flexmeasures/data/models/planning/storage.py @@ -1036,6 +1036,10 @@ def deserialize_flex_config(self): self.flex_model ) for d, sensor_flex_model in enumerate(self.flex_model): + sensor_flex_model["sensor_flex_model"] = self.ensure_soc_at_start( + flex_model=sensor_flex_model["sensor_flex_model"], + sensor=sensor_flex_model.get("sensor"), + ) self.flex_model[d] = StorageFlexModelSchema( start=self.start, sensor=sensor_flex_model.get("sensor"), @@ -1065,6 +1069,135 @@ def has_soc_at_start(self) -> bool: and self.flex_model["soc-at-start"] is not None ) + @staticmethod + def has_soc_at_start_in(flex_model: dict) -> bool: + return ( + "soc-at-start" in flex_model + and flex_model["soc-at-start"] is not None + ) + + def _get_soc_lookup_window(self, sensor: Sensor | None = None) -> timedelta: + resolution = self.resolution + if resolution is None and sensor is not None: + resolution = sensor.event_resolution + if resolution is None: + resolution = self.default_resolution + return 4 * resolution + + def _get_soc_capacity_for_percent_conversion(self, flex_model: dict) -> str: + soc_max = flex_model.get("soc-max") + if soc_max is None: + raise ValueError( + "Cannot derive state of charge from a '%' state-of-charge source without `soc-max`." + ) + if isinstance(soc_max, Sensor): + raise ValueError( + "Cannot derive state of charge from a '%' state-of-charge source when `soc-max` is a sensor reference." + ) + return str(ur.Quantity(soc_max).to("MWh")) + + def _convert_soc_value_to_mwh( + self, value: float, from_unit: str, flex_model: dict + ) -> float: + capacity = ( + self._get_soc_capacity_for_percent_conversion(flex_model) + if from_unit == "%" + else None + ) + return convert_units( + data=value, + from_unit=from_unit, + to_unit="MWh", + capacity=capacity, + ) + + def _resolve_soc_at_start_from_sensor( + self, + state_of_charge_sensor: Sensor, + flex_model: dict, + sensor: Sensor | None = None, + ) -> float: + lookup_window = self._get_soc_lookup_window(sensor) + beliefs = state_of_charge_sensor.search_beliefs( + event_starts_after=self.start - lookup_window, + event_ends_before=self.start + lookup_window, + one_deterministic_belief_per_event=True, + ) + if beliefs.empty: + raise ValueError( + f"No recent state-of-charge value found for sensor {state_of_charge_sensor.id} " + f"within {lookup_window} of schedule start {self.start.isoformat()}." + ) + + beliefs_df = beliefs.reset_index() + beliefs_df["time_distance"] = ( + beliefs_df["event_start"] - pd.Timestamp(self.start) + ).abs() + nearest_belief = beliefs_df.sort_values( + by=["time_distance", "event_start"], ascending=[True, False] + ).iloc[0] + + return self._convert_soc_value_to_mwh( + value=nearest_belief["event_value"], + from_unit=state_of_charge_sensor.unit, + flex_model=flex_model, + ) + + def _resolve_soc_at_start_from_time_series( + self, soc_time_series: list[dict], sensor: Sensor | None = None + ) -> float: + lookup_window = self._get_soc_lookup_window(sensor) + matching_segments = [ + segment + for segment in soc_time_series + if segment["start"] <= self.start <= segment["end"] + ] + if matching_segments: + return ( + matching_segments[0]["value"] / ur.Quantity("MWh") + ).magnitude + + candidate_segments = [] + for segment in soc_time_series: + start_distance = abs(segment["start"] - self.start) + end_distance = abs(segment["end"] - self.start) + distance = min(start_distance, end_distance) + if distance <= lookup_window: + candidate_segments.append((distance, segment)) + + if not candidate_segments: + raise ValueError( + f"No recent state-of-charge value found in the provided `state-of-charge` time series " + f"within {lookup_window} of schedule start {self.start.isoformat()}." + ) + + _, nearest_segment = min(candidate_segments, key=lambda item: item[0]) + return (nearest_segment["value"] / ur.Quantity("MWh")).magnitude + + def _resolve_soc_at_start_from_state_of_charge_source( + self, flex_model: dict, sensor: Sensor | None = None + ) -> float | None: + state_of_charge_source = flex_model.get("state-of-charge") + if isinstance(state_of_charge_source, Sensor): + return self._resolve_soc_at_start_from_sensor( + state_of_charge_source, flex_model, sensor + ) + if isinstance(state_of_charge_source, list): + return self._resolve_soc_at_start_from_time_series( + state_of_charge_source, sensor + ) + if ( + isinstance(state_of_charge_source, dict) + and "sensor" in state_of_charge_source + ): + state_of_charge_sensor = Sensor.query.filter_by( + id=state_of_charge_source["sensor"] + ).one() + return self._resolve_soc_at_start_from_sensor( + state_of_charge_sensor, flex_model, sensor + ) + return None + def possibly_extend_end(self, soc_targets, sensor: Sensor = None): """Extend schedule period in case a target exceeds its end. @@ -1086,27 +1219,45 @@ def possibly_extend_end(self, soc_targets, sensor: Sensor = None): else: self.end = max_target_datetime - def ensure_soc_at_start(self): + def ensure_soc_at_start( + self, flex_model: dict | None = None, sensor: Sensor | None = None + ) -> dict: """ Ensure we have a starting state of charge - if needed. Preferably, a starting soc is given. - Otherwise, we try to retrieve the current state of charge from the (old-style) attribute (if that is the valid one at the start). - If that doesn't work, we default the starting soc to be 0 (only if there are soc limits, though, as some assets don't use the concept of a state of charge, - and without soc targets and limits the starting soc doesn't matter). + Otherwise, we try to retrieve the current state of charge from a configured state-of-charge source. + If that doesn't work, we try the (old-style) asset attribute. + Finally, we default the starting soc to 0 (only if there are soc limits, though, as some assets don't use + the concept of a state of charge, and without soc targets and limits the starting soc doesn't matter). """ - if not self.has_soc_at_start() and self.sensor is not None: + if flex_model is None: + flex_model = self.flex_model + if sensor is None: + sensor = self.sensor + + if ( + not self.has_soc_at_start_in(flex_model) + and "state-of-charge" in flex_model + ): + flex_model["soc-at-start"] = ( + self._resolve_soc_at_start_from_state_of_charge_source( + flex_model, sensor + ) + ) + + if not self.has_soc_at_start_in(flex_model) and sensor is not None: # TODO: remove this check when moving to v1.0 (requiring to also remove attributes from test data assets) if ( - self.start == self.sensor.get_attribute("soc_datetime") - and self.sensor.get_attribute("soc_in_mwh") is not None + self.start == sensor.get_attribute("soc_datetime") + and sensor.get_attribute("soc_in_mwh") is not None ): - self.flex_model["soc-at-start"] = self.sensor.get_attribute( - "soc_in_mwh" - ) - if not self.has_soc_at_start() and ( - "soc-min" in self.flex_model or "soc-max" in self.flex_model + flex_model["soc-at-start"] = sensor.get_attribute("soc_in_mwh") + if not self.has_soc_at_start_in(flex_model) and ( + "soc-min" in flex_model or "soc-max" in flex_model ): - self.flex_model["soc-at-start"] = 0 + flex_model["soc-at-start"] = 0 + + return flex_model def get_min_max_targets(self) -> tuple[float | None, float | None]: """This happens before deserializing the flex-model.""" diff --git a/flexmeasures/data/models/planning/tests/test_storage.py b/flexmeasures/data/models/planning/tests/test_storage.py index fae18f8715..97637df60a 100644 --- a/flexmeasures/data/models/planning/tests/test_storage.py +++ b/flexmeasures/data/models/planning/tests/test_storage.py @@ -1,6 +1,7 @@ from datetime import datetime, timedelta import pytz +import pytest import numpy as np import pandas as pd @@ -8,6 +9,7 @@ from flexmeasures.data.models.planning import Scheduler from flexmeasures.data.models.planning.storage import StorageScheduler from flexmeasures.data.models.planning.utils import initialize_index +from flexmeasures.data.models.time_series import TimedBelief from flexmeasures.data.models.planning.tests.utils import ( check_constraints, get_sensors_from_db, @@ -248,3 +250,113 @@ def test_battery_relaxation(add_battery_assets, db): costs["all consumption breaches device 0"], device_power_breach_price * consumption_capacity_in_mw * 1000 * 4, ) # 100 EUR/(kW*h) * 0.025 MW * 1000 kW/MW * 4 hours + + +def test_deserialize_storage_soc_at_start_from_state_of_charge_sensor( + add_charging_station_assets, setup_markets, setup_sources, db +): + start = pd.Timestamp("2015-01-01T00:00:00+01:00") + end = start + timedelta(hours=12) + charging_station = add_charging_station_assets["Test charging station"] + power_sensor = next(s for s in charging_station.sensors if s.name == "power") + soc_sensor = add_charging_station_assets["uni-soc"] + + db.session.add( + TimedBelief( + sensor=soc_sensor, + source=setup_sources["Seita"], + event_start=start - timedelta(minutes=45), + belief_horizon=timedelta(0), + event_value=2.75, + ) + ) + db.session.flush() + + scheduler = StorageScheduler( + power_sensor, + start, + end, + power_sensor.event_resolution, + flex_model={ + "state-of-charge": {"sensor": soc_sensor.id}, + "soc-min": "0 MWh", + "soc-max": "5 MWh", + "power-capacity": "2 MW", + }, + flex_context={"consumption-price": {"sensor": setup_markets["epex_da"].id}}, + ) + + scheduler.deserialize_config() + + assert scheduler.flex_model["soc_at_start"] == 2.75 + + +def test_deserialize_storage_soc_at_start_from_state_of_charge_time_series( + add_charging_station_assets, setup_markets +): + start = pd.Timestamp("2015-01-01T00:00:00+01:00") + end = start + timedelta(hours=12) + charging_station = add_charging_station_assets["Test charging station"] + power_sensor = next(s for s in charging_station.sensors if s.name == "power") + + scheduler = StorageScheduler( + power_sensor, + start, + end, + power_sensor.event_resolution, + flex_model={ + "state-of-charge": [ + { + "start": "2014-12-31T23:30:00+01:00", + "end": "2015-01-01T00:30:00+01:00", + "value": "3.1 MWh", + } + ], + "soc-min": "0 MWh", + "soc-max": "5 MWh", + "power-capacity": "2 MW", + }, + flex_context={"consumption-price": {"sensor": setup_markets["epex_da"].id}}, + ) + + scheduler.deserialize_config() + + assert scheduler.flex_model["soc_at_start"] == 3.1 + + +def test_deserialize_storage_soc_at_start_rejects_stale_state_of_charge_sensor( + add_charging_station_assets, setup_markets, setup_sources, db +): + start = pd.Timestamp("2015-01-01T06:00:00+01:00") + end = start + timedelta(hours=12) + charging_station = add_charging_station_assets["Test charging station"] + power_sensor = next(s for s in charging_station.sensors if s.name == "power") + soc_sensor = add_charging_station_assets["uni-soc"] + + db.session.add( + TimedBelief( + sensor=soc_sensor, + source=setup_sources["Seita"], + event_start=start - timedelta(hours=2), + belief_horizon=timedelta(0), + event_value=2.75, + ) + ) + db.session.flush() + + scheduler = StorageScheduler( + power_sensor, + start, + end, + power_sensor.event_resolution, + flex_model={ + "state-of-charge": {"sensor": soc_sensor.id}, + "soc-min": "0 MWh", + "soc-max": "5 MWh", + "power-capacity": "2 MW", + }, + flex_context={"consumption-price": {"sensor": setup_markets["epex_da"].id}}, + ) + + with pytest.raises(ValueError, match="No recent state-of-charge value found"): + scheduler.deserialize_config() diff --git a/flexmeasures/data/schemas/scheduling/metadata.py b/flexmeasures/data/schemas/scheduling/metadata.py index 39204d1bc3..044314c1e3 100644 --- a/flexmeasures/data/schemas/scheduling/metadata.py +++ b/flexmeasures/data/schemas/scheduling/metadata.py @@ -186,7 +186,7 @@ def to_dict(self): STATE_OF_CHARGE = MetaData( - description="Sensor used to record the scheduled state of charge. The sensor unit may be an energy unit (e.g. MWh or kWh) or a percentage (%). For sensors with a % unit, the ``soc-max`` flex-model field must be set to a non-zero value to allow converting the energy-based schedule to a percentage.", + description="Sensor or time series used for the storage state of charge. If ``soc-at-start`` is omitted, FlexMeasures will also use this field to infer the starting state of charge from a recent value near the schedule start. When a sensor is used, its unit may be an energy unit (e.g. MWh or kWh) or a percentage (%). For sensors with a % unit, the ``soc-max`` flex-model field must be set to a non-zero value to allow converting between the energy-based schedule and a percentage.", example={"sensor": 12}, ) SOC_AT_START = MetaData( diff --git a/flexmeasures/data/schemas/scheduling/storage.py b/flexmeasures/data/schemas/scheduling/storage.py index 8550cc094c..14ff12aeac 100644 --- a/flexmeasures/data/schemas/scheduling/storage.py +++ b/flexmeasures/data/schemas/scheduling/storage.py @@ -295,17 +295,16 @@ def check_whether_targets_exceed_max_planning_horizon(self, data: dict, **kwargs ) @validates("state_of_charge") - def validate_state_of_charge_is_sensor( + def validate_state_of_charge_source( self, state_of_charge: Sensor | list[dict] | ur.Quantity, **kwargs ): - if not isinstance(state_of_charge, Sensor): + if isinstance(state_of_charge, Sensor) and state_of_charge.event_resolution != timedelta(0): raise ValidationError( - "The `state-of-charge` field can only be a Sensor. In the future, the state-of-charge field will replace soc-at-start field." + "The field `state-of-charge` points to a sensor with a non-instantaneous event resolution. Please, use an instantaneous sensor." ) - - if state_of_charge.event_resolution != timedelta(0): + if not isinstance(state_of_charge, (Sensor, list)): raise ValidationError( - "The field `state-of-charge` points to a sensor with a non-instantaneous event resolution. Please, use an instantaneous sensor." + "The `state-of-charge` field can only be a Sensor or a time series." ) @validates("asset")