Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions flexmeasures_s2/profile_steering/common/device_plan.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from typing import Optional, Union
from flexmeasures_s2.profile_steering.common.pydantic_base import FlexMeasuresBaseModel

from flexmeasures_s2.profile_steering.common.joule_profile import JouleProfile
from flexmeasures_s2.profile_steering.common.soc_profile import SoCProfile
from flexmeasures_s2.profile_steering.device_planner.frbc.s2_frbc_instruction_profile import (
S2FrbcInstructionProfile,
)
from flexmeasures_s2.profile_steering.device_planner.ddbc.s2_ddbc_instruction_profile import (
S2DdbcInstructionProfile,
)


class DevicePlan(FlexMeasuresBaseModel):
Expand All @@ -13,5 +17,7 @@ class DevicePlan(FlexMeasuresBaseModel):
device_name: str
connection_id: str
energy_profile: JouleProfile
fill_level_profile: SoCProfile
instruction_profile: S2FrbcInstructionProfile
fill_level_profile: Optional[SoCProfile] = None
instruction_profile: Optional[
Union[S2FrbcInstructionProfile, S2DdbcInstructionProfile]
] = None
5 changes: 5 additions & 0 deletions flexmeasures_s2/profile_steering/congestion_point_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ def _get_initial_device_plan(
print(
f"Error getting initial plan from device {device.device_id} in worker: {e}"
)
print(f"Exception type: {type(e).__name__}")
import traceback

print("Full traceback:")
traceback.print_exc()
return (device.device_id, None)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# DDBC (Demand-Driven Based Control) device planner package
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from datetime import datetime, timedelta
from typing import List, Any, Optional


class AvgDemandForecastElement:
"""Element representing average demand forecast for a time period."""

def __init__(self, start: datetime, end: datetime, avg_demand: float):
self.start = start
self.end = end
self.avg_demand = avg_demand

def get_start(self) -> datetime:
return self.start

def get_end(self) -> datetime:
return self.end

def get_avg_demand(self) -> float:
return self.avg_demand

def get_duration(self) -> timedelta:
return self.end - self.start

def __str__(self) -> str:
return f"AvgDemandForecastElement(avgDemand={self.avg_demand}, start={self.start}, end={self.end})"


class AvgDemandForecastProfile:
"""Profile of average demand forecast elements."""

def __init__(self, elements: List[AvgDemandForecastElement]):
self.elements = elements

def get_elements(self) -> List[AvgDemandForecastElement]:
return self.elements

def get_start(self) -> Optional[datetime]:
if not self.elements:
return None
return self.elements[0].get_start()

def get_end(self) -> Optional[datetime]:
if not self.elements:
return None
return self.elements[-1].get_end()

def sub_profile(self, start: datetime, end: datetime) -> "AvgDemandForecastProfile":
"""Get a sub-profile between start and end times."""
sub_elements = []

for element in self.elements:
element_start = max(element.get_start(), start)
element_end = min(element.get_end(), end)

if element_start < element_end:
sub_elements.append(
AvgDemandForecastElement(
element_start, element_end, element.get_avg_demand()
)
)

return AvgDemandForecastProfile(sub_elements)


class AvgDemandForecastUtil:
"""Utility class for converting DDBC average demand forecasts."""

@staticmethod
def from_avg_demand_rate_forecast(demand_forecast: Any) -> AvgDemandForecastProfile:
"""Convert a DDBC average demand rate forecast to a profile."""
elements: List[AvgDemandForecastElement] = []

start = demand_forecast.start_time

for element in demand_forecast.elements:
if isinstance(element.duration, (int, float)):
duration_seconds = element.duration
elif hasattr(element.duration, "root"):
duration_seconds = element.duration.root
else:
duration_seconds = int(element.duration)
end = start + timedelta(seconds=duration_seconds)
elements.append(
AvgDemandForecastElement(
start, end, float(element.demand_rate_expected)
)
)
start = end + timedelta(milliseconds=1)

return AvgDemandForecastProfile(elements)

@staticmethod
def get_avg_demand_forecast_for_timestep(
avg_demand_forecast: AvgDemandForecastProfile,
timestep_start: datetime,
timestep_end: datetime,
) -> Optional[float]:
"""Get weighted average demand forecast for a timestep."""
if avg_demand_forecast is None:
return None

timestep_end_adjusted = timestep_end - timedelta(milliseconds=1)

sub_profile = avg_demand_forecast.sub_profile(
timestep_start, timestep_end_adjusted
)

if not sub_profile.get_elements():
return None

demand = 0.0
for element in sub_profile.get_elements():
duration_ms = element.get_duration().total_seconds() * 1000
demand += element.get_avg_demand() * duration_ms

start = sub_profile.get_start()
end = sub_profile.get_end()
if start is None or end is None:
return None

total_duration_ms = (end - start).total_seconds() * 1000

if total_duration_ms == 0:
return None

return demand / total_duration_ms
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from typing import List, Any, Dict, Optional
from flexmeasures_s2.profile_steering.common.power_range_wrapper import (
PowerRangeWrapper,
)
from flexmeasures_s2.profile_steering.device_planner.ddbc.number_range_wrapper import (
NumberRangeWrapper,
)
from s2python.common import CommodityQuantity


class DdbcOperationModeWrapper:
"""Wrapper for DDBC operation mode to provide utility methods."""

EPSILON = 1e-4

def __init__(self, ddbc_operation_mode: Any):
"""Initialize from a DDBC operation mode."""
self.id = (
ddbc_operation_mode.Id
if hasattr(ddbc_operation_mode, "Id")
else ddbc_operation_mode.id
)
self.diagnostic_label = getattr(ddbc_operation_mode, "diagnostic_label", None)
self.abnormal_condition_only = getattr(
ddbc_operation_mode, "abnormal_condition_only", False
)

self.power_ranges: List[PowerRangeWrapper] = []
for unwrapped_power_range in ddbc_operation_mode.power_ranges:
self.power_ranges.append(PowerRangeWrapper(unwrapped_power_range))

if isinstance(ddbc_operation_mode.supply_range, list):
self.supply_range = NumberRangeWrapper(ddbc_operation_mode.supply_range[0])
else:
self.supply_range = NumberRangeWrapper(ddbc_operation_mode.supply_range)

self.running_costs: Optional[NumberRangeWrapper]
if (
hasattr(ddbc_operation_mode, "running_costs")
and ddbc_operation_mode.running_costs is not None
):
self.running_costs = NumberRangeWrapper(ddbc_operation_mode.running_costs)
else:
self.running_costs = None

# Determine if this operation mode uses a factor
uses_factor = False

if (
abs(
self.supply_range.get_start_of_range()
- self.supply_range.get_end_of_range()
)
> self.EPSILON
):
uses_factor = True

for power_range in self.power_ranges:
if (
abs(power_range.get_start_of_range() - power_range.get_end_of_range())
> self.EPSILON
):
uses_factor = True

self.uses_factor = uses_factor

def get_id(self) -> str:
# Handle UUID objects with root attribute
if hasattr(self.id, "root"):
return str(self.id.root)
else:
return str(self.id)

def get_diagnostic_label(self) -> Optional[str]:
return self.diagnostic_label

def get_power_ranges(self) -> List[PowerRangeWrapper]:
return self.power_ranges

def get_supply_range(self) -> NumberRangeWrapper:
return self.supply_range

def get_running_costs(self) -> Optional[NumberRangeWrapper]:
return self.running_costs

def uses_factor_method(self) -> bool:
return self.uses_factor

def get_operation_mode_electrical_power(self, factor: float) -> float:
"""Calculate electrical power consumption for a given factor."""
power_watt = 0.0

electric_commodities = [
CommodityQuantity.ELECTRIC_POWER_L1,
CommodityQuantity.ELECTRIC_POWER_L2,
CommodityQuantity.ELECTRIC_POWER_L3,
CommodityQuantity.ELECTRIC_POWER_3_PHASE_SYMMETRIC,
]

for power_range in self.get_power_ranges():
if power_range.get_commodity_quantity() in electric_commodities:
start = power_range.get_start_of_range()
end = power_range.get_end_of_range()
power_watt += (end - start) * factor + start

return power_watt

def get_operation_mode_gas_consumption(self, factor: float) -> float:
"""Calculate natural gas consumption (liters per second) for a given factor."""
liters_gas_per_second = 0.0

for power_range in self.get_power_ranges():
if (
power_range.get_commodity_quantity()
== CommodityQuantity.NATURAL_GAS_FLOW_RATE
):
start = power_range.get_start_of_range()
end = power_range.get_end_of_range()
liters_gas_per_second += (end - start) * factor + start

return liters_gas_per_second

def get_operation_mode_supply_rate(self, factor: float) -> float:
"""Calculate supply rate for a given factor."""
start = self.get_supply_range().get_start_of_range()
end = self.get_supply_range().get_end_of_range()
return (end - start) * factor + start

def convert_to_actuator_config(self, factor: float):
"""Convert to S2DdbcActuatorConfiguration."""
from flexmeasures_s2.profile_steering.device_planner.ddbc.s2_ddbc_actuator_configuration import (
S2DdbcActuatorConfiguration,
)

power_per_commodity_quantity: Dict[str, float] = {}

for power_range in self.power_ranges:
commodity_quantity_value = power_range.get_commodity_quantity().value
power_per_commodity_quantity[
commodity_quantity_value
] = power_range.get_power(factor)

return S2DdbcActuatorConfiguration(
operation_mode_id=self.get_id(),
factor=factor,
supply_rate=self.get_operation_mode_supply_rate(factor),
power_per_commodity_quantity=power_per_commodity_quantity,
)
Loading
Loading