Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions flexmeasures/api/v3_0/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -1646,13 +1646,13 @@ def trigger_forecast(self, id: int, **params):

# Queue forecasting job
try:
job_id = forecaster.compute(parameters=parameters, as_job=True)
pipeline_returns = forecaster.compute(parameters=parameters, as_job=True)
except Exception as e:
current_app.logger.exception("Forecast job failed to enqueue.")
return unprocessable_entity(str(e))

d, s = request_processed()
return dict(forecast=job_id, **d), s
return dict(forecast=pipeline_returns["job_id"], **d), s

@route("/<id>/forecasts/<uuid>", methods=["GET"])
@use_kwargs(
Expand Down
4 changes: 2 additions & 2 deletions flexmeasures/cli/data_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -1151,8 +1151,8 @@ def add_forecast( # noqa: C901
return

# as_job case → list of job dicts like {"job-1": "<uuid>"}
if parameters.get("as_job"):
n_jobs = len(pipeline_returns)
if as_job:
n_jobs = pipeline_returns["n_jobs"]
click.secho(f"Created {n_jobs} forecasting job(s).", **MsgStyle.SUCCESS)
return

Expand Down
64 changes: 59 additions & 5 deletions flexmeasures/data/models/forecasting/pipelines/train_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from datetime import datetime, timedelta

from rq.job import Job
from sqlalchemy import inspect as sa_inspect

from flask import current_app

from flexmeasures.data import db
from flexmeasures.data.models.forecasting import Forecaster
from flexmeasures.data.models.forecasting.pipelines.predict import PredictPipeline
from flexmeasures.data.models.forecasting.pipelines.train import TrainPipeline
Expand Down Expand Up @@ -44,12 +46,28 @@ def __init__(
self.delete_model = delete_model
self.return_values = [] # To store forecasts and jobs

@staticmethod
def _reattach_if_needed(obj):
"""Re-merge a SQLAlchemy object into the current session if it is detached or expired.

After ``db.session.commit()``, all objects in the session are expired.
When RQ pickles ``self.run_cycle`` for a worker, expired or detached
objects may raise ``DetachedInstanceError`` on attribute access. This
helper merges such objects back into the active session so they are
usable when the worker executes the job.
"""
insp = sa_inspect(obj)
if insp.detached or insp.expired:
return db.session.merge(obj)
return obj

def run_wrap_up(self, cycle_job_ids: list[str]):
"""Log the status of all cycle jobs after completion."""
connection = current_app.queues["forecasting"].connection

for index, job_id in enumerate(cycle_job_ids):
logging.info(
f"forecasting job-{index}: {job_id} status: {Job.fetch(job_id).get_status()}"
)
status = Job.fetch(job_id, connection=connection).get_status()
logging.info(f"forecasting job-{index}: {job_id} status: {status}")

def run_cycle(
self,
Expand All @@ -68,6 +86,25 @@ def run_cycle(
f"Starting Train-Predict cycle from {train_start} to {predict_end}"
)

# Re-attach sensor objects if they are detached after RQ pickles/unpickles self
# (this can happen when a commit expires objects before RQ serializes the job).
self._parameters["sensor"] = self._reattach_if_needed(
self._parameters["sensor"]
)
sensor_to_save = self._parameters.get("sensor_to_save")
if sensor_to_save is not None:
self._parameters["sensor_to_save"] = self._reattach_if_needed(
sensor_to_save
)
# Also re-attach regressor sensors stored in _config
self._config["future_regressors"] = [
self._reattach_if_needed(s)
for s in self._config.get("future_regressors", [])
]
self._config["past_regressors"] = [
self._reattach_if_needed(s) for s in self._config.get("past_regressors", [])
]

# Train model
train_pipeline = TrainPipeline(
future_regressors=self._config["future_regressors"],
Expand Down Expand Up @@ -258,6 +295,18 @@ def run(
if as_job:
cycle_job_ids = []

# Ensure the data source is attached to the current session before
# committing. get_or_create_source() only flushes (does not commit), so
# without this merge the data source would not be found by the worker.
db.session.merge(self.data_source)
db.session.commit()

# After commit, SQLAlchemy expires all objects. Refresh sensor objects so
# they have loaded attributes when RQ pickles self.run_cycle for the worker.
db.session.refresh(self._parameters["sensor"])
if self._parameters.get("sensor_to_save"):
db.session.refresh(self._parameters["sensor_to_save"])

# job metadata for tracking
# Serialize start and end to ISO format strings
# Workaround for https://github.com/Parallels/rq-dashboard/issues/510
Expand Down Expand Up @@ -315,9 +364,14 @@ def run(

if len(cycle_job_ids) > 1:
# Return the wrap-up job ID if multiple cycle jobs are queued
return wrap_up_job.id
return {"job_id": wrap_up_job.id, "n_jobs": len(cycle_job_ids)}
else:
# Return the single cycle job ID if only one job is queued
return cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id
return {
"job_id": (
cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id
),
"n_jobs": 1,
}

return self.return_values
137 changes: 131 additions & 6 deletions flexmeasures/data/tests/test_forecasting_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,27 @@
True,
None,
),
(
{
# "model": "CustomLGBM",
"future-regressors": ["irradiance-sensor"],
"train-start": "2025-01-01T00:00+02:00",
"retrain-frequency": "PT12H",
},
{
"sensor": "solar-sensor",
"model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models",
"output-path": None,
"start": "2025-01-08T00:00+02:00", # start coincides with end of available data in sensor
"end": "2025-01-09T00:00+02:00",
"sensor-to-save": None,
"max-forecast-horizon": "PT1H",
"forecast-frequency": "PT12H", # 2 cycles and 2 viewpoints
"probabilistic": False,
},
True,
None,
),
(
{
# "model": "CustomLGBM",
Expand Down Expand Up @@ -176,17 +197,32 @@ def test_train_predict_pipeline( # noqa: C901
app.queues["forecasting"], exc_handler=handle_forecasting_exception
)

forecasts = sensor.search_beliefs(source_types=["forecaster"])
forecasts = sensor.search_beliefs(
source_types=["forecaster"], most_recent_beliefs_only=False
)
dg_params = pipeline._parameters # parameters stored in the data generator
m_viewpoints = (dg_params["end_date"] - dg_params["predict_start"]) / (
dg_params["forecast_frequency"]
)
# 1 hour of forecasts is saved over 4 15-minute resolution events
n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution
n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1)
n_cycles = max(
timedelta(hours=dg_params["predict_period_in_hours"])
// max(
pipeline._config["retrain_frequency"],
pipeline._parameters["forecast_frequency"],
),
1,
)
assert (
len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon
), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons"
len(forecasts)
== m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles
), (
f"we expect {n_events_per_horizon} event(s) per horizon, "
f"{n_hourly_horizons} horizon(s), {m_viewpoints} viewpoint(s)"
f"{f', and {n_cycles} cycle(s)' if n_cycles > 1 else ''}"
)
assert (
forecasts.lineage.number_of_belief_times == m_viewpoints
), f"we expect {m_viewpoints} viewpoints"
Expand All @@ -197,15 +233,18 @@ def test_train_predict_pipeline( # noqa: C901

if as_job:
# Fetch returned job
job = app.queues["forecasting"].fetch_job(pipeline_returns)
job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"])

assert (
job is not None
), "a returned job should exist in the forecasting queue"

if job.dependency_ids:
cycle_job_ids = [job] # only one cycle job, no wrap-up job
if not job.dependency_ids:
cycle_job_ids = [job.id] # only one cycle job, no wrap-up job
else:
assert (
job.is_finished
), f"The wrap-up job should be finished, and not {job.get_status()}"
cycle_job_ids = job.kwargs.get("cycle_job_ids", []) # wrap-up job case

finished_jobs = app.queues["forecasting"].finished_job_registry
Expand Down Expand Up @@ -406,3 +445,89 @@ def test_train_period_capped_logs_warning(
assert config_used["train_period_in_hours"] == timedelta(days=10) / timedelta(
hours=1
), "train_period_in_hours should be capped to max_training_period"


def test_data_source_committed_before_forecast_jobs(
app,
setup_fresh_test_forecast_data,
):
"""
Regression test: verifies that the data source is committed to the database
before forecast jobs are enqueued, so that the worker can find it.

Root cause: get_or_create_source() only flushes (does not commit) the new DataSource.
If the API request context ends without a commit, the worker cannot find the data source
by ID, causing the forecast job to fail.

The fix: db.session.merge(self.data_source) + db.session.commit() in train_predict.py
ensures the data source is persisted before jobs are queued.
"""
from sqlalchemy import inspect as sa_inspect
from sqlalchemy import select
from flexmeasures.data import db
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.services.data_sources import get_or_create_source

with app.app_context():
# === Part 1: Demonstrate the problem (flushed but not committed) ===

# Create a new data source (similar to what the pipeline does internally)
test_source = get_or_create_source(
source="TestForecaster",
source_type="forecaster",
model="TestModel",
attributes={"data_generator": {"config": {"test": True}}},
)
# Flush assigns an ID but does not commit
source_id = test_source.id
assert source_id is not None, "Flush should have assigned an ID"

# Verify the source is persistent in the current session
insp = sa_inspect(test_source)
print(
f"DEBUG: state after flush - detached={insp.detached}, "
f"persistent={insp.persistent}, pending={insp.pending}"
)

# Expunge from session to simulate what happens when session is closed/rolled back
db.session.expunge(test_source)

insp_after = sa_inspect(test_source)
print(f"DEBUG: state after expunge - detached={insp_after.detached}")
assert insp_after.detached, "After expunge, object should be detached"

# Now try to find the source in DB - it should NOT be found (was never committed)
db.session.rollback() # Simulate session teardown without commit
found_source = db.session.execute(
select(DataSource).filter_by(id=source_id)
).scalar_one_or_none()
print(f"DEBUG: found_source after rollback: {found_source}")
assert (
found_source is None
), "After rollback, the flushed-but-not-committed source should not be in DB"

# === Part 2: Demonstrate the fix (merge + commit) ===

# Create same source again
test_source_2 = get_or_create_source(
source="TestForecaster2",
source_type="forecaster",
model="TestModel2",
attributes={"data_generator": {"config": {"test": True}}},
)
source_id_2 = test_source_2.id

# Apply the fix: merge back into session and commit
db.session.merge(test_source_2)
db.session.commit()

# Now verify the source IS in DB, even after expunge
db.session.expunge_all()
found_source_2 = db.session.execute(
select(DataSource).filter_by(id=source_id_2)
).scalar_one_or_none()
print(f"DEBUG: found_source_2 after merge+commit: {found_source_2}")
assert (
found_source_2 is not None
), "After merge+commit, the source should be findable in DB"
assert found_source_2.id == source_id_2