From d63afe5684d7e7eb97120335d6f81db2d1906b1b Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 13:27:41 +0100 Subject: [PATCH 01/81] dev: try using the _sending_queue instead Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/cem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/cem.py b/src/flexmeasures_client/s2/cem.py index ecbdc8fa..83d6c866 100644 --- a/src/flexmeasures_client/s2/cem.py +++ b/src/flexmeasures_client/s2/cem.py @@ -290,7 +290,7 @@ async def handle_handshake(self, message: Handshake): # TODO: check the version that the RM is using and send a # `selected_protocol_version` that matches the one of the RM # TODO: Return a TBD "CloseConnection" message to close the connection - await self.send_message(get_reception_status(message, ReceptionStatusValues.OK)) + await self._sending_queue.put(get_reception_status(message, ReceptionStatusValues.OK)) latest_compatible_version = get_latest_compatible_version( message.supported_protocol_versions, From 564b8f1193fb0236040ee8d30a81bb7716620d42 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 15:00:53 +0100 Subject: [PATCH 02/81] Revert "dev: try using the _sending_queue instead" This reverts commit d63afe5684d7e7eb97120335d6f81db2d1906b1b. --- src/flexmeasures_client/s2/cem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/cem.py b/src/flexmeasures_client/s2/cem.py index 83d6c866..ecbdc8fa 100644 --- a/src/flexmeasures_client/s2/cem.py +++ b/src/flexmeasures_client/s2/cem.py @@ -290,7 +290,7 @@ async def handle_handshake(self, message: Handshake): # TODO: check the version that the RM is using and send a # `selected_protocol_version` that matches the one of the RM # TODO: Return a TBD "CloseConnection" message to close the connection - await self._sending_queue.put(get_reception_status(message, ReceptionStatusValues.OK)) + await self.send_message(get_reception_status(message, ReceptionStatusValues.OK)) latest_compatible_version = get_latest_compatible_version( message.supported_protocol_versions, From 2db7a4f5dbb439bb3693c3f81ffa66d2b707d42a Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 14:22:17 +0100 Subject: [PATCH 03/81] fix: give the sending task enough time to flush the queue before the connection shuts down Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/cem.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/flexmeasures_client/s2/cem.py b/src/flexmeasures_client/s2/cem.py index ecbdc8fa..b39da1c1 100644 --- a/src/flexmeasures_client/s2/cem.py +++ b/src/flexmeasures_client/s2/cem.py @@ -291,6 +291,7 @@ async def handle_handshake(self, message: Handshake): # `selected_protocol_version` that matches the one of the RM # TODO: Return a TBD "CloseConnection" message to close the connection await self.send_message(get_reception_status(message, ReceptionStatusValues.OK)) + await asyncio.sleep(0.5) latest_compatible_version = get_latest_compatible_version( message.supported_protocol_versions, From 35cc9add109290b20d7be8849aa773944f5c19e0 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 14:58:34 +0100 Subject: [PATCH 04/81] dev: use DEBUG logging level Signed-off-by: F.N. Claessen --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 8068b475..051c038c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,7 +23,7 @@ services: FLEXMEASURES_BASE_URL: http://server:5000 FLEXMEASURES_USER: toy-user@flexmeasures.io FLEXMEASURES_PASSWORD: toy-password - LOGGING_LEVEL: INFO + LOGGING_LEVEL: DEBUG volumes: # If flexmeasures_client lives in your repo and you want live edits - ../flexmeasures-client:/app/flexmeasures-client:rw From 375b49ce9482f6cf7fdc849dfd9d4bde8c0911dc Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 15:00:01 +0100 Subject: [PATCH 05/81] feat: await message sent rather than message queued Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/cem.py | 25 ++++++++++++------- .../s2/script/websockets_server.py | 10 +++++--- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/flexmeasures_client/s2/cem.py b/src/flexmeasures_client/s2/cem.py index b39da1c1..a87c8017 100644 --- a/src/flexmeasures_client/s2/cem.py +++ b/src/flexmeasures_client/s2/cem.py @@ -59,7 +59,7 @@ class CEM(Handler): ] # maps the CommodityQuantity power measurement sensors to FM sensor IDs _fm_client: FlexMeasuresClient - _sending_queue: Queue[pydantic.BaseModel] + _sending_queue: asyncio.Queue[tuple[pydantic.BaseModel, asyncio.Future]] _timers: dict[str, datetime] _datastore: dict @@ -216,7 +216,7 @@ async def handle_message(self, message: Dict | pydantic.BaseModel | str): ) if response is not None: - await self._sending_queue.put(response) + await self.send_message(response) def update_control_type(self, control_type: ControlType): """ @@ -232,14 +232,20 @@ async def get_message(self) -> str: str: message in JSON format """ - message = await self._sending_queue.get() - await asyncio.sleep(0.3) + item = await self._sending_queue.get() + + if not isinstance(item, tuple) or len(item) != 2: + raise RuntimeError( + "Invalid item in sending queue. All messages must go through send_message() rather than _sending_queue.put()." + ) + + message, fut = item # Pending for pydantic V2 to implement model.model_dump(mode="json") in # PR #1409 (https://github.com/pydantic/pydantic/issues/1409) message = json.loads(message.json()) - return message + return message, fut async def activate_control_type( self, control_type: ControlType @@ -279,8 +285,7 @@ async def activate_control_type( ].register_success_callbacks( message_id, self.update_control_type, control_type=control_type ) - - await self._sending_queue.put( + await self.send_message( SelectControlType(message_id=message_id, control_type=control_type) ) return None @@ -291,7 +296,6 @@ async def handle_handshake(self, message: Handshake): # `selected_protocol_version` that matches the one of the RM # TODO: Return a TBD "CloseConnection" message to close the connection await self.send_message(get_reception_status(message, ReceptionStatusValues.OK)) - await asyncio.sleep(0.5) latest_compatible_version = get_latest_compatible_version( message.supported_protocol_versions, @@ -426,8 +430,11 @@ def handle_revoke_object(self, message: RevokeObject): return get_reception_status(message, ReceptionStatusValues.OK) async def send_message(self, message): + loop = asyncio.get_running_loop() + fut = loop.create_future() self._logger.debug(f"Sent: {message}") - await self._sending_queue.put(message) + await self._sending_queue.put((message, fut)) + await fut # wait until actually sent def get_commodity_unit(commodity_quantity) -> str: diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index afceeb56..8c223237 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -47,9 +47,13 @@ async def websocket_producer(ws, cem: CEM): cem._logger.debug("start websocket message producer") cem._logger.debug(f"IS CLOSED? {cem.is_closed()}") while not cem.is_closed(): - message = await cem.get_message() - cem._logger.debug("sending message") - await ws.send_json(message) + message, fut = await cem.get_message() + try: + cem._logger.debug("sending message") + await ws.send_json(message) + fut.set_result(True) + except Exception as exc: + fut.set_exception(exc) cem._logger.debug("cem closed") From 14f77924f25e235031c9a752cf04c55b897236f5 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 15:00:19 +0100 Subject: [PATCH 06/81] fix: also close WS if closing CEM Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/script/websockets_server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index 8c223237..cd3a27ff 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -75,6 +75,7 @@ async def websocket_consumer(ws, cem: CEM): elif msg.type == aiohttp.WSMsgType.ERROR: cem._logger.debug("close...") cem.close() + await ws.close() cem._logger.error(f"ws connection closed with exception {ws.exception()}") # TODO: save cem state? From 47995853ab9914c4b47cd7856874af964a619c18 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 15:27:29 +0100 Subject: [PATCH 07/81] feat: skip setting up the toy account Signed-off-by: F.N. Claessen --- docker-compose.yml => docker-compose.override.yml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) rename docker-compose.yml => docker-compose.override.yml (79%) diff --git a/docker-compose.yml b/docker-compose.override.yml similarity index 79% rename from docker-compose.yml rename to docker-compose.override.yml index 051c038c..25273db2 100644 --- a/docker-compose.yml +++ b/docker-compose.override.yml @@ -4,11 +4,18 @@ # run this from the flexmeasures folder (which contains the Dockerfile): # docker compose \ # -f docker-compose.yml \ -# -f ../flexmeasures-client/docker-compose.yml \ +# -f ../flexmeasures-client/docker-compose.override.yml \ # up # ------------------------------------------------------------------ services: + server: + command: + - | + pip install --break-system-packages -r /usr/var/flexmeasures-instance/requirements.txt + flexmeasures db upgrade + # toy account step removed + gunicorn --bind 0.0.0.0:5000 --worker-tmp-dir /dev/shm --workers 2 --threads 4 wsgi:applicationservices: cem: build: context: . From 89d433800d3d21ec8eb1058b3dd58a9a1ae3090f Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 15:30:44 +0100 Subject: [PATCH 08/81] feat: post prices in background task Signed-off-by: F.N. Claessen --- .../s2/script/websockets_server.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index cd3a27ff..9a10af37 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -169,12 +169,17 @@ async def configure_site( generic_asset_id=site_asset["id"], timezone="Europe/Amsterdam", ) - await fm_client.post_sensor_data( - sensor_id=price_sensor["id"], - start="2026-01-15T00:00+01", # 2026-01-01T00:00+01 - duration="P3D", # P1M - values=[0.3], - unit="EUR/kWh", + + # Continue immediately without awaiting + LOGGER.debug("Posting 3 days of prices in a background task..") + asyncio.create_task( + fm_client.post_sensor_data( + sensor_id=price_sensor["id"], + start="2026-01-15T00:00+01", # 2026-01-01T00:00+01 + duration="P3D", # P1M + values=[0.3], + unit="EUR/kWh", + ) ) if power_sensor is None: power_sensor = await fm_client.add_sensor( From 60ff6af74133ac12547e8508f89072d3eef3d661 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 15:33:19 +0100 Subject: [PATCH 09/81] Revert "feat: skip setting up the toy account" This reverts commit 47995853ab9914c4b47cd7856874af964a619c18. --- docker-compose.override.yml => docker-compose.yml | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) rename docker-compose.override.yml => docker-compose.yml (79%) diff --git a/docker-compose.override.yml b/docker-compose.yml similarity index 79% rename from docker-compose.override.yml rename to docker-compose.yml index 25273db2..051c038c 100644 --- a/docker-compose.override.yml +++ b/docker-compose.yml @@ -4,18 +4,11 @@ # run this from the flexmeasures folder (which contains the Dockerfile): # docker compose \ # -f docker-compose.yml \ -# -f ../flexmeasures-client/docker-compose.override.yml \ +# -f ../flexmeasures-client/docker-compose.yml \ # up # ------------------------------------------------------------------ services: - server: - command: - - | - pip install --break-system-packages -r /usr/var/flexmeasures-instance/requirements.txt - flexmeasures db upgrade - # toy account step removed - gunicorn --bind 0.0.0.0:5000 --worker-tmp-dir /dev/shm --workers 2 --threads 4 wsgi:applicationservices: cem: build: context: . From 404ed6b373dc66cddad8c7de31888acebc833896 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 15:54:37 +0100 Subject: [PATCH 10/81] chore: use modern method name Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 6e81f291..691c8dc7 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -62,7 +62,7 @@ def now(self): return self._timezone.localize(datetime.now()) async def send_storage_status(self, status: FRBCStorageStatus): - await self._fm_client.post_measurements( + await self._fm_client.post_sensor_data( self._soc_sensor_id, start=self.now(), values=[status.present_fill_level], @@ -82,7 +82,7 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): dt = status.transition_timestamp # self.now() - await self._fm_client.post_measurements( + await self._fm_client.post_sensor_data( self._rm_discharge_sensor_id, start=dt, values=[-power], From 5d4de1070fe156a9abe1e9530c039a56f9591ccf Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 15:55:07 +0100 Subject: [PATCH 11/81] fix: fall back on now in case FRBCActuatorStatus.transition_timestamp is missing Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 691c8dc7..f9244837 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -80,7 +80,7 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): + (fill_rate.end_of_range - fill_rate.start_of_range) * factor ) - dt = status.transition_timestamp # self.now() + dt = status.transition_timestamp or self.now() await self._fm_client.post_sensor_data( self._rm_discharge_sensor_id, From f8ea65849cdd492a8ccb16b43e5f28f74d7df058 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 15:57:21 +0100 Subject: [PATCH 12/81] fix: actuator status unit Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index f9244837..72af00b8 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -45,6 +45,8 @@ def __init__( schedule_duration: timedelta = timedelta(hours=12), max_size: int = 100, valid_from_shift: timedelta = timedelta(days=1), + power_unit: str = "kW", + energy_unit: str = "kWh", ) -> None: super().__init__(max_size) self._power_sensor_id = power_sensor_id @@ -57,6 +59,8 @@ def __init__( # delay the start of the schedule from the time `valid_from` # of the FRBC.SystemDescription. self._valid_from_shift = valid_from_shift + self.power_unit = power_unit + self.energy_unit = energy_unit def now(self): return self._timezone.localize(datetime.now()) @@ -66,7 +70,7 @@ async def send_storage_status(self, status: FRBCStorageStatus): self._soc_sensor_id, start=self.now(), values=[status.present_fill_level], - unit="MWh", + unit=self.energy_unit, duration=timedelta(minutes=1), ) @@ -86,7 +90,7 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): self._rm_discharge_sensor_id, start=dt, values=[-power], - unit="MWh", + unit=self.power_unit, duration=timedelta(minutes=15), ) From d59f3407a8d10ac2de5717fe9d7bea0aaa32052c Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 15:27:29 +0100 Subject: [PATCH 13/81] feat: skip setting up the toy account Signed-off-by: F.N. Claessen --- docker-compose.yml => docker-compose.override.yml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) rename docker-compose.yml => docker-compose.override.yml (79%) diff --git a/docker-compose.yml b/docker-compose.override.yml similarity index 79% rename from docker-compose.yml rename to docker-compose.override.yml index 051c038c..25273db2 100644 --- a/docker-compose.yml +++ b/docker-compose.override.yml @@ -4,11 +4,18 @@ # run this from the flexmeasures folder (which contains the Dockerfile): # docker compose \ # -f docker-compose.yml \ -# -f ../flexmeasures-client/docker-compose.yml \ +# -f ../flexmeasures-client/docker-compose.override.yml \ # up # ------------------------------------------------------------------ services: + server: + command: + - | + pip install --break-system-packages -r /usr/var/flexmeasures-instance/requirements.txt + flexmeasures db upgrade + # toy account step removed + gunicorn --bind 0.0.0.0:5000 --worker-tmp-dir /dev/shm --workers 2 --threads 4 wsgi:applicationservices: cem: build: context: . From 6abd2a11deac510c9f246044048e0af94ea491d5 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 16:02:41 +0100 Subject: [PATCH 14/81] fix: copy-paste mistake Signed-off-by: F.N. Claessen --- docker-compose.override.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.override.yml b/docker-compose.override.yml index 25273db2..e562a60c 100644 --- a/docker-compose.override.yml +++ b/docker-compose.override.yml @@ -15,7 +15,7 @@ services: pip install --break-system-packages -r /usr/var/flexmeasures-instance/requirements.txt flexmeasures db upgrade # toy account step removed - gunicorn --bind 0.0.0.0:5000 --worker-tmp-dir /dev/shm --workers 2 --threads 4 wsgi:applicationservices: + gunicorn --bind 0.0.0.0:5000 --worker-tmp-dir /dev/shm --workers 2 --threads 4 wsgi:application cem: build: context: . From 5328de99f14f5797cf96105eb4f97a00b0f7a30a Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 16:05:13 +0100 Subject: [PATCH 15/81] fix: discharge unit Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/script/websockets_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index 9a10af37..6d6e7550 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -201,7 +201,7 @@ async def configure_site( rm_discharge_sensor = await fm_client.add_sensor( name="RM discharge", event_resolution="PT15M", - unit="dimensionless", + unit="kW", generic_asset_id=site_asset["id"], timezone="Europe/Amsterdam", ) From eae232bc3c4a930b71b07a340421e91621b4de60 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 16:06:33 +0100 Subject: [PATCH 16/81] docs: update instruction to run docker-compose stack Signed-off-by: F.N. Claessen --- docs/CEM.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/CEM.rst b/docs/CEM.rst index 72661c75..98fe41d5 100644 --- a/docs/CEM.rst +++ b/docs/CEM.rst @@ -15,14 +15,14 @@ Then point your Resource Managers (RMs) to ``http://localhost:8080/ws`` and run: python3 flexmeasures_client/s2/scripts/websockets_server.py -We also included a ``docker-compose.yaml`` that can be used to set up the CEM including the FlexMeasures server, creating a fully self-hosted HEMS. +We also included a ``docker-compose.override.yaml`` that can be used to set up the CEM including the FlexMeasures server, creating a fully self-hosted HEMS. Assuming your ``flexmeasures`` and ``flexmeasures-client`` repo folders are located side by side, run this from your flexmeasures folder: .. code-block:: bash docker compose \ -f docker-compose.yml \ - -f ../flexmeasures-client/docker-compose.yml \ + -f ../flexmeasures-client/docker-compose.override.yml \ up From 0e72c25776b4c25d8790cb8356cff42786032fbf Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 16:13:12 +0100 Subject: [PATCH 17/81] Revert "fix: discharge unit" This reverts commit 5328de99f14f5797cf96105eb4f97a00b0f7a30a. --- src/flexmeasures_client/s2/script/websockets_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index 6d6e7550..9a10af37 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -201,7 +201,7 @@ async def configure_site( rm_discharge_sensor = await fm_client.add_sensor( name="RM discharge", event_resolution="PT15M", - unit="kW", + unit="dimensionless", generic_asset_id=site_asset["id"], timezone="Europe/Amsterdam", ) From abb0a7436643faeaa0bd493e34fbedd4c291ec57 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 16:13:38 +0100 Subject: [PATCH 18/81] fix: schedule power sensor instead of dimensionless discharge sensor Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 72af00b8..8a1c3353 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -87,7 +87,7 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): dt = status.transition_timestamp or self.now() await self._fm_client.post_sensor_data( - self._rm_discharge_sensor_id, + self._power_sensor_id, start=dt, values=[-power], unit=self.power_unit, From bbaac73bfaa9e8e83a7a10f39c6a9a94864771b6 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 16:39:45 +0100 Subject: [PATCH 19/81] feat: trigger schedule with each storage status (not yet rate limited) and make system_description_id optional Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 8a1c3353..dfb70b8b 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -73,6 +73,7 @@ async def send_storage_status(self, status: FRBCStorageStatus): unit=self.energy_unit, duration=timedelta(minutes=1), ) + await self.trigger_schedule() async def send_actuator_status(self, status: FRBCActuatorStatus): factor = status.operation_mode_factor @@ -94,12 +95,15 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): duration=timedelta(minutes=15), ) - async def trigger_schedule(self, system_description_id: str): + async def trigger_schedule(self, system_description_id: str | None = None): """Translates S2 System Description into FM API calls""" - system_description: FRBCSystemDescription = self._system_description_history[ - system_description_id - ] + if system_description_id: + system_description: FRBCSystemDescription = self._system_description_history[ + system_description_id + ] + else: + system_description: FRBCSystemDescription = list(self._system_description_history.values())[-1] if len(self._storage_status_history) > 0: soc_at_start = list(self._storage_status_history.values())[ From 374e44aff236795522dffcdf077af979020b4117 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 16:39:59 +0100 Subject: [PATCH 20/81] feat: post 1 year of data in background tasks Signed-off-by: F.N. Claessen --- .../s2/script/websockets_server.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index 9a10af37..10076db2 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -171,16 +171,17 @@ async def configure_site( ) # Continue immediately without awaiting - LOGGER.debug("Posting 3 days of prices in a background task..") - asyncio.create_task( - fm_client.post_sensor_data( - sensor_id=price_sensor["id"], - start="2026-01-15T00:00+01", # 2026-01-01T00:00+01 - duration="P3D", # P1M - values=[0.3], - unit="EUR/kWh", + LOGGER.debug("Posting 1 year of prices in monthly background tasks..") + for m in range(12): + asyncio.create_task( + fm_client.post_sensor_data( + sensor_id=price_sensor["id"], + start=f"2026-{m+ 1:02}-01T00:00+01", # 2026-01-01T00:00+01 + m months + duration="P31D", + values=[0.3], + unit="EUR/kWh", + ) ) - ) if power_sensor is None: power_sensor = await fm_client.add_sensor( name="power", From a1d6ba749c0e39ebded28967f71168908aad3b55 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 16:40:08 +0100 Subject: [PATCH 21/81] Revert "feat: post 1 year of data in background tasks" This reverts commit 374e44aff236795522dffcdf077af979020b4117. --- .../s2/script/websockets_server.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index 10076db2..9a10af37 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -171,17 +171,16 @@ async def configure_site( ) # Continue immediately without awaiting - LOGGER.debug("Posting 1 year of prices in monthly background tasks..") - for m in range(12): - asyncio.create_task( - fm_client.post_sensor_data( - sensor_id=price_sensor["id"], - start=f"2026-{m+ 1:02}-01T00:00+01", # 2026-01-01T00:00+01 + m months - duration="P31D", - values=[0.3], - unit="EUR/kWh", - ) + LOGGER.debug("Posting 3 days of prices in a background task..") + asyncio.create_task( + fm_client.post_sensor_data( + sensor_id=price_sensor["id"], + start="2026-01-15T00:00+01", # 2026-01-01T00:00+01 + duration="P3D", # P1M + values=[0.3], + unit="EUR/kWh", ) + ) if power_sensor is None: power_sensor = await fm_client.add_sensor( name="power", From f8e0d621efa883b23db7612277693d0dfdcc95da Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 16:42:16 +0100 Subject: [PATCH 22/81] fix: set prior knowledge of prices and test with now Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/script/websockets_server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index 9a10af37..a22d8780 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -175,7 +175,8 @@ async def configure_site( asyncio.create_task( fm_client.post_sensor_data( sensor_id=price_sensor["id"], - start="2026-01-15T00:00+01", # 2026-01-01T00:00+01 + start="2026-02-25T00:00+01", # now + prior="2026-01-01T00:00+01", # 2026-01-01T00:00+01 duration="P3D", # P1M values=[0.3], unit="EUR/kWh", From be6d8d30176b21ab651e118682f8b70699cf7d31 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 16:52:16 +0100 Subject: [PATCH 23/81] fix: floor the schedule start Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index dfb70b8b..5336cfef 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -116,9 +116,10 @@ async def trigger_schedule(self, system_description_id: str | None = None): soc_min, soc_max = get_soc_min_max(system_description) # call schedule + start = system_description.valid_from + self._valid_from_shift # TODO: localize datetime + start = start.replace(minute=(start.minute // 15) * 15, second=0, microsecond=0) schedule = await self._fm_client.trigger_and_get_schedule( - start=system_description.valid_from - + self._valid_from_shift, # TODO: localize datetime + start=start, sensor_id=self._power_sensor_id, flex_context={ "production-price": {"sensor": self._price_sensor_id}, From 5334c0c083f9281f3f6f7d9fd634b1b198f319bc Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 16:54:10 +0100 Subject: [PATCH 24/81] fix: messages should now be routed through cem.send_message Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 5336cfef..0131ac87 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -144,4 +144,4 @@ async def trigger_schedule(self, system_description_id: str | None = None): # put instructions to sending queue for instruction in instructions: - await self._sending_queue.put(instruction) + await self.send_message(instruction) From 2845036cade18fee7f2d139c34719b448f1bf52b Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 17:01:56 +0100 Subject: [PATCH 25/81] fix: messages should now be routed through cem.send_message; update handlers accordingly Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/cem.py | 4 ++-- src/flexmeasures_client/s2/control_types/FRBC/__init__.py | 2 +- src/flexmeasures_client/s2/control_types/__init__.py | 5 ++--- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/flexmeasures_client/s2/cem.py b/src/flexmeasures_client/s2/cem.py index a87c8017..46bc7f99 100644 --- a/src/flexmeasures_client/s2/cem.py +++ b/src/flexmeasures_client/s2/cem.py @@ -159,8 +159,8 @@ def register_control_type(self, control_type_handler: ControlTypeHandler): # add fm_client to control_type handler control_type_handler._fm_client = self._fm_client - # add sending queue - control_type_handler._sending_queue = self._sending_queue + # add send_message method so the handler can send messages + control_type_handler.send_message = self.send_message # Add logger control_type_handler._logger = self._logger diff --git a/src/flexmeasures_client/s2/control_types/FRBC/__init__.py b/src/flexmeasures_client/s2/control_types/FRBC/__init__.py index 2b4ccf15..94c85b43 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/__init__.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/__init__.py @@ -196,4 +196,4 @@ async def trigger_schedule(self, system_description_id: str): ) # put instruction into the sending queue - await self._sending_queue.put(instruction) + await self.send_message(instruction) diff --git a/src/flexmeasures_client/s2/control_types/__init__.py b/src/flexmeasures_client/s2/control_types/__init__.py index b3e19f5b..7fc6137c 100644 --- a/src/flexmeasures_client/s2/control_types/__init__.py +++ b/src/flexmeasures_client/s2/control_types/__init__.py @@ -1,8 +1,7 @@ from __future__ import annotations -from asyncio import Queue from logging import Logger -from typing import cast +from typing import cast, Callable from pydantic import BaseModel from s2python.common import ( @@ -22,7 +21,7 @@ class ControlTypeHandler(Handler): _instruction_history: SizeLimitOrderedDict[str, BaseModel] _instruction_status_history: SizeLimitOrderedDict[str, InstructionStatus] _fm_client: FlexMeasuresClient - _sending_queue: Queue + send_message: Callable _logger: Logger def __init__(self, max_size: int = 100) -> None: From e01176d9e8c72860f7c619b7e73177c53001de5b Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 17:12:39 +0100 Subject: [PATCH 26/81] feat: roll 3 days of test prices Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/script/websockets_server.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index a22d8780..e5a3103e 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -2,6 +2,8 @@ import json import logging import os +from datetime import datetime +from zoneinfo import ZoneInfo import aiohttp from aiohttp import web @@ -172,10 +174,11 @@ async def configure_site( # Continue immediately without awaiting LOGGER.debug("Posting 3 days of prices in a background task..") + start_of_today = datetime.now(ZoneInfo("Europe/Amsterdam")).replace(hour=0, minute=0, second=0, microsecond=0).isoformat() asyncio.create_task( fm_client.post_sensor_data( sensor_id=price_sensor["id"], - start="2026-02-25T00:00+01", # now + start=start_of_today, prior="2026-01-01T00:00+01", # 2026-01-01T00:00+01 duration="P3D", # P1M values=[0.3], From c780f9899e2dd9a3ff255c417757e907c026ee7a Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 17:37:30 +0100 Subject: [PATCH 27/81] style: black, isort Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 14 +++++++++----- .../s2/control_types/__init__.py | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 0131ac87..1a3afbe7 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -99,11 +99,13 @@ async def trigger_schedule(self, system_description_id: str | None = None): """Translates S2 System Description into FM API calls""" if system_description_id: - system_description: FRBCSystemDescription = self._system_description_history[ - system_description_id - ] + system_description: FRBCSystemDescription = ( + self._system_description_history[system_description_id] + ) else: - system_description: FRBCSystemDescription = list(self._system_description_history.values())[-1] + system_description: FRBCSystemDescription = list( + self._system_description_history.values() + )[-1] if len(self._storage_status_history) > 0: soc_at_start = list(self._storage_status_history.values())[ @@ -116,7 +118,9 @@ async def trigger_schedule(self, system_description_id: str | None = None): soc_min, soc_max = get_soc_min_max(system_description) # call schedule - start = system_description.valid_from + self._valid_from_shift # TODO: localize datetime + start = ( + system_description.valid_from + self._valid_from_shift + ) # TODO: localize datetime start = start.replace(minute=(start.minute // 15) * 15, second=0, microsecond=0) schedule = await self._fm_client.trigger_and_get_schedule( start=start, diff --git a/src/flexmeasures_client/s2/control_types/__init__.py b/src/flexmeasures_client/s2/control_types/__init__.py index 7fc6137c..429ad419 100644 --- a/src/flexmeasures_client/s2/control_types/__init__.py +++ b/src/flexmeasures_client/s2/control_types/__init__.py @@ -1,7 +1,7 @@ from __future__ import annotations from logging import Logger -from typing import cast, Callable +from typing import Callable, cast from pydantic import BaseModel from s2python.common import ( From 26f4778349dd7334d251987a6a518a7eb8962505 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 25 Feb 2026 17:48:03 +0100 Subject: [PATCH 28/81] style: black Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/script/websockets_server.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index e5a3103e..7a08d025 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -174,7 +174,11 @@ async def configure_site( # Continue immediately without awaiting LOGGER.debug("Posting 3 days of prices in a background task..") - start_of_today = datetime.now(ZoneInfo("Europe/Amsterdam")).replace(hour=0, minute=0, second=0, microsecond=0).isoformat() + start_of_today = ( + datetime.now(ZoneInfo("Europe/Amsterdam")) + .replace(hour=0, minute=0, second=0, microsecond=0) + .isoformat() + ) asyncio.create_task( fm_client.post_sensor_data( sensor_id=price_sensor["id"], From 5fb3a262d11659262294988969f101d6f2258687 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Thu, 26 Feb 2026 13:37:26 +0100 Subject: [PATCH 29/81] feat: make schedules appear with consumption on the positive axis Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/script/websockets_server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index 7a08d025..a56c6eac 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -196,6 +196,7 @@ async def configure_site( unit="kW", generic_asset_id=site_asset["id"], timezone="Europe/Amsterdam", + attributes={"consumption_is_positive": True}, ) if soc_sensor is None: soc_sensor = await fm_client.add_sensor( From 508611b631bfed4114191efbed0854b772c16c94 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Thu, 26 Feb 2026 13:39:57 +0100 Subject: [PATCH 30/81] fix: update _sending_queue.put to send_message in FillRateBasedControlTUNES, too Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_tunes.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py index 6b822791..2c647209 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py @@ -215,7 +215,7 @@ async def send_storage_status(self, status: FRBCStorageStatus): subject_message_id=status.message_id, status=ReceptionStatusValues.PERMANENT_ERROR, ) - await self._sending_queue.put(response) + await self.send_message(response) await self.trigger_schedule() async def send_leakage_behaviour(self, leakage: FRBCLeakageBehaviour): @@ -246,7 +246,7 @@ async def send_leakage_behaviour(self, leakage: FRBCLeakageBehaviour): subject_message_id=leakage.message_id, status=ReceptionStatusValues.PERMANENT_ERROR, ) - await self._sending_queue.put(response) + await self.send_message(response) async def send_actuator_status(self, status: FRBCActuatorStatus): if not self._is_timer_due("actuator_status"): @@ -514,12 +514,12 @@ async def trigger_schedule(self): object_id=message_id, ) self._logger.debug(f"Sending revoke instruction for {message_id}") - await self._sending_queue.put(revoke_instruction) + await self.send_message(revoke_instruction) self._datastore["instructions"] = {} # Put the instruction in the sending queue for instruction in instructions: - await self._sending_queue.put(instruction) + await self.send_message(instruction) # Store instructions for instruction in instructions: From 07b55c51f2ddc1cc7b2e1c66c24847b4705e77be Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 09:15:11 +0100 Subject: [PATCH 31/81] feat: port send_fill_level_target_profile Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 51 +++++++++++++++++++ .../s2/script/websockets_server.py | 28 +++++++++- 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 1a3afbe7..e8533c69 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -7,9 +7,12 @@ import pytz +import pandas as pd + try: from s2python.frbc import ( FRBCActuatorStatus, + FRBCFillLevelTargetProfile, FRBCStorageStatus, FRBCSystemDescription, ) @@ -25,6 +28,9 @@ fm_schedule_to_instructions, get_soc_min_max, ) +from flexmeasures_client.s2.control_types.translations import ( + translate_fill_level_target_profile, +) class FRBCSimple(FRBC): @@ -41,6 +47,8 @@ def __init__( soc_sensor_id: int, rm_discharge_sensor_id: int, price_sensor_id: int, + soc_minima_sensor_id: int, + soc_maxima_sensor_id: int, timezone: str = "UTC", schedule_duration: timedelta = timedelta(hours=12), max_size: int = 100, @@ -54,6 +62,8 @@ def __init__( self._schedule_duration = schedule_duration self._soc_sensor_id = soc_sensor_id self._rm_discharge_sensor_id = rm_discharge_sensor_id + self._soc_minima_sensor_id = soc_minima_sensor_id + self._soc_maxima_sensor_id = soc_maxima_sensor_id self._timezone = pytz.timezone(timezone) # delay the start of the schedule from the time `valid_from` @@ -135,6 +145,8 @@ async def trigger_schedule(self, system_description_id: str | None = None): "soc-at-start": soc_at_start, # TODO: use forecast of the SOC instead "soc-min": soc_min, "soc-max": soc_max, + "soc-minima": {"sensor": self._soc_minima_sensor_id}, + "soc-maxima": {"sensor": self._soc_maxima_sensor_id}, }, duration=self._schedule_duration, # next 12 hours # TODO: add SOC MAX AND SOC MIN FROM fill_level_range, @@ -149,3 +161,42 @@ async def trigger_schedule(self, system_description_id: str | None = None): # put instructions to sending queue for instruction in instructions: await self.send_message(instruction) + + async def send_fill_level_target_profile( + self, fill_level_target_profile: FRBCFillLevelTargetProfile + ): + """ + Send FRBC.FillLevelTargetProfile to FlexMeasures. + + Args: + fill_level_target_profile (FRBCFillLevelTargetProfile): The fill level target profile to be translated and sent. + """ + # if not self._is_timer_due("fill_level_target_profile"): + # return + RESOLUTION = "15min" + + soc_minima, soc_maxima = translate_fill_level_target_profile( + fill_level_target_profile=fill_level_target_profile, + resolution=RESOLUTION, + fill_level_scale=1, + ) + + duration = str(pd.Timedelta(RESOLUTION) * len(soc_maxima)) + + # POST SOC Minima measurements to FlexMeasures + await self._fm_client.post_sensor_data( + sensor_id=self._soc_minima_sensor_id, + start=fill_level_target_profile.start_time, + values=soc_minima.tolist(), + unit=self.energy_unit, + duration=duration, + ) + + # POST SOC Maxima measurements to FlexMeasures + await self._fm_client.post_sensor_data( + sensor_id=self._soc_maxima_sensor_id, + start=fill_level_target_profile.start_time, + values=soc_maxima.tolist(), + unit=self.energy_unit, + duration=duration, + ) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index a56c6eac..6bf6e57b 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -93,7 +93,7 @@ async def websocket_handler(request): "toy-password", "toy-user@flexmeasures.io", host="server:5000" ) - price_sensor, power_sensor, soc_sensor, rm_discharge_sensor = await configure_site( + price_sensor, power_sensor, soc_sensor, rm_discharge_sensor, soc_minima_sensor, soc_maxima_sensor = await configure_site( site_name, fm_client ) @@ -107,6 +107,8 @@ async def websocket_handler(request): price_sensor_id=price_sensor["id"], soc_sensor_id=soc_sensor["id"], rm_discharge_sensor_id=rm_discharge_sensor["id"], + soc_minima_sensor_id=soc_minima_sensor["id"], + soc_maxima_sensor_id=soc_maxima_sensor["id"], ) cem.register_control_type(frbc) @@ -153,6 +155,8 @@ async def configure_site( power_sensor = None soc_sensor = None rm_discharge_sensor = None + soc_minima_sensor = None + soc_maxima_sensor = None for sensor in sensors: if sensor["name"] == "price": price_sensor = sensor @@ -162,6 +166,10 @@ async def configure_site( soc_sensor = sensor elif sensor["name"] == "RM discharge": rm_discharge_sensor = sensor + elif sensor["name"] == "soc-minima": + soc_minima_sensor = sensor + elif sensor["name"] == "soc-maxima": + soc_maxima_sensor = sensor if price_sensor is None: price_sensor = await fm_client.add_sensor( @@ -214,7 +222,23 @@ async def configure_site( generic_asset_id=site_asset["id"], timezone="Europe/Amsterdam", ) - return price_sensor, power_sensor, soc_sensor, rm_discharge_sensor + if soc_minima_sensor is None: + soc_minima_sensor = await fm_client.add_sensor( + name="soc-minima", + event_resolution="PT15M", + unit="kWh", + generic_asset_id=site_asset["id"], + timezone="Europe/Amsterdam", + ) + if soc_maxima_sensor is None: + soc_maxima_sensor = await fm_client.add_sensor( + name="soc-maxima", + event_resolution="PT15M", + unit="kWh", + generic_asset_id=site_asset["id"], + timezone="Europe/Amsterdam", + ) + return price_sensor, power_sensor, soc_sensor, rm_discharge_sensor, soc_minima_sensor, soc_maxima_sensor app = web.Application() From 872224c5036bd6ba1a3f563db5ab4bdd4597bc15 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 09:16:43 +0100 Subject: [PATCH 32/81] feat: save scheduled state-of-charge, too Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index e8533c69..81213c88 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -147,6 +147,7 @@ async def trigger_schedule(self, system_description_id: str | None = None): "soc-max": soc_max, "soc-minima": {"sensor": self._soc_minima_sensor_id}, "soc-maxima": {"sensor": self._soc_maxima_sensor_id}, + "state-of-charge": {"sensor": self._soc_sensor_id}, }, duration=self._schedule_duration, # next 12 hours # TODO: add SOC MAX AND SOC MIN FROM fill_level_range, From 0b96cc5f9d86bb4c7d31ace4d9042ef06aa58715 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 10:19:38 +0100 Subject: [PATCH 33/81] feat: set sensors_to_show on CEM asset Signed-off-by: F.N. Claessen --- .../s2/script/websockets_server.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index 6bf6e57b..b55a67db 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -128,7 +128,7 @@ async def configure_site( account = await fm_client.get_account() assets = await fm_client.get_assets(parse_json_fields=True) - site_asset = None + site_asset: dict | None = None for asset in assets: if asset["name"] == site_name: site_asset = asset @@ -238,6 +238,20 @@ async def configure_site( generic_asset_id=site_asset["id"], timezone="Europe/Amsterdam", ) + sensors_to_show = [ + { + "title": "State of charge", + "sensors": [soc_minima_sensor["id"], soc_maxima_sensor["id"], soc_sensor["id"]], + }, + { + "title": "Prices", + "sensor": price_sensor["id"], + }, + ] + await fm_client.update_asset( + asset_id=site_asset["id"], + updates=dict(sensors_to_show=sensors_to_show), + ) return price_sensor, power_sensor, soc_sensor, rm_discharge_sensor, soc_minima_sensor, soc_maxima_sensor From 5ef6f6431b44ac4edce331fda029eadcb29bba85 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 10:20:04 +0100 Subject: [PATCH 34/81] fix: type annotation Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/script/websockets_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index b55a67db..575f57e4 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -124,7 +124,7 @@ async def websocket_handler(request): async def configure_site( site_name: str, fm_client: FlexMeasuresClient -) -> tuple[dict, dict, dict, dict]: +) -> tuple[dict, dict, dict, dict, dict, dict]: account = await fm_client.get_account() assets = await fm_client.get_assets(parse_json_fields=True) From 010d584f50542d8b8696e7975160d80ae34ba703 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 10:24:42 +0100 Subject: [PATCH 35/81] fix: flex-model soc-unit Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 81213c88..432fda1a 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -141,7 +141,7 @@ async def trigger_schedule(self, system_description_id: str | None = None): "site-power-capacity": f"{3 * 25 * 230} VA", }, flex_model={ - "soc-unit": "MWh", + "soc-unit": self.energy_unit, "soc-at-start": soc_at_start, # TODO: use forecast of the SOC instead "soc-min": soc_min, "soc-max": soc_max, From c38951040e185f821c7f92f08109bfb317c57d42 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 10:42:25 +0100 Subject: [PATCH 36/81] fix: get rid of valid_from_shift Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 8 +------- .../s2/control_types/FRBC/frbc_tunes.py | 5 ----- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 432fda1a..80874739 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -39,7 +39,6 @@ class FRBCSimple(FRBC): _soc_sensor_id: int _rm_discharge_sensor_id: int _schedule_duration: timedelta - _valid_from_shift: timedelta def __init__( self, @@ -52,7 +51,6 @@ def __init__( timezone: str = "UTC", schedule_duration: timedelta = timedelta(hours=12), max_size: int = 100, - valid_from_shift: timedelta = timedelta(days=1), power_unit: str = "kW", energy_unit: str = "kWh", ) -> None: @@ -65,10 +63,6 @@ def __init__( self._soc_minima_sensor_id = soc_minima_sensor_id self._soc_maxima_sensor_id = soc_maxima_sensor_id self._timezone = pytz.timezone(timezone) - - # delay the start of the schedule from the time `valid_from` - # of the FRBC.SystemDescription. - self._valid_from_shift = valid_from_shift self.power_unit = power_unit self.energy_unit = energy_unit @@ -129,7 +123,7 @@ async def trigger_schedule(self, system_description_id: str | None = None): # call schedule start = ( - system_description.valid_from + self._valid_from_shift + system_description.valid_from ) # TODO: localize datetime start = start.replace(minute=(start.minute // 15) * 15, second=0, microsecond=0) schedule = await self._fm_client.trigger_and_get_schedule( diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py index 2c647209..21217669 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py @@ -123,7 +123,6 @@ def __init__( schedule_duration: timedelta = timedelta(hours=12), max_size: int = 100, fill_level_scale: float = 0.1, - valid_from_shift: timedelta = timedelta(days=1), timers: dict[str, datetime] | None = None, datastore: dict | None = None, **kwargs, @@ -154,10 +153,6 @@ def __init__( self._production_price_sensor_id = production_price_sensor self._timezone = pytz.timezone(timezone) - - # delay the start of the schedule from the time `valid_from` of the FRBC.SystemDescription - self._valid_from_shift = valid_from_shift - self._fill_level_scale = fill_level_scale self._active_recurring_schedule = False From e5ad8a0a9f86bb248dc09161d8bab92e7697ae4a Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 10:45:01 +0100 Subject: [PATCH 37/81] dev: log used SystemDescription Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 80874739..672cda95 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -107,9 +107,11 @@ async def trigger_schedule(self, system_description_id: str | None = None): self._system_description_history[system_description_id] ) else: + # Use last SystemDescription system_description: FRBCSystemDescription = list( self._system_description_history.values() )[-1] + self._logger.debug(f"Using system description: {system_description}") if len(self._storage_status_history) > 0: soc_at_start = list(self._storage_status_history.values())[ From 54dd188138d2b4737610d97afa326ec16b8875b4 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 10:47:07 +0100 Subject: [PATCH 38/81] style: black Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 13 +++++++--- .../s2/script/websockets_server.py | 26 +++++++++++++++---- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 672cda95..b81a8c53 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -111,6 +111,15 @@ async def trigger_schedule(self, system_description_id: str | None = None): system_description: FRBCSystemDescription = list( self._system_description_history.values() )[-1] + system_descriptions = self._system_description_history.values() + self._logger.error( + list( + [ + system_description.valid_from + for system_description in system_descriptions + ] + ) + ) self._logger.debug(f"Using system description: {system_description}") if len(self._storage_status_history) > 0: @@ -124,9 +133,7 @@ async def trigger_schedule(self, system_description_id: str | None = None): soc_min, soc_max = get_soc_min_max(system_description) # call schedule - start = ( - system_description.valid_from - ) # TODO: localize datetime + start = system_description.valid_from # TODO: localize datetime start = start.replace(minute=(start.minute // 15) * 15, second=0, microsecond=0) schedule = await self._fm_client.trigger_and_get_schedule( start=start, diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index 575f57e4..b1018adc 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -93,9 +93,14 @@ async def websocket_handler(request): "toy-password", "toy-user@flexmeasures.io", host="server:5000" ) - price_sensor, power_sensor, soc_sensor, rm_discharge_sensor, soc_minima_sensor, soc_maxima_sensor = await configure_site( - site_name, fm_client - ) + ( + price_sensor, + power_sensor, + soc_sensor, + rm_discharge_sensor, + soc_minima_sensor, + soc_maxima_sensor, + ) = await configure_site(site_name, fm_client) cem = CEM( sensor_id=power_sensor["id"], @@ -241,7 +246,11 @@ async def configure_site( sensors_to_show = [ { "title": "State of charge", - "sensors": [soc_minima_sensor["id"], soc_maxima_sensor["id"], soc_sensor["id"]], + "sensors": [ + soc_minima_sensor["id"], + soc_maxima_sensor["id"], + soc_sensor["id"], + ], }, { "title": "Prices", @@ -252,7 +261,14 @@ async def configure_site( asset_id=site_asset["id"], updates=dict(sensors_to_show=sensors_to_show), ) - return price_sensor, power_sensor, soc_sensor, rm_discharge_sensor, soc_minima_sensor, soc_maxima_sensor + return ( + price_sensor, + power_sensor, + soc_sensor, + rm_discharge_sensor, + soc_minima_sensor, + soc_maxima_sensor, + ) app = web.Application() From 2819e4f4da0f7661725e2344db981b58f69531d0 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 10:55:10 +0100 Subject: [PATCH 39/81] feat: port send_usage_forecast Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 53 +++++++++++++++++-- .../s2/script/websockets_server.py | 16 +++++- 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index b81a8c53..849d3a92 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -15,6 +15,7 @@ FRBCFillLevelTargetProfile, FRBCStorageStatus, FRBCSystemDescription, + FRBCUsageForecast, ) except ImportError: raise ImportError( @@ -30,6 +31,7 @@ ) from flexmeasures_client.s2.control_types.translations import ( translate_fill_level_target_profile, + translate_usage_forecast_to_fm, ) @@ -38,7 +40,12 @@ class FRBCSimple(FRBC): _price_sensor_id: int _soc_sensor_id: int _rm_discharge_sensor_id: int + _soc_minima_sensor_id: int + _soc_maxima_sensor_id: int + _usage_forecast_sensor_id: int _schedule_duration: timedelta + _fill_level_scale: int = 1 + _resolution = "15min" def __init__( self, @@ -48,6 +55,7 @@ def __init__( price_sensor_id: int, soc_minima_sensor_id: int, soc_maxima_sensor_id: int, + usage_forecast_sensor_id: int, timezone: str = "UTC", schedule_duration: timedelta = timedelta(hours=12), max_size: int = 100, @@ -62,6 +70,7 @@ def __init__( self._rm_discharge_sensor_id = rm_discharge_sensor_id self._soc_minima_sensor_id = soc_minima_sensor_id self._soc_maxima_sensor_id = soc_maxima_sensor_id + self._usage_forecast_sensor_id = usage_forecast_sensor_id self._timezone = pytz.timezone(timezone) self.power_unit = power_unit self.energy_unit = energy_unit @@ -151,6 +160,7 @@ async def trigger_schedule(self, system_description_id: str | None = None): "soc-minima": {"sensor": self._soc_minima_sensor_id}, "soc-maxima": {"sensor": self._soc_maxima_sensor_id}, "state-of-charge": {"sensor": self._soc_sensor_id}, + "soc-usage": [{"sensor": self._usage_forecast_sensor_id}], }, duration=self._schedule_duration, # next 12 hours # TODO: add SOC MAX AND SOC MIN FROM fill_level_range, @@ -177,15 +187,14 @@ async def send_fill_level_target_profile( """ # if not self._is_timer_due("fill_level_target_profile"): # return - RESOLUTION = "15min" soc_minima, soc_maxima = translate_fill_level_target_profile( fill_level_target_profile=fill_level_target_profile, - resolution=RESOLUTION, - fill_level_scale=1, + resolution=self._resolution, + fill_level_scale=self._fill_level_scale, ) - duration = str(pd.Timedelta(RESOLUTION) * len(soc_maxima)) + duration = str(pd.Timedelta(self._resolution) * len(soc_maxima)) # POST SOC Minima measurements to FlexMeasures await self._fm_client.post_sensor_data( @@ -204,3 +213,39 @@ async def send_fill_level_target_profile( unit=self.energy_unit, duration=duration, ) + + async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast): + """ + Send FRBC.UsageForecast to FlexMeasures. + + Args: + usage_forecast (FRBCUsageForecast): The usage forecast to be translated and sent. + """ + # if not self._is_timer_due("usage_forecast"): + # return + + start_time = usage_forecast.start_time + + # flooring to previous 15min tick + start_time = start_time.replace( + minute=(start_time.minute // 15) * 15, second=0, microsecond=0 + ) + + usage_forecast = translate_usage_forecast_to_fm( + usage_forecast, + self._resolution, + strategy="mean", + fill_level_scale=self._fill_level_scale, + ) + + # Scale usage forecast e.g. [0, 100] %/s -> [0, 100] %/(15 min) + scale = timedelta(minutes=15) / timedelta(seconds=1) + scaled_usage_forecast = usage_forecast * scale + + await self._fm_client.post_sensor_data( + sensor_id=self._usage_forecast_sensor_id, + start=start_time, + values=scaled_usage_forecast.tolist(), + unit=self.power_unit, # e.g. [0, 100] MW/(15 min) + duration=str(pd.Timedelta(self._resolution) * len(usage_forecast)), + ) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index b1018adc..19c104cb 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -100,6 +100,7 @@ async def websocket_handler(request): rm_discharge_sensor, soc_minima_sensor, soc_maxima_sensor, + usage_forecast_sensor, ) = await configure_site(site_name, fm_client) cem = CEM( @@ -114,6 +115,7 @@ async def websocket_handler(request): rm_discharge_sensor_id=rm_discharge_sensor["id"], soc_minima_sensor_id=soc_minima_sensor["id"], soc_maxima_sensor_id=soc_maxima_sensor["id"], + usage_forecast_sensor_id=usage_forecast_sensor["id"], ) cem.register_control_type(frbc) @@ -129,7 +131,7 @@ async def websocket_handler(request): async def configure_site( site_name: str, fm_client: FlexMeasuresClient -) -> tuple[dict, dict, dict, dict, dict, dict]: +) -> tuple[dict, dict, dict, dict, dict, dict, dict]: account = await fm_client.get_account() assets = await fm_client.get_assets(parse_json_fields=True) @@ -162,6 +164,7 @@ async def configure_site( rm_discharge_sensor = None soc_minima_sensor = None soc_maxima_sensor = None + usage_forecast_sensor = None for sensor in sensors: if sensor["name"] == "price": price_sensor = sensor @@ -175,6 +178,8 @@ async def configure_site( soc_minima_sensor = sensor elif sensor["name"] == "soc-maxima": soc_maxima_sensor = sensor + elif sensor["name"] == "usage-forecast": + usage_forecast_sensor = sensor if price_sensor is None: price_sensor = await fm_client.add_sensor( @@ -243,6 +248,14 @@ async def configure_site( generic_asset_id=site_asset["id"], timezone="Europe/Amsterdam", ) + if usage_forecast_sensor is None: + usage_forecast_sensor = await fm_client.add_sensor( + name="usage-forecast", + event_resolution="PT15M", + unit="kW", + generic_asset_id=site_asset["id"], + timezone="Europe/Amsterdam", + ) sensors_to_show = [ { "title": "State of charge", @@ -268,6 +281,7 @@ async def configure_site( rm_discharge_sensor, soc_minima_sensor, soc_maxima_sensor, + usage_forecast_sensor, ) From 6cf71ad21d17c4f02f169c0c80772a24ccf5ecd5 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 11:04:42 +0100 Subject: [PATCH 40/81] feat: relax constraints Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 849d3a92..4a1b1f39 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -151,6 +151,7 @@ async def trigger_schedule(self, system_description_id: str | None = None): "production-price": {"sensor": self._price_sensor_id}, "consumption-price": {"sensor": self._price_sensor_id}, "site-power-capacity": f"{3 * 25 * 230} VA", + "relax-constraints": True, }, flex_model={ "soc-unit": self.energy_unit, From b78fbc8cd956665637a8e063c544f0d1af47a0a3 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 14:11:53 +0100 Subject: [PATCH 41/81] feat: we already remove scheduled power values that are not a change with respect to the previous value, but here we also remove instructions that are not a change to the previous instructions; because sometimes different scheduled power values can map to the same instruction Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/utils.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/utils.py b/src/flexmeasures_client/s2/control_types/FRBC/utils.py index 04ce1056..ebb1319a 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/utils.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/utils.py @@ -161,6 +161,7 @@ def fm_schedule_to_instructions( deltaT = timedelta(minutes=15) / timedelta(hours=1) + previous_instruction = None for timestamp, row in schedule.iterrows(): power = row["schedule"] usage = row.get("usage_forecast", 0) @@ -221,9 +222,21 @@ def fm_schedule_to_instructions( execution_time=timestamp, abnormal_condition=False, ) + if previous_instruction and all( + getattr(previous_instruction, attr) == getattr(instruction, attr) + for attr in ( + "actuator_id", + "operation_mode", + "operation_mode_factor", + "abnormal_condition", + ) + ): + logger.info("Instruction removed, no changes to previous instruction") + continue logger.info( f"Instruction created: at {timestamp} set {actuator.diagnostic_label if isinstance(actuator.diagnostic_label, str) else actuator} to {best_operation_mode.diagnostic_label if isinstance(best_operation_mode.diagnostic_label, str) else best_operation_mode} with factor {operation_mode_factor}" ) + previous_instruction = instruction instructions.append(instruction) # Update fill level From baac3bb3ae922502e0b590e823029a05b759cdb1 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 14:14:17 +0100 Subject: [PATCH 42/81] dev: debug log JSON instructions Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/utils.py | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/utils.py b/src/flexmeasures_client/s2/control_types/FRBC/utils.py index ebb1319a..46cc7d4e 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/utils.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/utils.py @@ -1,4 +1,6 @@ +import json import logging +import uuid from datetime import timedelta from math import isclose from typing import List @@ -9,6 +11,7 @@ try: from s2python.common import NumberRange from s2python.frbc import ( + FRBCActuatorDescription, FRBCInstruction, FRBCLeakageBehaviour, FRBCOperationMode, @@ -155,7 +158,12 @@ def fm_schedule_to_instructions( f"{len(system_description.actuators)} were provided" ) - operation_modes: list[FRBCOperationMode] = actuator.operation_modes + actuators: dict[uuid.UUID, FRBCActuatorDescription] = { + a.id: a for a in system_description.actuators + } + operation_modes: dict[uuid.UUID, FRBCOperationMode] = { + om.id: om for om in actuator.operation_modes + } fill_level = initial_fill_level @@ -170,7 +178,7 @@ def fm_schedule_to_instructions( # Convert from power to fill rate results = [ (om, *power_to_fill_rate_with_metrics(om, power, fill_level)) - for om in operation_modes + for om in operation_modes.values() ] # Step 1: minimize fill-level penalty (primary) @@ -238,6 +246,18 @@ def fm_schedule_to_instructions( ) previous_instruction = instruction instructions.append(instruction) + logger.debug( + "Instructions JSON: %s", + json.dumps( + [ + serialize_instruction( + instr, actuators=actuators, operation_modes=operation_modes + ) + for instr in instructions + ], + indent=2, + ), + ) # Update fill level fill_level = compute_next_fill_level( @@ -394,3 +414,29 @@ def explain_choice( lines.append(f"{label} (element={element_label}): rejected due to {reason}") return "; ".join(lines) + + +def serialize_instruction( + instr: FRBCInstruction, + actuators: dict[uuid.UUID, FRBCActuatorDescription], + operation_modes: dict[uuid.UUID, FRBCOperationMode], +): + """Create dict of instructions suitable for logging.""" + actuator = ( + getattr(actuators[instr.actuator_id], "diagnostic_label", None) + or instr.actuator_id + ) + operation_mode = ( + getattr(operation_modes[instr.operation_mode], "diagnostic_label", None) + or instr.operation_mode + ) + return { + "message_type": instr.message_type, + "message_id": str(instr.message_id), + "instruction_id": str(instr.id), + "actuator": str(actuator), + "operation_mode": str(operation_mode), + "operation_mode_factor": instr.operation_mode_factor, + "execution_time": instr.execution_time.isoformat(), + "abnormal_condition": instr.abnormal_condition, + } From 0577d534c5cb5e34faa5863676a7185bedf6ad1f Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 14:18:59 +0100 Subject: [PATCH 43/81] dev: speed up polling for simulations Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/script/websockets_server.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index 19c104cb..0660922e 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -90,7 +90,10 @@ async def websocket_handler(request): site_name = "My CEM" fm_client = FlexMeasuresClient( - "toy-password", "toy-user@flexmeasures.io", host="server:5000" + host="server:5000", + email="toy-user@flexmeasures.io", + password="toy-password", + polling_interval=0.5, ) ( From b99b39b0508b3ae65207a930534ce85873ba9304 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 16:03:14 +0100 Subject: [PATCH 44/81] docs: add instruction to create an admin user Signed-off-by: F.N. Claessen --- docs/CEM.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/CEM.rst b/docs/CEM.rst index 98fe41d5..5a604d45 100644 --- a/docs/CEM.rst +++ b/docs/CEM.rst @@ -40,6 +40,14 @@ To test, run the included example RM: python3 flexmeasures_client/s2/script/websockets_client.py +For full access via the UI, create an admin user for the Docker Toy Account (here, we assume it has ID 1): + +.. code-block:: bash + + docker exec -it flexmeasures-server-1 bash + flexmeasures show accounts + flexmeasures add user --roles admin --account 1 --email --username + Disclaimer ========== From 7ce4c35bbcc6edb76c8d2f9f8f1425b130a5e52b Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 16:10:48 +0100 Subject: [PATCH 45/81] refactor: rename variable Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 4a1b1f39..d85ec637 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -98,11 +98,11 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): + (fill_rate.end_of_range - fill_rate.start_of_range) * factor ) - dt = status.transition_timestamp or self.now() + start = status.transition_timestamp or self.now() await self._fm_client.post_sensor_data( self._power_sensor_id, - start=dt, + start=start, values=[-power], unit=self.power_unit, duration=timedelta(minutes=15), From 806dae0cf51e041923ddf676656da37236d39721 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 16:23:19 +0100 Subject: [PATCH 46/81] style: isort Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index d85ec637..8b7604fb 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -5,9 +5,8 @@ from datetime import datetime, timedelta -import pytz - import pandas as pd +import pytz try: from s2python.frbc import ( From 5be4b64ac9f55a1fa0ef500cca5f1d641d9b6e82 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 16:27:44 +0100 Subject: [PATCH 47/81] fix: mypy Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/cem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/cem.py b/src/flexmeasures_client/s2/cem.py index 46bc7f99..4abebde7 100644 --- a/src/flexmeasures_client/s2/cem.py +++ b/src/flexmeasures_client/s2/cem.py @@ -225,7 +225,7 @@ def update_control_type(self, control_type: ControlType): """ self._control_type = control_type - async def get_message(self) -> str: + async def get_message(self) -> tuple[str, asyncio.Future]: """Call this function to get the messages to be sent to the RM Returns: From abb68e062124e36c5070fe7af9bc9351538e1945 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 16:33:54 +0100 Subject: [PATCH 48/81] chore: resolve implicit todo Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/cem.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/flexmeasures_client/s2/cem.py b/src/flexmeasures_client/s2/cem.py index 4abebde7..ace85a71 100644 --- a/src/flexmeasures_client/s2/cem.py +++ b/src/flexmeasures_client/s2/cem.py @@ -240,10 +240,7 @@ async def get_message(self) -> tuple[str, asyncio.Future]: ) message, fut = item - - # Pending for pydantic V2 to implement model.model_dump(mode="json") in - # PR #1409 (https://github.com/pydantic/pydantic/issues/1409) - message = json.loads(message.json()) + message = message.model_dump(mode="json") return message, fut From 59108c0dec5223bad98988778d79b55cfdbf20d0 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 16:35:29 +0100 Subject: [PATCH 49/81] fix: obvious typo Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py index 21217669..58428840 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py @@ -230,7 +230,7 @@ async def send_leakage_behaviour(self, leakage: FRBCLeakageBehaviour): leakage_behaviour_to_storage_efficiency( message=leakage, resolution=timedelta(minutes=15), - fill_level_scale=self._fill_level_scalefill_level_scale, + fill_level_scale=self._fill_level_scale, ) ], unit=PERCENTAGE, From 92fd551132d302646552311898d00e56a72833c0 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 16:47:14 +0100 Subject: [PATCH 50/81] fix: debug log instead of error log Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 8b7604fb..76d314ef 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -120,7 +120,7 @@ async def trigger_schedule(self, system_description_id: str | None = None): self._system_description_history.values() )[-1] system_descriptions = self._system_description_history.values() - self._logger.error( + self._logger.debug( list( [ system_description.valid_from From 1b8d4c96e1e86817fd1a15703a9dd0a89970dbb9 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 27 Feb 2026 16:52:13 +0100 Subject: [PATCH 51/81] feat: port send_leakage_behaviour Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 28 +++++++++++++++++++ .../s2/script/websockets_server.py | 16 ++++++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 76d314ef..c8d70631 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -12,6 +12,7 @@ from s2python.frbc import ( FRBCActuatorStatus, FRBCFillLevelTargetProfile, + FRBCLeakageBehaviour, FRBCStorageStatus, FRBCSystemDescription, FRBCUsageForecast, @@ -31,6 +32,7 @@ from flexmeasures_client.s2.control_types.translations import ( translate_fill_level_target_profile, translate_usage_forecast_to_fm, + leakage_behaviour_to_storage_efficiency, ) @@ -42,6 +44,7 @@ class FRBCSimple(FRBC): _soc_minima_sensor_id: int _soc_maxima_sensor_id: int _usage_forecast_sensor_id: int + _leakage_behaviour_sensor_id: int _schedule_duration: timedelta _fill_level_scale: int = 1 _resolution = "15min" @@ -55,6 +58,7 @@ def __init__( soc_minima_sensor_id: int, soc_maxima_sensor_id: int, usage_forecast_sensor_id: int, + leakage_behaviour_sensor_id: int, timezone: str = "UTC", schedule_duration: timedelta = timedelta(hours=12), max_size: int = 100, @@ -70,6 +74,7 @@ def __init__( self._soc_minima_sensor_id = soc_minima_sensor_id self._soc_maxima_sensor_id = soc_maxima_sensor_id self._usage_forecast_sensor_id = usage_forecast_sensor_id + self._leakage_behaviour_sensor_id = leakage_behaviour_sensor_id self._timezone = pytz.timezone(timezone) self.power_unit = power_unit self.energy_unit = energy_unit @@ -161,6 +166,7 @@ async def trigger_schedule(self, system_description_id: str | None = None): "soc-maxima": {"sensor": self._soc_maxima_sensor_id}, "state-of-charge": {"sensor": self._soc_sensor_id}, "soc-usage": [{"sensor": self._usage_forecast_sensor_id}], + "storage-efficiency": {"sensor": self._leakage_behaviour_sensor_id}, }, duration=self._schedule_duration, # next 12 hours # TODO: add SOC MAX AND SOC MIN FROM fill_level_range, @@ -249,3 +255,25 @@ async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast): unit=self.power_unit, # e.g. [0, 100] MW/(15 min) duration=str(pd.Timedelta(self._resolution) * len(usage_forecast)), ) + + async def send_leakage_behaviour(self, leakage: FRBCLeakageBehaviour): + # if not self._is_timer_due("leakage_behaviour"): + # return + + start = leakage.valid_from or self.now() + start = start.replace(minute=(start.minute // 15) * 15, second=0, microsecond=0) + + storage_efficiency = leakage_behaviour_to_storage_efficiency( + message=leakage, + resolution=timedelta(minutes=15), + fill_level_scale=self._fill_level_scale, + ) + self._logger.debug(storage_efficiency) + + await self._fm_client.post_sensor_data( + self._leakage_behaviour_sensor_id, + start=start, + values=[storage_efficiency], + unit="%", + duration=timedelta(hours=48), + ) diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index 0660922e..6ef67321 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -104,6 +104,7 @@ async def websocket_handler(request): soc_minima_sensor, soc_maxima_sensor, usage_forecast_sensor, + leakage_behaviour_sensor_id, ) = await configure_site(site_name, fm_client) cem = CEM( @@ -119,6 +120,7 @@ async def websocket_handler(request): soc_minima_sensor_id=soc_minima_sensor["id"], soc_maxima_sensor_id=soc_maxima_sensor["id"], usage_forecast_sensor_id=usage_forecast_sensor["id"], + leakage_behaviour_sensor_id=leakage_behaviour_sensor_id["id"], ) cem.register_control_type(frbc) @@ -134,7 +136,7 @@ async def websocket_handler(request): async def configure_site( site_name: str, fm_client: FlexMeasuresClient -) -> tuple[dict, dict, dict, dict, dict, dict, dict]: +) -> tuple[dict, dict, dict, dict, dict, dict, dict, dict]: account = await fm_client.get_account() assets = await fm_client.get_assets(parse_json_fields=True) @@ -168,6 +170,7 @@ async def configure_site( soc_minima_sensor = None soc_maxima_sensor = None usage_forecast_sensor = None + leakage_behaviour_sensor = None for sensor in sensors: if sensor["name"] == "price": price_sensor = sensor @@ -183,6 +186,8 @@ async def configure_site( soc_maxima_sensor = sensor elif sensor["name"] == "usage-forecast": usage_forecast_sensor = sensor + elif sensor["name"] == "leakage-behaviour": + leakage_behaviour_sensor = sensor if price_sensor is None: price_sensor = await fm_client.add_sensor( @@ -259,6 +264,14 @@ async def configure_site( generic_asset_id=site_asset["id"], timezone="Europe/Amsterdam", ) + if leakage_behaviour_sensor is None: + leakage_behaviour_sensor = await fm_client.add_sensor( + name="leakage-behaviour", + event_resolution="PT15M", + unit="%", + generic_asset_id=site_asset["id"], + timezone="Europe/Amsterdam", + ) sensors_to_show = [ { "title": "State of charge", @@ -285,6 +298,7 @@ async def configure_site( soc_minima_sensor, soc_maxima_sensor, usage_forecast_sensor, + leakage_behaviour_sensor, ) From fe02d3313db3a94ff217f104b7c02bb1b2de24d2 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 2 Mar 2026 17:28:52 +0100 Subject: [PATCH 52/81] feat: CEM supports setting simulation time, by wrapping the S2 message together with metadata in an envelope, where the metadata contains a dt Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/cem.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/cem.py b/src/flexmeasures_client/s2/cem.py index ace85a71..6b1ea8fd 100644 --- a/src/flexmeasures_client/s2/cem.py +++ b/src/flexmeasures_client/s2/cem.py @@ -185,7 +185,19 @@ async def handle_message(self, message: Dict | pydantic.BaseModel | str): if isinstance(message, str): message = json.loads(message) - self._logger.debug(f"Received: {message}") + # Detect wrapper + if isinstance(message, dict) and "message" in message and "metadata" in message: + metadata = message["metadata"] + message = message["message"] + self._logger.debug(f"Received wrapped message") + self._logger.debug(f"Received message: {message}") + self._logger.debug(f"Received metadata: {metadata}") + if "dt" in metadata: + for control_type in self._control_types_handlers.values(): + control_type.now = lambda: metadata["dt"] + self.now = lambda: metadata["dt"] + else: + self._logger.debug(f"Received: {message}") # try to handle the message with the control_type handle if ( From fd91a624f538d76568fab7123ed0ef7cc88bfd84 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 2 Mar 2026 19:48:47 +0100 Subject: [PATCH 53/81] style: isort, flake8 Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/cem.py | 2 +- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flexmeasures_client/s2/cem.py b/src/flexmeasures_client/s2/cem.py index 6b1ea8fd..e94394ff 100644 --- a/src/flexmeasures_client/s2/cem.py +++ b/src/flexmeasures_client/s2/cem.py @@ -189,7 +189,7 @@ async def handle_message(self, message: Dict | pydantic.BaseModel | str): if isinstance(message, dict) and "message" in message and "metadata" in message: metadata = message["metadata"] message = message["message"] - self._logger.debug(f"Received wrapped message") + self._logger.debug("Received wrapped message") self._logger.debug(f"Received message: {message}") self._logger.debug(f"Received metadata: {metadata}") if "dt" in metadata: diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index c8d70631..1e5d3e64 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -30,9 +30,9 @@ get_soc_min_max, ) from flexmeasures_client.s2.control_types.translations import ( + leakage_behaviour_to_storage_efficiency, translate_fill_level_target_profile, translate_usage_forecast_to_fm, - leakage_behaviour_to_storage_efficiency, ) From e8e60061c3fd422dbf473482a8f841eaa61831aa Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 2 Mar 2026 19:55:29 +0100 Subject: [PATCH 54/81] feat: record data in FlexMeasures as if it was recorded at simulation time Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 1e5d3e64..309fd441 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -86,6 +86,7 @@ async def send_storage_status(self, status: FRBCStorageStatus): await self._fm_client.post_sensor_data( self._soc_sensor_id, start=self.now(), + prior=self.now(), values=[status.present_fill_level], unit=self.energy_unit, duration=timedelta(minutes=1), @@ -107,6 +108,7 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): await self._fm_client.post_sensor_data( self._power_sensor_id, start=start, + prior=self.now(), values=[-power], unit=self.power_unit, duration=timedelta(minutes=15), @@ -206,6 +208,7 @@ async def send_fill_level_target_profile( await self._fm_client.post_sensor_data( sensor_id=self._soc_minima_sensor_id, start=fill_level_target_profile.start_time, + prior=self.now(), values=soc_minima.tolist(), unit=self.energy_unit, duration=duration, @@ -215,6 +218,7 @@ async def send_fill_level_target_profile( await self._fm_client.post_sensor_data( sensor_id=self._soc_maxima_sensor_id, start=fill_level_target_profile.start_time, + prior=self.now(), values=soc_maxima.tolist(), unit=self.energy_unit, duration=duration, @@ -251,6 +255,7 @@ async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast): await self._fm_client.post_sensor_data( sensor_id=self._usage_forecast_sensor_id, start=start_time, + prior=self.now(), values=scaled_usage_forecast.tolist(), unit=self.power_unit, # e.g. [0, 100] MW/(15 min) duration=str(pd.Timedelta(self._resolution) * len(usage_forecast)), @@ -273,6 +278,7 @@ async def send_leakage_behaviour(self, leakage: FRBCLeakageBehaviour): await self._fm_client.post_sensor_data( self._leakage_behaviour_sensor_id, start=start, + prior=self.now(), values=[storage_efficiency], unit="%", duration=timedelta(hours=48), From 14bf5b8802eaf412329bfee73bfba2e799950724 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 2 Mar 2026 20:02:45 +0100 Subject: [PATCH 55/81] style: silence mypy on overwriting a method with a lambda function Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/cem.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flexmeasures_client/s2/cem.py b/src/flexmeasures_client/s2/cem.py index e94394ff..d0594783 100644 --- a/src/flexmeasures_client/s2/cem.py +++ b/src/flexmeasures_client/s2/cem.py @@ -194,8 +194,8 @@ async def handle_message(self, message: Dict | pydantic.BaseModel | str): self._logger.debug(f"Received metadata: {metadata}") if "dt" in metadata: for control_type in self._control_types_handlers.values(): - control_type.now = lambda: metadata["dt"] - self.now = lambda: metadata["dt"] + control_type.now = lambda: metadata["dt"] # type: ignore + self.now = lambda: metadata["dt"] # type: ignore else: self._logger.debug(f"Received: {message}") From 7e6b29ae50bb6d110c3b8c1308397b1b8c026f27 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 18:06:18 +0100 Subject: [PATCH 56/81] feat: force new scheduling job creation when using a prior Signed-off-by: F.N. Claessen --- src/flexmeasures_client/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/flexmeasures_client/client.py b/src/flexmeasures_client/client.py index 395fa109..9a25b85e 100644 --- a/src/flexmeasures_client/client.py +++ b/src/flexmeasures_client/client.py @@ -1228,6 +1228,7 @@ async def trigger_schedule( if prior is not None: message["prior"] = pd.Timestamp(prior).isoformat() + message["force_new_job_creation"] = True if scheduler is not None: if asset_id is None: raise ValueError( From a89a41e95784652773ccefe7a92822be241b0f96 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 18:07:23 +0100 Subject: [PATCH 57/81] fix: upgrade timely-beliefs to fix scheduler bug with resampling from non-instantaneous to instantaneous Signed-off-by: F.N. Claessen --- docker-compose.override.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docker-compose.override.yml b/docker-compose.override.yml index e562a60c..ad076c39 100644 --- a/docker-compose.override.yml +++ b/docker-compose.override.yml @@ -13,9 +13,16 @@ services: command: - | pip install --break-system-packages -r /usr/var/flexmeasures-instance/requirements.txt + pip install timely-beliefs -U --break-system-packages flexmeasures db upgrade # toy account step removed gunicorn --bind 0.0.0.0:5000 --worker-tmp-dir /dev/shm --workers 2 --threads 4 wsgi:application + worker: + command: + - | + pip install --break-system-packages -r /usr/var/flexmeasures-instance/requirements.txt + pip install timely-beliefs -U --break-system-packages + flexmeasures jobs run-worker --name flexmeasures-worker --queue forecasting\|scheduling cem: build: context: . From 31bddecbe3801b4bc2f844b6c79ca292ba0b9695 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 10:01:15 +0100 Subject: [PATCH 58/81] fix: pass prior to trigger_and_get_schedule Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 309fd441..27a45347 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -149,9 +149,9 @@ async def trigger_schedule(self, system_description_id: str | None = None): # call schedule start = system_description.valid_from # TODO: localize datetime - start = start.replace(minute=(start.minute // 15) * 15, second=0, microsecond=0) schedule = await self._fm_client.trigger_and_get_schedule( - start=start, + start=start.replace(minute=(start.minute // 15) * 15, second=0, microsecond=0), + prior=start, sensor_id=self._power_sensor_id, flex_context={ "production-price": {"sensor": self._price_sensor_id}, From 906a7f57df68558140b586ac96ff1083d262e7a9 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 10:04:40 +0100 Subject: [PATCH 59/81] fix: start schedule from the time of the most recent storage status Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 27a45347..f15d390c 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -83,15 +83,16 @@ def now(self): return self._timezone.localize(datetime.now()) async def send_storage_status(self, status: FRBCStorageStatus): + now = self.now() await self._fm_client.post_sensor_data( self._soc_sensor_id, - start=self.now(), - prior=self.now(), + start=now, + prior=now, values=[status.present_fill_level], unit=self.energy_unit, duration=timedelta(minutes=1), ) - await self.trigger_schedule() + await self.trigger_schedule(now) async def send_actuator_status(self, status: FRBCActuatorStatus): factor = status.operation_mode_factor @@ -114,7 +115,7 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): duration=timedelta(minutes=15), ) - async def trigger_schedule(self, system_description_id: str | None = None): + async def trigger_schedule(self, start: datetime, system_description_id: str | None = None): """Translates S2 System Description into FM API calls""" if system_description_id: @@ -148,7 +149,8 @@ async def trigger_schedule(self, system_description_id: str | None = None): soc_min, soc_max = get_soc_min_max(system_description) # call schedule - start = system_description.valid_from # TODO: localize datetime + if isinstance(start, str): + start = pd.Timestamp(start) schedule = await self._fm_client.trigger_and_get_schedule( start=start.replace(minute=(start.minute // 15) * 15, second=0, microsecond=0), prior=start, From f61fd60783bd8f60d7763be5070daadeb5150dc0 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 10:17:27 +0100 Subject: [PATCH 60/81] style: black Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index f15d390c..3907d587 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -115,7 +115,9 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): duration=timedelta(minutes=15), ) - async def trigger_schedule(self, start: datetime, system_description_id: str | None = None): + async def trigger_schedule( + self, start: datetime, system_description_id: str | None = None + ): """Translates S2 System Description into FM API calls""" if system_description_id: @@ -152,7 +154,9 @@ async def trigger_schedule(self, start: datetime, system_description_id: str | N if isinstance(start, str): start = pd.Timestamp(start) schedule = await self._fm_client.trigger_and_get_schedule( - start=start.replace(minute=(start.minute // 15) * 15, second=0, microsecond=0), + start=start.replace( + minute=(start.minute // 15) * 15, second=0, microsecond=0 + ), prior=start, sensor_id=self._power_sensor_id, flex_context={ From 2d5b0fe6af316845ed01fb9185a5234b97aa36aa Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 10:59:25 +0100 Subject: [PATCH 61/81] fix: still make sure to run `flexmeasures add toy-account` once Signed-off-by: F.N. Claessen --- docker-compose.override.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docker-compose.override.yml b/docker-compose.override.yml index ad076c39..93b0e749 100644 --- a/docker-compose.override.yml +++ b/docker-compose.override.yml @@ -15,7 +15,9 @@ services: pip install --break-system-packages -r /usr/var/flexmeasures-instance/requirements.txt pip install timely-beliefs -U --break-system-packages flexmeasures db upgrade - # toy account step removed + if ! flexmeasures show accounts | grep -q "Docker Toy Account"; then + flexmeasures add toy-account --name 'Docker Toy Account' + fi gunicorn --bind 0.0.0.0:5000 --worker-tmp-dir /dev/shm --workers 2 --threads 4 wsgi:application worker: command: From eda6b2854fc8fde751b0139b1daf44ea2667bbe5 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 14:39:27 +0100 Subject: [PATCH 62/81] feat: add ability to customize fill_level_scale Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 3907d587..cd6f0fec 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -46,7 +46,7 @@ class FRBCSimple(FRBC): _usage_forecast_sensor_id: int _leakage_behaviour_sensor_id: int _schedule_duration: timedelta - _fill_level_scale: int = 1 + _fill_level_scale: float _resolution = "15min" def __init__( @@ -62,6 +62,7 @@ def __init__( timezone: str = "UTC", schedule_duration: timedelta = timedelta(hours=12), max_size: int = 100, + fill_level_scale: float = 1, power_unit: str = "kW", energy_unit: str = "kWh", ) -> None: @@ -76,6 +77,7 @@ def __init__( self._usage_forecast_sensor_id = usage_forecast_sensor_id self._leakage_behaviour_sensor_id = leakage_behaviour_sensor_id self._timezone = pytz.timezone(timezone) + self._fill_level_scale = fill_level_scale self.power_unit = power_unit self.energy_unit = energy_unit @@ -88,7 +90,7 @@ async def send_storage_status(self, status: FRBCStorageStatus): self._soc_sensor_id, start=now, prior=now, - values=[status.present_fill_level], + values=[status.present_fill_level * self._fill_level_scale], unit=self.energy_unit, duration=timedelta(minutes=1), ) @@ -102,7 +104,7 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): power = ( fill_rate.start_of_range + (fill_rate.end_of_range - fill_rate.start_of_range) * factor - ) + ) * self._fill_level_scale start = status.transition_timestamp or self.now() @@ -141,14 +143,15 @@ async def trigger_schedule( self._logger.debug(f"Using system description: {system_description}") if len(self._storage_status_history) > 0: - soc_at_start = list(self._storage_status_history.values())[ - -1 - ].present_fill_level + soc_at_start = ( + list(self._storage_status_history.values())[-1].present_fill_level + * self._fill_level_scale + ) else: print("Can't trigger schedule without knowing the status of the storage...") return - soc_min, soc_max = get_soc_min_max(system_description) + soc_min, soc_max = get_soc_min_max(system_description, self._fill_level_scale) # call schedule if isinstance(start, str): @@ -183,7 +186,9 @@ async def trigger_schedule( # translate FlexMeasures schedule into instructions. SOC -> Power -> PowerFactor instructions = fm_schedule_to_instructions( - schedule, system_description, initial_fill_level=soc_at_start + schedule, + system_description, + initial_fill_level=soc_at_start / self._fill_level_scale, ) # put instructions to sending queue From 1abdd718f2b0f848f679d068e9261f6d53393c14 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 14:45:18 +0100 Subject: [PATCH 63/81] dev: remove todo (soc-at-start is now actually coming from the latest storage status) Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index cd6f0fec..2259d7b0 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -170,7 +170,7 @@ async def trigger_schedule( }, flex_model={ "soc-unit": self.energy_unit, - "soc-at-start": soc_at_start, # TODO: use forecast of the SOC instead + "soc-at-start": soc_at_start, "soc-min": soc_min, "soc-max": soc_max, "soc-minima": {"sensor": self._soc_minima_sensor_id}, From 9f20ab4379d0c06bdafaee70f3067d67752a6657 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 15:16:26 +0100 Subject: [PATCH 64/81] feat: derive consumption-capacity and production-capacity from operation mode elements Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 2259d7b0..47576f6d 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -151,6 +151,27 @@ async def trigger_schedule( print("Can't trigger schedule without knowing the status of the storage...") return + # Assume a single actuator + actuator = system_description.actuators[0] + + # Derive the overall power range + charging_capacity = None + discharging_capacity = None + for operation_mode in actuator.operation_modes: + for element in operation_mode.elements: + for power_range in element.power_ranges: + # todo: distinguish power range per commodity + p_min = power_range.end_of_range + if discharging_capacity is None: + discharging_capacity = p_min + else: + discharging_capacity = min(discharging_capacity, p_min) + p_max = power_range.start_of_range + if charging_capacity is None: + charging_capacity = p_max + else: + charging_capacity = max(charging_capacity, p_max) + soc_min, soc_max = get_soc_min_max(system_description, self._fill_level_scale) # call schedule @@ -178,6 +199,8 @@ async def trigger_schedule( "state-of-charge": {"sensor": self._soc_sensor_id}, "soc-usage": [{"sensor": self._usage_forecast_sensor_id}], "storage-efficiency": {"sensor": self._leakage_behaviour_sensor_id}, + "consumption-capacity": f"{charging_capacity} {self.power_unit}", + "production-capacity": f"{discharging_capacity} {self.power_unit}", }, duration=self._schedule_duration, # next 12 hours # TODO: add SOC MAX AND SOC MIN FROM fill_level_range, From ea68aa0360f00e4779e67fa6d4b060683a8c68ba Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 15:19:42 +0100 Subject: [PATCH 65/81] docs: update docstring Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 47576f6d..9429f947 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -120,7 +120,9 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): async def trigger_schedule( self, start: datetime, system_description_id: str | None = None ): - """Translates S2 System Description into FM API calls""" + """ + Ask FlexMeasures for a new schedule and create FRBC.Instructions to send back to the ResourceManager + """ if system_description_id: system_description: FRBCSystemDescription = ( From 605980ac730a5b194e8840b87f661ac80e7096a6 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 15:36:40 +0100 Subject: [PATCH 66/81] feat: move to J and W as default energy unit and power unit, respectively Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 9429f947..98b98f57 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -63,8 +63,8 @@ def __init__( schedule_duration: timedelta = timedelta(hours=12), max_size: int = 100, fill_level_scale: float = 1, - power_unit: str = "kW", - energy_unit: str = "kWh", + power_unit: str = "W", + energy_unit: str = "J", ) -> None: super().__init__(max_size) self._power_sensor_id = power_sensor_id @@ -176,6 +176,16 @@ async def trigger_schedule( soc_min, soc_max = get_soc_min_max(system_description, self._fill_level_scale) + # Support for J energy unit (FM server only accepts kWh and MWh) + if self.energy_unit == "J": + f = 3.6 * 10 ** 6 + energy_unit = "kWh" + soc_at_start *= f + soc_min *= f + soc_max *= f + else: + energy_unit = self.energy_unit + # call schedule if isinstance(start, str): start = pd.Timestamp(start) @@ -192,7 +202,7 @@ async def trigger_schedule( "relax-constraints": True, }, flex_model={ - "soc-unit": self.energy_unit, + "soc-unit": energy_unit, "soc-at-start": soc_at_start, "soc-min": soc_min, "soc-max": soc_max, From c2bab5d1a18b320ac0c51da5926bfcf4d413f646 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 16:22:54 +0100 Subject: [PATCH 67/81] fix: wrong conversion Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 98b98f57..99daa69b 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -180,9 +180,9 @@ async def trigger_schedule( if self.energy_unit == "J": f = 3.6 * 10 ** 6 energy_unit = "kWh" - soc_at_start *= f - soc_min *= f - soc_max *= f + soc_at_start /= f + soc_min /= f + soc_max /= f else: energy_unit = self.energy_unit From 00f363ab24551631a2080de6998207ba630ba00b Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 16:30:19 +0100 Subject: [PATCH 68/81] docs: clarify inline note Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 99daa69b..5e3d6b2c 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -176,7 +176,7 @@ async def trigger_schedule( soc_min, soc_max = get_soc_min_max(system_description, self._fill_level_scale) - # Support for J energy unit (FM server only accepts kWh and MWh) + # Support for J energy unit (FM server scheduling trigger endpoint only accepts kWh and MWh) if self.energy_unit == "J": f = 3.6 * 10 ** 6 energy_unit = "kWh" From 80b2f6154ab128a7358877e3d44d1664d09e3ee5 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 20:01:51 +0100 Subject: [PATCH 69/81] refactor: prepare for more params Signed-off-by: F.N. Claessen --- src/flexmeasures_client/client.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/flexmeasures_client/client.py b/src/flexmeasures_client/client.py index 9a25b85e..26a579d1 100644 --- a/src/flexmeasures_client/client.py +++ b/src/flexmeasures_client/client.py @@ -607,12 +607,9 @@ async def get_schedule( 'unit': 'MW' } """ + params = {} if duration is not None: - params = { - "duration": pd.Timedelta(duration).isoformat(), # for example: PT1H - } - else: - params = {} + params["duration"] = pd.Timedelta(duration).isoformat(), # for example: PT1H schedule, status = await self.request( uri=f"sensors/{sensor_id}/schedules/{schedule_id}", method="GET", From 2cc3b750a73fe11dca24e4c1a188d1c9afc22251 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 20:04:39 +0100 Subject: [PATCH 70/81] feat: support getting a schedule in a given unit Signed-off-by: F.N. Claessen --- src/flexmeasures_client/client.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/flexmeasures_client/client.py b/src/flexmeasures_client/client.py index 26a579d1..58260ca3 100644 --- a/src/flexmeasures_client/client.py +++ b/src/flexmeasures_client/client.py @@ -596,6 +596,7 @@ async def get_schedule( sensor_id: int, schedule_id: str, duration: str | timedelta | None = None, + unit: str | None = None, ) -> dict: """Get schedule with given ID. @@ -609,7 +610,11 @@ async def get_schedule( """ params = {} if duration is not None: - params["duration"] = pd.Timedelta(duration).isoformat(), # for example: PT1H + params["duration"] = ( + pd.Timedelta(duration).isoformat(), + ) # for example: PT1H + if unit is not None: + params["unit"] = unit schedule, status = await self.request( uri=f"sensors/{sensor_id}/schedules/{schedule_id}", method="GET", @@ -828,6 +833,7 @@ async def trigger_and_get_schedule( asset_id: int | None = None, prior: datetime | None = None, scheduler: str | None = None, + unit: str | None = None, ) -> dict | list[dict]: """Trigger a schedule and then fetch it. @@ -863,7 +869,10 @@ async def trigger_and_get_schedule( if sensor_id is not None: # Get the schedule for a single device return await self.get_schedule( - sensor_id=sensor_id, schedule_id=schedule_id, duration=duration + sensor_id=sensor_id, + schedule_id=schedule_id, + duration=duration, + unit=unit, ) elif flex_model is None: # If there is no flex-model referencing power sensors, no power schedules are retrieved From 3a547b1c5046aa60f5cc5397221dd224cd57b1bb Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 20:05:28 +0100 Subject: [PATCH 71/81] feat: require minimum version for getting a schedule in a given unit Signed-off-by: F.N. Claessen --- src/flexmeasures_client/client.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/flexmeasures_client/client.py b/src/flexmeasures_client/client.py index 58260ca3..f2d99bcb 100644 --- a/src/flexmeasures_client/client.py +++ b/src/flexmeasures_client/client.py @@ -614,6 +614,12 @@ async def get_schedule( pd.Timedelta(duration).isoformat(), ) # for example: PT1H if unit is not None: + await self.ensure_server_version() + if Version(self.server_version) < Version("0.31.0"): + self.logger.warning( + "get_schedule(): The 'unit' parameter requires FlexMeasures server version 0.31.0 or above. " + "This parameter will be ignored." + ) params["unit"] = unit schedule, status = await self.request( uri=f"sensors/{sensor_id}/schedules/{schedule_id}", From d29e64c9d1291d72630cf85e1aa19ad69b60a7da Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 20:06:00 +0100 Subject: [PATCH 72/81] fix: get schedule in the assumed power unit Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 5e3d6b2c..57c46595 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -217,6 +217,7 @@ async def trigger_schedule( duration=self._schedule_duration, # next 12 hours # TODO: add SOC MAX AND SOC MIN FROM fill_level_range, # this needs changes on the client + unit=self.power_unit, ) # translate FlexMeasures schedule into instructions. SOC -> Power -> PowerFactor From fca2c5ba575fb9cc69803986a5efe561213255ce Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 20:13:27 +0100 Subject: [PATCH 73/81] feat: note the current FM server version Signed-off-by: F.N. Claessen --- src/flexmeasures_client/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/flexmeasures_client/client.py b/src/flexmeasures_client/client.py index f2d99bcb..77604dc2 100644 --- a/src/flexmeasures_client/client.py +++ b/src/flexmeasures_client/client.py @@ -618,7 +618,7 @@ async def get_schedule( if Version(self.server_version) < Version("0.31.0"): self.logger.warning( "get_schedule(): The 'unit' parameter requires FlexMeasures server version 0.31.0 or above. " - "This parameter will be ignored." + f"This parameter will be ignored for server version {self.server_version}." ) params["unit"] = unit schedule, status = await self.request( @@ -754,7 +754,7 @@ async def get_assets( if Version(self.server_version) < Version("0.31.0"): self.logger.warning( "get_assets(): The 'root', 'depth' and 'fields' parameters require FlexMeasures server version 0.31.0 or above. " - "These parameters will be ignored." + f"These parameters will be ignored for server version {self.server_version}." ) if root and isinstance(root, int): uri += f"&root={root}" @@ -1127,7 +1127,7 @@ async def update_asset(self, asset_id: int, updates: dict) -> dict: if Version(self.server_version) < Version("0.31.0"): self.logger.warning( "update_asset(): The 'aggregate-power' flex-context field requires FlexMeasures server version 0.31.0 or above. " - "The 'aggregate-power' field will be ignored by the server." + f"The 'aggregate-power' field will be ignored by the server, which is at version {self.server_version}." ) updates["flex_context"] = json.dumps(updates["flex_context"]) if "flex_model" in updates: From fb6d9478514d1077196ca5948e7c5f9cbee863f2 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 20:17:20 +0100 Subject: [PATCH 74/81] fix: actually requires v0.32.0 Signed-off-by: F.N. Claessen --- src/flexmeasures_client/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/client.py b/src/flexmeasures_client/client.py index 77604dc2..0ee72861 100644 --- a/src/flexmeasures_client/client.py +++ b/src/flexmeasures_client/client.py @@ -615,7 +615,7 @@ async def get_schedule( ) # for example: PT1H if unit is not None: await self.ensure_server_version() - if Version(self.server_version) < Version("0.31.0"): + if Version(self.server_version) < Version("0.32.0"): self.logger.warning( "get_schedule(): The 'unit' parameter requires FlexMeasures server version 0.31.0 or above. " f"This parameter will be ignored for server version {self.server_version}." From 7d4c0c0916d975e01d3b319c98f9f1f13194a7f0 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 20:18:43 +0100 Subject: [PATCH 75/81] docs: clarify that the server ignores the parameter, not the client Signed-off-by: F.N. Claessen --- src/flexmeasures_client/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/client.py b/src/flexmeasures_client/client.py index 0ee72861..eca21e8c 100644 --- a/src/flexmeasures_client/client.py +++ b/src/flexmeasures_client/client.py @@ -618,7 +618,7 @@ async def get_schedule( if Version(self.server_version) < Version("0.32.0"): self.logger.warning( "get_schedule(): The 'unit' parameter requires FlexMeasures server version 0.31.0 or above. " - f"This parameter will be ignored for server version {self.server_version}." + f"This parameter will be ignored by the server, which is at version {self.server_version}." ) params["unit"] = unit schedule, status = await self.request( From cc28d77e7556af1c79c28b8e837453960e527efe Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 21:23:17 +0100 Subject: [PATCH 76/81] style: black Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 57c46595..65797a6e 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -178,7 +178,7 @@ async def trigger_schedule( # Support for J energy unit (FM server scheduling trigger endpoint only accepts kWh and MWh) if self.energy_unit == "J": - f = 3.6 * 10 ** 6 + f = 3.6 * 10**6 energy_unit = "kWh" soc_at_start /= f soc_min /= f From 07608256240c3e024cfeafecf6d69f6c044d38e9 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 10 Mar 2026 21:31:05 +0100 Subject: [PATCH 77/81] fix: mistake while refactoring (or from running black?) Signed-off-by: F.N. Claessen --- src/flexmeasures_client/client.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/flexmeasures_client/client.py b/src/flexmeasures_client/client.py index eca21e8c..7b3cc149 100644 --- a/src/flexmeasures_client/client.py +++ b/src/flexmeasures_client/client.py @@ -610,9 +610,7 @@ async def get_schedule( """ params = {} if duration is not None: - params["duration"] = ( - pd.Timedelta(duration).isoformat(), - ) # for example: PT1H + params["duration"] = pd.Timedelta(duration).isoformat() # for example: PT1H if unit is not None: await self.ensure_server_version() if Version(self.server_version) < Version("0.32.0"): From 67cbd0b459fb32a9bec5dc798c2beb579fc57d91 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 11 Mar 2026 15:15:49 +0100 Subject: [PATCH 78/81] fix: misinterpreted the usage forecast scale Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_simple.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 65797a6e..8f9a1780 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -295,16 +295,12 @@ async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast): fill_level_scale=self._fill_level_scale, ) - # Scale usage forecast e.g. [0, 100] %/s -> [0, 100] %/(15 min) - scale = timedelta(minutes=15) / timedelta(seconds=1) - scaled_usage_forecast = usage_forecast * scale - await self._fm_client.post_sensor_data( sensor_id=self._usage_forecast_sensor_id, start=start_time, prior=self.now(), - values=scaled_usage_forecast.tolist(), - unit=self.power_unit, # e.g. [0, 100] MW/(15 min) + values=usage_forecast.tolist(), + unit=self.power_unit, # e.g. [0, 100] MW/(15 min) # todo: or: f"{self.energy_unit}/s" to scale usage forecast e.g. [0, 100] %/s -> [0, 100] %/(15 min) duration=str(pd.Timedelta(self._resolution) * len(usage_forecast)), ) From 6ba2337a00545bd8e7d34bab2dda116f06ee1c5a Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 11 Mar 2026 15:42:07 +0100 Subject: [PATCH 79/81] fix: CEM should only relax soc-constraints (not capacity-constraints and not site-capacity-constraints) Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 8f9a1780..d9a10cb9 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -199,7 +199,7 @@ async def trigger_schedule( "production-price": {"sensor": self._price_sensor_id}, "consumption-price": {"sensor": self._price_sensor_id}, "site-power-capacity": f"{3 * 25 * 230} VA", - "relax-constraints": True, + "relax-soc-constraints": True, }, flex_model={ "soc-unit": energy_unit, From 2711aff35ff647e1accfcf31519a8cc74488b22b Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 11 Mar 2026 15:44:11 +0100 Subject: [PATCH 80/81] fix: stop flipping the values from the actuator status (now that we use consumption_is_positive on the power sensor Signed-off-by: F.N. Claessen --- src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index d9a10cb9..3a3c98d7 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -112,7 +112,7 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): self._power_sensor_id, start=start, prior=self.now(), - values=[-power], + values=[power], unit=self.power_unit, duration=timedelta(minutes=15), ) From fbe2461df525673a8fdb26c7145c02d40055ba56 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Thu, 12 Mar 2026 10:43:22 +0100 Subject: [PATCH 81/81] feat: use local flexmeasures repo in server and worker Signed-off-by: F.N. Claessen --- docker-compose.override.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docker-compose.override.yml b/docker-compose.override.yml index 93b0e749..0b7db4a0 100644 --- a/docker-compose.override.yml +++ b/docker-compose.override.yml @@ -10,8 +10,16 @@ services: server: + volumes: + # A place for config and plugin code, and custom requirements.txt + # The 1st mount point is for running the FlexMeasures CLI, the 2nd for gunicorn + # We use :rw so flexmeasures CLI commands can write log files + - ./flexmeasures-instance/:/usr/var/flexmeasures-instance/:rw + - ./flexmeasures-instance/:/app/instance/:rw + - ../flexmeasures/flexmeasures:/app/flexmeasures:rw command: - | + pip install --break-system-packages -e /app pip install --break-system-packages -r /usr/var/flexmeasures-instance/requirements.txt pip install timely-beliefs -U --break-system-packages flexmeasures db upgrade @@ -20,8 +28,14 @@ services: fi gunicorn --bind 0.0.0.0:5000 --worker-tmp-dir /dev/shm --workers 2 --threads 4 wsgi:application worker: + volumes: + # a place for config and plugin code, and custom requirements.txt + - ./flexmeasures-instance/:/usr/var/flexmeasures-instance/:rw + - ../flexmeasures/flexmeasures:/app/flexmeasures:rw + - ./flexmeasures-instance/:/app/instance/:rw command: - | + pip install --break-system-packages -e /app pip install --break-system-packages -r /usr/var/flexmeasures-instance/requirements.txt pip install timely-beliefs -U --break-system-packages flexmeasures jobs run-worker --name flexmeasures-worker --queue forecasting\|scheduling