Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions entsoe/entsoe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
parse_unavailabilities, parse_contracted_reserve, parse_imbalance_prices_zip, \
parse_imbalance_volumes_zip, parse_netpositions, parse_procured_balancing_capacity, \
parse_water_hydro,parse_aggregated_bids, parse_activated_balancing_energy_prices, \
parse_offshore_unavailability, parse_imbalance_volumes
parse_offshore_unavailability, parse_imbalance_volumes, parse_balancing_state_xml, \
convert_balancing_state_data_to_dataframe
from .decorators import retry, paginated, year_limited, day_limited, documents_limited
import warnings

Expand Down Expand Up @@ -829,8 +830,9 @@ def query_imbalance_volumes(

def query_current_balancing_state(
self, country_code: Union[Area, str], start: pd.Timestamp,
end: pd.Timestamp) -> str:
"""
end: pd.Timestamp) -> bytes:
"""Query Current balancing state [GL EB] [12.3.A] from ENTSO-E API.

Parameters
----------
country_code : Area|str
Expand All @@ -839,7 +841,7 @@ def query_current_balancing_state(

Returns
-------
str
bytes
"""
area = lookup_area(country_code)
params = {
Expand All @@ -848,7 +850,7 @@ def query_current_balancing_state(
'area_Domain': area.code,
}
response = self._base_request(params=params, start=start, end=end)
return response.text
return response.content

def query_procured_balancing_capacity(
self, country_code: Union[Area, str], start: pd.Timestamp,
Expand Down Expand Up @@ -2003,26 +2005,48 @@ def query_imbalance_volumes(
df = df.truncate(before=start, after=end)
return df

@year_limited
def query_current_balancing_state(
self, country_code: Union[Area, str], start: pd.Timestamp,
end: pd.Timestamp) -> pd.DataFrame:
"""
Query Current balancing state [GL EB] [12.3.A] and return as DataFrame.

Parameters
----------
country_code : Area|str
Country code or Area object
start : pd.Timestamp
Start timestamp for the query period
end : pd.Timestamp
End timestamp for the query period

Returns
-------
pd.DataFrame
DataFrame with current balancing state data, indexed by timestamp
and converted to the appropriate timezone for the country

Note
----
The endpoint can return 90,000 records at once (62.5 days; just over 2 months).
The endpoint does not return any errors if you query a wider range.
This means the @year_limited decorator cannot be used.
"""
area = lookup_area(country_code)
text = super(EntsoePandasClient, self).query_current_balancing_state(
country_code=area, start=start, end=end)
df = -1*parse_imbalance_volumes(text)
df = df.tz_convert(area.tz)
df = df.truncate(before=start, after=end)

# Parse the XML response using the dedicated parser
data = parse_balancing_state_xml(text)

# Convert to DataFrame with proper timezone handling
df = convert_balancing_state_data_to_dataframe(
data=data,
area=area,
start=start,
end=end,
)

return df

@year_limited
Expand Down
168 changes: 168 additions & 0 deletions entsoe/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import bs4
from bs4.builder import XMLParsedAsHTMLWarning
import pandas as pd
import xmltodict
from datetime import datetime, timedelta

from .mappings import PSRTYPE_MAPPINGS, DOCSTATUS, BSNTYPE, Area
from .series_parsers import _extract_timeseries, _resolution_to_timedelta, _parse_datetimeindex, _parse_timeseries_generic,\
Expand Down Expand Up @@ -1010,3 +1012,169 @@ def _outage_parser(xml_file: bytes, headers, ts_func) -> pd.DataFrame:
d.append(row + t)
df = pd.DataFrame.from_records(d, columns=headers)
return df


def _validate_xml_document(text: bytes):
"""Validate XML response and extract the document structure."""
try:
doc = xmltodict.parse(text, process_namespaces=False)
except Exception as e:
raise ValueError(f"Failed to parse XML response: {e}")

try:
return doc["Balancing_MarketDocument"]
except KeyError as e:
raise KeyError(f"Missing required XML element: {e}")


def _extract_document_metadata(document):
"""Extract metadata fields from the current balancing state document."""
return {
"mrid": document.get("mRID"),
"revision_number": document.get("revisionNumber"),
"created_datetime": document.get("createdDateTime"),
}


def _extract_timeseries_list(document):
"""Extract and normalize timeseries list from current balancing state document."""
timeseries = document["TimeSeries"]
if isinstance(timeseries, dict):
timeseries = [timeseries]
return timeseries


def _extract_series_metadata(series):
"""Extract metadata fields from the current balancing state timeseries."""
return {
"mrid": series.get("mRID"),
"flow_direction": series.get("flowDirection.direction"),
"quantity_unit": series.get("quantity_Measure_Unit.name"),
"curve_type": series.get("curveType"),
"cancelled_ts": series.get("cancelledTS"),
}


def _parse_time_interval(period):
"""Parse time interval from period data."""
time_interval = period.get("timeInterval", {})
if not time_interval:
raise ValueError("No time 'timeInterval' found in 'period' in response from ENTSO-E.")

start_str = time_interval.get("start")
end_str = time_interval.get("end")
start_time = datetime.strptime(start_str, "%Y-%m-%dT%H:%MZ")
end_time = datetime.strptime(end_str, "%Y-%m-%dT%H:%MZ")
return start_time, end_time


def _get_flow_direction_factor(timeseries_metadata: dict):
flow_direction = timeseries_metadata.get("flow_direction", "")
return {
'A01': 1, # Up, to convey the surplus
'A02': -1, # Down, to convey the deficit
'A03': 1, # Up and Down, when system is balanced
}.get(flow_direction, 1) # Default to 1 if unknown code


def _flatten_points(points, start_time, timeseries_metadata, document_metadata):
"""Flatten point data into list of dictionaries."""
if isinstance(points, dict):
points = [points]

flow_direction_factor = _get_flow_direction_factor(timeseries_metadata)

flattened_data = []
for point in points:
position = int(point["position"]) # 1-based index
timestamp = start_time + timedelta(minutes=position - 1)
flattened_data.append(
{
"timestamp": timestamp.isoformat(),
"quantity": float(point["quantity"]) * flow_direction_factor,
**timeseries_metadata,
**document_metadata,
}
)

return flattened_data


def parse_balancing_state_xml(text: bytes):
"""
Parse ENTSO-E XML response and flatten time series data.

Parameters
----------
text : bytes
Raw XML response from ENTSO-E API

Returns
-------
list
List of dictionaries containing flattened time series data with fields:
- timestamp: ISO formatted timestamp
- quantity: Numeric quantity value
- flow_direction: Direction of flow
- A01: Up, to convey the surplus
- A02: Down, to convey the deficit
- A03: Up and Down, when system is balanced
- quantity_unit: Unit of measurement (MAW: Megawatt)
- curve_type: Type of curve
- A01: Sequential fixed block
- A03: Variable sized blocks
- cancelled_ts: Cancellation flag

Raises
------
ValueError
If the XML structure is invalid or missing required fields
KeyError
If required XML elements are not found
"""
document = _validate_xml_document(text)
document_metadata = _extract_document_metadata(document)
timeseries_list = _extract_timeseries_list(document)

flattened_data = []
for series in timeseries_list:
timeseries_metadata = _extract_series_metadata(series)
period = series.get("Period", {})
start_time, _ = _parse_time_interval(period)
points = period.get("Point", [])
series_data = _flatten_points(points, start_time, timeseries_metadata, document_metadata)
flattened_data.extend(series_data)

return flattened_data


def convert_balancing_state_data_to_dataframe(data, area, start, end):
"""
Process raw balancing state data into a properly formatted DataFrame.

Parameters
----------
data : list
Raw data list from XML parsing
area : Area
Area object containing timezone information
start : pd.Timestamp
Start timestamp for the query period
end : pd.Timestamp
End timestamp for the query period

Returns
-------
pd.DataFrame
DataFrame with timestamp index, converted to the appropriate timezone
and truncated to the specified time range
"""
df = pd.DataFrame(data)

df["timestamp"] = pd.to_datetime(df["timestamp"])
df = df.set_index("timestamp")
df = df.tz_localize("UTC")
df = df.tz_convert(area.tz)
df = df.truncate(before=start, after=end)

return df
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ requests
pytz
beautifulsoup4>=4.11.1
pandas>=2.2.0
xmltodict