From 83f38eab5a42c5abb7b7137fb487c9696c496a9a Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 16 May 2025 11:05:38 +0200 Subject: [PATCH 1/2] fix: rate limit sending data from system descriptions to FM Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_tunes.py | 3 ++- src/flexmeasures_client/s2/utils.py | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py index 20984d69..27301e45 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py @@ -51,7 +51,7 @@ translate_fill_level_target_profile, translate_usage_forecast_to_fm, ) -from flexmeasures_client.s2.utils import get_reception_status, get_unique_id +from flexmeasures_client.s2.utils import get_reception_status, get_unique_id, rate_limit_measurements RESOLUTION = "15min" ENERGY_UNIT = "MWh" @@ -486,6 +486,7 @@ def handle_system_description( return get_reception_status(message, status=ReceptionStatusValues.OK) + @rate_limit_measurements async def send_conversion_efficiencies( self, system_description: FRBCSystemDescription ): diff --git a/src/flexmeasures_client/s2/utils.py b/src/flexmeasures_client/s2/utils.py index 50eabaab..7fbe3c45 100644 --- a/src/flexmeasures_client/s2/utils.py +++ b/src/flexmeasures_client/s2/utils.py @@ -1,6 +1,7 @@ from __future__ import annotations from collections import OrderedDict +from functools import wraps from typing import Mapping, TypeVar from uuid import uuid4 @@ -123,3 +124,21 @@ def get_latest_compatible_version(supported_versions, current_version, logger): return cem_version return latest_compatible_version + + +def rate_limit_measurements(func): + @wraps(func) + async def wrapper(self, *args, **kwargs): + # Use the actual function name + function_name = func.__name__ + + # Ensure `self` has `is_timer_due` method + if not hasattr(self, 'is_timer_due'): + raise AttributeError(f"{self} has no method is_timer_due") + + if not self.is_timer_due(function_name): + return # Skip execution if timer is not due + + return await func(self, *args, **kwargs) + + return wrapper From 724f56e51ea2d7dd1270cdd921399ec68425c368 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Fri, 16 May 2025 11:07:45 +0200 Subject: [PATCH 2/2] refactor: use decorator Signed-off-by: F.N. Claessen --- .../s2/control_types/FRBC/frbc_tunes.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py index 27301e45..65751c78 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py @@ -161,9 +161,8 @@ def is_timer_due(self, name: str): def now(self): return datetime.now(self._timezone) + @rate_limit_measurements async def send_storage_status(self, status: FRBCStorageStatus): - if not self.is_timer_due("storage_status"): - return try: await self._fm_client.post_measurements( @@ -180,9 +179,8 @@ async def send_storage_status(self, status: FRBCStorageStatus): ) await self._sending_queue.put(response) + @rate_limit_measurements async def send_leakage_behaviour(self, leakage: FRBCLeakageBehaviour): - if not self.is_timer_due("leakage_behaviour"): - return try: start = self.now() @@ -208,9 +206,8 @@ async def send_leakage_behaviour(self, leakage: FRBCLeakageBehaviour): ) await self._sending_queue.put(response) + @rate_limit_measurements async def send_actuator_status(self, status: FRBCActuatorStatus): - if not self.is_timer_due("actuator_status"): - return factor = status.operation_mode_factor system_description: FRBCSystemDescription = list( @@ -560,6 +557,7 @@ async def close(self): await self.stop_trigger_schedule() + @rate_limit_measurements async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast): """ Send FRBC.UsageForecast to FlexMeasures. @@ -567,8 +565,6 @@ async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast): Args: usage_forecast (FRBCUsageForecast): The usage forecast to be translated and sent. """ - if not self.is_timer_due("usage_forecast"): - return start_time = usage_forecast.start_time @@ -591,6 +587,7 @@ async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast): duration=str(pd.Timedelta(RESOLUTION) * len(usage_forecast)), ) + @rate_limit_measurements async def send_fill_level_target_profile( self, fill_level_target_profile: FRBCFillLevelTargetProfile ): @@ -600,8 +597,6 @@ async def send_fill_level_target_profile( Args: fill_level_target_profile (FRBCFillLevelTargetProfile): The fill level target profile to be translated and sent. """ - if not self.is_timer_due("fill_level_target_profile"): - return soc_minima, soc_maxima = translate_fill_level_target_profile( fill_level_target_profile,