diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 694ae8fda..1dc8c2f34 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -13,10 +13,6 @@ New features * Show sensor attributes on sensor page, if not empty [see `PR #2015 `_] * Support saving state-of-charge schedules to sensors with ``"%"`` unit, using the ``soc-max`` flex-model field as the capacity for unit conversion [see `PR #1996 `_] -Bugfixes ------------ -* Stop failure in the API endpoint that lists available endpoints (/api/v3_0/) [see `PR #2032 `_] - Infrastructure / Support ---------------------- * Migrate from ``pip`` to ``uv`` for dependency management [see `PR #1973 `_] @@ -24,13 +20,18 @@ Infrastructure / Support * Make the test environment used by agents and by the test workflow identical [see `PR #1998 `_] * Improve contact information to get in touch with the FlexMeasures community [see `PR #2022 `_] +Bugfixes +----------- -v0.31.2 | March XX, 2026 + +v0.31.2 | March 17, 2026 ============================ Bugfixes ----------- * Fix an issue where asset context was accessed in schemas that do not define a ``context`` attribute [see `PR #2014 `_] +* Fix wrap-up forecasting job [see `PR #2011 `_] +* Stop failure in the API endpoint that lists available endpoints (/api/v3_0/) [see `PR #2032 `_] v0.31.1 | March 6, 2026 @@ -76,7 +77,6 @@ New features * Improved the UX for creating sensors, clicking on ``Enter`` now validates and creates a sensor [see `PR #1876 `_] * Show zero values in bar charts even though they have 0 area [see `PR #1932 `_ and `PR #1936 `_] * Added ``root`` and ``depth`` fields to the `[GET] /assets` endpoint for listing assets, to allow selecting descendants of a given root asset up to a given depth [see `PR #1874 `_] -* Give ability to edit sensor timezone from the UI [see `PR #1900 `_] * Support creating schedules with only information known prior to some time, now also via the CLI (the API already supported it) [see `PR #1871 `_]. * Added capability to update an asset's parent from the UI [`PR #1957 `_] * Add ``fields`` param to the asset-listing endpoints, to save bandwidth in response data [see `PR #1884 `_] diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 8e178b478..4fc4ac1ae 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1646,13 +1646,13 @@ def trigger_forecast(self, id: int, **params): # Queue forecasting job try: - job_id = forecaster.compute(parameters=parameters, as_job=True) + pipeline_returns = forecaster.compute(parameters=parameters, as_job=True) except Exception as e: current_app.logger.exception("Forecast job failed to enqueue.") return unprocessable_entity(str(e)) d, s = request_processed() - return dict(forecast=job_id, **d), s + return dict(forecast=pipeline_returns["job_id"], **d), s @route("//forecasts/", methods=["GET"]) @use_kwargs( diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index 48b618dcc..64886cac1 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -1151,8 +1151,8 @@ def add_forecast( # noqa: C901 return # as_job case → list of job dicts like {"job-1": ""} - if parameters.get("as_job"): - n_jobs = len(pipeline_returns) + if as_job: + n_jobs = pipeline_returns["n_jobs"] click.secho(f"Created {n_jobs} forecasting job(s).", **MsgStyle.SUCCESS) return diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 6efb4d0fe..b21001fa0 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -8,9 +8,11 @@ from datetime import datetime, timedelta from rq.job import Job +from sqlalchemy import inspect as sa_inspect from flask import current_app +from flexmeasures.data import db from flexmeasures.data.models.forecasting import Forecaster from flexmeasures.data.models.forecasting.pipelines.predict import PredictPipeline from flexmeasures.data.models.forecasting.pipelines.train import TrainPipeline @@ -44,12 +46,28 @@ def __init__( self.delete_model = delete_model self.return_values = [] # To store forecasts and jobs + @staticmethod + def _reattach_if_needed(obj): + """Re-merge a SQLAlchemy object into the current session if it is detached or expired. + + After ``db.session.commit()``, all objects in the session are expired. + When RQ pickles ``self.run_cycle`` for a worker, expired or detached + objects may raise ``DetachedInstanceError`` on attribute access. This + helper merges such objects back into the active session so they are + usable when the worker executes the job. + """ + insp = sa_inspect(obj) + if insp.detached or insp.expired: + return db.session.merge(obj) + return obj + def run_wrap_up(self, cycle_job_ids: list[str]): """Log the status of all cycle jobs after completion.""" + connection = current_app.queues["forecasting"].connection + for index, job_id in enumerate(cycle_job_ids): - logging.info( - f"forecasting job-{index}: {job_id} status: {Job.fetch(job_id).get_status()}" - ) + status = Job.fetch(job_id, connection=connection).get_status() + logging.info(f"forecasting job-{index}: {job_id} status: {status}") def run_cycle( self, @@ -68,6 +86,25 @@ def run_cycle( f"Starting Train-Predict cycle from {train_start} to {predict_end}" ) + # Re-attach sensor objects if they are detached after RQ pickles/unpickles self + # (this can happen when a commit expires objects before RQ serializes the job). + self._parameters["sensor"] = self._reattach_if_needed( + self._parameters["sensor"] + ) + sensor_to_save = self._parameters.get("sensor_to_save") + if sensor_to_save is not None: + self._parameters["sensor_to_save"] = self._reattach_if_needed( + sensor_to_save + ) + # Also re-attach regressor sensors stored in _config + self._config["future_regressors"] = [ + self._reattach_if_needed(s) + for s in self._config.get("future_regressors", []) + ] + self._config["past_regressors"] = [ + self._reattach_if_needed(s) for s in self._config.get("past_regressors", []) + ] + # Train model train_pipeline = TrainPipeline( future_regressors=self._config["future_regressors"], @@ -258,6 +295,12 @@ def run( if as_job: cycle_job_ids = [] + # Ensure the data source is attached to the current session before + # committing. get_or_create_source() only flushes (does not commit), so + # without this merge the data source would not be found by the worker. + db.session.merge(self.data_source) + db.session.commit() + # job metadata for tracking # Serialize start and end to ISO format strings # Workaround for https://github.com/Parallels/rq-dashboard/issues/510 @@ -315,9 +358,14 @@ def run( if len(cycle_job_ids) > 1: # Return the wrap-up job ID if multiple cycle jobs are queued - return wrap_up_job.id + return {"job_id": wrap_up_job.id, "n_jobs": len(cycle_job_ids)} else: # Return the single cycle job ID if only one job is queued - return cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id + return { + "job_id": ( + cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id + ), + "n_jobs": 1, + } return self.return_values diff --git a/flexmeasures/data/schemas/tests/test_forecasting.py b/flexmeasures/data/schemas/tests/test_forecasting.py index efb8a19e7..1ccdaf900 100644 --- a/flexmeasures/data/schemas/tests/test_forecasting.py +++ b/flexmeasures/data/schemas/tests/test_forecasting.py @@ -439,7 +439,6 @@ def test_timing_parameters_of_forecaster_parameters_schema( **timing_input, } ) - # breakpoint() for k, v in expected_timing_output.items(): # Convert kebab-case key to snake_case to match data dictionary keys returned by schema snake_key = kebab_to_snake(k) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 348eff7f9..4b727361d 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -58,6 +58,27 @@ True, None, ), + ( + { + # "model": "CustomLGBM", + "future-regressors": ["irradiance-sensor"], + "train-start": "2025-01-01T00:00+02:00", + "retrain-frequency": "PT12H", + }, + { + "sensor": "solar-sensor", + "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", + "output-path": None, + "start": "2025-01-08T00:00+02:00", # start coincides with end of available data in sensor + "end": "2025-01-09T00:00+02:00", + "sensor-to-save": None, + "max-forecast-horizon": "PT1H", + "forecast-frequency": "PT12H", # 2 cycles and 2 viewpoints + "probabilistic": False, + }, + True, + None, + ), ( { # "model": "CustomLGBM", @@ -176,36 +197,19 @@ def test_train_predict_pipeline( # noqa: C901 app.queues["forecasting"], exc_handler=handle_forecasting_exception ) - forecasts = sensor.search_beliefs(source_types=["forecaster"]) - dg_params = pipeline._parameters # parameters stored in the data generator - m_viewpoints = (dg_params["end_date"] - dg_params["predict_start"]) / ( - dg_params["forecast_frequency"] - ) - # 1 hour of forecasts is saved over 4 15-minute resolution events - n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution - n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1) - assert ( - len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon - ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" - assert ( - forecasts.lineage.number_of_belief_times == m_viewpoints - ), f"we expect {m_viewpoints} viewpoints" - source = forecasts.lineage.sources[0] - assert "TrainPredictPipeline" in str( - source - ), "string representation of the Forecaster (DataSource) should mention the used model" - - if as_job: # Fetch returned job - job = app.queues["forecasting"].fetch_job(pipeline_returns) + job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"]) assert ( job is not None ), "a returned job should exist in the forecasting queue" - if job.dependency_ids: - cycle_job_ids = [job] # only one cycle job, no wrap-up job + if not job.dependency_ids: + cycle_job_ids = [job.id] # only one cycle job, no wrap-up job else: + assert ( + job.is_finished + ), f"The wrap-up job should be finished, and not {job.get_status()}" cycle_job_ids = job.kwargs.get("cycle_job_ids", []) # wrap-up job case finished_jobs = app.queues["forecasting"].finished_job_registry @@ -217,7 +221,42 @@ def test_train_predict_pipeline( # noqa: C901 job_id in finished_jobs ), f"Job {job_id} should be in the finished registry" - else: + forecasts = sensor.search_beliefs( + source_types=["forecaster"], most_recent_beliefs_only=False + ) + + dg_params = pipeline._parameters # parameters stored in the data generator + m_viewpoints = (dg_params["end_date"] - dg_params["predict_start"]) / ( + dg_params["forecast_frequency"] + ) + # 1 hour of forecasts is saved over 4 15-minute resolution events + n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution + n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1) + n_cycles = max( + timedelta(hours=dg_params["predict_period_in_hours"]) + // max( + pipeline._config["retrain_frequency"], + pipeline._parameters["forecast_frequency"], + ), + 1, + ) + assert ( + len(forecasts) + == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles + ), ( + f"we expect {n_events_per_horizon} event(s) per horizon, " + f"{n_hourly_horizons} horizon(s), {m_viewpoints} viewpoint(s)" + f"{f', and {n_cycles} cycle(s)' if n_cycles > 1 else ''}" + ) + assert ( + forecasts.lineage.number_of_belief_times == m_viewpoints + ), f"we expect {m_viewpoints} viewpoints" + source = forecasts.lineage.sources[0] + assert "TrainPredictPipeline" in str( + source + ), "string representation of the Forecaster (DataSource) should mention the used model" + + if not as_job: # Sync case: pipeline returns a non-empty list assert ( isinstance(pipeline_returns, list) and len(pipeline_returns) > 0