Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
translate_fill_level_target_profile,
translate_usage_forecast_to_fm,
)
from flexmeasures_client.s2.utils import get_reception_status, get_unique_id
from flexmeasures_client.s2.utils import get_reception_status, get_unique_id, rate_limit_measurements

RESOLUTION = "15min"
ENERGY_UNIT = "MWh"
Expand Down Expand Up @@ -161,9 +161,8 @@ def is_timer_due(self, name: str):
def now(self):
return datetime.now(self._timezone)

@rate_limit_measurements
async def send_storage_status(self, status: FRBCStorageStatus):
if not self.is_timer_due("storage_status"):
return

try:
await self._fm_client.post_measurements(
Expand All @@ -180,9 +179,8 @@ async def send_storage_status(self, status: FRBCStorageStatus):
)
await self._sending_queue.put(response)

@rate_limit_measurements
async def send_leakage_behaviour(self, leakage: FRBCLeakageBehaviour):
if not self.is_timer_due("leakage_behaviour"):
return

try:
start = self.now()
Expand All @@ -208,9 +206,8 @@ async def send_leakage_behaviour(self, leakage: FRBCLeakageBehaviour):
)
await self._sending_queue.put(response)

@rate_limit_measurements
async def send_actuator_status(self, status: FRBCActuatorStatus):
if not self.is_timer_due("actuator_status"):
return

factor = status.operation_mode_factor
system_description: FRBCSystemDescription = list(
Expand Down Expand Up @@ -486,6 +483,7 @@ def handle_system_description(

return get_reception_status(message, status=ReceptionStatusValues.OK)

@rate_limit_measurements
async def send_conversion_efficiencies(
self, system_description: FRBCSystemDescription
):
Expand Down Expand Up @@ -559,15 +557,14 @@ async def close(self):

await self.stop_trigger_schedule()

@rate_limit_measurements
async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast):
"""
Send FRBC.UsageForecast to FlexMeasures.

Args:
usage_forecast (FRBCUsageForecast): The usage forecast to be translated and sent.
"""
if not self.is_timer_due("usage_forecast"):
return

start_time = usage_forecast.start_time

Expand All @@ -590,6 +587,7 @@ async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast):
duration=str(pd.Timedelta(RESOLUTION) * len(usage_forecast)),
)

@rate_limit_measurements
async def send_fill_level_target_profile(
self, fill_level_target_profile: FRBCFillLevelTargetProfile
):
Expand All @@ -599,8 +597,6 @@ async def send_fill_level_target_profile(
Args:
fill_level_target_profile (FRBCFillLevelTargetProfile): The fill level target profile to be translated and sent.
"""
if not self.is_timer_due("fill_level_target_profile"):
return

soc_minima, soc_maxima = translate_fill_level_target_profile(
fill_level_target_profile,
Expand Down
19 changes: 19 additions & 0 deletions src/flexmeasures_client/s2/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from collections import OrderedDict
from functools import wraps
from typing import Mapping, TypeVar
from uuid import uuid4

Expand Down Expand Up @@ -123,3 +124,21 @@ def get_latest_compatible_version(supported_versions, current_version, logger):
return cem_version

return latest_compatible_version


def rate_limit_measurements(func):
@wraps(func)
async def wrapper(self, *args, **kwargs):
# Use the actual function name
function_name = func.__name__

# Ensure `self` has `is_timer_due` method
if not hasattr(self, 'is_timer_due'):
raise AttributeError(f"{self} has no method is_timer_due")

if not self.is_timer_due(function_name):
return # Skip execution if timer is not due

return await func(self, *args, **kwargs)

return wrapper
Loading