Skip to content
Open
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<!-- internal dependencies -->
<dependencies.version>2025.6.25</dependencies.version>
<step-grid.version>2.5.0</step-grid.version>
<step-framework.version>2.5.0</step-framework.version>
<step-framework.version>2025.12.10-693957f1a52e5331621ac16a</step-framework.version>

<!-- external, non-transitive, dependencies -->
<dep.groovy.version>3.0.23</dep.groovy.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class TimeSeriesControllerPlugin extends AbstractControllerPlugin {
public static final String PARAM_KEY_ANALYTICS_DASHBOARD_ID = "plugins.timeseries.analytics.dashboard.id";
public static final String PARAM_KEY_RESPONSE_IDEAL_INTERVALS = "timeseries.response.intervals.ideal";
public static final String PARAM_KEY_RESPONSE_MAX_INTERVALS = "timeseries.response.intervals.max";
public static final String RESOLUTION_PERIOD_PROPERTY = "plugins.timeseries.resolution.period";

public static final String EXECUTION_DASHBOARD_PREPOPULATED_NAME = "Execution Dashboard";
public static final String ANALYTICS_DASHBOARD_PREPOPULATED_NAME = "Analytics Dashboard";
Expand Down Expand Up @@ -111,7 +112,7 @@ public void serverStart(GlobalContext context) {

WebApplicationConfigurationManager configurationManager = context.require(WebApplicationConfigurationManager.class);
// Following property is used by the UI. We could align its name with the configuration property in the future
configurationManager.registerHook(s -> Map.of("plugins.timeseries.resolution.period", String.valueOf(timeSeries.getDefaultCollection().getResolution())));
configurationManager.registerHook(s -> Map.of(RESOLUTION_PERIOD_PROPERTY, String.valueOf(timeSeries.getDefaultCollection().getResolution())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
@Plugin
public class ControllerSettingPlugin extends AbstractControllerPlugin {

public static final String SETTINGS = "settings";
private ControllerSettingAccessor controllerSettingAccessor;

@Override
public void serverStart(GlobalContext context) throws Exception {
controllerSettingAccessor = new ControllerSettingAccessorImpl(
context.getCollectionFactory().getCollection("settings", ControllerSetting.class));
context.getCollectionFactory().getCollection(SETTINGS, ControllerSetting.class));
context.put(ControllerSettingAccessor.class, controllerSettingAccessor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import ch.exense.commons.app.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import step.controller.services.async.AsyncTaskManager;
import step.core.Controller;
import step.core.GlobalContext;
import step.core.artefacts.reports.aggregated.ReportNodeTimeSeries;
import step.core.controller.errorhandling.ErrorFilter;
import step.core.deployment.ControllerServices;
import step.core.execution.model.Execution;
Expand Down Expand Up @@ -124,6 +126,17 @@ public void recover(GlobalContext context) throws Exception {
@Override
public void finalizeStart(GlobalContext context) throws Exception {
context.require(ExecutionScheduler.class).start();
//Initialize new empty resolutions after start (require the async task manager)
//Because the ReportNodeTimeSeries is created in ControllerServer.init directly and not in a plugin, this the right place to do it
ReportNodeTimeSeries reportNodeTimeSeries = context.require(ReportNodeTimeSeries.class);
AsyncTaskManager asyncTaskManager = context.require(AsyncTaskManager.class);
asyncTaskManager.scheduleAsyncTask((empty) -> {
logger.info("ReportNode timeSeries ingestion for empty resolutions has started");
reportNodeTimeSeries.getTimeSeries().ingestDataForEmptyCollections();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I couldn't find the place where this was called previously. Wasn't this called?
  • Is it safe to do it asynchronously? I assume the controller would start and executions could start creating data too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the same logic implemented for the time-series (response times) but it did not exists for the reportNodeTimeSeries.
Reinsgesting such new resolutions can take quite some time and I would say doing it async is the only option. Since empty resolutions are determined before any new execution can be triggered and a dedicated ingestion pipeline is used it should be safe However the filter currently used to re-ingest data it too permissive and could cause to re-ingest "new" buckets (created while re-ingesting). I will update the filter to make sure begin < controller_start_time (in ingestDataForEmptyCollections)

Filter filter = (Filter)(collection.getTtl() > 0L ? Filters.gte("begin", System.currentTimeMillis() - collection.getTtl()) : Filters.empty()); try (Stream<Bucket> bucketStream = previousCollection.findLazy(filter, searchOrder)) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification. Agree that it should be done asynchronously. There's however a few things we should think about:

  • Ensure the re-ingestion is not interrupted: We could explicitly write in the logs that the controller shouldn't be restarted during the re-ingestion to avoid incomplete re-ingestions.
  • Ensure new data are not re-ingested: not sure if the condition begin < controller_start_time would be enough for large resolution. In theory the last bucket could fulfill this condition and be used for new executions while it is being re-ingested. Right?
  • In any case, we should benchmark the re-ingestion for large data sets

logger.info("ReportNode timeSeries ingestion for empty resolutions has finished");
return null;
});

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void serverStart(GlobalContext context) throws Exception {
migrationManager.register(V27_4_DropResolvedPlanNodesIndexForPSQLMigrationTask.class);
migrationManager.register(V28_0_FixEmptyDefaultMavenSettingsMigrationTask.class);
migrationManager.register(V29_0_UpdateAutomationPackageModel.class);
migrationManager.register(V29_1_UpdateTimeSeriesCollectionsAndSettings.class);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* ******************************************************************************
* * Copyright (C) 2020, exense GmbH
* *
* * This file is part of STEP
* *
* * STEP is free software: you can redistribute it and/or modify
* * it under the terms of the GNU Affero General Public License as published by
* * the Free Software Foundation, either version 3 of the License, or
* * (at your option) any later version.
* *
* * STEP is distributed in the hope that it will be useful,
* * but WITHOUT ANY WARRANTY; without even the implied warranty of
* * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* * GNU Affero General Public License for more details.
* *
* * You should have received a copy of the GNU Affero General Public License
* * along with STEP. If not, see <http://www.gnu.org/licenses/>.
* *****************************************************************************
*/
package step.migration.tasks;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import step.core.Version;
import step.core.collections.Collection;
import step.core.collections.CollectionFactory;
import step.core.collections.Document;
import step.core.collections.Filters;
import step.core.timeseries.Resolution;
import step.migration.MigrationContext;
import step.migration.MigrationTask;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import static step.core.controller.ControllerSettingPlugin.SETTINGS;

public class V29_1_UpdateTimeSeriesCollectionsAndSettings extends MigrationTask {

public static final String TIME_SERIES_MAIN_COLLECTION = "timeseries";
public static final String TIME_SERIES_MAIN_COLLECTION_NEW_NAME = "timeseries_5_seconds";
public static final String TIME_SERIES_MAIN_COLLECTION_REPORTS = "reportNodeTimeSeries";
public static final String TIME_SERIES_MAIN_COLLECTION_REPORTS_NEW_NAME = "reportNodeTimeSeries_5_seconds";
public static final String HOUSEKEEPING_TIME_SERIES_DEFAULT_TTL = "housekeeping_time_series_default_ttl";
public static final String HOUSEKEEPING_TIME_SERIES_PER_MINUTE_TTL = "housekeeping_time_series_per_minute_ttl";
public static final String HOUSEKEEPING_TIME_SERIES_HOURLY_TTL = "housekeeping_time_series_hourly_ttl";
public static final String HOUSEKEEPING_TIME_SERIES_DAILY_TTL = "housekeeping_time_series_daily_ttl";
public static final String HOUSEKEEPING_TIME_SERIES_WEEKLY_TTL = "housekeeping_time_series_weekly_ttl";
public static final String HOUSEKEEPING_TIME_SERIES_TTL_PREFIX = "housekeeping_time_series_";
public static final String HOUSEKEEPING_TIME_SERIES_TTL_SUFFIX = "_ttl";

private static final Logger log = LoggerFactory.getLogger(V29_1_UpdateTimeSeriesCollectionsAndSettings.class);
private final Collection<Document> timeseriesCollection;
private final Collection<Document> settings;
private final Collection<Document> reportNodeTimeseriesCollection;
protected AtomicInteger successCount;

public V29_1_UpdateTimeSeriesCollectionsAndSettings(CollectionFactory collectionFactory, MigrationContext migrationContext) {
super(new Version(3,29,1), collectionFactory, migrationContext);
timeseriesCollection = collectionFactory.getCollection(TIME_SERIES_MAIN_COLLECTION, Document.class);
reportNodeTimeseriesCollection = collectionFactory.getCollection(TIME_SERIES_MAIN_COLLECTION_REPORTS, Document.class);
settings = collectionFactory.getCollection(SETTINGS, Document.class);
}

@Override
public void runUpgradeScript() {
log.info("Renaming the 'main' collection of the response times time-series to include its resolution");
timeseriesCollection.rename(TIME_SERIES_MAIN_COLLECTION_NEW_NAME);

log.info("Renaming the 'main' collection of the report nodes time-series to include its resolution");
reportNodeTimeseriesCollection.rename(TIME_SERIES_MAIN_COLLECTION_REPORTS_NEW_NAME);


log.info("Renaming time-series housekeeping setting keys");
//use names from enum which will then be aligned with the collection names
updateSettingKeyIfPresent(HOUSEKEEPING_TIME_SERIES_DEFAULT_TTL, HOUSEKEEPING_TIME_SERIES_TTL_PREFIX + Resolution.FIVE_SECONDS.name + HOUSEKEEPING_TIME_SERIES_TTL_SUFFIX);
updateSettingKeyIfPresent(HOUSEKEEPING_TIME_SERIES_PER_MINUTE_TTL, HOUSEKEEPING_TIME_SERIES_TTL_PREFIX + Resolution.ONE_MINUTE.name + HOUSEKEEPING_TIME_SERIES_TTL_SUFFIX);
updateSettingKeyIfPresent(HOUSEKEEPING_TIME_SERIES_HOURLY_TTL, HOUSEKEEPING_TIME_SERIES_TTL_PREFIX + Resolution.ONE_HOUR.name + HOUSEKEEPING_TIME_SERIES_TTL_SUFFIX);
updateSettingKeyIfPresent(HOUSEKEEPING_TIME_SERIES_DAILY_TTL, HOUSEKEEPING_TIME_SERIES_TTL_PREFIX + Resolution.ONE_DAY.name + HOUSEKEEPING_TIME_SERIES_TTL_SUFFIX);
updateSettingKeyIfPresent(HOUSEKEEPING_TIME_SERIES_WEEKLY_TTL, HOUSEKEEPING_TIME_SERIES_TTL_PREFIX + Resolution.ONE_WEEK.name + HOUSEKEEPING_TIME_SERIES_TTL_SUFFIX);
log.info("Time-series housekeeping setting keys renamed");
}

private void updateSettingKeyIfPresent(String oldKey, String newKey) {
Optional<Document> setting = settings.find(Filters.equals("key", oldKey), null, null, null, 0).findFirst();
setting.ifPresent(s -> {
s.put("key", newKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add an explicit log entry here to be fully transparent

settings.save(s);
logger.info("Time-series housekeeping setting key {} renamed to {}", oldKey, newKey);
});

}

@Override
public void runDowngradeScript() {

}
}
2 changes: 1 addition & 1 deletion step-core/src/main/java/step/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package step.core;

public interface Constants {
String STEP_API_VERSION_STRING = "3.29.0";
String STEP_API_VERSION_STRING = "3.29.1";
Version STEP_API_VERSION = new Version(STEP_API_VERSION_STRING);

String STEP_YAML_SCHEMA_VERSION_STRING = "1.2.0";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import step.core.execution.model.ExecutionAccessor;
import step.core.execution.model.ExecutionStatus;
import step.core.plugins.threadmanager.ThreadManager;
import step.core.timeseries.Resolution;
import step.core.timeseries.TimeSeriesCollectionsSettings;
import step.core.timeseries.bucket.Bucket;
import step.core.timeseries.bucket.BucketBuilder;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -112,15 +114,23 @@ public AggregatedReport buildAggregatedReport(AggregatedReportViewRequest reques
}
}
// Generate complete aggregated report tree
//First aggregate time series data for the given execution context grouped by artefactHash
Map<String, Map<String, Bucket>> countByHashAndStatus = mainReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, STATUS);
Map<String, Map<String, Bucket>> countByHashAndErrorMessage = mainReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, ERROR_MESSAGE);
//First aggregate time series data for the given execution context grouped by artefactHash and status as wells as artefactHash and error message
CompletableFuture<Map<String, Map<String, Bucket>>> countByHashAndStatusFuture =
CompletableFuture.supplyAsync(() ->
mainReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, STATUS)
);
CompletableFuture<Map<String, Map<String, Bucket>>> countByHashAndErrorMessageFuture =
CompletableFuture.supplyAsync(() ->
mainReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, ERROR_MESSAGE)
);
Map<String, Map<String, Bucket>> countByHashAndStatus = countByHashAndStatusFuture.join();
Map<String, Map<String, Bucket>> countByHashAndErrorMessage = countByHashAndErrorMessageFuture.join();
//Because the time series time range extends to the time series resolution we need to use the same range when querying the report nodes
Range resolvedRange = getResolvedRange(request, countByHashAndStatus);
return new AggregatedReport(recursivelyBuildAggregatedReportTree(rootResolvedPlanNode, request, countByHashAndStatus, countByHashAndErrorMessage, mainReportNodeAccessor, null, runningCountByArtefactHash, operationsByArtefactHash, resolvedRange));
} else {
// a node is selected to generate a partial aggregated report
try (ReportNodeTimeSeries partialReportNodesTimeSeries = getInMemoryReportNodeTimeSeries()) {
try (ReportNodeTimeSeries partialReportNodesTimeSeries = getPartialTreeReportNodeTimeSeries()) {
InMemoryReportNodeAccessor inMemoryReportNodeAccessor = new InMemoryReportNodeAccessor();
AggregatedReport aggregatedReport = new AggregatedReport();
//We now (SED-3882) also wand to get the count for RUNNING artefacts which can only be retrieved from report nodes RAW data
Expand All @@ -130,9 +140,17 @@ public AggregatedReport buildAggregatedReport(AggregatedReportViewRequest reques
Set<String> reportArtefactHashSet = buildPartialReportNodeTimeSeries(aggregatedReport, request.selectedReportNodeId, partialReportNodesTimeSeries, inMemoryReportNodeAccessor, runningCountByArtefactHash, operationsByArtefactHash, request.fetchCurrentOperations);
// Only pass the reportArtefactHashSet if aggregate view filtering is enabled
reportArtefactHashSet = (request.filterResolvedPlanNodes) ? reportArtefactHashSet : null;
//Aggregate time series data for the given execution reporting context grouped by artefactHash
Map<String, Map<String, Bucket>> countByHashAndStatus = partialReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, STATUS);
Map<String, Map<String, Bucket>> countByHashAndErrorMessage = mainReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, ERROR_MESSAGE);
//Aggregate time series data for the given execution reporting context grouped by artefactHash and status ans artefactHash and error messages
CompletableFuture<Map<String, Map<String, Bucket>>> countByHashAndStatusFuture =
CompletableFuture.supplyAsync(() ->
partialReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, STATUS)
);
CompletableFuture<Map<String, Map<String, Bucket>>> countByHashAndErrorMessageFuture =
CompletableFuture.supplyAsync(() ->
mainReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, ERROR_MESSAGE)
);
Map<String, Map<String, Bucket>> countByHashAndStatus = countByHashAndStatusFuture.join();
Map<String, Map<String, Bucket>> countByHashAndErrorMessage = countByHashAndErrorMessageFuture.join();
//Because the time series time range extends to the time series resolution we need to use the same range when querying the report nodes
Range resolvedRange = getResolvedRange(request, countByHashAndStatus);
aggregatedReport.aggregatedReportView = recursivelyBuildAggregatedReportTree(rootResolvedPlanNode, request, countByHashAndStatus, countByHashAndErrorMessage, inMemoryReportNodeAccessor, reportArtefactHashSet, runningCountByArtefactHash, operationsByArtefactHash, resolvedRange);
Expand All @@ -153,13 +171,6 @@ private ReportNodeTimeSeries.Range getResolvedRange(AggregatedReportViewRequest
return result;
}

private ReportNodeTimeSeries getInMemoryReportNodeTimeSeries() {
//Need to create a configuration with all time series details
return new ReportNodeTimeSeries(new InMemoryCollectionFactory(new Properties()),
// to build the report we only need a single time bucket and can flush only once all reports are ingested
TimeSeriesCollectionsSettings.buildSingleResolutionSettings(Long.MAX_VALUE, 0), true);
}

/**
* Populate an in memory timeseries and report node accessor with all data required to build a partial aggregated report tree
* This aggregated tree will be filtered for the execution path of this single report node. If available we filter on the wrapping (nested) iteration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
import step.core.artefacts.reports.ReportNode;
import step.core.artefacts.reports.ReportNodeStatus;
import step.core.collections.*;
import step.core.collections.inmemory.InMemoryCollectionFactory;
import step.core.timeseries.*;
import step.core.timeseries.aggregation.TimeSeriesAggregationQueryBuilder;
import step.core.timeseries.aggregation.TimeSeriesOptimizationType;
import step.core.timeseries.bucket.Bucket;
import step.core.timeseries.bucket.BucketAttributes;
import step.core.timeseries.ingestion.TimeSeriesIngestionPipeline;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -48,6 +51,31 @@ private List<TimeSeriesCollection> getTimeSeriesCollections(TimeSeriesCollection
return timeSeriesCollectionsBuilder.getTimeSeriesCollections(TIME_SERIES_MAIN_COLLECTION, collectionsSettings, Set.of(EXECUTION_ID));
}

/**
* Create an in memory report tree time-series, with a single resolution
* The resolution is coarse to ingest only one time bucket
* The flush interval is 0 as we only need to flush once after the ingestion
*/
protected ReportNodeTimeSeries(Duration resolution, long flushInterval) {
InMemoryCollectionFactory inMemoryCollectionFactory = new InMemoryCollectionFactory(new Properties());
TimeSeriesCollectionsBuilder timeSeriesCollectionsBuilder = new TimeSeriesCollectionsBuilder(inMemoryCollectionFactory);
List<TimeSeriesCollection> timeSeriesCollections = timeSeriesCollectionsBuilder.getSingleTimeSeriesCollections(TIME_SERIES_MAIN_COLLECTION, new TimeSeriesCollectionsSettings(), resolution, flushInterval);
timeSeries = new TimeSeriesBuilder().registerCollections(timeSeriesCollections).build();
ingestionPipeline = timeSeries.getIngestionPipeline();
timeSeries.createIndexes(Set.of(new IndexField(EXECUTION_ID, Order.ASC, String.class)));
this.ingestionEnabled = true;
}

/**
* Get an in memory report tree time-series, with a single resolution
* The resolution is coarse to ingest only one time bucket
* The flush interval is 0 as we only need to flush once after the ingestion
* @return the in memory report node time-series
*/
public static ReportNodeTimeSeries getPartialTreeReportNodeTimeSeries() {
return new ReportNodeTimeSeries(Duration.ofMillis(Long.MAX_VALUE), 0);
}

public TimeSeries getTimeSeries() {
return timeSeries;
}
Expand Down Expand Up @@ -99,7 +127,7 @@ public Map<String, Map<String, Bucket>> queryByExecutionIdAndGroupBy(String exec
Filter filter = Filters.equals("attributes." + EXECUTION_ID, executionId);
Set<String> groupBy = Set.of(groupLevel1, groupLevel2);
TimeSeriesAggregationQueryBuilder queryBuilder = new TimeSeriesAggregationQueryBuilder()
.withOptimizationType(TimeSeriesOptimizationType.MOST_ACCURATE)
.withOptimizationType(TimeSeriesOptimizationType.MOST_EFFICIENT)
.withFilter(filter)
.withGroupDimensions(groupBy)
.split(1);
Expand Down
Loading