From 921d958da7d120996ce6e9ec27f236273ae06771 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 10:37:00 +0100 Subject: [PATCH 01/33] fix: enhance job ID return structure in TrainPredictPipeline Signed-off-by: Mohamed Belhsan Hmida --- .../data/models/forecasting/pipelines/train_predict.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 6efb4d0fe..6db583339 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -315,9 +315,9 @@ def run( if len(cycle_job_ids) > 1: # Return the wrap-up job ID if multiple cycle jobs are queued - return wrap_up_job.id + return {"job_id" : wrap_up_job.id, "n_jobs": len(cycle_job_ids)} else: # Return the single cycle job ID if only one job is queued - return cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id + return {"job_id" : cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id, "n_jobs": 1} return self.return_values From b7068faab4a93ae4a2202f6942e6386d4092f2e5 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 10:37:18 +0100 Subject: [PATCH 02/33] fix: update forecasting job return structure in SensorAPI Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/api/v3_0/sensors.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 1f64cd464..371f052b0 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1630,13 +1630,13 @@ def trigger_forecast(self, id: int, **params): # Queue forecasting job try: - job_id = forecaster.compute(parameters=parameters, as_job=True) + pipeline_returns = forecaster.compute(parameters=parameters, as_job=True) except Exception as e: current_app.logger.exception("Forecast job failed to enqueue.") return unprocessable_entity(str(e)) d, s = request_processed() - return dict(forecast=job_id, **d), s + return dict(forecast=pipeline_returns["job_id"], **d), s @route("//forecasts/", methods=["GET"]) @use_kwargs( From 505c2840e4cfa750fc43a551744334f5e7273c12 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 10:37:49 +0100 Subject: [PATCH 03/33] fix: update job fetching logic in test_train_predict_pipeline Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_forecasting_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 348eff7f9..ea2ed42c2 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -197,7 +197,7 @@ def test_train_predict_pipeline( # noqa: C901 if as_job: # Fetch returned job - job = app.queues["forecasting"].fetch_job(pipeline_returns) + job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"]) assert ( job is not None From 10b370a65f9dcfeb79e47a44aa60852994cb7ce6 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 10:38:21 +0100 Subject: [PATCH 04/33] chore: remove commented-out breakpoint in test_forecasting.py Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/schemas/tests/test_forecasting.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flexmeasures/data/schemas/tests/test_forecasting.py b/flexmeasures/data/schemas/tests/test_forecasting.py index efb8a19e7..1ccdaf900 100644 --- a/flexmeasures/data/schemas/tests/test_forecasting.py +++ b/flexmeasures/data/schemas/tests/test_forecasting.py @@ -439,7 +439,6 @@ def test_timing_parameters_of_forecaster_parameters_schema( **timing_input, } ) - # breakpoint() for k, v in expected_timing_output.items(): # Convert kebab-case key to snake_case to match data dictionary keys returned by schema snake_key = kebab_to_snake(k) From 6342867d15c62ab91ded7499cd6a8a0697e5f69a Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 10:39:33 +0100 Subject: [PATCH 05/33] fix: as_job is no longer in parameters Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/cli/data_add.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index 48b618dcc..be5df0c73 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -1151,7 +1151,7 @@ def add_forecast( # noqa: C901 return # as_job case → list of job dicts like {"job-1": ""} - if parameters.get("as_job"): + if as_job: n_jobs = len(pipeline_returns) click.secho(f"Created {n_jobs} forecasting job(s).", **MsgStyle.SUCCESS) return From cc1bf1bf1e440c8583a3d06fe11832a6e606b9fb Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 10:40:05 +0100 Subject: [PATCH 06/33] fix: update job count retrieval in add_forecast function Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/cli/data_add.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index be5df0c73..64886cac1 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -1152,7 +1152,7 @@ def add_forecast( # noqa: C901 # as_job case → list of job dicts like {"job-1": ""} if as_job: - n_jobs = len(pipeline_returns) + n_jobs = pipeline_returns["n_jobs"] click.secho(f"Created {n_jobs} forecasting job(s).", **MsgStyle.SUCCESS) return From a7ff4a9cc7375bd77d8318231b45aef2bd4b8b53 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 11:02:31 +0100 Subject: [PATCH 07/33] fix: add connection queue to fetch job Signed-off-by: Mohamed Belhsan Hmida --- .../data/models/forecasting/pipelines/train_predict.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 6db583339..958357fe6 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -46,9 +46,12 @@ def __init__( def run_wrap_up(self, cycle_job_ids: list[str]): """Log the status of all cycle jobs after completion.""" + connection = current_app.queues["forecasting"].connection + for index, job_id in enumerate(cycle_job_ids): + status = Job.fetch(job_id, connection=connection).get_status() logging.info( - f"forecasting job-{index}: {job_id} status: {Job.fetch(job_id).get_status()}" + f"forecasting job-{index}: {job_id} status: {status}" ) def run_cycle( From 745e5137669e9c6cd1e43eafd4619709c79ab063 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:22:17 +0100 Subject: [PATCH 08/33] style: black Signed-off-by: F.N. Claessen --- .../models/forecasting/pipelines/train_predict.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 958357fe6..1d9578a86 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -50,9 +50,7 @@ def run_wrap_up(self, cycle_job_ids: list[str]): for index, job_id in enumerate(cycle_job_ids): status = Job.fetch(job_id, connection=connection).get_status() - logging.info( - f"forecasting job-{index}: {job_id} status: {status}" - ) + logging.info(f"forecasting job-{index}: {job_id} status: {status}") def run_cycle( self, @@ -318,9 +316,14 @@ def run( if len(cycle_job_ids) > 1: # Return the wrap-up job ID if multiple cycle jobs are queued - return {"job_id" : wrap_up_job.id, "n_jobs": len(cycle_job_ids)} + return {"job_id": wrap_up_job.id, "n_jobs": len(cycle_job_ids)} else: # Return the single cycle job ID if only one job is queued - return {"job_id" : cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id, "n_jobs": 1} + return { + "job_id": ( + cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id + ), + "n_jobs": 1, + } return self.return_values From 47f160dbbfc5248c5304f8aa363fd7744f6a9745 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:23:31 +0100 Subject: [PATCH 09/33] docs: changelog entry Signed-off-by: F.N. Claessen --- documentation/changelog.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 989fab9d0..cfe072fc1 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -18,6 +18,14 @@ Bugfixes ----------- +v0.31.2 | March XX, 2026 +============================ + +Bugfixes +----------- +* Fix wrap-up forecasting job [see `PR #2011 `_] + + v0.31.1 | March 6, 2026 ============================ From 3af2c6264b6865c5d6ef8aaa04197a32928be2fe Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:30:27 +0100 Subject: [PATCH 10/33] feat: check if wrap-up job actually finished rather than failed Signed-off-by: F.N. Claessen --- flexmeasures/data/tests/test_forecasting_pipeline.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index ea2ed42c2..7f185bd51 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -202,6 +202,9 @@ def test_train_predict_pipeline( # noqa: C901 assert ( job is not None ), "a returned job should exist in the forecasting queue" + assert ( + job.is_finished + ), f"The wrap-up job should be finished, and not {job.get_status()}" if job.dependency_ids: cycle_job_ids = [job] # only one cycle job, no wrap-up job From 474b860379e868b3ffd9f83f549dbf37e957f8f6 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:44:35 +0100 Subject: [PATCH 11/33] feat: add test case for 2 cycles, yielding 2 jobs and a wrap-up job Signed-off-by: F.N. Claessen --- .../data/tests/test_forecasting_pipeline.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 7f185bd51..4451f8f59 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -58,6 +58,27 @@ True, None, ), + ( + { + # "model": "CustomLGBM", + "future-regressors": ["irradiance-sensor"], + "train-start": "2025-01-01T00:00+02:00", + "retrain-frequency": "PT12H", + }, + { + "sensor": "solar-sensor", + "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", + "output-path": None, + "start": "2025-01-08T00:00+02:00", # start coincides with end of available data in sensor + "end": "2025-01-09T00:00+02:00", + "sensor-to-save": None, + "max-forecast-horizon": "PT1H", + "forecast-frequency": "PT12H", # 2 cycles and 2 viewpoint + "probabilistic": False, + }, + True, + None, + ), ( { # "model": "CustomLGBM", From 7f824a7c754db58a92180f4e437af558e7c40417 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:45:36 +0100 Subject: [PATCH 12/33] dev: comment out failing assert, which needs to be investgated and updated Signed-off-by: F.N. Claessen --- flexmeasures/data/tests/test_forecasting_pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 4451f8f59..868ea78e4 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -205,9 +205,9 @@ def test_train_predict_pipeline( # noqa: C901 # 1 hour of forecasts is saved over 4 15-minute resolution events n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1) - assert ( - len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon - ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" + # assert ( + # len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon + # ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" assert ( forecasts.lineage.number_of_belief_times == m_viewpoints ), f"we expect {m_viewpoints} viewpoints" From 29705a2a0b5c0f498bc3dfb40e39bc2af396f26f Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:46:50 +0100 Subject: [PATCH 13/33] refactor: move checking the status of the wrap-up job to where it matters Signed-off-by: F.N. Claessen --- flexmeasures/data/tests/test_forecasting_pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 868ea78e4..eaf093c3f 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -223,13 +223,13 @@ def test_train_predict_pipeline( # noqa: C901 assert ( job is not None ), "a returned job should exist in the forecasting queue" - assert ( - job.is_finished - ), f"The wrap-up job should be finished, and not {job.get_status()}" if job.dependency_ids: cycle_job_ids = [job] # only one cycle job, no wrap-up job else: + assert ( + job.is_finished + ), f"The wrap-up job should be finished, and not {job.get_status()}" cycle_job_ids = job.kwargs.get("cycle_job_ids", []) # wrap-up job case finished_jobs = app.queues["forecasting"].finished_job_registry From f26c41b47c0f8965c20c3b9f2df8e2ee68723390 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:55:09 +0100 Subject: [PATCH 14/33] fix: use job ID itself in case the returned job is the one existing cycle job Signed-off-by: F.N. Claessen --- flexmeasures/data/tests/test_forecasting_pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index eaf093c3f..de20f9b07 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -224,8 +224,8 @@ def test_train_predict_pipeline( # noqa: C901 job is not None ), "a returned job should exist in the forecasting queue" - if job.dependency_ids: - cycle_job_ids = [job] # only one cycle job, no wrap-up job + if not job.dependency_ids: + cycle_job_ids = [job.id] # only one cycle job, no wrap-up job else: assert ( job.is_finished From f326efc043673a29f4c8abe285c2b5b12251b448 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Tue, 10 Mar 2026 02:56:54 +0100 Subject: [PATCH 15/33] fix: add db.commit before forecasting jobs are created Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/models/forecasting/pipelines/train_predict.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 1d9578a86..2f933ec7e 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -11,6 +11,7 @@ from flask import current_app +from flexmeasures.data import db from flexmeasures.data.models.forecasting import Forecaster from flexmeasures.data.models.forecasting.pipelines.predict import PredictPipeline from flexmeasures.data.models.forecasting.pipelines.train import TrainPipeline @@ -262,6 +263,7 @@ def run( # job metadata for tracking # Serialize start and end to ISO format strings # Workaround for https://github.com/Parallels/rq-dashboard/issues/510 + db.session.commit() # Ensure the data source ID is available in the database when the job runs. job_metadata = { "data_source_info": {"id": self.data_source.id}, "start": self._parameters["predict_start"].isoformat(), From 67862ed9c7996e69c4cdaa403e1993af57fa8979 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 11 Mar 2026 00:43:28 +0100 Subject: [PATCH 16/33] dev: uncomment test assertion statement Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_forecasting_pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index de20f9b07..794716876 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -205,9 +205,9 @@ def test_train_predict_pipeline( # noqa: C901 # 1 hour of forecasts is saved over 4 15-minute resolution events n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1) - # assert ( - # len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon - # ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" + assert ( + len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles + ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" assert ( forecasts.lineage.number_of_belief_times == m_viewpoints ), f"we expect {m_viewpoints} viewpoints" From 64465e94ea0fe8768e472bf7f23fdca877c954e3 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 11 Mar 2026 00:44:26 +0100 Subject: [PATCH 17/33] Test(feat): search all beliefs forecasts saved into the sensor by the pipeline Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_forecasting_pipeline.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 794716876..8c43a8759 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -197,7 +197,10 @@ def test_train_predict_pipeline( # noqa: C901 app.queues["forecasting"], exc_handler=handle_forecasting_exception ) - forecasts = sensor.search_beliefs(source_types=["forecaster"]) + forecasts = sensor.search_beliefs( + source_types=["forecaster"], most_recent_beliefs_only=False + ) + dg_params = pipeline._parameters # parameters stored in the data generator m_viewpoints = (dg_params["end_date"] - dg_params["predict_start"]) / ( dg_params["forecast_frequency"] From 4ac18462c1f3eaed9bb0be3442b143027b447d1d Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 11 Mar 2026 00:45:17 +0100 Subject: [PATCH 18/33] test(feat): add n_cycles variable to use to account for length of forecasts Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_forecasting_pipeline.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 8c43a8759..a551f1992 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -208,6 +208,14 @@ def test_train_predict_pipeline( # noqa: C901 # 1 hour of forecasts is saved over 4 15-minute resolution events n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1) + n_cycles = max( + timedelta(hours=dg_params["predict_period_in_hours"]) + // max( + pipeline._config["retrain_frequency"], + pipeline._parameters["forecast_frequency"], + ), + 1, + ) assert ( len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" From bdc5e28c025940360bad93cc610e9efcbf436c7e Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 11 Mar 2026 00:49:29 +0100 Subject: [PATCH 19/33] style: run pre-commit Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_forecasting_pipeline.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index a551f1992..7d208f612 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -217,7 +217,8 @@ def test_train_predict_pipeline( # noqa: C901 1, ) assert ( - len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles + len(forecasts) + == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" assert ( forecasts.lineage.number_of_belief_times == m_viewpoints From 5340350c844ab6990005ebe0803c1bb9d5131738 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 11 Mar 2026 00:57:22 +0100 Subject: [PATCH 20/33] fix: improve assertion message in test_train_predict_pipeline for clarity Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_forecasting_pipeline.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 7d208f612..da72e2d4e 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -219,7 +219,11 @@ def test_train_predict_pipeline( # noqa: C901 assert ( len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles - ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" + ), ( + f"we expect {n_events_per_horizon} event(s) per horizon, " + f"{n_hourly_horizons} horizon(s), {m_viewpoints} viewpoint(s)" + f"{f', {n_cycles} cycle(s)' if n_cycles > 1 else ''}" + ) assert ( forecasts.lineage.number_of_belief_times == m_viewpoints ), f"we expect {m_viewpoints} viewpoints" From 4ebaa9768aebbaabe3f71b00aa8003272f04fc42 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 15:57:31 +0100 Subject: [PATCH 21/33] fix: first create all jobs, then queue all jobs, giving the db.session.commit() some time to finish writing a new source to the db Signed-off-by: F.N. Claessen --- .../data/models/forecasting/pipelines/train_predict.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 2f933ec7e..6eca725cb 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -270,6 +270,7 @@ def run( "end": self._parameters["end_date"].isoformat(), "sensor_id": self._parameters["sensor_to_save"].id, } + jobs = [] for cycle_params in cycles_job_params: job = Job.create( @@ -294,7 +295,7 @@ def run( # Store the job ID for this cycle cycle_job_ids.append(job.id) - current_app.queues[queue].enqueue_job(job) + jobs.append(job) current_app.job_cache.add( self._parameters["sensor"].id, job_id=job.id, @@ -314,6 +315,8 @@ def run( ), meta=job_metadata, ) + for job in jobs: + current_app.queues[queue].enqueue_job(job) current_app.queues[queue].enqueue_job(wrap_up_job) if len(cycle_job_ids) > 1: From 888b98079c492352d76e46f3eabf52f3acbe4194 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 15:59:10 +0100 Subject: [PATCH 22/33] feat: enqueue job only after the transactional request Signed-off-by: F.N. Claessen --- .../models/forecasting/pipelines/train_predict.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 6eca725cb..38456f9df 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -7,10 +7,9 @@ import logging from datetime import datetime, timedelta +from flask import after_this_request, current_app from rq.job import Job -from flask import current_app - from flexmeasures.data import db from flexmeasures.data.models.forecasting import Forecaster from flexmeasures.data.models.forecasting.pipelines.predict import PredictPipeline @@ -315,9 +314,14 @@ def run( ), meta=job_metadata, ) - for job in jobs: - current_app.queues[queue].enqueue_job(job) - current_app.queues[queue].enqueue_job(wrap_up_job) + + @after_this_request + def enqueue_job(response): + """After the request, RQ jobs get to see committed data from the transaction.""" + for job in jobs: + current_app.queues[queue].enqueue_job(job) + current_app.queues[queue].enqueue_job(wrap_up_job) + return response if len(cycle_job_ids) > 1: # Return the wrap-up job ID if multiple cycle jobs are queued From 56f825a393c49c0e4e09d7f831c337c01b40ae0c Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 17:20:52 +0100 Subject: [PATCH 23/33] Revert "feat: enqueue job only after the transactional request" This reverts commit 888b98079c492352d76e46f3eabf52f3acbe4194. Signed-off-by: F.N. Claessen --- .../models/forecasting/pipelines/train_predict.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 38456f9df..6eca725cb 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -7,9 +7,10 @@ import logging from datetime import datetime, timedelta -from flask import after_this_request, current_app from rq.job import Job +from flask import current_app + from flexmeasures.data import db from flexmeasures.data.models.forecasting import Forecaster from flexmeasures.data.models.forecasting.pipelines.predict import PredictPipeline @@ -314,14 +315,9 @@ def run( ), meta=job_metadata, ) - - @after_this_request - def enqueue_job(response): - """After the request, RQ jobs get to see committed data from the transaction.""" - for job in jobs: - current_app.queues[queue].enqueue_job(job) - current_app.queues[queue].enqueue_job(wrap_up_job) - return response + for job in jobs: + current_app.queues[queue].enqueue_job(job) + current_app.queues[queue].enqueue_job(wrap_up_job) if len(cycle_job_ids) > 1: # Return the wrap-up job ID if multiple cycle jobs are queued From e41c1a56c41e2f669a53abb8d87982574c5a727c Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 17:23:11 +0100 Subject: [PATCH 24/33] docs: resolve silent merge conflict in changelog Signed-off-by: F.N. Claessen --- documentation/changelog.rst | 7 ------- 1 file changed, 7 deletions(-) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 5aa0382a5..b1626ea73 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -23,13 +23,6 @@ v0.31.2 | March XX, 2026 Bugfixes ----------- * Fix an issue where asset context was accessed in schemas that do not define a ``context`` attribute [see `PR #2014 `_] - - -v0.31.2 | March XX, 2026 -============================ - -Bugfixes ------------ * Fix wrap-up forecasting job [see `PR #2011 `_] From 064f9cb0531c85c6827706b948e4a33c624f8e8f Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 16 Mar 2026 13:00:41 +0100 Subject: [PATCH 25/33] docs: delete duplicate changelog entry Signed-off-by: F.N. Claessen --- documentation/changelog.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index d34bb23eb..e444f06d8 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -72,7 +72,6 @@ New features * Improved the UX for creating sensors, clicking on ``Enter`` now validates and creates a sensor [see `PR #1876 `_] * Show zero values in bar charts even though they have 0 area [see `PR #1932 `_ and `PR #1936 `_] * Added ``root`` and ``depth`` fields to the `[GET] /assets` endpoint for listing assets, to allow selecting descendants of a given root asset up to a given depth [see `PR #1874 `_] -* Give ability to edit sensor timezone from the UI [see `PR #1900 `_] * Support creating schedules with only information known prior to some time, now also via the CLI (the API already supported it) [see `PR #1871 `_]. * Added capability to update an asset's parent from the UI [`PR #1957 `_] * Add ``fields`` param to the asset-listing endpoints, to save bandwidth in response data [see `PR #1884 `_] From 7ff82d14774ba214472e8fe30f80d362743c625e Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 17:27:39 +0100 Subject: [PATCH 26/33] docs: add release date for v0.31.2 Signed-off-by: F.N. Claessen --- documentation/changelog.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index e444f06d8..441dc152f 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -20,7 +20,7 @@ Infrastructure / Support * Improve contact information to get in touch with the FlexMeasures community [see `PR #2022 `_] -v0.31.2 | March XX, 2026 +v0.31.2 | March 17, 2026 ============================ Bugfixes From ff789883b8f4be2b97ceebd766268196f16565ed Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 17:29:24 +0100 Subject: [PATCH 27/33] docs: advance a different bugfix to v0.31.2 Signed-off-by: F.N. Claessen --- documentation/changelog.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index c352a02c5..1dc8c2f34 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -13,10 +13,6 @@ New features * Show sensor attributes on sensor page, if not empty [see `PR #2015 `_] * Support saving state-of-charge schedules to sensors with ``"%"`` unit, using the ``soc-max`` flex-model field as the capacity for unit conversion [see `PR #1996 `_] -Bugfixes ------------ -* Stop failure in the API endpoint that lists available endpoints (/api/v3_0/) [see `PR #2032 `_] - Infrastructure / Support ---------------------- * Migrate from ``pip`` to ``uv`` for dependency management [see `PR #1973 `_] @@ -24,6 +20,9 @@ Infrastructure / Support * Make the test environment used by agents and by the test workflow identical [see `PR #1998 `_] * Improve contact information to get in touch with the FlexMeasures community [see `PR #2022 `_] +Bugfixes +----------- + v0.31.2 | March 17, 2026 ============================ @@ -32,6 +31,7 @@ Bugfixes ----------- * Fix an issue where asset context was accessed in schemas that do not define a ``context`` attribute [see `PR #2014 `_] * Fix wrap-up forecasting job [see `PR #2011 `_] +* Stop failure in the API endpoint that lists available endpoints (/api/v3_0/) [see `PR #2032 `_] v0.31.1 | March 6, 2026 From 4ed641a79a98c5c150c954faf67413d9102d4776 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 20:54:50 +0100 Subject: [PATCH 28/33] fix: self.data_source found itself in a different session somehow, so we put it back into the session we are committing Signed-off-by: F.N. Claessen --- .../data/models/forecasting/pipelines/train_predict.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 6eca725cb..5b016bdba 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -260,10 +260,13 @@ def run( if as_job: cycle_job_ids = [] + # Ensure the data source ID is available in the database when the job runs. + db.session.merge(self.data_source) + db.session.commit() + # job metadata for tracking # Serialize start and end to ISO format strings # Workaround for https://github.com/Parallels/rq-dashboard/issues/510 - db.session.commit() # Ensure the data source ID is available in the database when the job runs. job_metadata = { "data_source_info": {"id": self.data_source.id}, "start": self._parameters["predict_start"].isoformat(), From d276c8a2a64bbd172bbaa6fbcce9bac567b9106d Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 20:56:44 +0100 Subject: [PATCH 29/33] Revert "fix: first create all jobs, then queue all jobs, giving the db.session.commit() some time to finish writing a new source to the db" This reverts commit 4ebaa9768aebbaabe3f71b00aa8003272f04fc42. Signed-off-by: F.N. Claessen --- .../data/models/forecasting/pipelines/train_predict.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 5b016bdba..955eb5a01 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -273,7 +273,6 @@ def run( "end": self._parameters["end_date"].isoformat(), "sensor_id": self._parameters["sensor_to_save"].id, } - jobs = [] for cycle_params in cycles_job_params: job = Job.create( @@ -298,7 +297,7 @@ def run( # Store the job ID for this cycle cycle_job_ids.append(job.id) - jobs.append(job) + current_app.queues[queue].enqueue_job(job) current_app.job_cache.add( self._parameters["sensor"].id, job_id=job.id, @@ -318,8 +317,6 @@ def run( ), meta=job_metadata, ) - for job in jobs: - current_app.queues[queue].enqueue_job(job) current_app.queues[queue].enqueue_job(wrap_up_job) if len(cycle_job_ids) > 1: From d710365de26419070b558332351da203977f4ecd Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 21:26:02 +0100 Subject: [PATCH 30/33] refactor: move asserts to where they matter Signed-off-by: F.N. Claessen --- .../data/tests/test_forecasting_pipeline.py | 51 +++++++++---------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index da72e2d4e..bae39ff3c 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -197,6 +197,30 @@ def test_train_predict_pipeline( # noqa: C901 app.queues["forecasting"], exc_handler=handle_forecasting_exception ) + # Fetch returned job + job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"]) + + assert ( + job is not None + ), "a returned job should exist in the forecasting queue" + + if not job.dependency_ids: + cycle_job_ids = [job.id] # only one cycle job, no wrap-up job + else: + assert ( + job.is_finished + ), f"The wrap-up job should be finished, and not {job.get_status()}" + cycle_job_ids = job.kwargs.get("cycle_job_ids", []) # wrap-up job case + + finished_jobs = app.queues["forecasting"].finished_job_registry + + for job_id in cycle_job_ids: + job = app.queues["forecasting"].fetch_job(job_id) + assert job is not None, f"Job {job_id} should exist" + assert ( + job_id in finished_jobs + ), f"Job {job_id} should be in the finished registry" + forecasts = sensor.search_beliefs( source_types=["forecaster"], most_recent_beliefs_only=False ) @@ -232,32 +256,7 @@ def test_train_predict_pipeline( # noqa: C901 source ), "string representation of the Forecaster (DataSource) should mention the used model" - if as_job: - # Fetch returned job - job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"]) - - assert ( - job is not None - ), "a returned job should exist in the forecasting queue" - - if not job.dependency_ids: - cycle_job_ids = [job.id] # only one cycle job, no wrap-up job - else: - assert ( - job.is_finished - ), f"The wrap-up job should be finished, and not {job.get_status()}" - cycle_job_ids = job.kwargs.get("cycle_job_ids", []) # wrap-up job case - - finished_jobs = app.queues["forecasting"].finished_job_registry - - for job_id in cycle_job_ids: - job = app.queues["forecasting"].fetch_job(job_id) - assert job is not None, f"Job {job_id} should exist" - assert ( - job_id in finished_jobs - ), f"Job {job_id} should be in the finished registry" - - else: + if not as_job: # Sync case: pipeline returns a non-empty list assert ( isinstance(pipeline_returns, list) and len(pipeline_returns) > 0 From a6d09bbfa750b38789311d05d89d49b2b2a532f8 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 18 Mar 2026 09:26:16 +0100 Subject: [PATCH 31/33] fix: cherry-pick from d40246fa64cf78e0df325019ce6a547b12bfdc74 Signed-off-by: F.N. Claessen --- .../forecasting/pipelines/train_predict.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 955eb5a01..8c3a92ab0 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -70,6 +70,38 @@ def run_cycle( f"Starting Train-Predict cycle from {train_start} to {predict_end}" ) + # Re-attach sensor objects if they are detached after RQ pickles/unpickles self + # (this can happen when a commit expires objects before RQ serializes the job). + sensor = self._parameters["sensor"] + from sqlalchemy import inspect as sa_inspect + + if sa_inspect(sensor).detached or sa_inspect(sensor).expired: + self._parameters["sensor"] = db.session.merge(sensor) + sensor_to_save = self._parameters.get("sensor_to_save") + if sensor_to_save is not None: + if ( + sa_inspect(sensor_to_save).detached + or sa_inspect(sensor_to_save).expired + ): + self._parameters["sensor_to_save"] = db.session.merge(sensor_to_save) + # Also re-attach regressor sensors stored in _config + self._config["future_regressors"] = [ + ( + db.session.merge(s) + if (sa_inspect(s).detached or sa_inspect(s).expired) + else s + ) + for s in self._config.get("future_regressors", []) + ] + self._config["past_regressors"] = [ + ( + db.session.merge(s) + if (sa_inspect(s).detached or sa_inspect(s).expired) + else s + ) + for s in self._config.get("past_regressors", []) + ] + # Train model train_pipeline = TrainPipeline( future_regressors=self._config["future_regressors"], From a05a7e8df332f24848f9f1e9d3bd946b186712cc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 23:04:07 +0000 Subject: [PATCH 32/33] fix: improve comments and assertions from code review - Fix 'viewpoint' -> 'viewpoints' in comment - Add ', and' before n_cycles in assertion message for clarity - Improve comment on db.session.merge() to explain why merge is needed before commit (get_or_create_source flushes but does not commit) --- .../data/models/forecasting/pipelines/train_predict.py | 4 +++- flexmeasures/data/tests/test_forecasting_pipeline.py | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 8c3a92ab0..d86e7c6d0 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -292,7 +292,9 @@ def run( if as_job: cycle_job_ids = [] - # Ensure the data source ID is available in the database when the job runs. + # Ensure the data source is attached to the current session before + # committing. get_or_create_source() only flushes (does not commit), so + # without this merge the data source would not be found by the worker. db.session.merge(self.data_source) db.session.commit() diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index bae39ff3c..4b727361d 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -73,7 +73,7 @@ "end": "2025-01-09T00:00+02:00", "sensor-to-save": None, "max-forecast-horizon": "PT1H", - "forecast-frequency": "PT12H", # 2 cycles and 2 viewpoint + "forecast-frequency": "PT12H", # 2 cycles and 2 viewpoints "probabilistic": False, }, True, @@ -246,7 +246,7 @@ def test_train_predict_pipeline( # noqa: C901 ), ( f"we expect {n_events_per_horizon} event(s) per horizon, " f"{n_hourly_horizons} horizon(s), {m_viewpoints} viewpoint(s)" - f"{f', {n_cycles} cycle(s)' if n_cycles > 1 else ''}" + f"{f', and {n_cycles} cycle(s)' if n_cycles > 1 else ''}" ) assert ( forecasts.lineage.number_of_belief_times == m_viewpoints From 39351a3a767a1e858b3b19680a976f1acdf2ec29 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 23:09:36 +0000 Subject: [PATCH 33/33] refactor: move reattachment of sensor objects to session into a class method; cherry-picked from cbff3b48f0e23959e48cfb3f0f92fb7611c97a8e Co-authored-by: Flix6x <30658763+Flix6x@users.noreply.github.com> --- .../forecasting/pipelines/train_predict.py | 45 ++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index d86e7c6d0..b21001fa0 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -8,6 +8,7 @@ from datetime import datetime, timedelta from rq.job import Job +from sqlalchemy import inspect as sa_inspect from flask import current_app @@ -45,6 +46,21 @@ def __init__( self.delete_model = delete_model self.return_values = [] # To store forecasts and jobs + @staticmethod + def _reattach_if_needed(obj): + """Re-merge a SQLAlchemy object into the current session if it is detached or expired. + + After ``db.session.commit()``, all objects in the session are expired. + When RQ pickles ``self.run_cycle`` for a worker, expired or detached + objects may raise ``DetachedInstanceError`` on attribute access. This + helper merges such objects back into the active session so they are + usable when the worker executes the job. + """ + insp = sa_inspect(obj) + if insp.detached or insp.expired: + return db.session.merge(obj) + return obj + def run_wrap_up(self, cycle_job_ids: list[str]): """Log the status of all cycle jobs after completion.""" connection = current_app.queues["forecasting"].connection @@ -72,34 +88,21 @@ def run_cycle( # Re-attach sensor objects if they are detached after RQ pickles/unpickles self # (this can happen when a commit expires objects before RQ serializes the job). - sensor = self._parameters["sensor"] - from sqlalchemy import inspect as sa_inspect - - if sa_inspect(sensor).detached or sa_inspect(sensor).expired: - self._parameters["sensor"] = db.session.merge(sensor) + self._parameters["sensor"] = self._reattach_if_needed( + self._parameters["sensor"] + ) sensor_to_save = self._parameters.get("sensor_to_save") if sensor_to_save is not None: - if ( - sa_inspect(sensor_to_save).detached - or sa_inspect(sensor_to_save).expired - ): - self._parameters["sensor_to_save"] = db.session.merge(sensor_to_save) + self._parameters["sensor_to_save"] = self._reattach_if_needed( + sensor_to_save + ) # Also re-attach regressor sensors stored in _config self._config["future_regressors"] = [ - ( - db.session.merge(s) - if (sa_inspect(s).detached or sa_inspect(s).expired) - else s - ) + self._reattach_if_needed(s) for s in self._config.get("future_regressors", []) ] self._config["past_regressors"] = [ - ( - db.session.merge(s) - if (sa_inspect(s).detached or sa_inspect(s).expired) - else s - ) - for s in self._config.get("past_regressors", []) + self._reattach_if_needed(s) for s in self._config.get("past_regressors", []) ] # Train model