diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 8e178b478..4fc4ac1ae 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1646,13 +1646,13 @@ def trigger_forecast(self, id: int, **params): # Queue forecasting job try: - job_id = forecaster.compute(parameters=parameters, as_job=True) + pipeline_returns = forecaster.compute(parameters=parameters, as_job=True) except Exception as e: current_app.logger.exception("Forecast job failed to enqueue.") return unprocessable_entity(str(e)) d, s = request_processed() - return dict(forecast=job_id, **d), s + return dict(forecast=pipeline_returns["job_id"], **d), s @route("//forecasts/", methods=["GET"]) @use_kwargs( diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index 48b618dcc..64886cac1 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -1151,8 +1151,8 @@ def add_forecast( # noqa: C901 return # as_job case → list of job dicts like {"job-1": ""} - if parameters.get("as_job"): - n_jobs = len(pipeline_returns) + if as_job: + n_jobs = pipeline_returns["n_jobs"] click.secho(f"Created {n_jobs} forecasting job(s).", **MsgStyle.SUCCESS) return diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 6efb4d0fe..38db50623 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -8,9 +8,11 @@ from datetime import datetime, timedelta from rq.job import Job +from sqlalchemy import inspect as sa_inspect from flask import current_app +from flexmeasures.data import db from flexmeasures.data.models.forecasting import Forecaster from flexmeasures.data.models.forecasting.pipelines.predict import PredictPipeline from flexmeasures.data.models.forecasting.pipelines.train import TrainPipeline @@ -44,12 +46,28 @@ def __init__( self.delete_model = delete_model self.return_values = [] # To store forecasts and jobs + @staticmethod + def _reattach_if_needed(obj): + """Re-merge a SQLAlchemy object into the current session if it is detached or expired. + + After ``db.session.commit()``, all objects in the session are expired. + When RQ pickles ``self.run_cycle`` for a worker, expired or detached + objects may raise ``DetachedInstanceError`` on attribute access. This + helper merges such objects back into the active session so they are + usable when the worker executes the job. + """ + insp = sa_inspect(obj) + if insp.detached or insp.expired: + return db.session.merge(obj) + return obj + def run_wrap_up(self, cycle_job_ids: list[str]): """Log the status of all cycle jobs after completion.""" + connection = current_app.queues["forecasting"].connection + for index, job_id in enumerate(cycle_job_ids): - logging.info( - f"forecasting job-{index}: {job_id} status: {Job.fetch(job_id).get_status()}" - ) + status = Job.fetch(job_id, connection=connection).get_status() + logging.info(f"forecasting job-{index}: {job_id} status: {status}") def run_cycle( self, @@ -68,6 +86,25 @@ def run_cycle( f"Starting Train-Predict cycle from {train_start} to {predict_end}" ) + # Re-attach sensor objects if they are detached after RQ pickles/unpickles self + # (this can happen when a commit expires objects before RQ serializes the job). + self._parameters["sensor"] = self._reattach_if_needed( + self._parameters["sensor"] + ) + sensor_to_save = self._parameters.get("sensor_to_save") + if sensor_to_save is not None: + self._parameters["sensor_to_save"] = self._reattach_if_needed( + sensor_to_save + ) + # Also re-attach regressor sensors stored in _config + self._config["future_regressors"] = [ + self._reattach_if_needed(s) + for s in self._config.get("future_regressors", []) + ] + self._config["past_regressors"] = [ + self._reattach_if_needed(s) for s in self._config.get("past_regressors", []) + ] + # Train model train_pipeline = TrainPipeline( future_regressors=self._config["future_regressors"], @@ -258,6 +295,18 @@ def run( if as_job: cycle_job_ids = [] + # Ensure the data source is attached to the current session before + # committing. get_or_create_source() only flushes (does not commit), so + # without this merge the data source would not be found by the worker. + db.session.merge(self.data_source) + db.session.commit() + + # After commit, SQLAlchemy expires all objects. Refresh sensor objects so + # they have loaded attributes when RQ pickles self.run_cycle for the worker. + db.session.refresh(self._parameters["sensor"]) + if self._parameters.get("sensor_to_save"): + db.session.refresh(self._parameters["sensor_to_save"]) + # job metadata for tracking # Serialize start and end to ISO format strings # Workaround for https://github.com/Parallels/rq-dashboard/issues/510 @@ -315,9 +364,14 @@ def run( if len(cycle_job_ids) > 1: # Return the wrap-up job ID if multiple cycle jobs are queued - return wrap_up_job.id + return {"job_id": wrap_up_job.id, "n_jobs": len(cycle_job_ids)} else: # Return the single cycle job ID if only one job is queued - return cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id + return { + "job_id": ( + cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id + ), + "n_jobs": 1, + } return self.return_values diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 348eff7f9..0dfe1e50e 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -58,6 +58,27 @@ True, None, ), + ( + { + # "model": "CustomLGBM", + "future-regressors": ["irradiance-sensor"], + "train-start": "2025-01-01T00:00+02:00", + "retrain-frequency": "PT12H", + }, + { + "sensor": "solar-sensor", + "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", + "output-path": None, + "start": "2025-01-08T00:00+02:00", # start coincides with end of available data in sensor + "end": "2025-01-09T00:00+02:00", + "sensor-to-save": None, + "max-forecast-horizon": "PT1H", + "forecast-frequency": "PT12H", # 2 cycles and 2 viewpoints + "probabilistic": False, + }, + True, + None, + ), ( { # "model": "CustomLGBM", @@ -176,7 +197,9 @@ def test_train_predict_pipeline( # noqa: C901 app.queues["forecasting"], exc_handler=handle_forecasting_exception ) - forecasts = sensor.search_beliefs(source_types=["forecaster"]) + forecasts = sensor.search_beliefs( + source_types=["forecaster"], most_recent_beliefs_only=False + ) dg_params = pipeline._parameters # parameters stored in the data generator m_viewpoints = (dg_params["end_date"] - dg_params["predict_start"]) / ( dg_params["forecast_frequency"] @@ -184,9 +207,22 @@ def test_train_predict_pipeline( # noqa: C901 # 1 hour of forecasts is saved over 4 15-minute resolution events n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1) + n_cycles = max( + timedelta(hours=dg_params["predict_period_in_hours"]) + // max( + pipeline._config["retrain_frequency"], + pipeline._parameters["forecast_frequency"], + ), + 1, + ) assert ( - len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon - ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" + len(forecasts) + == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles + ), ( + f"we expect {n_events_per_horizon} event(s) per horizon, " + f"{n_hourly_horizons} horizon(s), {m_viewpoints} viewpoint(s)" + f"{f', and {n_cycles} cycle(s)' if n_cycles > 1 else ''}" + ) assert ( forecasts.lineage.number_of_belief_times == m_viewpoints ), f"we expect {m_viewpoints} viewpoints" @@ -197,15 +233,18 @@ def test_train_predict_pipeline( # noqa: C901 if as_job: # Fetch returned job - job = app.queues["forecasting"].fetch_job(pipeline_returns) + job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"]) assert ( job is not None ), "a returned job should exist in the forecasting queue" - if job.dependency_ids: - cycle_job_ids = [job] # only one cycle job, no wrap-up job + if not job.dependency_ids: + cycle_job_ids = [job.id] # only one cycle job, no wrap-up job else: + assert ( + job.is_finished + ), f"The wrap-up job should be finished, and not {job.get_status()}" cycle_job_ids = job.kwargs.get("cycle_job_ids", []) # wrap-up job case finished_jobs = app.queues["forecasting"].finished_job_registry @@ -406,3 +445,89 @@ def test_train_period_capped_logs_warning( assert config_used["train_period_in_hours"] == timedelta(days=10) / timedelta( hours=1 ), "train_period_in_hours should be capped to max_training_period" + + +def test_data_source_committed_before_forecast_jobs( + app, + setup_fresh_test_forecast_data, +): + """ + Regression test: verifies that the data source is committed to the database + before forecast jobs are enqueued, so that the worker can find it. + + Root cause: get_or_create_source() only flushes (does not commit) the new DataSource. + If the API request context ends without a commit, the worker cannot find the data source + by ID, causing the forecast job to fail. + + The fix: db.session.merge(self.data_source) + db.session.commit() in train_predict.py + ensures the data source is persisted before jobs are queued. + """ + from sqlalchemy import inspect as sa_inspect + from sqlalchemy import select + from flexmeasures.data import db + from flexmeasures.data.models.data_sources import DataSource + from flexmeasures.data.services.data_sources import get_or_create_source + + with app.app_context(): + # === Part 1: Demonstrate the problem (flushed but not committed) === + + # Create a new data source (similar to what the pipeline does internally) + test_source = get_or_create_source( + source="TestForecaster", + source_type="forecaster", + model="TestModel", + attributes={"data_generator": {"config": {"test": True}}}, + ) + # Flush assigns an ID but does not commit + source_id = test_source.id + assert source_id is not None, "Flush should have assigned an ID" + + # Verify the source is persistent in the current session + insp = sa_inspect(test_source) + print( + f"DEBUG: state after flush - detached={insp.detached}, " + f"persistent={insp.persistent}, pending={insp.pending}" + ) + + # Expunge from session to simulate what happens when session is closed/rolled back + db.session.expunge(test_source) + + insp_after = sa_inspect(test_source) + print(f"DEBUG: state after expunge - detached={insp_after.detached}") + assert insp_after.detached, "After expunge, object should be detached" + + # Now try to find the source in DB - it should NOT be found (was never committed) + db.session.rollback() # Simulate session teardown without commit + found_source = db.session.execute( + select(DataSource).filter_by(id=source_id) + ).scalar_one_or_none() + print(f"DEBUG: found_source after rollback: {found_source}") + assert ( + found_source is None + ), "After rollback, the flushed-but-not-committed source should not be in DB" + + # === Part 2: Demonstrate the fix (merge + commit) === + + # Create same source again + test_source_2 = get_or_create_source( + source="TestForecaster2", + source_type="forecaster", + model="TestModel2", + attributes={"data_generator": {"config": {"test": True}}}, + ) + source_id_2 = test_source_2.id + + # Apply the fix: merge back into session and commit + db.session.merge(test_source_2) + db.session.commit() + + # Now verify the source IS in DB, even after expunge + db.session.expunge_all() + found_source_2 = db.session.execute( + select(DataSource).filter_by(id=source_id_2) + ).scalar_one_or_none() + print(f"DEBUG: found_source_2 after merge+commit: {found_source_2}") + assert ( + found_source_2 is not None + ), "After merge+commit, the source should be findable in DB" + assert found_source_2.id == source_id_2