Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
921d958
fix: enhance job ID return structure in TrainPredictPipeline
BelhsanHmida Mar 9, 2026
b7068fa
fix: update forecasting job return structure in SensorAPI
BelhsanHmida Mar 9, 2026
505c284
fix: update job fetching logic in test_train_predict_pipeline
BelhsanHmida Mar 9, 2026
10b370a
chore: remove commented-out breakpoint in test_forecasting.py
BelhsanHmida Mar 9, 2026
6342867
fix: as_job is no longer in parameters
BelhsanHmida Mar 9, 2026
cc1bf1b
fix: update job count retrieval in add_forecast function
BelhsanHmida Mar 9, 2026
a7ff4a9
fix: add connection queue to fetch job
BelhsanHmida Mar 9, 2026
745e513
style: black
Flix6x Mar 9, 2026
47f160d
docs: changelog entry
Flix6x Mar 9, 2026
3af2c62
feat: check if wrap-up job actually finished rather than failed
Flix6x Mar 9, 2026
474b860
feat: add test case for 2 cycles, yielding 2 jobs and a wrap-up job
Flix6x Mar 9, 2026
7f824a7
dev: comment out failing assert, which needs to be investgated and up…
Flix6x Mar 9, 2026
29705a2
refactor: move checking the status of the wrap-up job to where it mat…
Flix6x Mar 9, 2026
f26c41b
fix: use job ID itself in case the returned job is the one existing c…
Flix6x Mar 9, 2026
f326efc
fix: add db.commit before forecasting jobs are created
BelhsanHmida Mar 10, 2026
67862ed
dev: uncomment test assertion statement
BelhsanHmida Mar 10, 2026
64465e9
Test(feat): search all beliefs forecasts saved into the sensor by the…
BelhsanHmida Mar 10, 2026
4ac1846
test(feat): add n_cycles variable to use to account for length of for…
BelhsanHmida Mar 10, 2026
bdc5e28
style: run pre-commit
BelhsanHmida Mar 10, 2026
5340350
fix: improve assertion message in test_train_predict_pipeline for cla…
BelhsanHmida Mar 10, 2026
8271e28
Merge branch 'main' into fix/small-forecasting-pipeline-fixes
BelhsanHmida Mar 11, 2026
60a85b3
Merge branch 'main' into fix/small-forecasting-pipeline-fixes
BelhsanHmida Mar 11, 2026
4ebaa97
fix: first create all jobs, then queue all jobs, giving the db.sessio…
Flix6x Mar 17, 2026
888b980
feat: enqueue job only after the transactional request
Flix6x Mar 17, 2026
56f825a
Revert "feat: enqueue job only after the transactional request"
Flix6x Mar 17, 2026
e41c1a5
docs: resolve silent merge conflict in changelog
Flix6x Mar 17, 2026
9085b9e
Merge remote-tracking branch 'origin/main' into fix/small-forecasting…
Flix6x Mar 17, 2026
064f9cb
docs: delete duplicate changelog entry
Flix6x Mar 16, 2026
7ff82d1
docs: add release date for v0.31.2
Flix6x Mar 17, 2026
657b0c0
Merge remote-tracking branch 'origin/main' into fix/small-forecasting…
Flix6x Mar 17, 2026
ff78988
docs: advance a different bugfix to v0.31.2
Flix6x Mar 17, 2026
4ed641a
fix: self.data_source found itself in a different session somehow, so…
Flix6x Mar 17, 2026
d276c8a
Revert "fix: first create all jobs, then queue all jobs, giving the d…
Flix6x Mar 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions documentation/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,25 @@ New features
* Show sensor attributes on sensor page, if not empty [see `PR #2015 <https://www.github.com/FlexMeasures/flexmeasures/pull/2015>`_]
* Support saving state-of-charge schedules to sensors with ``"%"`` unit, using the ``soc-max`` flex-model field as the capacity for unit conversion [see `PR #1996 <https://www.github.com/FlexMeasures/flexmeasures/pull/1996>`_]

Bugfixes
-----------
* Stop failure in the API endpoint that lists available endpoints (/api/v3_0/) [see `PR #2032 <https://github.com/FlexMeasures/flexmeasures/pull/2032>`_]

Infrastructure / Support
----------------------
* Migrate from ``pip`` to ``uv`` for dependency management [see `PR #1973 <https://github.com/FlexMeasures/flexmeasures/pull/1973>`_]
* Migrate from ``make`` to ``poe`` for running tasks [see `PR #1973 <https://github.com/FlexMeasures/flexmeasures/pull/1973>`_]
* Make the test environment used by agents and by the test workflow identical [see `PR #1998 <https://www.github.com/FlexMeasures/flexmeasures/pull/1998>`_]
* Improve contact information to get in touch with the FlexMeasures community [see `PR #2022 <https://www.github.com/FlexMeasures/flexmeasures/pull/2022>`_]

Bugfixes
-----------

v0.31.2 | March XX, 2026

v0.31.2 | March 17, 2026
============================

Bugfixes
-----------
* Fix an issue where asset context was accessed in schemas that do not define a ``context`` attribute [see `PR #2014 <https://www.github.com/FlexMeasures/flexmeasures/pull/2014>`_]
* Fix wrap-up forecasting job [see `PR #2011 <https://www.github.com/FlexMeasures/flexmeasures/pull/2011>`_]
* Stop failure in the API endpoint that lists available endpoints (/api/v3_0/) [see `PR #2032 <https://github.com/FlexMeasures/flexmeasures/pull/2032>`_]


v0.31.1 | March 6, 2026
Expand Down Expand Up @@ -76,7 +77,6 @@ New features
* Improved the UX for creating sensors, clicking on ``Enter`` now validates and creates a sensor [see `PR #1876 <https://www.github.com/FlexMeasures/flexmeasures/pull/1876>`_]
* Show zero values in bar charts even though they have 0 area [see `PR #1932 <https://www.github.com/FlexMeasures/flexmeasures/pull/1932>`_ and `PR #1936 <https://www.github.com/FlexMeasures/flexmeasures/pull/1936>`_]
* Added ``root`` and ``depth`` fields to the `[GET] /assets` endpoint for listing assets, to allow selecting descendants of a given root asset up to a given depth [see `PR #1874 <https://www.github.com/FlexMeasures/flexmeasures/pull/1874>`_]
* Give ability to edit sensor timezone from the UI [see `PR #1900 <https://www.github.com/FlexMeasures/flexmeasures/pull/1900>`_]
* Support creating schedules with only information known prior to some time, now also via the CLI (the API already supported it) [see `PR #1871 <https://www.github.com/FlexMeasures/flexmeasures/pull/1871>`_].
* Added capability to update an asset's parent from the UI [`PR #1957 <https://www.github.com/FlexMeasures/flexmeasures/pull/1957>`_]
* Add ``fields`` param to the asset-listing endpoints, to save bandwidth in response data [see `PR #1884 <https://www.github.com/FlexMeasures/flexmeasures/pull/1884>`_]
Expand Down
4 changes: 2 additions & 2 deletions flexmeasures/api/v3_0/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -1646,13 +1646,13 @@ def trigger_forecast(self, id: int, **params):

# Queue forecasting job
try:
job_id = forecaster.compute(parameters=parameters, as_job=True)
pipeline_returns = forecaster.compute(parameters=parameters, as_job=True)
except Exception as e:
current_app.logger.exception("Forecast job failed to enqueue.")
return unprocessable_entity(str(e))

d, s = request_processed()
return dict(forecast=job_id, **d), s
return dict(forecast=pipeline_returns["job_id"], **d), s

@route("/<id>/forecasts/<uuid>", methods=["GET"])
@use_kwargs(
Expand Down
4 changes: 2 additions & 2 deletions flexmeasures/cli/data_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -1151,8 +1151,8 @@ def add_forecast( # noqa: C901
return

# as_job case → list of job dicts like {"job-1": "<uuid>"}
if parameters.get("as_job"):
n_jobs = len(pipeline_returns)
if as_job:
n_jobs = pipeline_returns["n_jobs"]
click.secho(f"Created {n_jobs} forecasting job(s).", **MsgStyle.SUCCESS)
return

Expand Down
21 changes: 16 additions & 5 deletions flexmeasures/data/models/forecasting/pipelines/train_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from flask import current_app

from flexmeasures.data import db
from flexmeasures.data.models.forecasting import Forecaster
from flexmeasures.data.models.forecasting.pipelines.predict import PredictPipeline
from flexmeasures.data.models.forecasting.pipelines.train import TrainPipeline
Expand Down Expand Up @@ -46,10 +47,11 @@ def __init__(

def run_wrap_up(self, cycle_job_ids: list[str]):
"""Log the status of all cycle jobs after completion."""
connection = current_app.queues["forecasting"].connection

for index, job_id in enumerate(cycle_job_ids):
logging.info(
f"forecasting job-{index}: {job_id} status: {Job.fetch(job_id).get_status()}"
)
status = Job.fetch(job_id, connection=connection).get_status()
logging.info(f"forecasting job-{index}: {job_id} status: {status}")

def run_cycle(
self,
Expand Down Expand Up @@ -258,6 +260,10 @@ def run(
if as_job:
cycle_job_ids = []

# Ensure the data source ID is available in the database when the job runs.
db.session.merge(self.data_source)
db.session.commit()

# job metadata for tracking
# Serialize start and end to ISO format strings
# Workaround for https://github.com/Parallels/rq-dashboard/issues/510
Expand Down Expand Up @@ -315,9 +321,14 @@ def run(

if len(cycle_job_ids) > 1:
# Return the wrap-up job ID if multiple cycle jobs are queued
return wrap_up_job.id
return {"job_id": wrap_up_job.id, "n_jobs": len(cycle_job_ids)}
else:
# Return the single cycle job ID if only one job is queued
return cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id
return {
"job_id": (
cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id
),
"n_jobs": 1,
}

return self.return_values
1 change: 0 additions & 1 deletion flexmeasures/data/schemas/tests/test_forecasting.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ def test_timing_parameters_of_forecaster_parameters_schema(
**timing_input,
}
)
# breakpoint()
for k, v in expected_timing_output.items():
# Convert kebab-case key to snake_case to match data dictionary keys returned by schema
snake_key = kebab_to_snake(k)
Expand Down
52 changes: 46 additions & 6 deletions flexmeasures/data/tests/test_forecasting_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,27 @@
True,
None,
),
(
{
# "model": "CustomLGBM",
"future-regressors": ["irradiance-sensor"],
"train-start": "2025-01-01T00:00+02:00",
"retrain-frequency": "PT12H",
},
{
"sensor": "solar-sensor",
"model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models",
"output-path": None,
"start": "2025-01-08T00:00+02:00", # start coincides with end of available data in sensor
"end": "2025-01-09T00:00+02:00",
"sensor-to-save": None,
"max-forecast-horizon": "PT1H",
"forecast-frequency": "PT12H", # 2 cycles and 2 viewpoint
"probabilistic": False,
},
True,
None,
),
(
{
# "model": "CustomLGBM",
Expand Down Expand Up @@ -176,17 +197,33 @@ def test_train_predict_pipeline( # noqa: C901
app.queues["forecasting"], exc_handler=handle_forecasting_exception
)

forecasts = sensor.search_beliefs(source_types=["forecaster"])
forecasts = sensor.search_beliefs(
source_types=["forecaster"], most_recent_beliefs_only=False
)

dg_params = pipeline._parameters # parameters stored in the data generator
m_viewpoints = (dg_params["end_date"] - dg_params["predict_start"]) / (
dg_params["forecast_frequency"]
)
# 1 hour of forecasts is saved over 4 15-minute resolution events
n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution
n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1)
n_cycles = max(
timedelta(hours=dg_params["predict_period_in_hours"])
// max(
pipeline._config["retrain_frequency"],
pipeline._parameters["forecast_frequency"],
),
1,
)
assert (
len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon
), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons"
len(forecasts)
== m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles
), (
f"we expect {n_events_per_horizon} event(s) per horizon, "
f"{n_hourly_horizons} horizon(s), {m_viewpoints} viewpoint(s)"
f"{f', {n_cycles} cycle(s)' if n_cycles > 1 else ''}"
)
assert (
forecasts.lineage.number_of_belief_times == m_viewpoints
), f"we expect {m_viewpoints} viewpoints"
Expand All @@ -197,15 +234,18 @@ def test_train_predict_pipeline( # noqa: C901

if as_job:
# Fetch returned job
job = app.queues["forecasting"].fetch_job(pipeline_returns)
job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"])

assert (
job is not None
), "a returned job should exist in the forecasting queue"

if job.dependency_ids:
cycle_job_ids = [job] # only one cycle job, no wrap-up job
if not job.dependency_ids:
cycle_job_ids = [job.id] # only one cycle job, no wrap-up job
else:
assert (
job.is_finished
), f"The wrap-up job should be finished, and not {job.get_status()}"
cycle_job_ids = job.kwargs.get("cycle_job_ids", []) # wrap-up job case

finished_jobs = app.queues["forecasting"].finished_job_registry
Expand Down
Loading