Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
d63afe5
dev: try using the _sending_queue instead
Flix6x Feb 25, 2026
564b8f1
Revert "dev: try using the _sending_queue instead"
Flix6x Feb 25, 2026
2db7a4f
fix: give the sending task enough time to flush the queue before the …
Flix6x Feb 25, 2026
35cc9ad
dev: use DEBUG logging level
Flix6x Feb 25, 2026
375b49c
feat: await message sent rather than message queued
Flix6x Feb 25, 2026
14f7792
fix: also close WS if closing CEM
Flix6x Feb 25, 2026
4799585
feat: skip setting up the toy account
Flix6x Feb 25, 2026
89d4338
feat: post prices in background task
Flix6x Feb 25, 2026
60ff6af
Revert "feat: skip setting up the toy account"
Flix6x Feb 25, 2026
404ed6b
chore: use modern method name
Flix6x Feb 25, 2026
5d4de10
fix: fall back on now in case FRBCActuatorStatus.transition_timestamp…
Flix6x Feb 25, 2026
f8ea658
fix: actuator status unit
Flix6x Feb 25, 2026
d59f340
feat: skip setting up the toy account
Flix6x Feb 25, 2026
6abd2a1
fix: copy-paste mistake
Flix6x Feb 25, 2026
5328de9
fix: discharge unit
Flix6x Feb 25, 2026
eae232b
docs: update instruction to run docker-compose stack
Flix6x Feb 25, 2026
0e72c25
Revert "fix: discharge unit"
Flix6x Feb 25, 2026
abb0a74
fix: schedule power sensor instead of dimensionless discharge sensor
Flix6x Feb 25, 2026
bbaac73
feat: trigger schedule with each storage status (not yet rate limited…
Flix6x Feb 25, 2026
374e44a
feat: post 1 year of data in background tasks
Flix6x Feb 25, 2026
a1d6ba7
Revert "feat: post 1 year of data in background tasks"
Flix6x Feb 25, 2026
f8e0d62
fix: set prior knowledge of prices and test with now
Flix6x Feb 25, 2026
be6d8d3
fix: floor the schedule start
Flix6x Feb 25, 2026
5334c0c
fix: messages should now be routed through cem.send_message
Flix6x Feb 25, 2026
2845036
fix: messages should now be routed through cem.send_message; update h…
Flix6x Feb 25, 2026
e01176d
feat: roll 3 days of test prices
Flix6x Feb 25, 2026
c780f98
style: black, isort
Flix6x Feb 25, 2026
26f4778
style: black
Flix6x Feb 25, 2026
c691cf2
Merge remote-tracking branch 'origin/main' into dev/fix-handshake-han…
Flix6x Feb 26, 2026
5fb3a26
feat: make schedules appear with consumption on the positive axis
Flix6x Feb 26, 2026
508611b
fix: update _sending_queue.put to send_message in FillRateBasedContro…
Flix6x Feb 26, 2026
07b55c5
feat: port send_fill_level_target_profile
Flix6x Feb 27, 2026
872224c
feat: save scheduled state-of-charge, too
Flix6x Feb 27, 2026
0b96cc5
feat: set sensors_to_show on CEM asset
Flix6x Feb 27, 2026
5ef6f64
fix: type annotation
Flix6x Feb 27, 2026
010d584
fix: flex-model soc-unit
Flix6x Feb 27, 2026
c389510
fix: get rid of valid_from_shift
Flix6x Feb 27, 2026
e5ad8a0
dev: log used SystemDescription
Flix6x Feb 27, 2026
54dd188
style: black
Flix6x Feb 27, 2026
2819e4f
feat: port send_usage_forecast
Flix6x Feb 27, 2026
6cf71ad
feat: relax constraints
Flix6x Feb 27, 2026
b78fbc8
feat: we already remove scheduled power values that are not a change …
Flix6x Feb 27, 2026
baac3bb
dev: debug log JSON instructions
Flix6x Feb 27, 2026
0577d53
dev: speed up polling for simulations
Flix6x Feb 27, 2026
b99b39b
docs: add instruction to create an admin user
Flix6x Feb 27, 2026
7ce4c35
refactor: rename variable
Flix6x Feb 27, 2026
806dae0
style: isort
Flix6x Feb 27, 2026
5be4b64
fix: mypy
Flix6x Feb 27, 2026
abb68e0
chore: resolve implicit todo
Flix6x Feb 27, 2026
59108c0
fix: obvious typo
Flix6x Feb 27, 2026
92fd551
fix: debug log instead of error log
Flix6x Feb 27, 2026
1b8d4c9
feat: port send_leakage_behaviour
Flix6x Feb 27, 2026
fe02d33
feat: CEM supports setting simulation time, by wrapping the S2 messag…
Flix6x Mar 2, 2026
fd91a62
style: isort, flake8
Flix6x Mar 2, 2026
e8e6006
feat: record data in FlexMeasures as if it was recorded at simulation…
Flix6x Mar 2, 2026
14bf5b8
style: silence mypy on overwriting a method with a lambda function
Flix6x Mar 2, 2026
7e6b29a
feat: force new scheduling job creation when using a prior
Flix6x Mar 9, 2026
a89a41e
fix: upgrade timely-beliefs to fix scheduler bug with resampling from…
Flix6x Mar 9, 2026
31bddec
fix: pass prior to trigger_and_get_schedule
Flix6x Mar 10, 2026
906a7f5
fix: start schedule from the time of the most recent storage status
Flix6x Mar 10, 2026
f61fd60
style: black
Flix6x Mar 10, 2026
2d5b0fe
fix: still make sure to run `flexmeasures add toy-account` once
Flix6x Mar 10, 2026
eda6b28
feat: add ability to customize fill_level_scale
Flix6x Mar 10, 2026
1abdd71
dev: remove todo (soc-at-start is now actually coming from the latest…
Flix6x Mar 10, 2026
9f20ab4
feat: derive consumption-capacity and production-capacity from operat…
Flix6x Mar 10, 2026
ea68aa0
docs: update docstring
Flix6x Mar 10, 2026
605980a
feat: move to J and W as default energy unit and power unit, respecti…
Flix6x Mar 10, 2026
c2bab5d
fix: wrong conversion
Flix6x Mar 10, 2026
00f363a
docs: clarify inline note
Flix6x Mar 10, 2026
80b2f61
refactor: prepare for more params
Flix6x Mar 10, 2026
2cc3b75
feat: support getting a schedule in a given unit
Flix6x Mar 10, 2026
3a547b1
feat: require minimum version for getting a schedule in a given unit
Flix6x Mar 10, 2026
d29e64c
fix: get schedule in the assumed power unit
Flix6x Mar 10, 2026
fca2c5b
feat: note the current FM server version
Flix6x Mar 10, 2026
fb6d947
fix: actually requires v0.32.0
Flix6x Mar 10, 2026
7d4c0c0
docs: clarify that the server ignores the parameter, not the client
Flix6x Mar 10, 2026
cc28d77
style: black
Flix6x Mar 10, 2026
0760825
fix: mistake while refactoring (or from running black?)
Flix6x Mar 10, 2026
67cbd0b
fix: misinterpreted the usage forecast scale
Flix6x Mar 11, 2026
6ba2337
fix: CEM should only relax soc-constraints (not capacity-constraints …
Flix6x Mar 11, 2026
2711aff
fix: stop flipping the values from the actuator status (now that we u…
Flix6x Mar 11, 2026
fbe2461
feat: use local flexmeasures repo in server and worker
Flix6x Mar 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions docker-compose.override.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# ------------------------------------------------------------------
# This allow to run the S2 CEM from your local FlexMeasures Client code in a docker compose stack.
# Assuming you have flexmeasures the repo next to your flexmeasures-client repo,
# run this from the flexmeasures folder (which contains the Dockerfile):
# docker compose \
# -f docker-compose.yml \
# -f ../flexmeasures-client/docker-compose.override.yml \
# up
# ------------------------------------------------------------------

services:
server:
volumes:
# A place for config and plugin code, and custom requirements.txt
# The 1st mount point is for running the FlexMeasures CLI, the 2nd for gunicorn
# We use :rw so flexmeasures CLI commands can write log files
- ./flexmeasures-instance/:/usr/var/flexmeasures-instance/:rw
- ./flexmeasures-instance/:/app/instance/:rw
- ../flexmeasures/flexmeasures:/app/flexmeasures:rw
command:
- |
pip install --break-system-packages -e /app
pip install --break-system-packages -r /usr/var/flexmeasures-instance/requirements.txt
pip install timely-beliefs -U --break-system-packages
flexmeasures db upgrade
if ! flexmeasures show accounts | grep -q "Docker Toy Account"; then
flexmeasures add toy-account --name 'Docker Toy Account'
fi
gunicorn --bind 0.0.0.0:5000 --worker-tmp-dir /dev/shm --workers 2 --threads 4 wsgi:application
worker:
volumes:
# a place for config and plugin code, and custom requirements.txt
- ./flexmeasures-instance/:/usr/var/flexmeasures-instance/:rw
- ../flexmeasures/flexmeasures:/app/flexmeasures:rw
- ./flexmeasures-instance/:/app/instance/:rw
command:
- |
pip install --break-system-packages -e /app
pip install --break-system-packages -r /usr/var/flexmeasures-instance/requirements.txt
pip install timely-beliefs -U --break-system-packages
flexmeasures jobs run-worker --name flexmeasures-worker --queue forecasting\|scheduling
cem:
build:
context: .
dockerfile: Dockerfile
depends_on:
- server
restart: always
ports:
- "8080:8080" # aiohttp default; adjust if you change it
environment:
# Optional, but useful if you later make this configurable
FLEXMEASURES_BASE_URL: http://server:5000
FLEXMEASURES_USER: toy-user@flexmeasures.io
FLEXMEASURES_PASSWORD: toy-password
LOGGING_LEVEL: DEBUG
volumes:
# If flexmeasures_client lives in your repo and you want live edits
- ../flexmeasures-client:/app/flexmeasures-client:rw
entrypoint: ["/bin/sh", "-c"]
command:
- |
# pip install --break-system-packages --no-cache-dir "git+https://github.com/FlexMeasures/flexmeasures-client.git@main#egg=flexmeasures-client[s2]"
pip install --break-system-packages -e /app/flexmeasures-client[s2]
python3 /app/flexmeasures-client/src/flexmeasures_client/s2/script/websockets_server.py
35 changes: 0 additions & 35 deletions docker-compose.yml

This file was deleted.

12 changes: 10 additions & 2 deletions docs/CEM.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ Then point your Resource Managers (RMs) to ``http://localhost:8080/ws`` and run:

python3 flexmeasures_client/s2/scripts/websockets_server.py

We also included a ``docker-compose.yaml`` that can be used to set up the CEM including the FlexMeasures server, creating a fully self-hosted HEMS.
We also included a ``docker-compose.override.yaml`` that can be used to set up the CEM including the FlexMeasures server, creating a fully self-hosted HEMS.
Assuming your ``flexmeasures`` and ``flexmeasures-client`` repo folders are located side by side, run this from your flexmeasures folder:

.. code-block:: bash

docker compose \
-f docker-compose.yml \
-f ../flexmeasures-client/docker-compose.yml \
-f ../flexmeasures-client/docker-compose.override.yml \
up


Expand All @@ -40,6 +40,14 @@ To test, run the included example RM:

python3 flexmeasures_client/s2/script/websockets_client.py

For full access via the UI, create an admin user for the Docker Toy Account (here, we assume it has ID 1):

.. code-block:: bash

docker exec -it flexmeasures-server-1 bash
flexmeasures show accounts
flexmeasures add user --roles admin --account 1 --email <email> --username <username>

Disclaimer
==========

Expand Down
27 changes: 19 additions & 8 deletions src/flexmeasures_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ async def get_schedule(
sensor_id: int,
schedule_id: str,
duration: str | timedelta | None = None,
unit: str | None = None,
) -> dict:
"""Get schedule with given ID.

Expand All @@ -607,12 +608,17 @@ async def get_schedule(
'unit': 'MW'
}
"""
params = {}
if duration is not None:
params = {
"duration": pd.Timedelta(duration).isoformat(), # for example: PT1H
}
else:
params = {}
params["duration"] = pd.Timedelta(duration).isoformat() # for example: PT1H
if unit is not None:
await self.ensure_server_version()
if Version(self.server_version) < Version("0.32.0"):
self.logger.warning(
"get_schedule(): The 'unit' parameter requires FlexMeasures server version 0.31.0 or above. "
f"This parameter will be ignored by the server, which is at version {self.server_version}."
)
params["unit"] = unit
schedule, status = await self.request(
uri=f"sensors/{sensor_id}/schedules/{schedule_id}",
method="GET",
Expand Down Expand Up @@ -746,7 +752,7 @@ async def get_assets(
if Version(self.server_version) < Version("0.31.0"):
self.logger.warning(
"get_assets(): The 'root', 'depth' and 'fields' parameters require FlexMeasures server version 0.31.0 or above. "
"These parameters will be ignored."
f"These parameters will be ignored for server version {self.server_version}."
)
if root and isinstance(root, int):
uri += f"&root={root}"
Expand Down Expand Up @@ -831,6 +837,7 @@ async def trigger_and_get_schedule(
asset_id: int | None = None,
prior: datetime | None = None,
scheduler: str | None = None,
unit: str | None = None,
) -> dict | list[dict]:
"""Trigger a schedule and then fetch it.

Expand Down Expand Up @@ -866,7 +873,10 @@ async def trigger_and_get_schedule(
if sensor_id is not None:
# Get the schedule for a single device
return await self.get_schedule(
sensor_id=sensor_id, schedule_id=schedule_id, duration=duration
sensor_id=sensor_id,
schedule_id=schedule_id,
duration=duration,
unit=unit,
)
elif flex_model is None:
# If there is no flex-model referencing power sensors, no power schedules are retrieved
Expand Down Expand Up @@ -1115,7 +1125,7 @@ async def update_asset(self, asset_id: int, updates: dict) -> dict:
if Version(self.server_version) < Version("0.31.0"):
self.logger.warning(
"update_asset(): The 'aggregate-power' flex-context field requires FlexMeasures server version 0.31.0 or above. "
"The 'aggregate-power' field will be ignored by the server."
f"The 'aggregate-power' field will be ignored by the server, which is at version {self.server_version}."
)
updates["flex_context"] = json.dumps(updates["flex_context"])
if "flex_model" in updates:
Expand Down Expand Up @@ -1228,6 +1238,7 @@ async def trigger_schedule(

if prior is not None:
message["prior"] = pd.Timestamp(prior).isoformat()
message["force_new_job_creation"] = True
if scheduler is not None:
if asset_id is None:
raise ValueError(
Expand Down
47 changes: 32 additions & 15 deletions src/flexmeasures_client/s2/cem.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class CEM(Handler):
] # maps the CommodityQuantity power measurement sensors to FM sensor IDs

_fm_client: FlexMeasuresClient
_sending_queue: Queue[pydantic.BaseModel]
_sending_queue: asyncio.Queue[tuple[pydantic.BaseModel, asyncio.Future]]

_timers: dict[str, datetime]
_datastore: dict
Expand Down Expand Up @@ -159,8 +159,8 @@ def register_control_type(self, control_type_handler: ControlTypeHandler):
# add fm_client to control_type handler
control_type_handler._fm_client = self._fm_client

# add sending queue
control_type_handler._sending_queue = self._sending_queue
# add send_message method so the handler can send messages
control_type_handler.send_message = self.send_message

# Add logger
control_type_handler._logger = self._logger
Expand All @@ -185,7 +185,19 @@ async def handle_message(self, message: Dict | pydantic.BaseModel | str):
if isinstance(message, str):
message = json.loads(message)

self._logger.debug(f"Received: {message}")
# Detect wrapper
if isinstance(message, dict) and "message" in message and "metadata" in message:
metadata = message["metadata"]
message = message["message"]
self._logger.debug("Received wrapped message")
self._logger.debug(f"Received message: {message}")
self._logger.debug(f"Received metadata: {metadata}")
if "dt" in metadata:
for control_type in self._control_types_handlers.values():
control_type.now = lambda: metadata["dt"] # type: ignore
self.now = lambda: metadata["dt"] # type: ignore
else:
self._logger.debug(f"Received: {message}")

# try to handle the message with the control_type handle
if (
Expand Down Expand Up @@ -216,7 +228,7 @@ async def handle_message(self, message: Dict | pydantic.BaseModel | str):
)

if response is not None:
await self._sending_queue.put(response)
await self.send_message(response)

def update_control_type(self, control_type: ControlType):
"""
Expand All @@ -225,21 +237,24 @@ def update_control_type(self, control_type: ControlType):
"""
self._control_type = control_type

async def get_message(self) -> str:
async def get_message(self) -> tuple[str, asyncio.Future]:
"""Call this function to get the messages to be sent to the RM

Returns:
str: message in JSON format
"""

message = await self._sending_queue.get()
await asyncio.sleep(0.3)
item = await self._sending_queue.get()

if not isinstance(item, tuple) or len(item) != 2:
raise RuntimeError(
"Invalid item in sending queue. All messages must go through send_message() rather than _sending_queue.put()."
)

# Pending for pydantic V2 to implement model.model_dump(mode="json") in
# PR #1409 (https://github.com/pydantic/pydantic/issues/1409)
message = json.loads(message.json())
message, fut = item
message = message.model_dump(mode="json")

return message
return message, fut

async def activate_control_type(
self, control_type: ControlType
Expand Down Expand Up @@ -279,8 +294,7 @@ async def activate_control_type(
].register_success_callbacks(
message_id, self.update_control_type, control_type=control_type
)

await self._sending_queue.put(
await self.send_message(
SelectControlType(message_id=message_id, control_type=control_type)
)
return None
Expand Down Expand Up @@ -425,8 +439,11 @@ def handle_revoke_object(self, message: RevokeObject):
return get_reception_status(message, ReceptionStatusValues.OK)

async def send_message(self, message):
loop = asyncio.get_running_loop()
fut = loop.create_future()
self._logger.debug(f"Sent: {message}")
await self._sending_queue.put(message)
await self._sending_queue.put((message, fut))
await fut # wait until actually sent


def get_commodity_unit(commodity_quantity) -> str:
Expand Down
2 changes: 1 addition & 1 deletion src/flexmeasures_client/s2/control_types/FRBC/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,4 @@ async def trigger_schedule(self, system_description_id: str):
)

# put instruction into the sending queue
await self._sending_queue.put(instruction)
await self.send_message(instruction)
Loading
Loading