diff --git a/flexmeasures/api/v3_0/assets.py b/flexmeasures/api/v3_0/assets.py index d63bbc2dda..b2f14db488 100644 --- a/flexmeasures/api/v3_0/assets.py +++ b/flexmeasures/api/v3_0/assets.py @@ -47,6 +47,7 @@ GenericAssetSchema as AssetSchema, GenericAssetIdField as AssetIdField, GenericAssetTypeSchema as AssetTypeSchema, + SensorsToShowSchema, ) from flexmeasures.data.schemas.scheduling.storage import StorageFlexModelSchema from flexmeasures.data.schemas.scheduling import AssetTriggerSchema, FlexContextSchema @@ -61,9 +62,6 @@ ) from flexmeasures.api.common.schemas.users import AccountIdField from flexmeasures.api.common.schemas.assets import default_response_fields -from flexmeasures.utils.coding_utils import ( - flatten_unique, -) from flexmeasures.ui.utils.view_utils import clear_session, set_session_variables from flexmeasures.auth.policy import check_access from flexmeasures.data.schemas.sensors import ( @@ -896,7 +894,7 @@ def get_chart_data(self, id: int, asset: GenericAsset, **kwargs): tags: - Assets """ - sensors = flatten_unique(asset.validate_sensors_to_show()) + sensors = SensorsToShowSchema.flatten(asset.validate_sensors_to_show()) return asset.search_beliefs(sensors=sensors, as_json=True, **kwargs) @route("//auditlog") diff --git a/flexmeasures/conftest.py b/flexmeasures/conftest.py index d3043286e4..f737bb4c82 100644 --- a/flexmeasures/conftest.py +++ b/flexmeasures/conftest.py @@ -548,6 +548,31 @@ def create_assets( ), ) db.session.add(sensor) + scc_sensor = Sensor( + name="site-consumption-capacity", + generic_asset=asset, + event_resolution=timedelta(minutes=15), + unit="MW", + ) + db.session.add(scc_sensor) + cc_sensor = Sensor( + name="consumption-capacity", + generic_asset=asset, + event_resolution=timedelta(minutes=15), + unit="MW", + ) + db.session.add(cc_sensor) + pc_sensor = Sensor( + name="production-capacity", + generic_asset=asset, + event_resolution=timedelta(minutes=15), + unit="MW", + ) + db.session.add(pc_sensor) + db.session.flush() # assign sensor IDs + asset.flex_model["consumption-capacity"] = {"sensor": cc_sensor.id} + asset.flex_model["production-capacity"] = {"sensor": pc_sensor.id} + asset.flex_context["site-consumption-capacity"] = {"sensor": scc_sensor.id} assets.append(asset) # one day of test data (one complete sine curve) diff --git a/flexmeasures/data/models/charts/belief_charts.py b/flexmeasures/data/models/charts/belief_charts.py index e155438520..4cf882d9ee 100644 --- a/flexmeasures/data/models/charts/belief_charts.py +++ b/flexmeasures/data/models/charts/belief_charts.py @@ -11,7 +11,6 @@ from flexmeasures.utils.flexmeasures_inflection import ( capitalize, ) -from flexmeasures.utils.coding_utils import flatten_unique from flexmeasures.utils.unit_utils import find_smallest_common_unit, get_unit_dimension @@ -499,8 +498,10 @@ def chart_for_multiple_sensors( combine_legend: bool = True, **override_chart_specs: dict, ): + from flexmeasures.data.schemas.generic_assets import SensorsToShowSchema + # Determine the shared data resolution - all_shown_sensors = flatten_unique(sensors_to_show) + all_shown_sensors = SensorsToShowSchema.flatten(sensors_to_show) condition = list( sensor.event_resolution for sensor in all_shown_sensors diff --git a/flexmeasures/data/models/generic_assets.py b/flexmeasures/data/models/generic_assets.py index 46bb19383c..48072f0556 100644 --- a/flexmeasures/data/models/generic_assets.py +++ b/flexmeasures/data/models/generic_assets.py @@ -30,7 +30,6 @@ CONSULTANT_ROLE, ) from flexmeasures.utils import geo_utils -from flexmeasures.utils.coding_utils import flatten_unique from flexmeasures.utils.time_utils import determine_minimum_resampling_resolution from flexmeasures.utils.unit_utils import find_smallest_common_unit @@ -287,7 +286,9 @@ def validate_sensors_to_show( sensor_ids_to_show = self.sensors_to_show # Import the schema for validation - from flexmeasures.data.schemas.utils import extract_sensors_from_flex_config + from flexmeasures.data.schemas.generic_assets import ( + extract_sensors_from_flex_config, + ) from flexmeasures.data.schemas.generic_assets import SensorsToShowSchema sensors_to_show_schema = SensorsToShowSchema() @@ -662,8 +663,10 @@ def chart( :param resolution: optionally set the resolution of data being displayed :returns: JSON string defining vega-lite chart specs """ + from flexmeasures.data.schemas.generic_assets import SensorsToShowSchema + processed_sensors_to_show = self.validate_sensors_to_show() - sensors = flatten_unique(processed_sensors_to_show) + sensors = SensorsToShowSchema.flatten(processed_sensors_to_show) for sensor in sensors: sensor.sensor_type = sensor.get_attribute("sensor_type", sensor.name) @@ -1023,7 +1026,9 @@ def get_timerange(cls, sensors: list["Sensor"]) -> dict[str, datetime]: # noqa 'end': datetime.datetime(2020, 12, 3, 14, 30, tzinfo=pytz.utc) } """ - sensor_ids = [s.id for s in flatten_unique(sensors)] + from flexmeasures.data.schemas.generic_assets import SensorsToShowSchema + + sensor_ids = [s.id for s in SensorsToShowSchema.flatten(sensors)] start, end = get_timerange(sensor_ids) return dict(start=start, end=end) diff --git a/flexmeasures/data/schemas/generic_assets.py b/flexmeasures/data/schemas/generic_assets.py index a6e2e0b8df..472b63aafb 100644 --- a/flexmeasures/data/schemas/generic_assets.py +++ b/flexmeasures/data/schemas/generic_assets.py @@ -3,7 +3,7 @@ from datetime import timedelta import json from http import HTTPStatus -from typing import Any +from typing import Any, TYPE_CHECKING from flask import abort from marshmallow import validates, ValidationError, fields, validates_schema @@ -11,7 +11,8 @@ from flask_security import current_user from sqlalchemy import select - +if TYPE_CHECKING: + from flexmeasures import Sensor from flexmeasures.data import ma, db from flexmeasures.data.models.user import Account from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType @@ -21,7 +22,6 @@ from flexmeasures.data.schemas.utils import ( FMValidationError, MarshmallowClickMixin, - extract_sensors_from_flex_config, ) from flexmeasures.auth.policy import user_has_admin_access from flexmeasures.cli import is_running as running_as_cli @@ -211,23 +211,34 @@ def _validate_flex_config_field_is_valid_choice( ) @classmethod - def flatten(cls, nested_list) -> list[int]: + def flatten(cls, nested_list: list) -> list[int] | list[Sensor]: """ - Flatten a nested list of sensors or sensor dictionaries into a unique list of sensor IDs. - - This method processes the following formats, for each of the entries of the nested list: - - A list of sensor IDs: `[1, 2, 3]` - - A list of dictionaries where each dictionary contains a `sensors` list, a `sensor` key or a `plots` key - `[{"title": "Temperature", "sensors": [1, 2]}, {"title": "Pressure", "sensor": 3}, {"title": "Pressure", "plots": [{"sensor": 4}, {"sensors": [5,6]}]}]` - - Mixed formats: `[{"title": "Temperature", "sensors": [1, 2]}, {"title": "Pressure", "sensor": 3}, 4, 5, 1]` - - It extracts all sensor IDs, removes duplicates, and returns a flattened list of unique sensor IDs. - - Args: - nested_list (list): A list containing sensor IDs, or dictionaries with `sensors` or `sensor` keys. - - Returns: - list: A unique list of sensor IDs. + Flatten a nested list of sensor IDs into a unique list. Also works for Sensor objects. + + This method processes the following formats for each entry in the list: + 1. A single sensor ID: + `3` + 2. A list of sensor IDs: + `[1, 2]` + 3. A dictionary with a `sensor` key: + `{"sensor": 3}` + 4. A dictionary with a `sensors` key: + `{"sensors": [1, 2]}` + 5. A dictionary with a `plots` key, containing a list of dictionaries, + each with a `sensor` or `sensors` key: + `{"plots": [{"sensor": 4}, {"sensors": [5, 6]}]}` + 6. A dictionary under the `plots` key containing the `asset` key together with a `flex-model` or `flex-context` key, + containing a field name or a list of field names: + `{"plots": [{"asset": 100, "flex-model": ["consumption-capacity", "production-capacity"], "flex-context": "site-power-capacity"}}` + 7. Mixed formats: + `[{"title": "Temperature", "sensors": [1, 2]}, {"title": "Pressure", "sensor": 3}, {"title": "Pressure", "plots": [{"sensor": 4}, {"sensors": [5, 6]}]}]` + + Example: + >>> SensorsToShowSchema.flatten([1, [2, 20, 6], 10, [6, 2], {"title": None,"sensors": [10, 15]}, 15, {"plots": [{"sensor": 1}, {"sensors": [20, 8]}]}]) + [1, 2, 20, 6, 10, 15, 8] + + :param nested_list: A list containing sensor IDs, or dictionaries with `sensors` or `sensor` keys. + :returns: A unique list of sensor IDs, or a unique list of Sensors """ all_objects = [] for s in nested_list: @@ -249,7 +260,6 @@ def flatten(cls, nested_list) -> list[int]: all_objects.extend(s["sensors"]) elif "sensor" in s: all_objects.append(s["sensor"]) - return list(dict.fromkeys(all_objects).keys()) @@ -427,3 +437,41 @@ def _deserialize(self, value: Any, attr, data, **kwargs) -> GenericAsset: def _serialize(self, value: GenericAsset, attr, obj, **kwargs) -> int: """Turn a GenericAsset into a generic asset id.""" return value.id + + +def extract_sensors_from_flex_config(plot: dict) -> list[Sensor]: + """ + Extracts a consolidated list of sensors from an asset based on + flex-context or flex-model definitions provided in a plot dictionary. + """ + all_sensors = [] + + asset = GenericAssetIdField().deserialize(plot.get("asset")) + + fields_to_check = { + "flex-context": asset.flex_context, + "flex-model": asset.flex_model, + } + + for plot_key, flex_config in fields_to_check.items(): + if plot_key not in plot: + continue + + field_keys = plot[plot_key] + data = flex_config or {} + + if isinstance(field_keys, str): + field_keys = [field_keys] + elif not isinstance(field_keys, list): + continue + + for field_key in field_keys: + field_value = data.get(field_key) + + if isinstance(field_value, dict): + # Add a single sensor if it exists + sensor = field_value.get("sensor") + if sensor: + all_sensors.append(sensor) + + return all_sensors diff --git a/flexmeasures/data/schemas/utils.py b/flexmeasures/data/schemas/utils.py index a628add7a8..149d891f39 100644 --- a/flexmeasures/data/schemas/utils.py +++ b/flexmeasures/data/schemas/utils.py @@ -5,7 +5,6 @@ from pint import DefinitionSyntaxError, DimensionalityError, UndefinedUnitError from flexmeasures.utils.unit_utils import to_preferred, ur -from flexmeasures.data.models.time_series import Sensor class MarshmallowClickMixin(click.ParamType): @@ -86,39 +85,6 @@ def convert_to_quantity(value: str, to_unit: str) -> ur.Quantity: ) -def extract_sensors_from_flex_config(plot: dict) -> list[Sensor]: - """ - Extracts a consolidated list of sensors from an asset based on - flex-context or flex-model definitions provided in a plot dictionary. - """ - all_sensors = [] - - from flexmeasures.data.schemas.generic_assets import ( - GenericAssetIdField, - ) # Import here to avoid circular imports - - asset = GenericAssetIdField().deserialize(plot.get("asset")) - - fields_to_check = { - "flex-context": asset.flex_context, - "flex-model": asset.flex_model, - } - - for plot_key, flex_config in fields_to_check.items(): - if plot_key in plot: - field_key = plot[plot_key] - data = flex_config or {} - field_value = data.get(field_key) - - if isinstance(field_value, dict): - # Add a single sensor if it exists - sensor = field_value.get("sensor") - if sensor: - all_sensors.append(sensor) - - return all_sensors - - def snake_to_kebab(key: str) -> str: """Convert snake_case to kebab-case.""" return key.replace("_", "-") diff --git a/flexmeasures/data/tests/test_SensorsToShowSchema.py b/flexmeasures/data/tests/test_SensorsToShowSchema.py index 95ca1ecbb6..2780baf5b2 100644 --- a/flexmeasures/data/tests/test_SensorsToShowSchema.py +++ b/flexmeasures/data/tests/test_SensorsToShowSchema.py @@ -1,6 +1,7 @@ import pytest from marshmallow import ValidationError +from flexmeasures import Sensor from flexmeasures.data.schemas.generic_assets import SensorsToShowSchema @@ -47,6 +48,38 @@ def test_dict_with_asset_and_no_title_plot(setup_test_data): assert schema.deserialize(input_value) == expected_output +def _get_sensor_by_name(sensors: list[Sensor], name: str) -> Sensor: + for sensor in sensors: + if sensor.name == name: + return sensor + raise ValueError(f"Sensor {name} not found") + + +def test_flatten_with_multiple_flex_config_fields(setup_test_data): + asset = setup_test_data["wind-asset-1"] + schema = SensorsToShowSchema() + input_value = [ + { + "plots": [ + { + "asset": asset.id, + "flex-model": ["consumption-capacity", "production-capacity"], + "flex-context": "site-consumption-capacity", + } + ] + } + ] + expected_output = [ + _get_sensor_by_name(asset.sensors, name).id + for name in ( + "site-consumption-capacity", + "consumption-capacity", + "production-capacity", + ) + ] + assert schema.flatten(input_value) == expected_output + + def test_invalid_sensor_string_input(): schema = SensorsToShowSchema() with pytest.raises( diff --git a/flexmeasures/utils/coding_utils.py b/flexmeasures/utils/coding_utils.py index 0c1607efc6..0a951bc053 100644 --- a/flexmeasures/utils/coding_utils.py +++ b/flexmeasures/utils/coding_utils.py @@ -71,54 +71,6 @@ def sort_dict(unsorted_dict: dict) -> dict: return sorted_dict -# This function is used for sensors_to_show in follow-up PR it will be moved and renamed to flatten_sensors_to_show -def flatten_unique(nested_list_of_objects: list) -> list: - """ - Get unique sensor IDs from a list of `sensors_to_show`. - - Handles: - - Lists of sensor IDs - - Dictionaries with a `sensors` key - - Nested lists (one level) - - Dictionaries with `plots` key containing lists of sensors or asset's flex-config reference - - Example: - Input: - [1, [2, 20, 6], 10, [6, 2], {"title":None,"sensors": [10, 15]}, 15, {"plots": [{"sensor": 1}, {"sensors": [20, 6]}]}] - - Output: - [1, 2, 20, 6, 10, 15] - """ - all_objects = [] - for s in nested_list_of_objects: - if isinstance(s, list): - all_objects.extend(s) - elif isinstance(s, int): - all_objects.append(s) - elif isinstance(s, dict): - if "sensors" in s: - all_objects.extend(s["sensors"]) - elif "sensor" in s: - all_objects.append(s["sensor"]) - elif "plots" in s: - from flexmeasures.data.schemas.utils import ( - extract_sensors_from_flex_config, - ) - - for entry in s["plots"]: - if "sensors" in entry: - all_objects.extend(entry["sensors"]) - if "sensor" in entry: - all_objects.append(entry["sensor"]) - if "asset" in entry: - sensors = extract_sensors_from_flex_config(entry) - all_objects.extend(sensors) - else: - all_objects.append(s) - - return list(dict.fromkeys(all_objects).keys()) - - def timeit(func): """Decorator for printing the time it took to execute the decorated function."""