Fix detached DataSource and expired sensors when enqueuing forecast jobs#2034
Draft
Fix detached DataSource and expired sensors when enqueuing forecast jobs#2034
Conversation
Documentation build overview
Show files changed (3 files in total): 📝 3 modified | ➕ 0 added | ➖ 0 deleted
|
The TrainPredictPipeline creates a DataSource via get_or_create_source() which only flushes (not commits) the new source. When the RQ worker runs later, the data source may not be found in the DB if the API request's transaction was never committed. Fix: - Add db.session.merge(self.data_source) + db.session.commit() in train_predict.py before queuing forecast jobs to ensure the data source is persisted in the DB before jobs run - After commit, refresh sensor objects to prevent DetachedInstanceError when RQ pickles self.run_cycle for the worker - Re-attach detached sensor objects in run_cycle() at job execution time to handle the case where sensors are expired/detached after unpickling Also fix run_wrap_up to pass the Redis connection to Job.fetch(), and update return values to include both job_id and n_jobs for callers. Update sensors.py and data_add.py to use the new dict return value.
…d fix assertions Changes: 1. Add new parametrize case for multi-cycle forecasting (retrain-frequency PT12H, forecast-frequency PT12H → 2 cycles, 2 viewpoints per cycle) with as_job=True 2. Fix search_beliefs call to use most_recent_beliefs_only=False so that all belief times from all cycles are returned 3. Fix forecast count assertion to account for n_cycles when computing expected count (multi-cycle jobs produce n_cycles times more forecasts) 4. Fix as_job section to use pipeline_returns['job_id'] (dict return value from updated run() method) and fix wrap-up job vs single-job detection logic 5. Add regression test test_data_source_committed_before_forecast_jobs: - Demonstrates that flushed-but-not-committed data sources are lost on rollback - Verifies that the merge+commit fix makes data sources findable after expunge - Provides clear debug output of SQLAlchemy instance states
- Fix 'viewpoint' -> 'viewpoints' in comment - Add ', and' before n_cycles in assertion message for clarity - Improve comment on db.session.merge() to explain why merge is needed before commit (get_or_create_source flushes but does not commit)
Co-authored-by: Flix6x <30658763+Flix6x@users.noreply.github.com>
Copilot
AI
changed the title
[WIP] Debug detached data source issue in forecasting
Fix detached DataSource and expired sensors when enqueuing forecast jobs
Mar 17, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
get_or_create_source()only flushes a newDataSource— it never commits. When the API request context tears down without an explicit commit, the data source row is rolled back. The RQ worker then fails to find it by ID. A secondary issue:db.session.commit()expires all session objects, causing sensor references pickled into RQ jobs (future_regressors,past_regressors,sensor,sensor_to_save) to raiseDetachedInstanceErroron the worker.Fixes
train_predict.pydb.session.merge(self.data_source) + db.session.commit()before enqueuing jobs — ensures the data source row exists in the DB when the worker runsdb.session.refresh(sensor)calls after commit to re-hydrate expired sensor objects before RQ serialises the job_reattach_if_needed()static helper consolidates the repeated merge-if-detached-or-expired pattern used acrosssensor,sensor_to_save,future_regressors, andpast_regressorsinrun_cycle()run_wrap_up():Job.fetch()requires an explicitconnectionargument (was raisingTypeError)run()changed from a bare job ID string to{"job_id": ..., "n_jobs": ...}api/v3_0/sensors.py— consumepipeline_returns["job_id"]cli/data_add.py— use localas_jobvariable (was readingparameters.get("as_job")); consumepipeline_returns["n_jobs"]Tests
test_data_source_committed_before_forecast_jobs: demonstrates the flush-without-commit bug (source disappears after rollback) and verifies the merge+commit fix makes it findable in a fresh queryretrain-frequency: PT12H, 2 cycles,as_job=True) — previously untested code path for wrap-up jobsn_cycles; switchedsearch_beliefstomost_recent_beliefs_only=Falseto count all saved beliefs across cyclesOriginal prompt
💬 Send tasks to Copilot coding agent from Slack and Teams to turn conversations into code. Copilot posts an update in your thread when it's finished.