Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions flexmeasures/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1830,6 +1830,103 @@ def add_test_solar_sensor_and_irradiance_with_forecasts(
return sensors


@pytest.fixture(scope="function")
def setup_fresh_test_forecast_data_with_anomalous_beliefs(
fresh_db,
app,
) -> dict[str, Sensor]:
return add_test_sensor_with_anomalous_beliefs(fresh_db)


def add_test_sensor_with_anomalous_beliefs(
db: SQLAlchemy,
) -> dict[str, Sensor]:
"""Sensor with two layers of beliefs: normal values recorded early, anomalous values recorded later.

This fixture is designed for testing that the 'prior' parameter correctly restricts
which beliefs are used by the forecasting pipeline.

The sensor has 7 days of hourly data (Jan 1–7, 2025):
- Phase 1 (normal): values = 50, belief_time = Dec 31, 2024 (known in advance)
- Phase 2 (anomalous): values = 5000, belief_time = Jan 10, 2025 (late revision)

When the forecasting pipeline is run with ``prior`` before Jan 10,
only the normal beliefs (Phase 1) are used as input data.
When run with ``prior`` after Jan 10, the anomalous beliefs (Phase 2)
are selected as the most recent belief per event.
"""
data_source = db.session.execute(
select(DataSource).filter_by(name="Seita", type="demo script")
).scalar_one_or_none()
if not data_source:
data_source = DataSource(name="Seita", type="demo script")
db.session.add(data_source)

asset_type = db.session.execute(
select(GenericAssetType).filter_by(name="Site")
).scalar_one_or_none()
if not asset_type:
asset_type = GenericAssetType(name="Site")
db.session.add(asset_type)

db.session.flush()

asset = Asset(
name="Test station anomaly",
generic_asset_type=asset_type,
latitude=10,
longitude=10,
)
db.session.add(asset)

sensor = Sensor(
name="anomaly-sensor",
generic_asset=asset,
unit="kW",
event_resolution=timedelta(hours=1),
)
db.session.add(sensor)
db.session.flush()

time_slots = pd.date_range(
datetime(2025, 1, 1), datetime(2025, 1, 7, 23, 0), freq="60min"
)

normal_value = 50.0
anomalous_value = 5000.0

# Phase 1: normal beliefs, recorded well before the events (Dec 31)
early_belief_time = as_server_time(datetime(2024, 12, 31, 0, 0))
phase1_beliefs = [
TimedBelief(
sensor=sensor,
event_start=as_server_time(dt),
event_value=normal_value,
belief_time=early_belief_time,
source=data_source,
)
for dt in time_slots
]
db.session.add_all(phase1_beliefs)

# Phase 2: anomalous beliefs, recorded after the events (Jan 10)
late_belief_time = as_server_time(datetime(2025, 1, 10, 0, 0))
phase2_beliefs = [
TimedBelief(
sensor=sensor,
event_start=as_server_time(dt),
event_value=anomalous_value,
belief_time=late_belief_time,
source=data_source,
)
for dt in time_slots
]
db.session.add_all(phase2_beliefs)

db.session.commit()
return {"anomaly-sensor": sensor}


@pytest.fixture
def freeze_server_now():
"""
Expand Down
3 changes: 3 additions & 0 deletions flexmeasures/data/models/forecasting/pipelines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(
event_starts_after: datetime | None = None,
event_ends_before: datetime | None = None,
save_belief_time: datetime | None = None,
beliefs_before: datetime | None = None,
predict_start: datetime | None = None,
predict_end: datetime | None = None,
missing_threshold: float = 1.0,
Expand All @@ -82,6 +83,7 @@ def __init__(
self.save_belief_time = (
save_belief_time # non floored belief time to save forecasts with
)
self.beliefs_before = beliefs_before # restrict input data to beliefs recorded before this time
self.target_sensor = target_sensor
self.target = f"{target_sensor.name} (ID: {target_sensor.id})_target"
self.future_regressors = [
Expand Down Expand Up @@ -141,6 +143,7 @@ def load_data_all_beliefs(self) -> pd.DataFrame:
event_starts_after=sensor_event_starts_after,
event_ends_before=sensor_event_ends_before,
most_recent_beliefs_only=most_recent_beliefs_only,
beliefs_before=self.beliefs_before,
exclude_source_types=(
["forecaster"] if name == self.target else []
), # we exclude forecasters for target dataframe as to not use forecasts in target.
Expand Down
2 changes: 2 additions & 0 deletions flexmeasures/data/models/forecasting/pipelines/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(
event_starts_after: datetime | None = None,
event_ends_before: datetime | None = None,
save_belief_time: datetime | None = None,
beliefs_before: datetime | None = None,
predict_start: datetime | None = None,
predict_end: datetime | None = None,
data_source: Source = None,
Expand Down Expand Up @@ -76,6 +77,7 @@ def __init__(
predict_end=predict_end,
missing_threshold=missing_threshold,
save_belief_time=save_belief_time,
beliefs_before=beliefs_before,
)
self.model_path = model_path
self.output_path = output_path
Expand Down
2 changes: 2 additions & 0 deletions flexmeasures/data/models/forecasting/pipelines/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(
event_starts_after: datetime | None = None,
event_ends_before: datetime | None = None,
save_belief_time: datetime | None = None,
beliefs_before: datetime | None = None,
probabilistic: bool = False,
ensure_positive: bool = False,
missing_threshold: float = 1.0,
Expand Down Expand Up @@ -64,6 +65,7 @@ def __init__(
event_starts_after=event_starts_after,
event_ends_before=event_ends_before,
save_belief_time=save_belief_time,
beliefs_before=beliefs_before,
forecast_frequency=forecast_frequency,
missing_threshold=missing_threshold,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def run_cycle(
event_starts_after=train_start,
event_ends_before=train_end,
save_belief_time=self._parameters["save_belief_time"],
beliefs_before=self._parameters.get("beliefs_before"),
probabilistic=self._parameters["probabilistic"],
ensure_positive=self._config["ensure_positive"],
missing_threshold=self._config.get("missing_threshold"),
Expand Down Expand Up @@ -120,6 +121,7 @@ def run_cycle(
event_starts_after=train_start, # use beliefs about events before the start of the predict period
event_ends_before=predict_end, # ignore any beliefs about events beyond the end of the predict period
save_belief_time=self._parameters["save_belief_time"],
beliefs_before=self._parameters.get("beliefs_before"),
predict_start=predict_start,
predict_end=predict_end,
sensor_to_save=self._parameters["sensor_to_save"],
Expand Down
1 change: 1 addition & 0 deletions flexmeasures/data/schemas/forecasting/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ def resolve_config( # noqa: C901
probabilistic=data.get("probabilistic"),
sensor_to_save=sensor_to_save,
save_belief_time=save_belief_time,
beliefs_before=data.get("belief_time"),
m_viewpoints=m_viewpoints,
)

Expand Down
47 changes: 47 additions & 0 deletions flexmeasures/data/schemas/tests/test_forecasting.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,53 @@
"m_viewpoints": 2, # we expect 2 cycles from the retrain frequency and predict period given the end date
},
),
# Case 15: prior is given (no start)
#
# User expects save_belief_time to be set to the provided prior timestamp,
# overriding the default of server now. This is useful for simulations.
# Specifically, we expect:
# - save-belief-time = prior timestamp (not server now)
# - predict-start = server now floored to resolution (start not given)
# - 1 cycle, 1 belief time
(
{
"prior": "2025-01-10T10:00:00+01:00",
},
{
"predict-start": pd.Timestamp(
"2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam"
).floor("1h"),
"save-belief-time": pd.Timestamp(
"2025-01-10T10:00:00+01:00",
tz="Europe/Amsterdam",
),
"m_viewpoints": 1,
},
),
# Case 16: prior is given together with start
#
# User expects save_belief_time to be set to the provided prior timestamp,
# overriding the default of predict_start. This is useful for simulations.
# Specifically, we expect:
# - save-belief-time = prior timestamp (not predict_start)
# - predict-start = start
# - 1 cycle, 1 belief time
(
{
"start": "2025-01-15T12:00:00+01:00",
"prior": "2025-01-10T10:00:00+01:00",
},
{
"predict-start": pd.Timestamp(
"2025-01-15T12:00:00+01:00", tz="Europe/Amsterdam"
),
"save-belief-time": pd.Timestamp(
"2025-01-10T10:00:00+01:00",
tz="Europe/Amsterdam",
),
"m_viewpoints": 1,
},
),
],
)
def test_timing_parameters_of_forecaster_parameters_schema(
Expand Down
76 changes: 76 additions & 0 deletions flexmeasures/data/tests/test_forecasting_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,79 @@ def test_train_period_capped_logs_warning(
assert config_used["train_period_in_hours"] == timedelta(days=10) / timedelta(
hours=1
), "train_period_in_hours should be capped to max_training_period"


def test_prior_restricts_training_beliefs(
app,
setup_fresh_test_forecast_data_with_anomalous_beliefs,
):
"""
Verify that the 'prior' parameter restricts which beliefs are used by the forecasting pipeline.

The fixture provides a sensor with two sets of beliefs for Jan 1–7 2025:
- Phase 1 (normal): value = 50 kW, recorded on Dec 31, 2024
- Phase 2 (anomalous): value = 5000 kW, recorded on Jan 10, 2025

When 'prior' is set before Jan 10, only Phase 1 beliefs are available,
so the model is trained on normal values and produces low forecasts.
When 'prior' is set after Jan 10, Phase 2 beliefs become the most recent
per event, so the model is trained on anomalous values and produces high forecasts.
"""
sensor = setup_fresh_test_forecast_data_with_anomalous_beliefs["anomaly-sensor"]

normal_value = 50.0
anomalous_value = 5000.0

base_config = {
"train-start": "2025-01-01T00:00+00:00",
}
base_params = {
"sensor": sensor.id,
"model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models",
"output-path": None,
"start": "2025-01-08T00:00+00:00",
"end": "2025-01-09T00:00+00:00",
"sensor-to-save": None,
"max-forecast-horizon": "PT1H",
"forecast-frequency": "PT1H",
"probabilistic": False,
}

# Run with prior before the anomalous beliefs (Jan 10) → only normal values used
params_before_anomaly = {
**base_params,
"prior": "2025-01-05T00:00+00:00", # before late_belief_time (Jan 10)
}
pipeline_before = TrainPredictPipeline(config=base_config)
returns_before = pipeline_before.compute(parameters=params_before_anomaly)
forecasts_before = returns_before[0]["data"]

# Run with prior after the anomalous beliefs (Jan 10) → anomalous values used
params_after_anomaly = {
**base_params,
"prior": "2025-01-15T00:00+00:00", # after late_belief_time (Jan 10)
}
pipeline_after = TrainPredictPipeline(config=base_config)
returns_after = pipeline_after.compute(parameters=params_after_anomaly)
forecasts_after = returns_after[0]["data"]

mean_before = float(forecasts_before["event_value"].mean())
mean_after = float(forecasts_after["event_value"].mean())

# When prior is before the anomaly, forecasts should reflect normal values (close to 50)
assert mean_before < anomalous_value / 2, (
f"Forecasts with prior before anomaly should be well below {anomalous_value / 2}, "
f"but mean was {mean_before:.1f}"
)

# When prior is after the anomaly, forecasts should reflect anomalous values (close to 5000)
assert mean_after > normal_value * 5, (
f"Forecasts with prior after anomaly should be well above {normal_value * 5}, "
f"but mean was {mean_after:.1f}"
)

# The two runs must produce meaningfully different forecasts
assert mean_after > mean_before * 10, (
f"Forecasts after anomaly ({mean_after:.1f}) should be at least 10x higher "
f"than forecasts before anomaly ({mean_before:.1f})"
)
Loading