diff --git a/docker-compose.override.yml b/docker-compose.override.yml new file mode 100644 index 00000000..0b7db4a0 --- /dev/null +++ b/docker-compose.override.yml @@ -0,0 +1,65 @@ +# ------------------------------------------------------------------ +# This allow to run the S2 CEM from your local FlexMeasures Client code in a docker compose stack. +# Assuming you have flexmeasures the repo next to your flexmeasures-client repo, +# run this from the flexmeasures folder (which contains the Dockerfile): +# docker compose \ +# -f docker-compose.yml \ +# -f ../flexmeasures-client/docker-compose.override.yml \ +# up +# ------------------------------------------------------------------ + +services: + server: + volumes: + # A place for config and plugin code, and custom requirements.txt + # The 1st mount point is for running the FlexMeasures CLI, the 2nd for gunicorn + # We use :rw so flexmeasures CLI commands can write log files + - ./flexmeasures-instance/:/usr/var/flexmeasures-instance/:rw + - ./flexmeasures-instance/:/app/instance/:rw + - ../flexmeasures/flexmeasures:/app/flexmeasures:rw + command: + - | + pip install --break-system-packages -e /app + pip install --break-system-packages -r /usr/var/flexmeasures-instance/requirements.txt + pip install timely-beliefs -U --break-system-packages + flexmeasures db upgrade + if ! flexmeasures show accounts | grep -q "Docker Toy Account"; then + flexmeasures add toy-account --name 'Docker Toy Account' + fi + gunicorn --bind 0.0.0.0:5000 --worker-tmp-dir /dev/shm --workers 2 --threads 4 wsgi:application + worker: + volumes: + # a place for config and plugin code, and custom requirements.txt + - ./flexmeasures-instance/:/usr/var/flexmeasures-instance/:rw + - ../flexmeasures/flexmeasures:/app/flexmeasures:rw + - ./flexmeasures-instance/:/app/instance/:rw + command: + - | + pip install --break-system-packages -e /app + pip install --break-system-packages -r /usr/var/flexmeasures-instance/requirements.txt + pip install timely-beliefs -U --break-system-packages + flexmeasures jobs run-worker --name flexmeasures-worker --queue forecasting\|scheduling + cem: + build: + context: . + dockerfile: Dockerfile + depends_on: + - server + restart: always + ports: + - "8080:8080" # aiohttp default; adjust if you change it + environment: + # Optional, but useful if you later make this configurable + FLEXMEASURES_BASE_URL: http://server:5000 + FLEXMEASURES_USER: toy-user@flexmeasures.io + FLEXMEASURES_PASSWORD: toy-password + LOGGING_LEVEL: DEBUG + volumes: + # If flexmeasures_client lives in your repo and you want live edits + - ../flexmeasures-client:/app/flexmeasures-client:rw + entrypoint: ["/bin/sh", "-c"] + command: + - | + # pip install --break-system-packages --no-cache-dir "git+https://github.com/FlexMeasures/flexmeasures-client.git@main#egg=flexmeasures-client[s2]" + pip install --break-system-packages -e /app/flexmeasures-client[s2] + python3 /app/flexmeasures-client/src/flexmeasures_client/s2/script/websockets_server.py diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 8068b475..00000000 --- a/docker-compose.yml +++ /dev/null @@ -1,35 +0,0 @@ -# ------------------------------------------------------------------ -# This allow to run the S2 CEM from your local FlexMeasures Client code in a docker compose stack. -# Assuming you have flexmeasures the repo next to your flexmeasures-client repo, -# run this from the flexmeasures folder (which contains the Dockerfile): -# docker compose \ -# -f docker-compose.yml \ -# -f ../flexmeasures-client/docker-compose.yml \ -# up -# ------------------------------------------------------------------ - -services: - cem: - build: - context: . - dockerfile: Dockerfile - depends_on: - - server - restart: always - ports: - - "8080:8080" # aiohttp default; adjust if you change it - environment: - # Optional, but useful if you later make this configurable - FLEXMEASURES_BASE_URL: http://server:5000 - FLEXMEASURES_USER: toy-user@flexmeasures.io - FLEXMEASURES_PASSWORD: toy-password - LOGGING_LEVEL: INFO - volumes: - # If flexmeasures_client lives in your repo and you want live edits - - ../flexmeasures-client:/app/flexmeasures-client:rw - entrypoint: ["/bin/sh", "-c"] - command: - - | - # pip install --break-system-packages --no-cache-dir "git+https://github.com/FlexMeasures/flexmeasures-client.git@main#egg=flexmeasures-client[s2]" - pip install --break-system-packages -e /app/flexmeasures-client[s2] - python3 /app/flexmeasures-client/src/flexmeasures_client/s2/script/websockets_server.py diff --git a/docs/CEM.rst b/docs/CEM.rst index 72661c75..5a604d45 100644 --- a/docs/CEM.rst +++ b/docs/CEM.rst @@ -15,14 +15,14 @@ Then point your Resource Managers (RMs) to ``http://localhost:8080/ws`` and run: python3 flexmeasures_client/s2/scripts/websockets_server.py -We also included a ``docker-compose.yaml`` that can be used to set up the CEM including the FlexMeasures server, creating a fully self-hosted HEMS. +We also included a ``docker-compose.override.yaml`` that can be used to set up the CEM including the FlexMeasures server, creating a fully self-hosted HEMS. Assuming your ``flexmeasures`` and ``flexmeasures-client`` repo folders are located side by side, run this from your flexmeasures folder: .. code-block:: bash docker compose \ -f docker-compose.yml \ - -f ../flexmeasures-client/docker-compose.yml \ + -f ../flexmeasures-client/docker-compose.override.yml \ up @@ -40,6 +40,14 @@ To test, run the included example RM: python3 flexmeasures_client/s2/script/websockets_client.py +For full access via the UI, create an admin user for the Docker Toy Account (here, we assume it has ID 1): + +.. code-block:: bash + + docker exec -it flexmeasures-server-1 bash + flexmeasures show accounts + flexmeasures add user --roles admin --account 1 --email --username + Disclaimer ========== diff --git a/src/flexmeasures_client/client.py b/src/flexmeasures_client/client.py index 395fa109..7b3cc149 100644 --- a/src/flexmeasures_client/client.py +++ b/src/flexmeasures_client/client.py @@ -596,6 +596,7 @@ async def get_schedule( sensor_id: int, schedule_id: str, duration: str | timedelta | None = None, + unit: str | None = None, ) -> dict: """Get schedule with given ID. @@ -607,12 +608,17 @@ async def get_schedule( 'unit': 'MW' } """ + params = {} if duration is not None: - params = { - "duration": pd.Timedelta(duration).isoformat(), # for example: PT1H - } - else: - params = {} + params["duration"] = pd.Timedelta(duration).isoformat() # for example: PT1H + if unit is not None: + await self.ensure_server_version() + if Version(self.server_version) < Version("0.32.0"): + self.logger.warning( + "get_schedule(): The 'unit' parameter requires FlexMeasures server version 0.31.0 or above. " + f"This parameter will be ignored by the server, which is at version {self.server_version}." + ) + params["unit"] = unit schedule, status = await self.request( uri=f"sensors/{sensor_id}/schedules/{schedule_id}", method="GET", @@ -746,7 +752,7 @@ async def get_assets( if Version(self.server_version) < Version("0.31.0"): self.logger.warning( "get_assets(): The 'root', 'depth' and 'fields' parameters require FlexMeasures server version 0.31.0 or above. " - "These parameters will be ignored." + f"These parameters will be ignored for server version {self.server_version}." ) if root and isinstance(root, int): uri += f"&root={root}" @@ -831,6 +837,7 @@ async def trigger_and_get_schedule( asset_id: int | None = None, prior: datetime | None = None, scheduler: str | None = None, + unit: str | None = None, ) -> dict | list[dict]: """Trigger a schedule and then fetch it. @@ -866,7 +873,10 @@ async def trigger_and_get_schedule( if sensor_id is not None: # Get the schedule for a single device return await self.get_schedule( - sensor_id=sensor_id, schedule_id=schedule_id, duration=duration + sensor_id=sensor_id, + schedule_id=schedule_id, + duration=duration, + unit=unit, ) elif flex_model is None: # If there is no flex-model referencing power sensors, no power schedules are retrieved @@ -1115,7 +1125,7 @@ async def update_asset(self, asset_id: int, updates: dict) -> dict: if Version(self.server_version) < Version("0.31.0"): self.logger.warning( "update_asset(): The 'aggregate-power' flex-context field requires FlexMeasures server version 0.31.0 or above. " - "The 'aggregate-power' field will be ignored by the server." + f"The 'aggregate-power' field will be ignored by the server, which is at version {self.server_version}." ) updates["flex_context"] = json.dumps(updates["flex_context"]) if "flex_model" in updates: @@ -1228,6 +1238,7 @@ async def trigger_schedule( if prior is not None: message["prior"] = pd.Timestamp(prior).isoformat() + message["force_new_job_creation"] = True if scheduler is not None: if asset_id is None: raise ValueError( diff --git a/src/flexmeasures_client/s2/cem.py b/src/flexmeasures_client/s2/cem.py index ecbdc8fa..d0594783 100644 --- a/src/flexmeasures_client/s2/cem.py +++ b/src/flexmeasures_client/s2/cem.py @@ -59,7 +59,7 @@ class CEM(Handler): ] # maps the CommodityQuantity power measurement sensors to FM sensor IDs _fm_client: FlexMeasuresClient - _sending_queue: Queue[pydantic.BaseModel] + _sending_queue: asyncio.Queue[tuple[pydantic.BaseModel, asyncio.Future]] _timers: dict[str, datetime] _datastore: dict @@ -159,8 +159,8 @@ def register_control_type(self, control_type_handler: ControlTypeHandler): # add fm_client to control_type handler control_type_handler._fm_client = self._fm_client - # add sending queue - control_type_handler._sending_queue = self._sending_queue + # add send_message method so the handler can send messages + control_type_handler.send_message = self.send_message # Add logger control_type_handler._logger = self._logger @@ -185,7 +185,19 @@ async def handle_message(self, message: Dict | pydantic.BaseModel | str): if isinstance(message, str): message = json.loads(message) - self._logger.debug(f"Received: {message}") + # Detect wrapper + if isinstance(message, dict) and "message" in message and "metadata" in message: + metadata = message["metadata"] + message = message["message"] + self._logger.debug("Received wrapped message") + self._logger.debug(f"Received message: {message}") + self._logger.debug(f"Received metadata: {metadata}") + if "dt" in metadata: + for control_type in self._control_types_handlers.values(): + control_type.now = lambda: metadata["dt"] # type: ignore + self.now = lambda: metadata["dt"] # type: ignore + else: + self._logger.debug(f"Received: {message}") # try to handle the message with the control_type handle if ( @@ -216,7 +228,7 @@ async def handle_message(self, message: Dict | pydantic.BaseModel | str): ) if response is not None: - await self._sending_queue.put(response) + await self.send_message(response) def update_control_type(self, control_type: ControlType): """ @@ -225,21 +237,24 @@ def update_control_type(self, control_type: ControlType): """ self._control_type = control_type - async def get_message(self) -> str: + async def get_message(self) -> tuple[str, asyncio.Future]: """Call this function to get the messages to be sent to the RM Returns: str: message in JSON format """ - message = await self._sending_queue.get() - await asyncio.sleep(0.3) + item = await self._sending_queue.get() + + if not isinstance(item, tuple) or len(item) != 2: + raise RuntimeError( + "Invalid item in sending queue. All messages must go through send_message() rather than _sending_queue.put()." + ) - # Pending for pydantic V2 to implement model.model_dump(mode="json") in - # PR #1409 (https://github.com/pydantic/pydantic/issues/1409) - message = json.loads(message.json()) + message, fut = item + message = message.model_dump(mode="json") - return message + return message, fut async def activate_control_type( self, control_type: ControlType @@ -279,8 +294,7 @@ async def activate_control_type( ].register_success_callbacks( message_id, self.update_control_type, control_type=control_type ) - - await self._sending_queue.put( + await self.send_message( SelectControlType(message_id=message_id, control_type=control_type) ) return None @@ -425,8 +439,11 @@ def handle_revoke_object(self, message: RevokeObject): return get_reception_status(message, ReceptionStatusValues.OK) async def send_message(self, message): + loop = asyncio.get_running_loop() + fut = loop.create_future() self._logger.debug(f"Sent: {message}") - await self._sending_queue.put(message) + await self._sending_queue.put((message, fut)) + await fut # wait until actually sent def get_commodity_unit(commodity_quantity) -> str: diff --git a/src/flexmeasures_client/s2/control_types/FRBC/__init__.py b/src/flexmeasures_client/s2/control_types/FRBC/__init__.py index 2b4ccf15..94c85b43 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/__init__.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/__init__.py @@ -196,4 +196,4 @@ async def trigger_schedule(self, system_description_id: str): ) # put instruction into the sending queue - await self._sending_queue.put(instruction) + await self.send_message(instruction) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 6e81f291..3a3c98d7 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -5,13 +5,17 @@ from datetime import datetime, timedelta +import pandas as pd import pytz try: from s2python.frbc import ( FRBCActuatorStatus, + FRBCFillLevelTargetProfile, + FRBCLeakageBehaviour, FRBCStorageStatus, FRBCSystemDescription, + FRBCUsageForecast, ) except ImportError: raise ImportError( @@ -25,6 +29,11 @@ fm_schedule_to_instructions, get_soc_min_max, ) +from flexmeasures_client.s2.control_types.translations import ( + leakage_behaviour_to_storage_efficiency, + translate_fill_level_target_profile, + translate_usage_forecast_to_fm, +) class FRBCSimple(FRBC): @@ -32,8 +41,13 @@ class FRBCSimple(FRBC): _price_sensor_id: int _soc_sensor_id: int _rm_discharge_sensor_id: int + _soc_minima_sensor_id: int + _soc_maxima_sensor_id: int + _usage_forecast_sensor_id: int + _leakage_behaviour_sensor_id: int _schedule_duration: timedelta - _valid_from_shift: timedelta + _fill_level_scale: float + _resolution = "15min" def __init__( self, @@ -41,10 +55,16 @@ def __init__( soc_sensor_id: int, rm_discharge_sensor_id: int, price_sensor_id: int, + soc_minima_sensor_id: int, + soc_maxima_sensor_id: int, + usage_forecast_sensor_id: int, + leakage_behaviour_sensor_id: int, timezone: str = "UTC", schedule_duration: timedelta = timedelta(hours=12), max_size: int = 100, - valid_from_shift: timedelta = timedelta(days=1), + fill_level_scale: float = 1, + power_unit: str = "W", + energy_unit: str = "J", ) -> None: super().__init__(max_size) self._power_sensor_id = power_sensor_id @@ -52,23 +72,29 @@ def __init__( self._schedule_duration = schedule_duration self._soc_sensor_id = soc_sensor_id self._rm_discharge_sensor_id = rm_discharge_sensor_id + self._soc_minima_sensor_id = soc_minima_sensor_id + self._soc_maxima_sensor_id = soc_maxima_sensor_id + self._usage_forecast_sensor_id = usage_forecast_sensor_id + self._leakage_behaviour_sensor_id = leakage_behaviour_sensor_id self._timezone = pytz.timezone(timezone) - - # delay the start of the schedule from the time `valid_from` - # of the FRBC.SystemDescription. - self._valid_from_shift = valid_from_shift + self._fill_level_scale = fill_level_scale + self.power_unit = power_unit + self.energy_unit = energy_unit def now(self): return self._timezone.localize(datetime.now()) async def send_storage_status(self, status: FRBCStorageStatus): - await self._fm_client.post_measurements( + now = self.now() + await self._fm_client.post_sensor_data( self._soc_sensor_id, - start=self.now(), - values=[status.present_fill_level], - unit="MWh", + start=now, + prior=now, + values=[status.present_fill_level * self._fill_level_scale], + unit=self.energy_unit, duration=timedelta(minutes=1), ) + await self.trigger_schedule(now) async def send_actuator_status(self, status: FRBCActuatorStatus): factor = status.operation_mode_factor @@ -78,61 +104,225 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): power = ( fill_rate.start_of_range + (fill_rate.end_of_range - fill_rate.start_of_range) * factor - ) + ) * self._fill_level_scale - dt = status.transition_timestamp # self.now() + start = status.transition_timestamp or self.now() - await self._fm_client.post_measurements( - self._rm_discharge_sensor_id, - start=dt, - values=[-power], - unit="MWh", + await self._fm_client.post_sensor_data( + self._power_sensor_id, + start=start, + prior=self.now(), + values=[power], + unit=self.power_unit, duration=timedelta(minutes=15), ) - async def trigger_schedule(self, system_description_id: str): - """Translates S2 System Description into FM API calls""" + async def trigger_schedule( + self, start: datetime, system_description_id: str | None = None + ): + """ + Ask FlexMeasures for a new schedule and create FRBC.Instructions to send back to the ResourceManager + """ - system_description: FRBCSystemDescription = self._system_description_history[ - system_description_id - ] + if system_description_id: + system_description: FRBCSystemDescription = ( + self._system_description_history[system_description_id] + ) + else: + # Use last SystemDescription + system_description: FRBCSystemDescription = list( + self._system_description_history.values() + )[-1] + system_descriptions = self._system_description_history.values() + self._logger.debug( + list( + [ + system_description.valid_from + for system_description in system_descriptions + ] + ) + ) + self._logger.debug(f"Using system description: {system_description}") if len(self._storage_status_history) > 0: - soc_at_start = list(self._storage_status_history.values())[ - -1 - ].present_fill_level + soc_at_start = ( + list(self._storage_status_history.values())[-1].present_fill_level + * self._fill_level_scale + ) else: print("Can't trigger schedule without knowing the status of the storage...") return - soc_min, soc_max = get_soc_min_max(system_description) + # Assume a single actuator + actuator = system_description.actuators[0] + + # Derive the overall power range + charging_capacity = None + discharging_capacity = None + for operation_mode in actuator.operation_modes: + for element in operation_mode.elements: + for power_range in element.power_ranges: + # todo: distinguish power range per commodity + p_min = power_range.end_of_range + if discharging_capacity is None: + discharging_capacity = p_min + else: + discharging_capacity = min(discharging_capacity, p_min) + p_max = power_range.start_of_range + if charging_capacity is None: + charging_capacity = p_max + else: + charging_capacity = max(charging_capacity, p_max) + + soc_min, soc_max = get_soc_min_max(system_description, self._fill_level_scale) + + # Support for J energy unit (FM server scheduling trigger endpoint only accepts kWh and MWh) + if self.energy_unit == "J": + f = 3.6 * 10**6 + energy_unit = "kWh" + soc_at_start /= f + soc_min /= f + soc_max /= f + else: + energy_unit = self.energy_unit # call schedule + if isinstance(start, str): + start = pd.Timestamp(start) schedule = await self._fm_client.trigger_and_get_schedule( - start=system_description.valid_from - + self._valid_from_shift, # TODO: localize datetime + start=start.replace( + minute=(start.minute // 15) * 15, second=0, microsecond=0 + ), + prior=start, sensor_id=self._power_sensor_id, flex_context={ "production-price": {"sensor": self._price_sensor_id}, "consumption-price": {"sensor": self._price_sensor_id}, "site-power-capacity": f"{3 * 25 * 230} VA", + "relax-soc-constraints": True, }, flex_model={ - "soc-unit": "MWh", - "soc-at-start": soc_at_start, # TODO: use forecast of the SOC instead + "soc-unit": energy_unit, + "soc-at-start": soc_at_start, "soc-min": soc_min, "soc-max": soc_max, + "soc-minima": {"sensor": self._soc_minima_sensor_id}, + "soc-maxima": {"sensor": self._soc_maxima_sensor_id}, + "state-of-charge": {"sensor": self._soc_sensor_id}, + "soc-usage": [{"sensor": self._usage_forecast_sensor_id}], + "storage-efficiency": {"sensor": self._leakage_behaviour_sensor_id}, + "consumption-capacity": f"{charging_capacity} {self.power_unit}", + "production-capacity": f"{discharging_capacity} {self.power_unit}", }, duration=self._schedule_duration, # next 12 hours # TODO: add SOC MAX AND SOC MIN FROM fill_level_range, # this needs changes on the client + unit=self.power_unit, ) # translate FlexMeasures schedule into instructions. SOC -> Power -> PowerFactor instructions = fm_schedule_to_instructions( - schedule, system_description, initial_fill_level=soc_at_start + schedule, + system_description, + initial_fill_level=soc_at_start / self._fill_level_scale, ) # put instructions to sending queue for instruction in instructions: - await self._sending_queue.put(instruction) + await self.send_message(instruction) + + async def send_fill_level_target_profile( + self, fill_level_target_profile: FRBCFillLevelTargetProfile + ): + """ + Send FRBC.FillLevelTargetProfile to FlexMeasures. + + Args: + fill_level_target_profile (FRBCFillLevelTargetProfile): The fill level target profile to be translated and sent. + """ + # if not self._is_timer_due("fill_level_target_profile"): + # return + + soc_minima, soc_maxima = translate_fill_level_target_profile( + fill_level_target_profile=fill_level_target_profile, + resolution=self._resolution, + fill_level_scale=self._fill_level_scale, + ) + + duration = str(pd.Timedelta(self._resolution) * len(soc_maxima)) + + # POST SOC Minima measurements to FlexMeasures + await self._fm_client.post_sensor_data( + sensor_id=self._soc_minima_sensor_id, + start=fill_level_target_profile.start_time, + prior=self.now(), + values=soc_minima.tolist(), + unit=self.energy_unit, + duration=duration, + ) + + # POST SOC Maxima measurements to FlexMeasures + await self._fm_client.post_sensor_data( + sensor_id=self._soc_maxima_sensor_id, + start=fill_level_target_profile.start_time, + prior=self.now(), + values=soc_maxima.tolist(), + unit=self.energy_unit, + duration=duration, + ) + + async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast): + """ + Send FRBC.UsageForecast to FlexMeasures. + + Args: + usage_forecast (FRBCUsageForecast): The usage forecast to be translated and sent. + """ + # if not self._is_timer_due("usage_forecast"): + # return + + start_time = usage_forecast.start_time + + # flooring to previous 15min tick + start_time = start_time.replace( + minute=(start_time.minute // 15) * 15, second=0, microsecond=0 + ) + + usage_forecast = translate_usage_forecast_to_fm( + usage_forecast, + self._resolution, + strategy="mean", + fill_level_scale=self._fill_level_scale, + ) + + await self._fm_client.post_sensor_data( + sensor_id=self._usage_forecast_sensor_id, + start=start_time, + prior=self.now(), + values=usage_forecast.tolist(), + unit=self.power_unit, # e.g. [0, 100] MW/(15 min) # todo: or: f"{self.energy_unit}/s" to scale usage forecast e.g. [0, 100] %/s -> [0, 100] %/(15 min) + duration=str(pd.Timedelta(self._resolution) * len(usage_forecast)), + ) + + async def send_leakage_behaviour(self, leakage: FRBCLeakageBehaviour): + # if not self._is_timer_due("leakage_behaviour"): + # return + + start = leakage.valid_from or self.now() + start = start.replace(minute=(start.minute // 15) * 15, second=0, microsecond=0) + + storage_efficiency = leakage_behaviour_to_storage_efficiency( + message=leakage, + resolution=timedelta(minutes=15), + fill_level_scale=self._fill_level_scale, + ) + self._logger.debug(storage_efficiency) + + await self._fm_client.post_sensor_data( + self._leakage_behaviour_sensor_id, + start=start, + prior=self.now(), + values=[storage_efficiency], + unit="%", + duration=timedelta(hours=48), + ) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py index 6b822791..58428840 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py @@ -123,7 +123,6 @@ def __init__( schedule_duration: timedelta = timedelta(hours=12), max_size: int = 100, fill_level_scale: float = 0.1, - valid_from_shift: timedelta = timedelta(days=1), timers: dict[str, datetime] | None = None, datastore: dict | None = None, **kwargs, @@ -154,10 +153,6 @@ def __init__( self._production_price_sensor_id = production_price_sensor self._timezone = pytz.timezone(timezone) - - # delay the start of the schedule from the time `valid_from` of the FRBC.SystemDescription - self._valid_from_shift = valid_from_shift - self._fill_level_scale = fill_level_scale self._active_recurring_schedule = False @@ -215,7 +210,7 @@ async def send_storage_status(self, status: FRBCStorageStatus): subject_message_id=status.message_id, status=ReceptionStatusValues.PERMANENT_ERROR, ) - await self._sending_queue.put(response) + await self.send_message(response) await self.trigger_schedule() async def send_leakage_behaviour(self, leakage: FRBCLeakageBehaviour): @@ -235,7 +230,7 @@ async def send_leakage_behaviour(self, leakage: FRBCLeakageBehaviour): leakage_behaviour_to_storage_efficiency( message=leakage, resolution=timedelta(minutes=15), - fill_level_scale=self._fill_level_scalefill_level_scale, + fill_level_scale=self._fill_level_scale, ) ], unit=PERCENTAGE, @@ -246,7 +241,7 @@ async def send_leakage_behaviour(self, leakage: FRBCLeakageBehaviour): subject_message_id=leakage.message_id, status=ReceptionStatusValues.PERMANENT_ERROR, ) - await self._sending_queue.put(response) + await self.send_message(response) async def send_actuator_status(self, status: FRBCActuatorStatus): if not self._is_timer_due("actuator_status"): @@ -514,12 +509,12 @@ async def trigger_schedule(self): object_id=message_id, ) self._logger.debug(f"Sending revoke instruction for {message_id}") - await self._sending_queue.put(revoke_instruction) + await self.send_message(revoke_instruction) self._datastore["instructions"] = {} # Put the instruction in the sending queue for instruction in instructions: - await self._sending_queue.put(instruction) + await self.send_message(instruction) # Store instructions for instruction in instructions: diff --git a/src/flexmeasures_client/s2/control_types/FRBC/utils.py b/src/flexmeasures_client/s2/control_types/FRBC/utils.py index 04ce1056..46cc7d4e 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/utils.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/utils.py @@ -1,4 +1,6 @@ +import json import logging +import uuid from datetime import timedelta from math import isclose from typing import List @@ -9,6 +11,7 @@ try: from s2python.common import NumberRange from s2python.frbc import ( + FRBCActuatorDescription, FRBCInstruction, FRBCLeakageBehaviour, FRBCOperationMode, @@ -155,12 +158,18 @@ def fm_schedule_to_instructions( f"{len(system_description.actuators)} were provided" ) - operation_modes: list[FRBCOperationMode] = actuator.operation_modes + actuators: dict[uuid.UUID, FRBCActuatorDescription] = { + a.id: a for a in system_description.actuators + } + operation_modes: dict[uuid.UUID, FRBCOperationMode] = { + om.id: om for om in actuator.operation_modes + } fill_level = initial_fill_level deltaT = timedelta(minutes=15) / timedelta(hours=1) + previous_instruction = None for timestamp, row in schedule.iterrows(): power = row["schedule"] usage = row.get("usage_forecast", 0) @@ -169,7 +178,7 @@ def fm_schedule_to_instructions( # Convert from power to fill rate results = [ (om, *power_to_fill_rate_with_metrics(om, power, fill_level)) - for om in operation_modes + for om in operation_modes.values() ] # Step 1: minimize fill-level penalty (primary) @@ -221,10 +230,34 @@ def fm_schedule_to_instructions( execution_time=timestamp, abnormal_condition=False, ) + if previous_instruction and all( + getattr(previous_instruction, attr) == getattr(instruction, attr) + for attr in ( + "actuator_id", + "operation_mode", + "operation_mode_factor", + "abnormal_condition", + ) + ): + logger.info("Instruction removed, no changes to previous instruction") + continue logger.info( f"Instruction created: at {timestamp} set {actuator.diagnostic_label if isinstance(actuator.diagnostic_label, str) else actuator} to {best_operation_mode.diagnostic_label if isinstance(best_operation_mode.diagnostic_label, str) else best_operation_mode} with factor {operation_mode_factor}" ) + previous_instruction = instruction instructions.append(instruction) + logger.debug( + "Instructions JSON: %s", + json.dumps( + [ + serialize_instruction( + instr, actuators=actuators, operation_modes=operation_modes + ) + for instr in instructions + ], + indent=2, + ), + ) # Update fill level fill_level = compute_next_fill_level( @@ -381,3 +414,29 @@ def explain_choice( lines.append(f"{label} (element={element_label}): rejected due to {reason}") return "; ".join(lines) + + +def serialize_instruction( + instr: FRBCInstruction, + actuators: dict[uuid.UUID, FRBCActuatorDescription], + operation_modes: dict[uuid.UUID, FRBCOperationMode], +): + """Create dict of instructions suitable for logging.""" + actuator = ( + getattr(actuators[instr.actuator_id], "diagnostic_label", None) + or instr.actuator_id + ) + operation_mode = ( + getattr(operation_modes[instr.operation_mode], "diagnostic_label", None) + or instr.operation_mode + ) + return { + "message_type": instr.message_type, + "message_id": str(instr.message_id), + "instruction_id": str(instr.id), + "actuator": str(actuator), + "operation_mode": str(operation_mode), + "operation_mode_factor": instr.operation_mode_factor, + "execution_time": instr.execution_time.isoformat(), + "abnormal_condition": instr.abnormal_condition, + } diff --git a/src/flexmeasures_client/s2/control_types/__init__.py b/src/flexmeasures_client/s2/control_types/__init__.py index b3e19f5b..429ad419 100644 --- a/src/flexmeasures_client/s2/control_types/__init__.py +++ b/src/flexmeasures_client/s2/control_types/__init__.py @@ -1,8 +1,7 @@ from __future__ import annotations -from asyncio import Queue from logging import Logger -from typing import cast +from typing import Callable, cast from pydantic import BaseModel from s2python.common import ( @@ -22,7 +21,7 @@ class ControlTypeHandler(Handler): _instruction_history: SizeLimitOrderedDict[str, BaseModel] _instruction_status_history: SizeLimitOrderedDict[str, InstructionStatus] _fm_client: FlexMeasuresClient - _sending_queue: Queue + send_message: Callable _logger: Logger def __init__(self, max_size: int = 100) -> None: diff --git a/src/flexmeasures_client/s2/script/websockets_server.py b/src/flexmeasures_client/s2/script/websockets_server.py index afceeb56..6ef67321 100644 --- a/src/flexmeasures_client/s2/script/websockets_server.py +++ b/src/flexmeasures_client/s2/script/websockets_server.py @@ -2,6 +2,8 @@ import json import logging import os +from datetime import datetime +from zoneinfo import ZoneInfo import aiohttp from aiohttp import web @@ -47,9 +49,13 @@ async def websocket_producer(ws, cem: CEM): cem._logger.debug("start websocket message producer") cem._logger.debug(f"IS CLOSED? {cem.is_closed()}") while not cem.is_closed(): - message = await cem.get_message() - cem._logger.debug("sending message") - await ws.send_json(message) + message, fut = await cem.get_message() + try: + cem._logger.debug("sending message") + await ws.send_json(message) + fut.set_result(True) + except Exception as exc: + fut.set_exception(exc) cem._logger.debug("cem closed") @@ -71,6 +77,7 @@ async def websocket_consumer(ws, cem: CEM): elif msg.type == aiohttp.WSMsgType.ERROR: cem._logger.debug("close...") cem.close() + await ws.close() cem._logger.error(f"ws connection closed with exception {ws.exception()}") # TODO: save cem state? @@ -83,12 +90,22 @@ async def websocket_handler(request): site_name = "My CEM" fm_client = FlexMeasuresClient( - "toy-password", "toy-user@flexmeasures.io", host="server:5000" + host="server:5000", + email="toy-user@flexmeasures.io", + password="toy-password", + polling_interval=0.5, ) - price_sensor, power_sensor, soc_sensor, rm_discharge_sensor = await configure_site( - site_name, fm_client - ) + ( + price_sensor, + power_sensor, + soc_sensor, + rm_discharge_sensor, + soc_minima_sensor, + soc_maxima_sensor, + usage_forecast_sensor, + leakage_behaviour_sensor_id, + ) = await configure_site(site_name, fm_client) cem = CEM( sensor_id=power_sensor["id"], @@ -100,6 +117,10 @@ async def websocket_handler(request): price_sensor_id=price_sensor["id"], soc_sensor_id=soc_sensor["id"], rm_discharge_sensor_id=rm_discharge_sensor["id"], + soc_minima_sensor_id=soc_minima_sensor["id"], + soc_maxima_sensor_id=soc_maxima_sensor["id"], + usage_forecast_sensor_id=usage_forecast_sensor["id"], + leakage_behaviour_sensor_id=leakage_behaviour_sensor_id["id"], ) cem.register_control_type(frbc) @@ -115,11 +136,11 @@ async def websocket_handler(request): async def configure_site( site_name: str, fm_client: FlexMeasuresClient -) -> tuple[dict, dict, dict, dict]: +) -> tuple[dict, dict, dict, dict, dict, dict, dict, dict]: account = await fm_client.get_account() assets = await fm_client.get_assets(parse_json_fields=True) - site_asset = None + site_asset: dict | None = None for asset in assets: if asset["name"] == site_name: site_asset = asset @@ -146,6 +167,10 @@ async def configure_site( power_sensor = None soc_sensor = None rm_discharge_sensor = None + soc_minima_sensor = None + soc_maxima_sensor = None + usage_forecast_sensor = None + leakage_behaviour_sensor = None for sensor in sensors: if sensor["name"] == "price": price_sensor = sensor @@ -155,6 +180,14 @@ async def configure_site( soc_sensor = sensor elif sensor["name"] == "RM discharge": rm_discharge_sensor = sensor + elif sensor["name"] == "soc-minima": + soc_minima_sensor = sensor + elif sensor["name"] == "soc-maxima": + soc_maxima_sensor = sensor + elif sensor["name"] == "usage-forecast": + usage_forecast_sensor = sensor + elif sensor["name"] == "leakage-behaviour": + leakage_behaviour_sensor = sensor if price_sensor is None: price_sensor = await fm_client.add_sensor( @@ -164,12 +197,23 @@ async def configure_site( generic_asset_id=site_asset["id"], timezone="Europe/Amsterdam", ) - await fm_client.post_sensor_data( - sensor_id=price_sensor["id"], - start="2026-01-15T00:00+01", # 2026-01-01T00:00+01 - duration="P3D", # P1M - values=[0.3], - unit="EUR/kWh", + + # Continue immediately without awaiting + LOGGER.debug("Posting 3 days of prices in a background task..") + start_of_today = ( + datetime.now(ZoneInfo("Europe/Amsterdam")) + .replace(hour=0, minute=0, second=0, microsecond=0) + .isoformat() + ) + asyncio.create_task( + fm_client.post_sensor_data( + sensor_id=price_sensor["id"], + start=start_of_today, + prior="2026-01-01T00:00+01", # 2026-01-01T00:00+01 + duration="P3D", # P1M + values=[0.3], + unit="EUR/kWh", + ) ) if power_sensor is None: power_sensor = await fm_client.add_sensor( @@ -178,6 +222,7 @@ async def configure_site( unit="kW", generic_asset_id=site_asset["id"], timezone="Europe/Amsterdam", + attributes={"consumption_is_positive": True}, ) if soc_sensor is None: soc_sensor = await fm_client.add_sensor( @@ -195,7 +240,66 @@ async def configure_site( generic_asset_id=site_asset["id"], timezone="Europe/Amsterdam", ) - return price_sensor, power_sensor, soc_sensor, rm_discharge_sensor + if soc_minima_sensor is None: + soc_minima_sensor = await fm_client.add_sensor( + name="soc-minima", + event_resolution="PT15M", + unit="kWh", + generic_asset_id=site_asset["id"], + timezone="Europe/Amsterdam", + ) + if soc_maxima_sensor is None: + soc_maxima_sensor = await fm_client.add_sensor( + name="soc-maxima", + event_resolution="PT15M", + unit="kWh", + generic_asset_id=site_asset["id"], + timezone="Europe/Amsterdam", + ) + if usage_forecast_sensor is None: + usage_forecast_sensor = await fm_client.add_sensor( + name="usage-forecast", + event_resolution="PT15M", + unit="kW", + generic_asset_id=site_asset["id"], + timezone="Europe/Amsterdam", + ) + if leakage_behaviour_sensor is None: + leakage_behaviour_sensor = await fm_client.add_sensor( + name="leakage-behaviour", + event_resolution="PT15M", + unit="%", + generic_asset_id=site_asset["id"], + timezone="Europe/Amsterdam", + ) + sensors_to_show = [ + { + "title": "State of charge", + "sensors": [ + soc_minima_sensor["id"], + soc_maxima_sensor["id"], + soc_sensor["id"], + ], + }, + { + "title": "Prices", + "sensor": price_sensor["id"], + }, + ] + await fm_client.update_asset( + asset_id=site_asset["id"], + updates=dict(sensors_to_show=sensors_to_show), + ) + return ( + price_sensor, + power_sensor, + soc_sensor, + rm_discharge_sensor, + soc_minima_sensor, + soc_maxima_sensor, + usage_forecast_sensor, + leakage_behaviour_sensor, + ) app = web.Application()