Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion documentation/tut/forecasting_scheduling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ Schedules can be queried by their UUID for up to 1 week after they were triggere
Afterwards, the exact schedule can still be retrieved through the `[GET] /sensors/<id>/data <../api/v3_0.html#get--api-v3_0-sensors-id-data>`_, using precise filter values for ``start``, ``prior`` and ``source``.
Besides the UUID, the endpoint for retrieving schedules takes a sensor ID, which is the sensor ID of one of the power sensors that was referenced in the flex model.

.. note:: If a ``state-of-charge`` sensor was referenced in the flex model (like in the example below), the scheduled state of charge can be retrieved using the same endpoint and UUID, but then using the state-of-charge sensor ID.
.. note:: If a ``state-of-charge`` sensor was referenced in the flex model (like in the example below), FlexMeasures can both persist the scheduled state of charge on that sensor and, when ``soc-at-start`` is omitted, infer the starting state of charge from a recent sensor value near the schedule start.
The scheduled state of charge can then be retrieved using the same endpoint and UUID, but with the state-of-charge sensor ID.

.. code-block:: json
:emphasize-lines: 3
Expand Down
65 changes: 65 additions & 0 deletions flexmeasures/api/v3_0/tests/test_sensor_schedules_fresh_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,71 @@ def test_trigger_and_get_schedule(
assert sensor.generic_asset.get_attribute("soc_in_mwh") == start_soc


@pytest.mark.parametrize(
"requesting_user", ["test_prosumer_user@seita.nl"], indirect=True
)
def test_trigger_schedule_uses_state_of_charge_sensor_for_soc_at_start(
app,
fresh_db,
add_market_prices_fresh_db,
add_battery_assets_fresh_db,
battery_soc_sensor_fresh_db,
setup_sources_fresh_db,
keep_scheduling_queue_empty,
requesting_user,
):
message = message_for_trigger_schedule(resolution="PT1H")
message["flex-context"] = {
"consumption-price": {"sensor": add_market_prices_fresh_db["epex_da"].id},
"production-price": {"sensor": add_market_prices_fresh_db["epex_da"].id},
"site-power-capacity": "1 TW",
}
message["flex-model"]["state-of-charge"] = {"sensor": battery_soc_sensor_fresh_db.id}
message["flex-model"].pop("soc-at-start")

fresh_db.session.add(
TimedBelief(
sensor=battery_soc_sensor_fresh_db,
source=setup_sources_fresh_db["Seita"],
event_start=parse_datetime(message["start"]),
belief_horizon=timedelta(0),
event_value=50,
)
)
fresh_db.session.commit()

sensor = (
Sensor.query.filter(Sensor.name == "power")
.join(GenericAsset, GenericAsset.id == Sensor.generic_asset_id)
.filter(GenericAsset.name == "Test battery")
.one_or_none()
)

with app.test_client() as client:
trigger_schedule_response = client.post(
url_for("SensorAPI:trigger_schedule", id=sensor.id),
json=message,
)
assert trigger_schedule_response.status_code == 200
job_id = trigger_schedule_response.json["schedule"]

work_on_rq(app.queues["scheduling"], exc_handler=handle_scheduling_exception)

with app.test_client() as client:
get_soc_schedule_response = client.get(
url_for(
"SensorAPI:get_schedule", id=battery_soc_sensor_fresh_db.id, uuid=job_id
),
query_string={"duration": "PT24H"},
)
assert get_soc_schedule_response.status_code == 200
assert get_soc_schedule_response.json["unit"] == "%"
assert get_soc_schedule_response.json["values"][0] == 50

sensor = fresh_db.session.get(Sensor, sensor.id)
assert sensor.generic_asset.get_attribute("soc_in_mwh") == pytest.approx(0.02)


@pytest.mark.parametrize(
"context_sensor, asset_sensor, parent_sensor, expect_sensor",
[
Expand Down
9 changes: 0 additions & 9 deletions flexmeasures/cli/data_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -1309,15 +1309,6 @@ def add_schedule( # noqa C901
scheduler_module = "flexmeasures.data.models.planning.process"
elif scheduler_class == "StorageScheduler":
scheduler_module = "flexmeasures.data.models.planning.storage"
if soc_at_start is None and (
"soc-min" in flex_model or "soc-max" in flex_model
):
# for asset scheduling, soc at start should be part of the flex model
click.secho(
"For a storage device with SoC constraints, --soc-at-start is required.",
**MsgStyle.ERROR,
)
raise click.Abort()
if soc_at_start:
flex_model["soc-at-start"] = soc_at_start.to("%")

Expand Down
52 changes: 51 additions & 1 deletion flexmeasures/cli/tests/test_data_add_fresh_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from flexmeasures import Asset
from flexmeasures.cli.tests.utils import to_flags
from flexmeasures.data.models.user import Account
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.time_series import Sensor, TimedBelief

from flexmeasures.cli.tests.utils import check_command_ran_without_error
from flexmeasures.utils.time_utils import server_now
Expand Down Expand Up @@ -463,3 +463,53 @@ def test_add_storage_schedule(

check_command_ran_without_error(result)
assert len(power_sensor.search_beliefs()) == 48


def test_add_storage_schedule_uses_state_of_charge_sensor_for_soc_at_start(
app,
fresh_db,
add_market_prices_fresh_db,
add_charging_station_assets_fresh_db,
setup_sources_fresh_db,
):
from flexmeasures.cli.data_add import add_schedule

charging_station = add_charging_station_assets_fresh_db["Test charging station"]
power_sensor = next(s for s in charging_station.sensors if s.name == "power")
soc_sensor = add_charging_station_assets_fresh_db["uni-soc"]
start = "2015-01-01T00:00:00+01:00"

fresh_db.session.add(
TimedBelief(
sensor=soc_sensor,
source=setup_sources_fresh_db["Seita"],
event_start=datetime.fromisoformat(start),
event_value=2.5,
belief_time=datetime.fromisoformat(start),
)
)
fresh_db.session.commit()

cli_input_params = {
"sensor": power_sensor.id,
"start": start,
"duration": "PT12H",
"scheduler": "StorageScheduler",
"flex-context": json.dumps(
{"consumption-price": {"sensor": add_market_prices_fresh_db["epex_da"].id}}
),
"flex-model": json.dumps(
{
"state-of-charge": {"sensor": soc_sensor.id},
"soc-min": "0 MWh",
"soc-max": "5 MWh",
"power-capacity": "2 MW",
}
),
}

result = app.test_cli_runner().invoke(add_schedule, to_flags(cli_input_params))

check_command_ran_without_error(result)
assert len(power_sensor.search_beliefs()) == 48
assert power_sensor.generic_asset.get_attribute("soc_in_mwh") == 2.5
177 changes: 164 additions & 13 deletions flexmeasures/data/models/planning/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,10 @@ def deserialize_flex_config(self):
self.flex_model
)
for d, sensor_flex_model in enumerate(self.flex_model):
sensor_flex_model["sensor_flex_model"] = self.ensure_soc_at_start(
flex_model=sensor_flex_model["sensor_flex_model"],
sensor=sensor_flex_model.get("sensor"),
)
self.flex_model[d] = StorageFlexModelSchema(
start=self.start,
sensor=sensor_flex_model.get("sensor"),
Expand Down Expand Up @@ -1065,6 +1069,135 @@ def has_soc_at_start(self) -> bool:
and self.flex_model["soc-at-start"] is not None
)

@staticmethod
def has_soc_at_start_in(flex_model: dict) -> bool:
return (
"soc-at-start" in flex_model
and flex_model["soc-at-start"] is not None
)

def _get_soc_lookup_window(self, sensor: Sensor | None = None) -> timedelta:
resolution = self.resolution
if resolution is None and sensor is not None:
resolution = sensor.event_resolution
if resolution is None:
resolution = self.default_resolution
return 4 * resolution

def _get_soc_capacity_for_percent_conversion(self, flex_model: dict) -> str:
soc_max = flex_model.get("soc-max")
if soc_max is None:
raise ValueError(
"Cannot derive state of charge from a '%' state-of-charge source without `soc-max`."
)
if isinstance(soc_max, Sensor):
raise ValueError(
"Cannot derive state of charge from a '%' state-of-charge source when `soc-max` is a sensor reference."
)
return str(ur.Quantity(soc_max).to("MWh"))

def _convert_soc_value_to_mwh(
self, value: float, from_unit: str, flex_model: dict
) -> float:
capacity = (
self._get_soc_capacity_for_percent_conversion(flex_model)
if from_unit == "%"
else None
)
return convert_units(
data=value,
from_unit=from_unit,
to_unit="MWh",
capacity=capacity,
)

def _resolve_soc_at_start_from_sensor(
self,
state_of_charge_sensor: Sensor,
flex_model: dict,
sensor: Sensor | None = None,
) -> float:
lookup_window = self._get_soc_lookup_window(sensor)
beliefs = state_of_charge_sensor.search_beliefs(
event_starts_after=self.start - lookup_window,
event_ends_before=self.start + lookup_window,
one_deterministic_belief_per_event=True,
)
if beliefs.empty:
raise ValueError(
f"No recent state-of-charge value found for sensor {state_of_charge_sensor.id} "
f"within {lookup_window} of schedule start {self.start.isoformat()}."
)

beliefs_df = beliefs.reset_index()
beliefs_df["time_distance"] = (
beliefs_df["event_start"] - pd.Timestamp(self.start)
).abs()
nearest_belief = beliefs_df.sort_values(
by=["time_distance", "event_start"], ascending=[True, False]
).iloc[0]

return self._convert_soc_value_to_mwh(
value=nearest_belief["event_value"],
from_unit=state_of_charge_sensor.unit,
flex_model=flex_model,
)

def _resolve_soc_at_start_from_time_series(
self, soc_time_series: list[dict], sensor: Sensor | None = None
) -> float:
lookup_window = self._get_soc_lookup_window(sensor)
matching_segments = [
segment
for segment in soc_time_series
if segment["start"] <= self.start <= segment["end"]
]
if matching_segments:
return (
matching_segments[0]["value"] / ur.Quantity("MWh")
).magnitude

candidate_segments = []
for segment in soc_time_series:
start_distance = abs(segment["start"] - self.start)
end_distance = abs(segment["end"] - self.start)
distance = min(start_distance, end_distance)
if distance <= lookup_window:
candidate_segments.append((distance, segment))

if not candidate_segments:
raise ValueError(
f"No recent state-of-charge value found in the provided `state-of-charge` time series "
f"within {lookup_window} of schedule start {self.start.isoformat()}."
)

_, nearest_segment = min(candidate_segments, key=lambda item: item[0])
return (nearest_segment["value"] / ur.Quantity("MWh")).magnitude

def _resolve_soc_at_start_from_state_of_charge_source(
self, flex_model: dict, sensor: Sensor | None = None
) -> float | None:
state_of_charge_source = flex_model.get("state-of-charge")
if isinstance(state_of_charge_source, Sensor):
return self._resolve_soc_at_start_from_sensor(
state_of_charge_source, flex_model, sensor
)
if isinstance(state_of_charge_source, list):
return self._resolve_soc_at_start_from_time_series(
state_of_charge_source, sensor
)
if (
isinstance(state_of_charge_source, dict)
and "sensor" in state_of_charge_source
):
state_of_charge_sensor = Sensor.query.filter_by(
id=state_of_charge_source["sensor"]
).one()
return self._resolve_soc_at_start_from_sensor(
state_of_charge_sensor, flex_model, sensor
)
return None

def possibly_extend_end(self, soc_targets, sensor: Sensor = None):
"""Extend schedule period in case a target exceeds its end.

Expand All @@ -1086,27 +1219,45 @@ def possibly_extend_end(self, soc_targets, sensor: Sensor = None):
else:
self.end = max_target_datetime

def ensure_soc_at_start(self):
def ensure_soc_at_start(
self, flex_model: dict | None = None, sensor: Sensor | None = None
) -> dict:
"""
Ensure we have a starting state of charge - if needed.
Preferably, a starting soc is given.
Otherwise, we try to retrieve the current state of charge from the (old-style) attribute (if that is the valid one at the start).
If that doesn't work, we default the starting soc to be 0 (only if there are soc limits, though, as some assets don't use the concept of a state of charge,
and without soc targets and limits the starting soc doesn't matter).
Otherwise, we try to retrieve the current state of charge from a configured state-of-charge source.
If that doesn't work, we try the (old-style) asset attribute.
Finally, we default the starting soc to 0 (only if there are soc limits, though, as some assets don't use
the concept of a state of charge, and without soc targets and limits the starting soc doesn't matter).
"""
if not self.has_soc_at_start() and self.sensor is not None:
if flex_model is None:
flex_model = self.flex_model
if sensor is None:
sensor = self.sensor

if (
not self.has_soc_at_start_in(flex_model)
and "state-of-charge" in flex_model
):
flex_model["soc-at-start"] = (
self._resolve_soc_at_start_from_state_of_charge_source(
flex_model, sensor
)
)

if not self.has_soc_at_start_in(flex_model) and sensor is not None:
# TODO: remove this check when moving to v1.0 (requiring to also remove attributes from test data assets)
if (
self.start == self.sensor.get_attribute("soc_datetime")
and self.sensor.get_attribute("soc_in_mwh") is not None
self.start == sensor.get_attribute("soc_datetime")
and sensor.get_attribute("soc_in_mwh") is not None
):
self.flex_model["soc-at-start"] = self.sensor.get_attribute(
"soc_in_mwh"
)
if not self.has_soc_at_start() and (
"soc-min" in self.flex_model or "soc-max" in self.flex_model
flex_model["soc-at-start"] = sensor.get_attribute("soc_in_mwh")
if not self.has_soc_at_start_in(flex_model) and (
"soc-min" in flex_model or "soc-max" in flex_model
):
self.flex_model["soc-at-start"] = 0
flex_model["soc-at-start"] = 0

return flex_model

def get_min_max_targets(self) -> tuple[float | None, float | None]:
"""This happens before deserializing the flex-model."""
Expand Down
Loading
Loading