diff --git a/flexmeasures/conftest.py b/flexmeasures/conftest.py index d3043286e4..27e9968f18 100644 --- a/flexmeasures/conftest.py +++ b/flexmeasures/conftest.py @@ -1830,6 +1830,103 @@ def add_test_solar_sensor_and_irradiance_with_forecasts( return sensors +@pytest.fixture(scope="function") +def setup_fresh_test_forecast_data_with_anomalous_beliefs( + fresh_db, + app, +) -> dict[str, Sensor]: + return add_test_sensor_with_anomalous_beliefs(fresh_db) + + +def add_test_sensor_with_anomalous_beliefs( + db: SQLAlchemy, +) -> dict[str, Sensor]: + """Sensor with two layers of beliefs: normal values recorded early, anomalous values recorded later. + + This fixture is designed for testing that the 'prior' parameter correctly restricts + which beliefs are used by the forecasting pipeline. + + The sensor has 7 days of hourly data (Jan 1–7, 2025): + - Phase 1 (normal): values = 50, belief_time = Dec 31, 2024 (known in advance) + - Phase 2 (anomalous): values = 5000, belief_time = Jan 10, 2025 (late revision) + + When the forecasting pipeline is run with ``prior`` before Jan 10, + only the normal beliefs (Phase 1) are used as input data. + When run with ``prior`` after Jan 10, the anomalous beliefs (Phase 2) + are selected as the most recent belief per event. + """ + data_source = db.session.execute( + select(DataSource).filter_by(name="Seita", type="demo script") + ).scalar_one_or_none() + if not data_source: + data_source = DataSource(name="Seita", type="demo script") + db.session.add(data_source) + + asset_type = db.session.execute( + select(GenericAssetType).filter_by(name="Site") + ).scalar_one_or_none() + if not asset_type: + asset_type = GenericAssetType(name="Site") + db.session.add(asset_type) + + db.session.flush() + + asset = Asset( + name="Test station anomaly", + generic_asset_type=asset_type, + latitude=10, + longitude=10, + ) + db.session.add(asset) + + sensor = Sensor( + name="anomaly-sensor", + generic_asset=asset, + unit="kW", + event_resolution=timedelta(hours=1), + ) + db.session.add(sensor) + db.session.flush() + + time_slots = pd.date_range( + datetime(2025, 1, 1), datetime(2025, 1, 7, 23, 0), freq="60min" + ) + + normal_value = 50.0 + anomalous_value = 5000.0 + + # Phase 1: normal beliefs, recorded well before the events (Dec 31) + early_belief_time = as_server_time(datetime(2024, 12, 31, 0, 0)) + phase1_beliefs = [ + TimedBelief( + sensor=sensor, + event_start=as_server_time(dt), + event_value=normal_value, + belief_time=early_belief_time, + source=data_source, + ) + for dt in time_slots + ] + db.session.add_all(phase1_beliefs) + + # Phase 2: anomalous beliefs, recorded after the events (Jan 10) + late_belief_time = as_server_time(datetime(2025, 1, 10, 0, 0)) + phase2_beliefs = [ + TimedBelief( + sensor=sensor, + event_start=as_server_time(dt), + event_value=anomalous_value, + belief_time=late_belief_time, + source=data_source, + ) + for dt in time_slots + ] + db.session.add_all(phase2_beliefs) + + db.session.commit() + return {"anomaly-sensor": sensor} + + @pytest.fixture def freeze_server_now(): """ diff --git a/flexmeasures/data/models/forecasting/pipelines/base.py b/flexmeasures/data/models/forecasting/pipelines/base.py index bb84cfd7e5..8d5054f2b9 100644 --- a/flexmeasures/data/models/forecasting/pipelines/base.py +++ b/flexmeasures/data/models/forecasting/pipelines/base.py @@ -65,6 +65,7 @@ def __init__( event_starts_after: datetime | None = None, event_ends_before: datetime | None = None, save_belief_time: datetime | None = None, + beliefs_before: datetime | None = None, predict_start: datetime | None = None, predict_end: datetime | None = None, missing_threshold: float = 1.0, @@ -82,6 +83,7 @@ def __init__( self.save_belief_time = ( save_belief_time # non floored belief time to save forecasts with ) + self.beliefs_before = beliefs_before # restrict input data to beliefs recorded before this time self.target_sensor = target_sensor self.target = f"{target_sensor.name} (ID: {target_sensor.id})_target" self.future_regressors = [ @@ -141,6 +143,7 @@ def load_data_all_beliefs(self) -> pd.DataFrame: event_starts_after=sensor_event_starts_after, event_ends_before=sensor_event_ends_before, most_recent_beliefs_only=most_recent_beliefs_only, + beliefs_before=self.beliefs_before, exclude_source_types=( ["forecaster"] if name == self.target else [] ), # we exclude forecasters for target dataframe as to not use forecasts in target. diff --git a/flexmeasures/data/models/forecasting/pipelines/predict.py b/flexmeasures/data/models/forecasting/pipelines/predict.py index 78fca20420..54ae993059 100644 --- a/flexmeasures/data/models/forecasting/pipelines/predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/predict.py @@ -35,6 +35,7 @@ def __init__( event_starts_after: datetime | None = None, event_ends_before: datetime | None = None, save_belief_time: datetime | None = None, + beliefs_before: datetime | None = None, predict_start: datetime | None = None, predict_end: datetime | None = None, data_source: Source = None, @@ -76,6 +77,7 @@ def __init__( predict_end=predict_end, missing_threshold=missing_threshold, save_belief_time=save_belief_time, + beliefs_before=beliefs_before, ) self.model_path = model_path self.output_path = output_path diff --git a/flexmeasures/data/models/forecasting/pipelines/train.py b/flexmeasures/data/models/forecasting/pipelines/train.py index 7a086f9543..dc9aa48a76 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train.py +++ b/flexmeasures/data/models/forecasting/pipelines/train.py @@ -28,6 +28,7 @@ def __init__( event_starts_after: datetime | None = None, event_ends_before: datetime | None = None, save_belief_time: datetime | None = None, + beliefs_before: datetime | None = None, probabilistic: bool = False, ensure_positive: bool = False, missing_threshold: float = 1.0, @@ -64,6 +65,7 @@ def __init__( event_starts_after=event_starts_after, event_ends_before=event_ends_before, save_belief_time=save_belief_time, + beliefs_before=beliefs_before, forecast_frequency=forecast_frequency, missing_threshold=missing_threshold, ) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 1d52bfd401..a5dd5c1481 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -82,6 +82,7 @@ def run_cycle( event_starts_after=train_start, event_ends_before=train_end, save_belief_time=self._parameters["save_belief_time"], + beliefs_before=self._parameters.get("beliefs_before"), probabilistic=self._parameters["probabilistic"], ensure_positive=self._config["ensure_positive"], missing_threshold=self._config.get("missing_threshold"), @@ -120,6 +121,7 @@ def run_cycle( event_starts_after=train_start, # use beliefs about events before the start of the predict period event_ends_before=predict_end, # ignore any beliefs about events beyond the end of the predict period save_belief_time=self._parameters["save_belief_time"], + beliefs_before=self._parameters.get("beliefs_before"), predict_start=predict_start, predict_end=predict_end, sensor_to_save=self._parameters["sensor_to_save"], diff --git a/flexmeasures/data/schemas/forecasting/pipeline.py b/flexmeasures/data/schemas/forecasting/pipeline.py index 80d1d8aec7..f11b36e1e9 100644 --- a/flexmeasures/data/schemas/forecasting/pipeline.py +++ b/flexmeasures/data/schemas/forecasting/pipeline.py @@ -506,6 +506,7 @@ def resolve_config( # noqa: C901 probabilistic=data.get("probabilistic"), sensor_to_save=sensor_to_save, save_belief_time=save_belief_time, + beliefs_before=data.get("belief_time"), m_viewpoints=m_viewpoints, ) diff --git a/flexmeasures/data/schemas/tests/test_forecasting.py b/flexmeasures/data/schemas/tests/test_forecasting.py index ed14afa2f8..cbae510c27 100644 --- a/flexmeasures/data/schemas/tests/test_forecasting.py +++ b/flexmeasures/data/schemas/tests/test_forecasting.py @@ -577,6 +577,53 @@ "m_viewpoints": 2, # we expect 2 cycles from the retrain frequency and predict period given the end date }, ), + # Case 15: prior is given (no start) + # + # User expects save_belief_time to be set to the provided prior timestamp, + # overriding the default of server now. This is useful for simulations. + # Specifically, we expect: + # - save-belief-time = prior timestamp (not server now) + # - predict-start = server now floored to resolution (start not given) + # - 1 cycle, 1 belief time + ( + { + "prior": "2025-01-10T10:00:00+01:00", + }, + { + "predict-start": pd.Timestamp( + "2025-01-15T12:23:58.387422+01", tz="Europe/Amsterdam" + ).floor("1h"), + "save-belief-time": pd.Timestamp( + "2025-01-10T10:00:00+01:00", + tz="Europe/Amsterdam", + ), + "m_viewpoints": 1, + }, + ), + # Case 16: prior is given together with start + # + # User expects save_belief_time to be set to the provided prior timestamp, + # overriding the default of predict_start. This is useful for simulations. + # Specifically, we expect: + # - save-belief-time = prior timestamp (not predict_start) + # - predict-start = start + # - 1 cycle, 1 belief time + ( + { + "start": "2025-01-15T12:00:00+01:00", + "prior": "2025-01-10T10:00:00+01:00", + }, + { + "predict-start": pd.Timestamp( + "2025-01-15T12:00:00+01:00", tz="Europe/Amsterdam" + ), + "save-belief-time": pd.Timestamp( + "2025-01-10T10:00:00+01:00", + tz="Europe/Amsterdam", + ), + "m_viewpoints": 1, + }, + ), ], ) def test_timing_parameters_of_forecaster_parameters_schema( diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 348eff7f9a..4ccd897f7d 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -406,3 +406,79 @@ def test_train_period_capped_logs_warning( assert config_used["train_period_in_hours"] == timedelta(days=10) / timedelta( hours=1 ), "train_period_in_hours should be capped to max_training_period" + + +def test_prior_restricts_training_beliefs( + app, + setup_fresh_test_forecast_data_with_anomalous_beliefs, +): + """ + Verify that the 'prior' parameter restricts which beliefs are used by the forecasting pipeline. + + The fixture provides a sensor with two sets of beliefs for Jan 1–7 2025: + - Phase 1 (normal): value = 50 kW, recorded on Dec 31, 2024 + - Phase 2 (anomalous): value = 5000 kW, recorded on Jan 10, 2025 + + When 'prior' is set before Jan 10, only Phase 1 beliefs are available, + so the model is trained on normal values and produces low forecasts. + When 'prior' is set after Jan 10, Phase 2 beliefs become the most recent + per event, so the model is trained on anomalous values and produces high forecasts. + """ + sensor = setup_fresh_test_forecast_data_with_anomalous_beliefs["anomaly-sensor"] + + normal_value = 50.0 + anomalous_value = 5000.0 + + base_config = { + "train-start": "2025-01-01T00:00+00:00", + } + base_params = { + "sensor": sensor.id, + "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", + "output-path": None, + "start": "2025-01-08T00:00+00:00", + "end": "2025-01-09T00:00+00:00", + "sensor-to-save": None, + "max-forecast-horizon": "PT1H", + "forecast-frequency": "PT1H", + "probabilistic": False, + } + + # Run with prior before the anomalous beliefs (Jan 10) → only normal values used + params_before_anomaly = { + **base_params, + "prior": "2025-01-05T00:00+00:00", # before late_belief_time (Jan 10) + } + pipeline_before = TrainPredictPipeline(config=base_config) + returns_before = pipeline_before.compute(parameters=params_before_anomaly) + forecasts_before = returns_before[0]["data"] + + # Run with prior after the anomalous beliefs (Jan 10) → anomalous values used + params_after_anomaly = { + **base_params, + "prior": "2025-01-15T00:00+00:00", # after late_belief_time (Jan 10) + } + pipeline_after = TrainPredictPipeline(config=base_config) + returns_after = pipeline_after.compute(parameters=params_after_anomaly) + forecasts_after = returns_after[0]["data"] + + mean_before = float(forecasts_before["event_value"].mean()) + mean_after = float(forecasts_after["event_value"].mean()) + + # When prior is before the anomaly, forecasts should reflect normal values (close to 50) + assert mean_before < anomalous_value / 2, ( + f"Forecasts with prior before anomaly should be well below {anomalous_value / 2}, " + f"but mean was {mean_before:.1f}" + ) + + # When prior is after the anomaly, forecasts should reflect anomalous values (close to 5000) + assert mean_after > normal_value * 5, ( + f"Forecasts with prior after anomaly should be well above {normal_value * 5}, " + f"but mean was {mean_after:.1f}" + ) + + # The two runs must produce meaningfully different forecasts + assert mean_after > mean_before * 10, ( + f"Forecasts after anomaly ({mean_after:.1f}) should be at least 10x higher " + f"than forecasts before anomaly ({mean_before:.1f})" + )