diff --git a/pom.xml b/pom.xml index 0269fb9959..e78b273dc8 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ 2025.6.25 2.5.0 - 2.5.0 + 2025.12.10-693957f1a52e5331621ac16a 3.0.23 diff --git a/step-controller-plugins/step-controller-plugins-timeseries/src/main/java/step/plugins/timeseries/TimeSeriesControllerPlugin.java b/step-controller-plugins/step-controller-plugins-timeseries/src/main/java/step/plugins/timeseries/TimeSeriesControllerPlugin.java index abcd972505..de39c5f3ac 100644 --- a/step-controller-plugins/step-controller-plugins-timeseries/src/main/java/step/plugins/timeseries/TimeSeriesControllerPlugin.java +++ b/step-controller-plugins/step-controller-plugins-timeseries/src/main/java/step/plugins/timeseries/TimeSeriesControllerPlugin.java @@ -52,6 +52,7 @@ public class TimeSeriesControllerPlugin extends AbstractControllerPlugin { public static final String PARAM_KEY_ANALYTICS_DASHBOARD_ID = "plugins.timeseries.analytics.dashboard.id"; public static final String PARAM_KEY_RESPONSE_IDEAL_INTERVALS = "timeseries.response.intervals.ideal"; public static final String PARAM_KEY_RESPONSE_MAX_INTERVALS = "timeseries.response.intervals.max"; + public static final String RESOLUTION_PERIOD_PROPERTY = "plugins.timeseries.resolution.period"; public static final String EXECUTION_DASHBOARD_PREPOPULATED_NAME = "Execution Dashboard"; public static final String ANALYTICS_DASHBOARD_PREPOPULATED_NAME = "Analytics Dashboard"; @@ -111,7 +112,7 @@ public void serverStart(GlobalContext context) { WebApplicationConfigurationManager configurationManager = context.require(WebApplicationConfigurationManager.class); // Following property is used by the UI. We could align its name with the configuration property in the future - configurationManager.registerHook(s -> Map.of("plugins.timeseries.resolution.period", String.valueOf(timeSeries.getDefaultCollection().getResolution()))); + configurationManager.registerHook(s -> Map.of(RESOLUTION_PERIOD_PROPERTY, String.valueOf(timeSeries.getDefaultCollection().getResolution()))); } @Override diff --git a/step-controller/step-controller-server/src/main/java/step/core/controller/ControllerSettingPlugin.java b/step-controller/step-controller-server/src/main/java/step/core/controller/ControllerSettingPlugin.java index 30fb2e4ccb..39bfb4aed3 100644 --- a/step-controller/step-controller-server/src/main/java/step/core/controller/ControllerSettingPlugin.java +++ b/step-controller/step-controller-server/src/main/java/step/core/controller/ControllerSettingPlugin.java @@ -29,12 +29,13 @@ @Plugin public class ControllerSettingPlugin extends AbstractControllerPlugin { + public static final String SETTINGS = "settings"; private ControllerSettingAccessor controllerSettingAccessor; @Override public void serverStart(GlobalContext context) throws Exception { controllerSettingAccessor = new ControllerSettingAccessorImpl( - context.getCollectionFactory().getCollection("settings", ControllerSetting.class)); + context.getCollectionFactory().getCollection(SETTINGS, ControllerSetting.class)); context.put(ControllerSettingAccessor.class, controllerSettingAccessor); } diff --git a/step-controller/step-controller-server/src/main/java/step/core/controller/StepControllerPlugin.java b/step-controller/step-controller-server/src/main/java/step/core/controller/StepControllerPlugin.java index ed5708faf2..11de5756d4 100644 --- a/step-controller/step-controller-server/src/main/java/step/core/controller/StepControllerPlugin.java +++ b/step-controller/step-controller-server/src/main/java/step/core/controller/StepControllerPlugin.java @@ -3,8 +3,10 @@ import ch.exense.commons.app.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import step.controller.services.async.AsyncTaskManager; import step.core.Controller; import step.core.GlobalContext; +import step.core.artefacts.reports.aggregated.ReportNodeTimeSeries; import step.core.controller.errorhandling.ErrorFilter; import step.core.deployment.ControllerServices; import step.core.execution.model.Execution; @@ -124,6 +126,17 @@ public void recover(GlobalContext context) throws Exception { @Override public void finalizeStart(GlobalContext context) throws Exception { context.require(ExecutionScheduler.class).start(); + //Initialize new empty resolutions after start (require the async task manager) + //Because the ReportNodeTimeSeries is created in ControllerServer.init directly and not in a plugin, this the right place to do it + ReportNodeTimeSeries reportNodeTimeSeries = context.require(ReportNodeTimeSeries.class); + AsyncTaskManager asyncTaskManager = context.require(AsyncTaskManager.class); + asyncTaskManager.scheduleAsyncTask((empty) -> { + logger.info("ReportNode timeSeries ingestion for empty resolutions has started"); + reportNodeTimeSeries.getTimeSeries().ingestDataForEmptyCollections(); + logger.info("ReportNode timeSeries ingestion for empty resolutions has finished"); + return null; + }); + } @Override diff --git a/step-controller/step-controller-server/src/main/java/step/migration/tasks/MigrationManagerTasksPlugin.java b/step-controller/step-controller-server/src/main/java/step/migration/tasks/MigrationManagerTasksPlugin.java index 1a591be5a9..5478239cee 100644 --- a/step-controller/step-controller-server/src/main/java/step/migration/tasks/MigrationManagerTasksPlugin.java +++ b/step-controller/step-controller-server/src/main/java/step/migration/tasks/MigrationManagerTasksPlugin.java @@ -54,6 +54,7 @@ public void serverStart(GlobalContext context) throws Exception { migrationManager.register(V27_4_DropResolvedPlanNodesIndexForPSQLMigrationTask.class); migrationManager.register(V28_0_FixEmptyDefaultMavenSettingsMigrationTask.class); migrationManager.register(V29_0_UpdateAutomationPackageModel.class); + migrationManager.register(V29_1_UpdateTimeSeriesCollectionsAndSettings.class); } @Override diff --git a/step-controller/step-controller-server/src/main/java/step/migration/tasks/V29_1_UpdateTimeSeriesCollectionsAndSettings.java b/step-controller/step-controller-server/src/main/java/step/migration/tasks/V29_1_UpdateTimeSeriesCollectionsAndSettings.java new file mode 100644 index 0000000000..207f56ab1b --- /dev/null +++ b/step-controller/step-controller-server/src/main/java/step/migration/tasks/V29_1_UpdateTimeSeriesCollectionsAndSettings.java @@ -0,0 +1,99 @@ +/* + * ****************************************************************************** + * * Copyright (C) 2020, exense GmbH + * * + * * This file is part of STEP + * * + * * STEP is free software: you can redistribute it and/or modify + * * it under the terms of the GNU Affero General Public License as published by + * * the Free Software Foundation, either version 3 of the License, or + * * (at your option) any later version. + * * + * * STEP is distributed in the hope that it will be useful, + * * but WITHOUT ANY WARRANTY; without even the implied warranty of + * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * * GNU Affero General Public License for more details. + * * + * * You should have received a copy of the GNU Affero General Public License + * * along with STEP. If not, see . + * ***************************************************************************** + */ +package step.migration.tasks; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import step.core.Version; +import step.core.collections.Collection; +import step.core.collections.CollectionFactory; +import step.core.collections.Document; +import step.core.collections.Filters; +import step.core.timeseries.Resolution; +import step.migration.MigrationContext; +import step.migration.MigrationTask; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static step.core.controller.ControllerSettingPlugin.SETTINGS; + +public class V29_1_UpdateTimeSeriesCollectionsAndSettings extends MigrationTask { + + public static final String TIME_SERIES_MAIN_COLLECTION = "timeseries"; + public static final String TIME_SERIES_MAIN_COLLECTION_NEW_NAME = "timeseries_5_seconds"; + public static final String TIME_SERIES_MAIN_COLLECTION_REPORTS = "reportNodeTimeSeries"; + public static final String TIME_SERIES_MAIN_COLLECTION_REPORTS_NEW_NAME = "reportNodeTimeSeries_5_seconds"; + public static final String HOUSEKEEPING_TIME_SERIES_DEFAULT_TTL = "housekeeping_time_series_default_ttl"; + public static final String HOUSEKEEPING_TIME_SERIES_PER_MINUTE_TTL = "housekeeping_time_series_per_minute_ttl"; + public static final String HOUSEKEEPING_TIME_SERIES_HOURLY_TTL = "housekeeping_time_series_hourly_ttl"; + public static final String HOUSEKEEPING_TIME_SERIES_DAILY_TTL = "housekeeping_time_series_daily_ttl"; + public static final String HOUSEKEEPING_TIME_SERIES_WEEKLY_TTL = "housekeeping_time_series_weekly_ttl"; + public static final String HOUSEKEEPING_TIME_SERIES_TTL_PREFIX = "housekeeping_time_series_"; + public static final String HOUSEKEEPING_TIME_SERIES_TTL_SUFFIX = "_ttl"; + + private static final Logger log = LoggerFactory.getLogger(V29_1_UpdateTimeSeriesCollectionsAndSettings.class); + private final Collection timeseriesCollection; + private final Collection settings; + private final Collection reportNodeTimeseriesCollection; + protected AtomicInteger successCount; + + public V29_1_UpdateTimeSeriesCollectionsAndSettings(CollectionFactory collectionFactory, MigrationContext migrationContext) { + super(new Version(3,29,1), collectionFactory, migrationContext); + timeseriesCollection = collectionFactory.getCollection(TIME_SERIES_MAIN_COLLECTION, Document.class); + reportNodeTimeseriesCollection = collectionFactory.getCollection(TIME_SERIES_MAIN_COLLECTION_REPORTS, Document.class); + settings = collectionFactory.getCollection(SETTINGS, Document.class); + } + + @Override + public void runUpgradeScript() { + log.info("Renaming the 'main' collection of the response times time-series to include its resolution"); + timeseriesCollection.rename(TIME_SERIES_MAIN_COLLECTION_NEW_NAME); + + log.info("Renaming the 'main' collection of the report nodes time-series to include its resolution"); + reportNodeTimeseriesCollection.rename(TIME_SERIES_MAIN_COLLECTION_REPORTS_NEW_NAME); + + + log.info("Renaming time-series housekeeping setting keys"); + //use names from enum which will then be aligned with the collection names + updateSettingKeyIfPresent(HOUSEKEEPING_TIME_SERIES_DEFAULT_TTL, HOUSEKEEPING_TIME_SERIES_TTL_PREFIX + Resolution.FIVE_SECONDS.name + HOUSEKEEPING_TIME_SERIES_TTL_SUFFIX); + updateSettingKeyIfPresent(HOUSEKEEPING_TIME_SERIES_PER_MINUTE_TTL, HOUSEKEEPING_TIME_SERIES_TTL_PREFIX + Resolution.ONE_MINUTE.name + HOUSEKEEPING_TIME_SERIES_TTL_SUFFIX); + updateSettingKeyIfPresent(HOUSEKEEPING_TIME_SERIES_HOURLY_TTL, HOUSEKEEPING_TIME_SERIES_TTL_PREFIX + Resolution.ONE_HOUR.name + HOUSEKEEPING_TIME_SERIES_TTL_SUFFIX); + updateSettingKeyIfPresent(HOUSEKEEPING_TIME_SERIES_DAILY_TTL, HOUSEKEEPING_TIME_SERIES_TTL_PREFIX + Resolution.ONE_DAY.name + HOUSEKEEPING_TIME_SERIES_TTL_SUFFIX); + updateSettingKeyIfPresent(HOUSEKEEPING_TIME_SERIES_WEEKLY_TTL, HOUSEKEEPING_TIME_SERIES_TTL_PREFIX + Resolution.ONE_WEEK.name + HOUSEKEEPING_TIME_SERIES_TTL_SUFFIX); + log.info("Time-series housekeeping setting keys renamed"); + } + + private void updateSettingKeyIfPresent(String oldKey, String newKey) { + Optional setting = settings.find(Filters.equals("key", oldKey), null, null, null, 0).findFirst(); + setting.ifPresent(s -> { + s.put("key", newKey); + settings.save(s); + logger.info("Time-series housekeeping setting key {} renamed to {}", oldKey, newKey); + }); + + } + + @Override + public void runDowngradeScript() { + + } +} diff --git a/step-core/src/main/java/step/core/Constants.java b/step-core/src/main/java/step/core/Constants.java index 1da256f2a5..3762b32ae8 100644 --- a/step-core/src/main/java/step/core/Constants.java +++ b/step-core/src/main/java/step/core/Constants.java @@ -19,7 +19,7 @@ package step.core; public interface Constants { - String STEP_API_VERSION_STRING = "3.29.0"; + String STEP_API_VERSION_STRING = "3.29.1"; Version STEP_API_VERSION = new Version(STEP_API_VERSION_STRING); String STEP_YAML_SCHEMA_VERSION_STRING = "1.2.0"; diff --git a/step-plans/step-plans-core/src/main/java/step/core/artefacts/reports/aggregated/AggregatedReportViewBuilder.java b/step-plans/step-plans-core/src/main/java/step/core/artefacts/reports/aggregated/AggregatedReportViewBuilder.java index 41db30a91f..0ec5e2566b 100644 --- a/step-plans/step-plans-core/src/main/java/step/core/artefacts/reports/aggregated/AggregatedReportViewBuilder.java +++ b/step-plans/step-plans-core/src/main/java/step/core/artefacts/reports/aggregated/AggregatedReportViewBuilder.java @@ -20,11 +20,13 @@ import step.core.execution.model.ExecutionAccessor; import step.core.execution.model.ExecutionStatus; import step.core.plugins.threadmanager.ThreadManager; +import step.core.timeseries.Resolution; import step.core.timeseries.TimeSeriesCollectionsSettings; import step.core.timeseries.bucket.Bucket; import step.core.timeseries.bucket.BucketBuilder; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -112,15 +114,23 @@ public AggregatedReport buildAggregatedReport(AggregatedReportViewRequest reques } } // Generate complete aggregated report tree - //First aggregate time series data for the given execution context grouped by artefactHash - Map> countByHashAndStatus = mainReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, STATUS); - Map> countByHashAndErrorMessage = mainReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, ERROR_MESSAGE); + //First aggregate time series data for the given execution context grouped by artefactHash and status as wells as artefactHash and error message + CompletableFuture>> countByHashAndStatusFuture = + CompletableFuture.supplyAsync(() -> + mainReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, STATUS) + ); + CompletableFuture>> countByHashAndErrorMessageFuture = + CompletableFuture.supplyAsync(() -> + mainReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, ERROR_MESSAGE) + ); + Map> countByHashAndStatus = countByHashAndStatusFuture.join(); + Map> countByHashAndErrorMessage = countByHashAndErrorMessageFuture.join(); //Because the time series time range extends to the time series resolution we need to use the same range when querying the report nodes Range resolvedRange = getResolvedRange(request, countByHashAndStatus); return new AggregatedReport(recursivelyBuildAggregatedReportTree(rootResolvedPlanNode, request, countByHashAndStatus, countByHashAndErrorMessage, mainReportNodeAccessor, null, runningCountByArtefactHash, operationsByArtefactHash, resolvedRange)); } else { // a node is selected to generate a partial aggregated report - try (ReportNodeTimeSeries partialReportNodesTimeSeries = getInMemoryReportNodeTimeSeries()) { + try (ReportNodeTimeSeries partialReportNodesTimeSeries = getPartialTreeReportNodeTimeSeries()) { InMemoryReportNodeAccessor inMemoryReportNodeAccessor = new InMemoryReportNodeAccessor(); AggregatedReport aggregatedReport = new AggregatedReport(); //We now (SED-3882) also wand to get the count for RUNNING artefacts which can only be retrieved from report nodes RAW data @@ -130,9 +140,17 @@ public AggregatedReport buildAggregatedReport(AggregatedReportViewRequest reques Set reportArtefactHashSet = buildPartialReportNodeTimeSeries(aggregatedReport, request.selectedReportNodeId, partialReportNodesTimeSeries, inMemoryReportNodeAccessor, runningCountByArtefactHash, operationsByArtefactHash, request.fetchCurrentOperations); // Only pass the reportArtefactHashSet if aggregate view filtering is enabled reportArtefactHashSet = (request.filterResolvedPlanNodes) ? reportArtefactHashSet : null; - //Aggregate time series data for the given execution reporting context grouped by artefactHash - Map> countByHashAndStatus = partialReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, STATUS); - Map> countByHashAndErrorMessage = mainReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, ERROR_MESSAGE); + //Aggregate time series data for the given execution reporting context grouped by artefactHash and status ans artefactHash and error messages + CompletableFuture>> countByHashAndStatusFuture = + CompletableFuture.supplyAsync(() -> + partialReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, STATUS) + ); + CompletableFuture>> countByHashAndErrorMessageFuture = + CompletableFuture.supplyAsync(() -> + mainReportNodesTimeSeries.queryByExecutionIdAndGroupBy(executionId, request.range, ARTEFACT_HASH, ERROR_MESSAGE) + ); + Map> countByHashAndStatus = countByHashAndStatusFuture.join(); + Map> countByHashAndErrorMessage = countByHashAndErrorMessageFuture.join(); //Because the time series time range extends to the time series resolution we need to use the same range when querying the report nodes Range resolvedRange = getResolvedRange(request, countByHashAndStatus); aggregatedReport.aggregatedReportView = recursivelyBuildAggregatedReportTree(rootResolvedPlanNode, request, countByHashAndStatus, countByHashAndErrorMessage, inMemoryReportNodeAccessor, reportArtefactHashSet, runningCountByArtefactHash, operationsByArtefactHash, resolvedRange); @@ -153,13 +171,6 @@ private ReportNodeTimeSeries.Range getResolvedRange(AggregatedReportViewRequest return result; } - private ReportNodeTimeSeries getInMemoryReportNodeTimeSeries() { - //Need to create a configuration with all time series details - return new ReportNodeTimeSeries(new InMemoryCollectionFactory(new Properties()), - // to build the report we only need a single time bucket and can flush only once all reports are ingested - TimeSeriesCollectionsSettings.buildSingleResolutionSettings(Long.MAX_VALUE, 0), true); - } - /** * Populate an in memory timeseries and report node accessor with all data required to build a partial aggregated report tree * This aggregated tree will be filtered for the execution path of this single report node. If available we filter on the wrapping (nested) iteration diff --git a/step-plans/step-plans-core/src/main/java/step/core/artefacts/reports/aggregated/ReportNodeTimeSeries.java b/step-plans/step-plans-core/src/main/java/step/core/artefacts/reports/aggregated/ReportNodeTimeSeries.java index 095f33229c..fb7578815c 100644 --- a/step-plans/step-plans-core/src/main/java/step/core/artefacts/reports/aggregated/ReportNodeTimeSeries.java +++ b/step-plans/step-plans-core/src/main/java/step/core/artefacts/reports/aggregated/ReportNodeTimeSeries.java @@ -4,6 +4,7 @@ import step.core.artefacts.reports.ReportNode; import step.core.artefacts.reports.ReportNodeStatus; import step.core.collections.*; +import step.core.collections.inmemory.InMemoryCollectionFactory; import step.core.timeseries.*; import step.core.timeseries.aggregation.TimeSeriesAggregationQueryBuilder; import step.core.timeseries.aggregation.TimeSeriesOptimizationType; @@ -11,8 +12,10 @@ import step.core.timeseries.bucket.BucketAttributes; import step.core.timeseries.ingestion.TimeSeriesIngestionPipeline; +import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; @@ -48,6 +51,31 @@ private List getTimeSeriesCollections(TimeSeriesCollection return timeSeriesCollectionsBuilder.getTimeSeriesCollections(TIME_SERIES_MAIN_COLLECTION, collectionsSettings, Set.of(EXECUTION_ID)); } + /** + * Create an in memory report tree time-series, with a single resolution + * The resolution is coarse to ingest only one time bucket + * The flush interval is 0 as we only need to flush once after the ingestion + */ + protected ReportNodeTimeSeries(Duration resolution, long flushInterval) { + InMemoryCollectionFactory inMemoryCollectionFactory = new InMemoryCollectionFactory(new Properties()); + TimeSeriesCollectionsBuilder timeSeriesCollectionsBuilder = new TimeSeriesCollectionsBuilder(inMemoryCollectionFactory); + List timeSeriesCollections = timeSeriesCollectionsBuilder.getSingleTimeSeriesCollections(TIME_SERIES_MAIN_COLLECTION, new TimeSeriesCollectionsSettings(), resolution, flushInterval); + timeSeries = new TimeSeriesBuilder().registerCollections(timeSeriesCollections).build(); + ingestionPipeline = timeSeries.getIngestionPipeline(); + timeSeries.createIndexes(Set.of(new IndexField(EXECUTION_ID, Order.ASC, String.class))); + this.ingestionEnabled = true; + } + + /** + * Get an in memory report tree time-series, with a single resolution + * The resolution is coarse to ingest only one time bucket + * The flush interval is 0 as we only need to flush once after the ingestion + * @return the in memory report node time-series + */ + public static ReportNodeTimeSeries getPartialTreeReportNodeTimeSeries() { + return new ReportNodeTimeSeries(Duration.ofMillis(Long.MAX_VALUE), 0); + } + public TimeSeries getTimeSeries() { return timeSeries; } @@ -99,7 +127,7 @@ public Map> queryByExecutionIdAndGroupBy(String exec Filter filter = Filters.equals("attributes." + EXECUTION_ID, executionId); Set groupBy = Set.of(groupLevel1, groupLevel2); TimeSeriesAggregationQueryBuilder queryBuilder = new TimeSeriesAggregationQueryBuilder() - .withOptimizationType(TimeSeriesOptimizationType.MOST_ACCURATE) + .withOptimizationType(TimeSeriesOptimizationType.MOST_EFFICIENT) .withFilter(filter) .withGroupDimensions(groupBy) .split(1); diff --git a/step-plans/step-plans-core/src/main/java/step/core/timeseries/Resolution.java b/step-plans/step-plans-core/src/main/java/step/core/timeseries/Resolution.java new file mode 100644 index 0000000000..0fa42cdab9 --- /dev/null +++ b/step-plans/step-plans-core/src/main/java/step/core/timeseries/Resolution.java @@ -0,0 +1,49 @@ +package step.core.timeseries; + +import java.time.Duration; +import java.util.Arrays; + +/** + * This enum define the time series resolutions supported in Step. It is critical to keep them ordered by resolution from the finest to the coarsest + */ +public enum Resolution { + FIVE_SECONDS("5_seconds", Duration.ofSeconds(5), Duration.ofSeconds(1), false), + FIFTEEN_SECONDS("15_seconds", Duration.ofSeconds(15), Duration.ofSeconds(5), false), + ONE_MINUTE("minute", Duration.ofMinutes(1), Duration.ofSeconds(10), false), + FIFTEEN_MINUTES("15_minutes", Duration.ofMinutes(15), Duration.ofMinutes(1), false), + ONE_HOUR("hour", Duration.ofHours(1), Duration.ofMinutes(5), true), + SIX_HOURS("6_hours", Duration.ofHours(6), Duration.ofMinutes(30), true), + ONE_DAY("day", Duration.ofDays(1), Duration.ofHours(1), true), + ONE_WEEK("week", Duration.ofDays(7), Duration.ofHours(2), true); + + /** + * The name of the resolutions which is used to define the collections names as well as the Step properties and setting keys + */ + public final String name; + /** + * The resolution duration + */ + public final Duration resolution; + /** + * The default flush period if not configured in step.properties + */ + public final Duration defaultFlushPeriod; + /** + * Whether this resolution is coarse, coarse resolution exclude specified attributes by the time-series creator (i.e. the execution id) + */ + public final boolean coarseResolution; + + Resolution(String name, Duration resolution, Duration defaultFlushPeriod, boolean coarseResolution) { + this.name = name; + this.resolution = resolution; + this.defaultFlushPeriod = defaultFlushPeriod; + this.coarseResolution = coarseResolution; + } + + public static Resolution fromName(String name) { + return Arrays.stream(values()) + .filter(r -> r.name.equals(name)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Unknown resolution: " + name)); + } +} diff --git a/step-plans/step-plans-core/src/main/java/step/core/timeseries/TimeSeriesCollectionsBuilder.java b/step-plans/step-plans-core/src/main/java/step/core/timeseries/TimeSeriesCollectionsBuilder.java index 9db714c8a7..601a50a29f 100644 --- a/step-plans/step-plans-core/src/main/java/step/core/timeseries/TimeSeriesCollectionsBuilder.java +++ b/step-plans/step-plans-core/src/main/java/step/core/timeseries/TimeSeriesCollectionsBuilder.java @@ -19,6 +19,8 @@ package step.core.timeseries; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import step.core.collections.CollectionFactory; import step.core.timeseries.bucket.Bucket; @@ -29,10 +31,9 @@ public class TimeSeriesCollectionsBuilder { - public static final String TIME_SERIES_SUFFIX_PER_MINUTE = "_minute"; - public static final String TIME_SERIES_SUFFIX_HOURLY = "_hour"; - public static final String TIME_SERIES_SUFFIX_DAILY = "_day"; - public static final String TIME_SERIES_SUFFIX_WEEKLY = "_week"; + private static final Logger logger = LoggerFactory.getLogger(TimeSeriesCollectionsBuilder.class); + + public static final String COLLECTION_NAME_SEPARATOR = "_"; private final CollectionFactory collectionFactory; @@ -44,11 +45,28 @@ public List getTimeSeriesCollections(String mainCollection List enabledCollections = new ArrayList<>(); int flushSeriesQueueSize = collectionsSettings.getFlushSeriesQueueSize(); int flushAsyncQueueSize = collectionsSettings.getFlushAsyncQueueSize(); - addIfEnabled(enabledCollections, mainCollectionName, Duration.ofSeconds(1), collectionsSettings.getMainFlushInterval(), flushSeriesQueueSize, flushAsyncQueueSize,null, true); - addIfEnabled(enabledCollections, mainCollectionName + TIME_SERIES_SUFFIX_PER_MINUTE, Duration.ofMinutes(1), collectionsSettings.getPerMinuteFlushInterval(), flushSeriesQueueSize, flushAsyncQueueSize,null, collectionsSettings.isPerMinuteEnabled()); - addIfEnabled(enabledCollections, mainCollectionName + TIME_SERIES_SUFFIX_HOURLY, Duration.ofHours(1), collectionsSettings.getHourlyFlushInterval(), flushSeriesQueueSize, flushAsyncQueueSize, ignoredAttributesForHighResolution, collectionsSettings.isHourlyEnabled()); - addIfEnabled(enabledCollections, mainCollectionName + TIME_SERIES_SUFFIX_DAILY, Duration.ofDays(1), collectionsSettings.getDailyFlushInterval(), flushSeriesQueueSize, flushAsyncQueueSize, ignoredAttributesForHighResolution, collectionsSettings.isDailyEnabled()); - addIfEnabled(enabledCollections, mainCollectionName + TIME_SERIES_SUFFIX_WEEKLY, Duration.ofDays(7), collectionsSettings.getWeeklyFlushInterval(), flushSeriesQueueSize, flushAsyncQueueSize, ignoredAttributesForHighResolution, collectionsSettings.isWeeklyEnabled()); + //Add additional resolutions + for (Resolution resolution: Resolution.values()) { + TimeSeriesCollectionsSettings.ResolutionSettings resolutionSettings = collectionsSettings.getResolutionSettings(resolution); + if (resolutionSettings != null) { + addIfEnabled(enabledCollections, mainCollectionName + COLLECTION_NAME_SEPARATOR + resolution.name, + resolution.resolution, resolutionSettings.flushInterval, flushSeriesQueueSize, flushAsyncQueueSize, + (resolution.coarseResolution ? ignoredAttributesForHighResolution : null), resolutionSettings.enabled); + + } + } + return enabledCollections; + } + + public List getSingleTimeSeriesCollections(String mainCollectionName, TimeSeriesCollectionsSettings collectionsSettings, Duration resolution, Long flushInterval) { + List enabledCollections = new ArrayList<>(); + int flushSeriesQueueSize = collectionsSettings.getFlushSeriesQueueSize(); + int flushAsyncQueueSize = collectionsSettings.getFlushAsyncQueueSize(); + addIfEnabled(enabledCollections, mainCollectionName, + resolution, flushInterval, flushSeriesQueueSize, flushAsyncQueueSize, + null, true); + + return enabledCollections; } @@ -63,8 +81,8 @@ private void addIfEnabled(List enabledCollections, String if (enabled) { enabledCollections.add(collection); } else { - // disabled resolutions will be completely dropped from db - collection.drop(); + // disabled resolutions are not dropped automatically + logger.warn("The time-series resolution with name '{}' is disabled. To reclaim space you can delete the corresponding DB table.", collectionName); } } } diff --git a/step-plans/step-plans-core/src/main/java/step/core/timeseries/TimeSeriesCollectionsSettings.java b/step-plans/step-plans-core/src/main/java/step/core/timeseries/TimeSeriesCollectionsSettings.java index cab6c2c055..652279237a 100644 --- a/step-plans/step-plans-core/src/main/java/step/core/timeseries/TimeSeriesCollectionsSettings.java +++ b/step-plans/step-plans-core/src/main/java/step/core/timeseries/TimeSeriesCollectionsSettings.java @@ -21,167 +21,73 @@ import ch.exense.commons.app.Configuration; -import java.time.Duration; -import java.util.concurrent.TimeUnit; +import java.util.Map; +import java.util.TreeMap; public class TimeSeriesCollectionsSettings { - public static final String TIME_SERIES_MAIN_COLLECTION_FLUSH_PERIOD = "{collectionName}.flush.period"; public static final String TIME_SERIES_COLLECTION_FLUSH_ASYNC_QUEUE_SIZE = "{collectionName}.flush.async.queue.size"; public static final String TIME_SERIES_COLLECTION_FLUSH_SERIES_QUEUE_SIZE = "{collectionName}.flush.series.queue.size"; - public static final String TIME_SERIES_MAIN_RESOLUTION = "{collectionName}.resolution"; - public static final String TIME_SERIES_MINUTE_COLLECTION_ENABLED = "{collectionName}.collections.minute.enabled"; - public static final String TIME_SERIES_MINUTE_COLLECTION_FLUSH_PERIOD = "{collectionName}.collections.minute.flush.period"; - public static final String TIME_SERIES_HOUR_COLLECTION_ENABLED = "{collectionName}.collections.hour.enabled"; - public static final String TIME_SERIES_HOUR_COLLECTION_FLUSH_PERIOD = "{collectionName}.collections.hour.flush.period"; - public static final String TIME_SERIES_DAY_COLLECTION_ENABLED = "{collectionName}.collections.day.enabled"; - public static final String TIME_SERIES_DAY_COLLECTION_FLUSH_PERIOD = "{collectionName}.collections.day.flush.period"; - public static final String TIME_SERIES_WEEK_COLLECTION_ENABLED = "{collectionName}.collections.week.enabled"; - public static final String TIME_SERIES_WEEK_COLLECTION_FLUSH_PERIOD = "{collectionName}.collections.week.flush.period"; - - private long mainResolution; - //Define the interval of the flushing job for the main ingestion pipeline (highest resolution) - //Note that flush is only actually performed by the job if the bucket time interval is complete (i.e. full resolution interval) or - //if the max series queue size is reached (to limit and control memory usage) - private long mainFlushInterval; + public static final String RESOLUTION_PROPERTY_PREFIX = "{collectionName}.collections."; + public static final String TIME_SERIES_RESOLUTION_ENABLED_SUFFIX = ".enabled"; + public static final String TIME_SERIES_RESOLUTION_FLUSH_PERIOD_SUFFIX = ".flush.period"; + //Define the max queue size for series, if the usage is over the limit flush is performed even for partial time interval private int flushSeriesQueueSize; //flushing do not write to DB directly but to a linked blocking queue in memory which is processed by an asynchronous processor, the queue size is limited to prevent excessive memory usage //While the queue is full, ingesting new buckets is blocked private int flushAsyncQueueSize; - private boolean perMinuteEnabled; - private long perMinuteFlushInterval; - private boolean hourlyEnabled; - private long hourlyFlushInterval; - private boolean dailyEnabled; - private long dailyFlushInterval; - private boolean weeklyEnabled; - private long weeklyFlushInterval; - - public boolean isDailyEnabled() { - return dailyEnabled; - } - - public boolean isPerMinuteEnabled() { - return perMinuteEnabled; - } - - public boolean isHourlyEnabled() { - return hourlyEnabled; - } - - public boolean isWeeklyEnabled() { - return weeklyEnabled; - } - - public long getMainResolution() { - return mainResolution; - } - - public TimeSeriesCollectionsSettings setMainResolution(long mainResolution) { - this.mainResolution = mainResolution; - return this; - } - - public TimeSeriesCollectionsSettings setPerMinuteEnabled(boolean perMinuteEnabled) { - this.perMinuteEnabled = perMinuteEnabled; - return this; - } - - public TimeSeriesCollectionsSettings setHourlyEnabled(boolean hourlyEnabled) { - this.hourlyEnabled = hourlyEnabled; - return this; - } - - public TimeSeriesCollectionsSettings setDailyEnabled(boolean dailyEnabled) { - this.dailyEnabled = dailyEnabled; - return this; - } - - public TimeSeriesCollectionsSettings setWeeklyEnabled(boolean weeklyEnabled) { - this.weeklyEnabled = weeklyEnabled; - return this; - } - - public long getMainFlushInterval() { - return mainFlushInterval; - } - - public TimeSeriesCollectionsSettings setMainFlushInterval(long mainFlushInterval) { - this.mainFlushInterval = mainFlushInterval; - return this; - } - - public long getPerMinuteFlushInterval() { - return perMinuteFlushInterval; + //Define the Map of supported Resolutions associated to their settings read from the configuration. + private final Map resolutionSettings = new TreeMap<>(); + + public static class ResolutionSettings { + //Define the interval of the flushing job for the ingestion pipeline + //Note that flush is only actually performed by the job if the bucket time interval is complete (i.e. full resolution interval) or + //if the max series queue size is reached (to limit and control memory usage) + public final long flushInterval; + //Flag to completely disable or enable the collection + public final boolean enabled; + + public ResolutionSettings(long flushInterval, boolean enabled) { + this.flushInterval = flushInterval; + this.enabled = enabled; + } } public int getFlushSeriesQueueSize() { return flushSeriesQueueSize; } - public TimeSeriesCollectionsSettings setFlushSeriesQueueSize(int flushSeriesQueueSize) { + public void setFlushSeriesQueueSize(int flushSeriesQueueSize) { this.flushSeriesQueueSize = flushSeriesQueueSize; - return this; } - private TimeSeriesCollectionsSettings setFlushAsyncQueueSize(int flushAsyncQueueSize) { + private void setFlushAsyncQueueSize(int flushAsyncQueueSize) { this.flushAsyncQueueSize = flushAsyncQueueSize; - return this; } public int getFlushAsyncQueueSize() { return flushAsyncQueueSize; } - public TimeSeriesCollectionsSettings setPerMinuteFlushInterval(long perMinuteFlushInterval) { - this.perMinuteFlushInterval = perMinuteFlushInterval; - return this; - } - - public long getHourlyFlushInterval() { - return hourlyFlushInterval; - } - - public TimeSeriesCollectionsSettings setHourlyFlushInterval(long hourlyFlushInterval) { - this.hourlyFlushInterval = hourlyFlushInterval; - return this; - } - - public long getDailyFlushInterval() { - return dailyFlushInterval; - } - - public TimeSeriesCollectionsSettings setDailyFlushInterval(long dailyFlushInterval) { - this.dailyFlushInterval = dailyFlushInterval; - return this; - } - - public long getWeeklyFlushInterval() { - return weeklyFlushInterval; + private void addResolutionSettings(Resolution resolution, ResolutionSettings resolutionSettings) { + this.resolutionSettings.put(resolution, resolutionSettings); } - - public TimeSeriesCollectionsSettings setWeeklyFlushInterval(long weeklyFlushInterval) { - this.weeklyFlushInterval = weeklyFlushInterval; - return this; + public ResolutionSettings getResolutionSettings(Resolution resolution) { + return resolutionSettings.get(resolution); } public static TimeSeriesCollectionsSettings readSettings(Configuration configuration, String collectionName) { - long mainResolution = getPropertyAsLong(configuration, TIME_SERIES_MAIN_RESOLUTION, collectionName, 1000L); - validateMainResolutionParam(mainResolution); - return new TimeSeriesCollectionsSettings() - .setMainResolution(mainResolution) - .setMainFlushInterval(getPropertyAsLong(configuration, TIME_SERIES_MAIN_COLLECTION_FLUSH_PERIOD, collectionName, Duration.ofSeconds(1).toMillis())) - .setFlushSeriesQueueSize(getPropertyAsInteger(configuration, TIME_SERIES_COLLECTION_FLUSH_SERIES_QUEUE_SIZE, collectionName, 20000)) - .setFlushAsyncQueueSize(getPropertyAsInteger(configuration, TIME_SERIES_COLLECTION_FLUSH_ASYNC_QUEUE_SIZE, collectionName, 5000)) - .setPerMinuteEnabled(getPropertyAsBoolean(configuration, TIME_SERIES_MINUTE_COLLECTION_ENABLED, collectionName, true)) - .setPerMinuteFlushInterval(getPropertyAsLong(configuration, TIME_SERIES_MINUTE_COLLECTION_FLUSH_PERIOD, collectionName, Duration.ofMinutes(1).toMillis())) - .setHourlyEnabled(getPropertyAsBoolean(configuration, TIME_SERIES_HOUR_COLLECTION_ENABLED, collectionName, true)) - .setHourlyFlushInterval(getPropertyAsLong(configuration, TIME_SERIES_HOUR_COLLECTION_FLUSH_PERIOD, collectionName, Duration.ofMinutes(5).toMillis())) - .setDailyEnabled(getPropertyAsBoolean(configuration, TIME_SERIES_DAY_COLLECTION_ENABLED, collectionName, true)) - .setDailyFlushInterval(getPropertyAsLong(configuration, TIME_SERIES_DAY_COLLECTION_FLUSH_PERIOD, collectionName, Duration.ofHours(1).toMillis())) - .setWeeklyEnabled(getPropertyAsBoolean(configuration, TIME_SERIES_WEEK_COLLECTION_ENABLED, collectionName, true)) - .setWeeklyFlushInterval(getPropertyAsLong(configuration, TIME_SERIES_WEEK_COLLECTION_FLUSH_PERIOD, collectionName, Duration.ofHours(2).toMillis())); + TimeSeriesCollectionsSettings settings = new TimeSeriesCollectionsSettings(); + settings.setFlushSeriesQueueSize(getPropertyAsInteger(configuration, TIME_SERIES_COLLECTION_FLUSH_SERIES_QUEUE_SIZE, collectionName, 20000)); + settings.setFlushAsyncQueueSize(getPropertyAsInteger(configuration, TIME_SERIES_COLLECTION_FLUSH_ASYNC_QUEUE_SIZE, collectionName, 5000)); + //Read settings for additional resolutions + for (Resolution resolution: Resolution.values()) { + boolean resolutionEnabled = getPropertyAsBoolean(configuration, RESOLUTION_PROPERTY_PREFIX + resolution.name + TIME_SERIES_RESOLUTION_ENABLED_SUFFIX, collectionName, true); + Long resolutionFlushInterval = getPropertyAsLong(configuration, RESOLUTION_PROPERTY_PREFIX + resolution.name + TIME_SERIES_RESOLUTION_FLUSH_PERIOD_SUFFIX, collectionName, resolution.defaultFlushPeriod.toMillis()); + settings.addResolutionSettings(resolution, new ResolutionSettings(resolutionFlushInterval, resolutionEnabled)); + } + return settings; } private static Long getPropertyAsLong(Configuration configuration, String property, String collectionName, long defaultValue) { @@ -199,23 +105,4 @@ private static boolean getPropertyAsBoolean(Configuration configuration, String private static String property(String propertyValue, String collectionName) { return propertyValue.replaceAll("\\{collectionName\\}", collectionName); } - - private static void validateMainResolutionParam(long resolution) { - double msInMinute = TimeUnit.MINUTES.toMillis(1); - if (msInMinute % resolution != 0) { - throw new IllegalArgumentException("Invalid interval: " + resolution + " seconds. The interval must be a divisor of one minute (60 seconds)."); - } - } - - public static TimeSeriesCollectionsSettings buildSingleResolutionSettings(long mainResolution, long mainFlushInterval) { - TimeSeriesCollectionsSettings timeSeriesCollectionsSettings = new TimeSeriesCollectionsSettings(); - timeSeriesCollectionsSettings.setDailyEnabled(false); - timeSeriesCollectionsSettings.setHourlyEnabled(false); - timeSeriesCollectionsSettings.setPerMinuteEnabled(false); - timeSeriesCollectionsSettings.setWeeklyEnabled(false); - timeSeriesCollectionsSettings.setMainResolution(mainResolution); - timeSeriesCollectionsSettings.setMainFlushInterval(mainFlushInterval); - return timeSeriesCollectionsSettings; - } - } diff --git a/step-plans/step-plans-core/src/test/java/step/core/timeseries/ReportNodeTimeSeriesTest.java b/step-plans/step-plans-core/src/test/java/step/core/timeseries/ReportNodeTimeSeriesTest.java index 39cf2cdfe4..556433449b 100644 --- a/step-plans/step-plans-core/src/test/java/step/core/timeseries/ReportNodeTimeSeriesTest.java +++ b/step-plans/step-plans-core/src/test/java/step/core/timeseries/ReportNodeTimeSeriesTest.java @@ -31,6 +31,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class ReportNodeTimeSeriesTest { @@ -45,7 +46,7 @@ public void reportNodeTimeSeries() { try (ReportNodeTimeSeries reportNodeTimeSeries = new ReportNodeTimeSeries(collectionFactory, configuration)) { TimeSeries timeSeries = reportNodeTimeSeries.getTimeSeries(); List collections = timeSeries.getCollections(); - assertEquals(5, collections.size()); + assertEquals(8, collections.size()); ReportNode reportNode = new ReportNode(); reportNode.setStatus(ReportNodeStatus.PASSED); reportNode.setExecutionID("executionId"); @@ -66,8 +67,11 @@ public void reportNodeTimeSeries() { @Test public void reportNodeTimeSeriesDisabled() { Configuration configuration = new Configuration(); + configuration.putProperty("reportNodeTimeSeries.collections.15_seconds.enabled", "false"); configuration.putProperty("reportNodeTimeSeries.collections.minute.enabled", "false"); + configuration.putProperty("reportNodeTimeSeries.collections.15_minutes.enabled", "false"); configuration.putProperty("reportNodeTimeSeries.collections.hour.enabled", "false"); + configuration.putProperty("reportNodeTimeSeries.collections.6_hours.enabled", "false"); configuration.putProperty("reportNodeTimeSeries.collections.day.enabled", "false"); configuration.putProperty("reportNodeTimeSeries.collections.week.enabled", "false"); CollectionFactory collectionFactory = new InMemoryCollectionFactory(null); @@ -75,6 +79,14 @@ public void reportNodeTimeSeriesDisabled() { TimeSeries timeSeries = reportNodeTimeSeries.getTimeSeries(); List collections = timeSeries.getCollections(); assertEquals(1, collections.size()); + + configuration.putProperty("reportNodeTimeSeries.collections.5_seconds.enabled", "false"); + try { + new ReportNodeTimeSeries(collectionFactory, configuration); + fail("Disabling all resolutions is not allowed"); + } catch (Throwable e) { + assertEquals("At least one time series collection must be registered.", e.getMessage()); + } } @Test @@ -86,10 +98,10 @@ public void reportNodeTimeSeriesFlushPeriods() { configuration.putProperty("reportNodeTimeSeries.collections.week.flush.period", "14"); TimeSeriesCollectionsSettings settings = TimeSeriesCollectionsSettings.readSettings(configuration, "reportNodeTimeSeries"); - assertEquals(11, settings.getPerMinuteFlushInterval()); - assertEquals(12, settings.getHourlyFlushInterval()); - assertEquals(13, settings.getDailyFlushInterval()); - assertEquals(14, settings.getWeeklyFlushInterval()); + assertEquals(11, settings.getResolutionSettings(Resolution.ONE_MINUTE).flushInterval); + assertEquals(12, settings.getResolutionSettings(Resolution.ONE_HOUR).flushInterval); + assertEquals(13, settings.getResolutionSettings(Resolution.ONE_DAY).flushInterval); + assertEquals(14, settings.getResolutionSettings(Resolution.ONE_WEEK).flushInterval); } }