From 925f9f159881b0255cd97960d05e4063c2fe95cb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 22:41:52 +0000 Subject: [PATCH 1/5] Initial plan From d40246fa64cf78e0df325019ce6a547b12bfdc74 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 23:01:57 +0000 Subject: [PATCH 2/5] fix: commit data source before enqueueing forecast jobs The TrainPredictPipeline creates a DataSource via get_or_create_source() which only flushes (not commits) the new source. When the RQ worker runs later, the data source may not be found in the DB if the API request's transaction was never committed. Fix: - Add db.session.merge(self.data_source) + db.session.commit() in train_predict.py before queuing forecast jobs to ensure the data source is persisted in the DB before jobs run - After commit, refresh sensor objects to prevent DetachedInstanceError when RQ pickles self.run_cycle for the worker - Re-attach detached sensor objects in run_cycle() at job execution time to handle the case where sensors are expired/detached after unpickling Also fix run_wrap_up to pass the Redis connection to Job.fetch(), and update return values to include both job_id and n_jobs for callers. Update sensors.py and data_add.py to use the new dict return value. --- flexmeasures/api/v3_0/sensors.py | 4 +- flexmeasures/cli/data_add.py | 4 +- .../forecasting/pipelines/train_predict.py | 59 +++++++++++++++++-- 3 files changed, 58 insertions(+), 9 deletions(-) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 8e178b478..4fc4ac1ae 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1646,13 +1646,13 @@ def trigger_forecast(self, id: int, **params): # Queue forecasting job try: - job_id = forecaster.compute(parameters=parameters, as_job=True) + pipeline_returns = forecaster.compute(parameters=parameters, as_job=True) except Exception as e: current_app.logger.exception("Forecast job failed to enqueue.") return unprocessable_entity(str(e)) d, s = request_processed() - return dict(forecast=job_id, **d), s + return dict(forecast=pipeline_returns["job_id"], **d), s @route("//forecasts/", methods=["GET"]) @use_kwargs( diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index 48b618dcc..64886cac1 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -1151,8 +1151,8 @@ def add_forecast( # noqa: C901 return # as_job case → list of job dicts like {"job-1": ""} - if parameters.get("as_job"): - n_jobs = len(pipeline_returns) + if as_job: + n_jobs = pipeline_returns["n_jobs"] click.secho(f"Created {n_jobs} forecasting job(s).", **MsgStyle.SUCCESS) return diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 6efb4d0fe..7d946e903 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -11,6 +11,7 @@ from flask import current_app +from flexmeasures.data import db from flexmeasures.data.models.forecasting import Forecaster from flexmeasures.data.models.forecasting.pipelines.predict import PredictPipeline from flexmeasures.data.models.forecasting.pipelines.train import TrainPipeline @@ -46,10 +47,11 @@ def __init__( def run_wrap_up(self, cycle_job_ids: list[str]): """Log the status of all cycle jobs after completion.""" + connection = current_app.queues["forecasting"].connection + for index, job_id in enumerate(cycle_job_ids): - logging.info( - f"forecasting job-{index}: {job_id} status: {Job.fetch(job_id).get_status()}" - ) + status = Job.fetch(job_id, connection=connection).get_status() + logging.info(f"forecasting job-{index}: {job_id} status: {status}") def run_cycle( self, @@ -68,6 +70,38 @@ def run_cycle( f"Starting Train-Predict cycle from {train_start} to {predict_end}" ) + # Re-attach sensor objects if they are detached after RQ pickles/unpickles self + # (this can happen when a commit expires objects before RQ serializes the job). + sensor = self._parameters["sensor"] + from sqlalchemy import inspect as sa_inspect + + if sa_inspect(sensor).detached or sa_inspect(sensor).expired: + self._parameters["sensor"] = db.session.merge(sensor) + sensor_to_save = self._parameters.get("sensor_to_save") + if sensor_to_save is not None: + if ( + sa_inspect(sensor_to_save).detached + or sa_inspect(sensor_to_save).expired + ): + self._parameters["sensor_to_save"] = db.session.merge(sensor_to_save) + # Also re-attach regressor sensors stored in _config + self._config["future_regressors"] = [ + ( + db.session.merge(s) + if (sa_inspect(s).detached or sa_inspect(s).expired) + else s + ) + for s in self._config.get("future_regressors", []) + ] + self._config["past_regressors"] = [ + ( + db.session.merge(s) + if (sa_inspect(s).detached or sa_inspect(s).expired) + else s + ) + for s in self._config.get("past_regressors", []) + ] + # Train model train_pipeline = TrainPipeline( future_regressors=self._config["future_regressors"], @@ -258,6 +292,16 @@ def run( if as_job: cycle_job_ids = [] + # Ensure the data source ID is available in the database when the job runs. + db.session.merge(self.data_source) + db.session.commit() + + # After commit, SQLAlchemy expires all objects. Refresh sensor objects so + # they have loaded attributes when RQ pickles self.run_cycle for the worker. + db.session.refresh(self._parameters["sensor"]) + if self._parameters.get("sensor_to_save"): + db.session.refresh(self._parameters["sensor_to_save"]) + # job metadata for tracking # Serialize start and end to ISO format strings # Workaround for https://github.com/Parallels/rq-dashboard/issues/510 @@ -315,9 +359,14 @@ def run( if len(cycle_job_ids) > 1: # Return the wrap-up job ID if multiple cycle jobs are queued - return wrap_up_job.id + return {"job_id": wrap_up_job.id, "n_jobs": len(cycle_job_ids)} else: # Return the single cycle job ID if only one job is queued - return cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id + return { + "job_id": ( + cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id + ), + "n_jobs": 1, + } return self.return_values From 51250c3e06f1c870e9cfa2705d97b453b1bb2870 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 23:02:09 +0000 Subject: [PATCH 3/5] tests/forecasting_pipeline: add multi-cycle test, regression test, and fix assertions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Changes: 1. Add new parametrize case for multi-cycle forecasting (retrain-frequency PT12H, forecast-frequency PT12H → 2 cycles, 2 viewpoints per cycle) with as_job=True 2. Fix search_beliefs call to use most_recent_beliefs_only=False so that all belief times from all cycles are returned 3. Fix forecast count assertion to account for n_cycles when computing expected count (multi-cycle jobs produce n_cycles times more forecasts) 4. Fix as_job section to use pipeline_returns['job_id'] (dict return value from updated run() method) and fix wrap-up job vs single-job detection logic 5. Add regression test test_data_source_committed_before_forecast_jobs: - Demonstrates that flushed-but-not-committed data sources are lost on rollback - Verifies that the merge+commit fix makes data sources findable after expunge - Provides clear debug output of SQLAlchemy instance states --- .../data/tests/test_forecasting_pipeline.py | 137 +++++++++++++++++- 1 file changed, 131 insertions(+), 6 deletions(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 348eff7f9..f4076a808 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -58,6 +58,27 @@ True, None, ), + ( + { + # "model": "CustomLGBM", + "future-regressors": ["irradiance-sensor"], + "train-start": "2025-01-01T00:00+02:00", + "retrain-frequency": "PT12H", + }, + { + "sensor": "solar-sensor", + "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", + "output-path": None, + "start": "2025-01-08T00:00+02:00", # start coincides with end of available data in sensor + "end": "2025-01-09T00:00+02:00", + "sensor-to-save": None, + "max-forecast-horizon": "PT1H", + "forecast-frequency": "PT12H", # 2 cycles and 2 viewpoint + "probabilistic": False, + }, + True, + None, + ), ( { # "model": "CustomLGBM", @@ -176,7 +197,9 @@ def test_train_predict_pipeline( # noqa: C901 app.queues["forecasting"], exc_handler=handle_forecasting_exception ) - forecasts = sensor.search_beliefs(source_types=["forecaster"]) + forecasts = sensor.search_beliefs( + source_types=["forecaster"], most_recent_beliefs_only=False + ) dg_params = pipeline._parameters # parameters stored in the data generator m_viewpoints = (dg_params["end_date"] - dg_params["predict_start"]) / ( dg_params["forecast_frequency"] @@ -184,9 +207,22 @@ def test_train_predict_pipeline( # noqa: C901 # 1 hour of forecasts is saved over 4 15-minute resolution events n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1) + n_cycles = max( + timedelta(hours=dg_params["predict_period_in_hours"]) + // max( + pipeline._config["retrain_frequency"], + pipeline._parameters["forecast_frequency"], + ), + 1, + ) assert ( - len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon - ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" + len(forecasts) + == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles + ), ( + f"we expect {n_events_per_horizon} event(s) per horizon, " + f"{n_hourly_horizons} horizon(s), {m_viewpoints} viewpoint(s)" + f"{f', {n_cycles} cycle(s)' if n_cycles > 1 else ''}" + ) assert ( forecasts.lineage.number_of_belief_times == m_viewpoints ), f"we expect {m_viewpoints} viewpoints" @@ -197,15 +233,18 @@ def test_train_predict_pipeline( # noqa: C901 if as_job: # Fetch returned job - job = app.queues["forecasting"].fetch_job(pipeline_returns) + job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"]) assert ( job is not None ), "a returned job should exist in the forecasting queue" - if job.dependency_ids: - cycle_job_ids = [job] # only one cycle job, no wrap-up job + if not job.dependency_ids: + cycle_job_ids = [job.id] # only one cycle job, no wrap-up job else: + assert ( + job.is_finished + ), f"The wrap-up job should be finished, and not {job.get_status()}" cycle_job_ids = job.kwargs.get("cycle_job_ids", []) # wrap-up job case finished_jobs = app.queues["forecasting"].finished_job_registry @@ -406,3 +445,89 @@ def test_train_period_capped_logs_warning( assert config_used["train_period_in_hours"] == timedelta(days=10) / timedelta( hours=1 ), "train_period_in_hours should be capped to max_training_period" + + +def test_data_source_committed_before_forecast_jobs( + app, + setup_fresh_test_forecast_data, +): + """ + Regression test: verifies that the data source is committed to the database + before forecast jobs are enqueued, so that the worker can find it. + + Root cause: get_or_create_source() only flushes (does not commit) the new DataSource. + If the API request context ends without a commit, the worker cannot find the data source + by ID, causing the forecast job to fail. + + The fix: db.session.merge(self.data_source) + db.session.commit() in train_predict.py + ensures the data source is persisted before jobs are queued. + """ + from sqlalchemy import inspect as sa_inspect + from sqlalchemy import select + from flexmeasures.data import db + from flexmeasures.data.models.data_sources import DataSource + from flexmeasures.data.services.data_sources import get_or_create_source + + with app.app_context(): + # === Part 1: Demonstrate the problem (flushed but not committed) === + + # Create a new data source (similar to what the pipeline does internally) + test_source = get_or_create_source( + source="TestForecaster", + source_type="forecaster", + model="TestModel", + attributes={"data_generator": {"config": {"test": True}}}, + ) + # Flush assigns an ID but does not commit + source_id = test_source.id + assert source_id is not None, "Flush should have assigned an ID" + + # Verify the source is persistent in the current session + insp = sa_inspect(test_source) + print( + f"DEBUG: state after flush - detached={insp.detached}, " + f"persistent={insp.persistent}, pending={insp.pending}" + ) + + # Expunge from session to simulate what happens when session is closed/rolled back + db.session.expunge(test_source) + + insp_after = sa_inspect(test_source) + print(f"DEBUG: state after expunge - detached={insp_after.detached}") + assert insp_after.detached, "After expunge, object should be detached" + + # Now try to find the source in DB - it should NOT be found (was never committed) + db.session.rollback() # Simulate session teardown without commit + found_source = db.session.execute( + select(DataSource).filter_by(id=source_id) + ).scalar_one_or_none() + print(f"DEBUG: found_source after rollback: {found_source}") + assert ( + found_source is None + ), "After rollback, the flushed-but-not-committed source should not be in DB" + + # === Part 2: Demonstrate the fix (merge + commit) === + + # Create same source again + test_source_2 = get_or_create_source( + source="TestForecaster2", + source_type="forecaster", + model="TestModel2", + attributes={"data_generator": {"config": {"test": True}}}, + ) + source_id_2 = test_source_2.id + + # Apply the fix: merge back into session and commit + db.session.merge(test_source_2) + db.session.commit() + + # Now verify the source IS in DB, even after expunge + db.session.expunge_all() + found_source_2 = db.session.execute( + select(DataSource).filter_by(id=source_id_2) + ).scalar_one_or_none() + print(f"DEBUG: found_source_2 after merge+commit: {found_source_2}") + assert ( + found_source_2 is not None + ), "After merge+commit, the source should be findable in DB" + assert found_source_2.id == source_id_2 From 60bedd859fc1ca90902be04c3b18eb62e4e70c6f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 23:04:07 +0000 Subject: [PATCH 4/5] fix: improve comments and assertions from code review - Fix 'viewpoint' -> 'viewpoints' in comment - Add ', and' before n_cycles in assertion message for clarity - Improve comment on db.session.merge() to explain why merge is needed before commit (get_or_create_source flushes but does not commit) --- .../data/models/forecasting/pipelines/train_predict.py | 4 +++- flexmeasures/data/tests/test_forecasting_pipeline.py | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 7d946e903..a8ef20121 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -292,7 +292,9 @@ def run( if as_job: cycle_job_ids = [] - # Ensure the data source ID is available in the database when the job runs. + # Ensure the data source is attached to the current session before + # committing. get_or_create_source() only flushes (does not commit), so + # without this merge the data source would not be found by the worker. db.session.merge(self.data_source) db.session.commit() diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index f4076a808..0dfe1e50e 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -73,7 +73,7 @@ "end": "2025-01-09T00:00+02:00", "sensor-to-save": None, "max-forecast-horizon": "PT1H", - "forecast-frequency": "PT12H", # 2 cycles and 2 viewpoint + "forecast-frequency": "PT12H", # 2 cycles and 2 viewpoints "probabilistic": False, }, True, @@ -221,7 +221,7 @@ def test_train_predict_pipeline( # noqa: C901 ), ( f"we expect {n_events_per_horizon} event(s) per horizon, " f"{n_hourly_horizons} horizon(s), {m_viewpoints} viewpoint(s)" - f"{f', {n_cycles} cycle(s)' if n_cycles > 1 else ''}" + f"{f', and {n_cycles} cycle(s)' if n_cycles > 1 else ''}" ) assert ( forecasts.lineage.number_of_belief_times == m_viewpoints From cbff3b48f0e23959e48cfb3f0f92fb7611c97a8e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 23:09:36 +0000 Subject: [PATCH 5/5] style: apply black formatting to train_predict.py Co-authored-by: Flix6x <30658763+Flix6x@users.noreply.github.com> --- .../forecasting/pipelines/train_predict.py | 45 ++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index a8ef20121..38db50623 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -8,6 +8,7 @@ from datetime import datetime, timedelta from rq.job import Job +from sqlalchemy import inspect as sa_inspect from flask import current_app @@ -45,6 +46,21 @@ def __init__( self.delete_model = delete_model self.return_values = [] # To store forecasts and jobs + @staticmethod + def _reattach_if_needed(obj): + """Re-merge a SQLAlchemy object into the current session if it is detached or expired. + + After ``db.session.commit()``, all objects in the session are expired. + When RQ pickles ``self.run_cycle`` for a worker, expired or detached + objects may raise ``DetachedInstanceError`` on attribute access. This + helper merges such objects back into the active session so they are + usable when the worker executes the job. + """ + insp = sa_inspect(obj) + if insp.detached or insp.expired: + return db.session.merge(obj) + return obj + def run_wrap_up(self, cycle_job_ids: list[str]): """Log the status of all cycle jobs after completion.""" connection = current_app.queues["forecasting"].connection @@ -72,34 +88,21 @@ def run_cycle( # Re-attach sensor objects if they are detached after RQ pickles/unpickles self # (this can happen when a commit expires objects before RQ serializes the job). - sensor = self._parameters["sensor"] - from sqlalchemy import inspect as sa_inspect - - if sa_inspect(sensor).detached or sa_inspect(sensor).expired: - self._parameters["sensor"] = db.session.merge(sensor) + self._parameters["sensor"] = self._reattach_if_needed( + self._parameters["sensor"] + ) sensor_to_save = self._parameters.get("sensor_to_save") if sensor_to_save is not None: - if ( - sa_inspect(sensor_to_save).detached - or sa_inspect(sensor_to_save).expired - ): - self._parameters["sensor_to_save"] = db.session.merge(sensor_to_save) + self._parameters["sensor_to_save"] = self._reattach_if_needed( + sensor_to_save + ) # Also re-attach regressor sensors stored in _config self._config["future_regressors"] = [ - ( - db.session.merge(s) - if (sa_inspect(s).detached or sa_inspect(s).expired) - else s - ) + self._reattach_if_needed(s) for s in self._config.get("future_regressors", []) ] self._config["past_regressors"] = [ - ( - db.session.merge(s) - if (sa_inspect(s).detached or sa_inspect(s).expired) - else s - ) - for s in self._config.get("past_regressors", []) + self._reattach_if_needed(s) for s in self._config.get("past_regressors", []) ] # Train model