diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 41640152..dc6af733 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -84,6 +84,13 @@ object BucketLevelMonitorRunner : MonitorRunner() { } var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) + + if (monitorCtx.multiTenantTriggerEvalEnabled && monitor.triggers.size > 1) { + val msg = "Bucket-level monitors only support 1 trigger when remote trigger evaluation is enabled." + logger.error(msg) + return monitorResult.copy(error = IllegalArgumentException(msg)) + } + val currentAlerts = try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) @@ -137,12 +144,17 @@ object BucketLevelMonitorRunner : MonitorRunner() { // in the final output of monitorResult which occurs when all pages have been exhausted. // If it's favorable to return the last page, will need to check how to accomplish that with multiple aggregation paths // with different page counts. + // + // When the flag is on, bucket-level monitors are limited to 1 trigger. The standard + // bucket_selector is injected directly into the query so a single search call performs + // both data collection and trigger evaluation — no separate per-trigger queries needed. val inputResults = monitorCtx.inputService!!.collectInputResults( monitor, periodStart, periodEnd, monitorResult.inputResults, - workflowRunContext + workflowRunContext, + useStandardBucketSelector = monitorCtx.multiTenantTriggerEvalEnabled ) if (firstIteration) { firstPageOfInputResults = inputResults @@ -155,13 +167,15 @@ object BucketLevelMonitorRunner : MonitorRunner() { // The currentAlerts map is formed by iterating over the Monitor's Triggers as keys so null should not be returned here val currentAlertsForTrigger = currentAlerts[trigger]!! val triggerCtx = BucketLevelTriggerExecutionContext( - monitor, - trigger as BucketLevelTrigger, - monitorResult, + monitor, trigger as BucketLevelTrigger, monitorResult, clusterSettings = monitorCtx.clusterService!!.clusterSettings ) triggerContexts[trigger.id] = triggerCtx - val triggerResult = monitorCtx.triggerService!!.runBucketLevelTrigger(monitor, trigger, triggerCtx) + val triggerResult = if (monitorCtx.multiTenantTriggerEvalEnabled) { + monitorCtx.triggerService!!.runBucketLevelTriggerFromFilteredResponse(monitor, trigger, triggerCtx) + } else { + monitorCtx.triggerService!!.runBucketLevelTrigger(monitor, trigger, triggerCtx) + } triggerResults[trigger.id] = triggerResult.getCombinedTriggerRunResult(triggerResults[trigger.id]) /* diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 3ee6d644..aea2b1a7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -14,6 +14,7 @@ import org.opensearch.alerting.opensearchapi.convertToMap import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AggregationQueryRewriter +import org.opensearch.alerting.util.BucketSelectorQueryBuilder import org.opensearch.alerting.util.CrossClusterMonitorUtils import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.addUserBackendRolesFilter @@ -30,6 +31,7 @@ import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.ClusterMetricsInput import org.opensearch.commons.alerting.model.InputRunResults import org.opensearch.commons.alerting.model.Monitor @@ -72,7 +74,8 @@ class InputService( periodStart: Instant, periodEnd: Instant, prevResult: InputRunResults? = null, - workflowRunContext: WorkflowRunContext? = null + workflowRunContext: WorkflowRunContext? = null, + useStandardBucketSelector: Boolean = false ): InputRunResults { return try { val results = mutableListOf>() @@ -93,7 +96,8 @@ class InputService( periodEnd = periodEnd, prevResult = prevResult, matchingDocIdsPerIndex = matchingDocIdsPerIndex, - returnSampleDocs = false + returnSampleDocs = false, + useStandardBucketSelector = useStandardBucketSelector ) val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } aggTriggerAfterKey += AggregationQueryRewriter.getAfterKeysFromSearchResponse( @@ -223,7 +227,8 @@ class InputService( periodEnd: Instant, prevResult: InputRunResults?, matchingDocIdsPerIndex: Map>?, - returnSampleDocs: Boolean = false + returnSampleDocs: Boolean = false, + useStandardBucketSelector: Boolean = false ): SearchRequest { // TODO: Figure out a way to use SearchTemplateRequest without bringing in the entire TransportClient val searchParams = mapOf( @@ -233,11 +238,22 @@ class InputService( // Deep copying query before passing it to rewriteQuery since otherwise, the monitor.input is modified directly // which causes a strange bug where the rewritten query persists on the Monitor across executions + val copiedQuery = deepCopyQuery(searchInput.query) + + // When using standard bucket_selector, inject it as a sub-agg instead of BucketSelectorExt. + if (useStandardBucketSelector) { + val bucketTriggers = monitor.triggers.filterIsInstance() + if (bucketTriggers.isNotEmpty()) { + BucketSelectorQueryBuilder.injectBucketSelector(copiedQuery, bucketTriggers) + } + } + val rewrittenQuery = AggregationQueryRewriter.rewriteQuery( - deepCopyQuery(searchInput.query), + copiedQuery, prevResult, monitor.triggers, - returnSampleDocs + returnSampleDocs, + skipBucketSelectorInjection = useStandardBucketSelector ) // Rewrite query to consider the doc ids per given index diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index aca55c1b..3554452e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -56,7 +56,7 @@ object MonitorMetadataService : private lateinit var xContentRegistry: NamedXContentRegistry private lateinit var clusterService: ClusterService private lateinit var settings: Settings - private lateinit var sdkClient: SdkClient + lateinit var sdkClient: SdkClient @Volatile private lateinit var indexTimeout: TimeValue diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt index d4b3d9de..ecbeb308 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt @@ -12,6 +12,7 @@ import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.script.TriggerScript import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser +import org.opensearch.alerting.util.BucketKeyFilter import org.opensearch.alerting.util.CrossClusterMonitorUtils import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.cluster.service.ClusterService @@ -181,6 +182,41 @@ class TriggerService(val scriptService: ScriptService) { } } + fun runBucketLevelTriggerFromFilteredResponse( + monitor: Monitor, + trigger: BucketLevelTrigger, + ctx: BucketLevelTriggerExecutionContext + ): BucketLevelTriggerRunResult { + return try { + val parentBucketPath = trigger.bucketSelector.parentBucketPath + val aggregationPath = AggregationPath.parse(parentBucketPath) + val aggs = ctx.results[0][Aggregations.AGGREGATIONS_FIELD] + require(aggs is Map<*, *>) { "Unexpected aggregations type: ${aggs?.javaClass}" } + var parentAgg: Map<*, *> = aggs + aggregationPath.pathElementsAsStringList.forEach { subAgg -> + val child = parentAgg[subAgg] + require(child is Map<*, *>) { "Unexpected type for agg '$subAgg': ${child?.javaClass}" } + parentAgg = child + } + val buckets = parentAgg[Aggregation.CommonFields.BUCKETS.preferredName] + require(buckets is List<*>) { "Unexpected buckets type: ${buckets?.javaClass}" } + val selectedBuckets = mutableMapOf() + for (bucket in buckets) { + require(bucket is Map<*, *>) { "Unexpected bucket type: ${bucket?.javaClass}" } + @Suppress("UNCHECKED_CAST") + val bucketDict = bucket as Map + val bucketKeyValuesList = getBucketKeyValuesList(bucketDict) + val aggResultBucket = AggregationResultBucket(parentBucketPath, bucketKeyValuesList, bucketDict) + selectedBuckets[aggResultBucket.getBucketKeysHash()] = aggResultBucket + } + val filteredBuckets = BucketKeyFilter.filterBuckets(selectedBuckets, trigger.bucketSelector.filter) + BucketLevelTriggerRunResult(trigger.name, null, filteredBuckets) + } catch (e: Exception) { + logger.info("Error running trigger [${trigger.id}] for monitor [${monitor.id}]", e) + BucketLevelTriggerRunResult(trigger.name, e, emptyMap()) + } + } + @Suppress("UNCHECKED_CAST") fun runBucketLevelTrigger( monitor: Monitor, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 82e5d334..8733fcdc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -38,6 +38,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERTING_MAX_ import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTION_THROTTLE_VALUE import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_TRIGGERS_PER_MONITOR +import org.opensearch.alerting.settings.AlertingSettings.Companion.MULTI_TENANT_TRIGGER_EVAL_ENABLED import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST import org.opensearch.alerting.util.DocLevelMonitorQueries @@ -116,6 +117,7 @@ class TransportIndexMonitorAction @Inject constructor( @Volatile private var maxMonitors = ALERTING_MAX_MONITORS.get(settings) @Volatile private var maxTriggersPerMonitor = MAX_TRIGGERS_PER_MONITOR.get(settings) + @Volatile private var multiTenantTriggerEvalEnabled = MULTI_TENANT_TRIGGER_EVAL_ENABLED.get(settings) @Volatile private var requestTimeout = REQUEST_TIMEOUT.get(settings) @Volatile private var indexTimeout = INDEX_TIMEOUT.get(settings) @Volatile private var maxActionThrottle = MAX_ACTION_THROTTLE_VALUE.get(settings) @@ -131,6 +133,9 @@ class TransportIndexMonitorAction @Inject constructor( init { clusterService.clusterSettings.addSettingsUpdateConsumer(ALERTING_MAX_MONITORS) { maxMonitors = it } clusterService.clusterSettings.addSettingsUpdateConsumer(MAX_TRIGGERS_PER_MONITOR) { maxTriggersPerMonitor = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(MULTI_TENANT_TRIGGER_EVAL_ENABLED) { + multiTenantTriggerEvalEnabled = it + } clusterService.clusterSettings.addSettingsUpdateConsumer(REQUEST_TIMEOUT) { requestTimeout = it } clusterService.clusterSettings.addSettingsUpdateConsumer(INDEX_TIMEOUT) { indexTimeout = it } clusterService.clusterSettings.addSettingsUpdateConsumer(MAX_ACTION_THROTTLE_VALUE) { maxActionThrottle = it } @@ -453,6 +458,14 @@ class TransportIndexMonitorAction @Inject constructor( require(monitor.triggers.size <= maxTriggersPerMonitor) { "The current cluster settings only allow up to $maxTriggersPerMonitor triggers per monitor." } + if (multiTenantTriggerEvalEnabled && + Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == + Monitor.MonitorType.BUCKET_LEVEL_MONITOR + ) { + require(monitor.triggers.size <= 1) { + "Bucket-level monitors only support 1 trigger when remote trigger evaluation is enabled." + } + } } private fun validateActionThrottle(monitor: Monitor, maxValue: TimeValue, minValue: TimeValue) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt index 74cedd59..60ab3c2f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt @@ -41,12 +41,15 @@ class AggregationQueryRewriter { query: SearchSourceBuilder, prevResult: InputRunResults?, triggers: List, - returnSampleDocs: Boolean = false + returnSampleDocs: Boolean = false, + skipBucketSelectorInjection: Boolean = false ): SearchSourceBuilder { triggers.forEach { trigger -> if (trigger is BucketLevelTrigger) { // add bucket selector pipeline aggregation for each trigger in query - query.aggregation(trigger.bucketSelector) + if (!skipBucketSelectorInjection) { + query.aggregation(trigger.bucketSelector) + } // if this request is processing the subsequent pages of input query result, then add after key if (prevResult?.aggTriggersAfterKey?.get(trigger.id) != null) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/BucketKeyFilter.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/BucketKeyFilter.kt new file mode 100644 index 00000000..35253abe --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/BucketKeyFilter.kt @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter +import org.opensearch.commons.alerting.model.AggregationResultBucket +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.search.aggregations.bucket.terms.IncludeExclude +import java.util.regex.Pattern + +/** + * Applies [BucketSelectorExtFilter] include/exclude patterns to bucket results post-response. + * + * In the standard `bucket_selector` approach, the filter cannot be applied server-side since + * `BucketSelectorExt` is a custom plugin aggregation. Instead, the include/exclude filtering + * is applied after the response is received. + */ +object BucketKeyFilter { + + fun filterBuckets( + buckets: Map, + filter: BucketSelectorExtFilter? + ): Map { + if (filter == null) return buckets + + return if (filter.isCompositeAggregation) { + filterCompositeKeys(buckets, filter.filtersMap ?: return buckets) + } else { + filterSimpleKeys(buckets, filter.filters ?: return buckets) + } + } + + @Suppress("UNCHECKED_CAST") + private fun filterCompositeKeys( + buckets: Map, + filtersMap: HashMap + ): Map { + val patterns = filtersMap.mapValues { (_, ie) -> extractPatterns(ie) } + return buckets.filter { (_, bucket) -> + val keyMap = bucket.bucket?.get("key") as? Map ?: return@filter true + patterns.all { (sourceKey, regexPair) -> + val value = keyMap[sourceKey]?.toString() ?: return@all true + isAccepted(value, regexPair) + } + } + } + + private fun filterSimpleKeys( + buckets: Map, + includeExclude: IncludeExclude + ): Map { + val regexPair = extractPatterns(includeExclude) + return buckets.filter { (_, bucket) -> + val key = bucket.bucketKeys.joinToString("#") + isAccepted(key, regexPair) + } + } + + private fun isAccepted(value: String, patterns: Pair): Boolean { + val (includeRegex, excludeRegex) = patterns + if (includeRegex != null && !includeRegex.matcher(value).matches()) return false + if (excludeRegex != null && excludeRegex.matcher(value).matches()) return false + return true + } + + /** + * Extracts include/exclude regex strings from [IncludeExclude] via XContent serialization. + */ + private fun extractPatterns(includeExclude: IncludeExclude): Pair { + val builder = XContentFactory.jsonBuilder().startObject() + includeExclude.toXContent(builder, ToXContent.EMPTY_PARAMS) + builder.endObject() + val json = builder.toString() + val map = XContentType.JSON.xContent() + .createParser(null, null, json) + .use { parser -> parser.map() } + val include = (map["include"] as? String)?.let { Pattern.compile(it) } + val exclude = (map["exclude"] as? String)?.let { Pattern.compile(it) } + return Pair(include, exclude) + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/BucketSelectorQueryBuilder.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/BucketSelectorQueryBuilder.kt new file mode 100644 index 00000000..29c127a8 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/BucketSelectorQueryBuilder.kt @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.search.aggregations.AggregationBuilder +import org.opensearch.search.aggregations.AggregatorFactories +import org.opensearch.search.aggregations.PipelineAggregatorBuilders +import org.opensearch.search.aggregations.support.AggregationPath +import org.opensearch.search.builder.SearchSourceBuilder + +/** + * Constructs standard `bucket_selector` pipeline aggregations from [BucketLevelTrigger] conditions + * and injects them as sub-aggregations of the parent aggregation identified by `parentBucketPath`. + * + * This replaces the custom `BucketSelectorExt` approach for environments where custom plugin + * aggregations are not available (e.g., serverless clusters). Standard `bucket_selector` natively + * removes non-matching buckets, so remaining buckets in the response are the triggered buckets. + */ +object BucketSelectorQueryBuilder { + + const val TRIGGER_FILTER_PREFIX = "_trigger_filter_" + + /** + * Injects a standard `bucket_selector` pipeline sub-aggregation for each trigger under its + * parent aggregation identified by [BucketLevelTrigger.bucketSelector.parentBucketPath]. + * + * @param query The search source to modify + * @param triggers The bucket-level triggers whose conditions should be injected + * @return The modified search source + * @throws IllegalArgumentException if a trigger's parentBucketPath cannot be resolved + */ + fun injectBucketSelector(query: SearchSourceBuilder, triggers: List): SearchSourceBuilder { + for (trigger in triggers) { + val selector = trigger.bucketSelector + val parentAgg = findParentAgg( + query.aggregations() as AggregatorFactories.Builder, + selector.parentBucketPath + ) + val pipelineAgg = PipelineAggregatorBuilders.bucketSelector( + "$TRIGGER_FILTER_PREFIX${trigger.id}", + selector.bucketsPathsMap, + selector.script + ) + parentAgg.subAggregation(pipelineAgg) + } + return query + } + + private fun findParentAgg(aggFactories: AggregatorFactories.Builder, parentBucketPath: String): AggregationBuilder { + val pathElements = AggregationPath.parse(parentBucketPath).pathElementsAsStringList + var aggBuilders = aggFactories.aggregatorFactories + var found: AggregationBuilder? = null + + for (element in pathElements) { + found = null + for (agg in aggBuilders) { + if (agg.name == element) { + found = agg + aggBuilders = agg.subAggregations + break + } + } + if (found == null) { + throw IllegalArgumentException("ParentBucketPath: $parentBucketPath not found in query aggregations") + } + } + + return found!! + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/RemoteBucketLevelTriggerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteBucketLevelTriggerIT.kt new file mode 100644 index 00000000..6c9f0f41 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteBucketLevelTriggerIT.kt @@ -0,0 +1,495 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.client.ResponseException +import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder +import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter +import org.opensearch.commons.alerting.model.Alert.State.ACTIVE +import org.opensearch.commons.alerting.model.Alert.State.COMPLETED +import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.index.query.QueryBuilders +import org.opensearch.script.Script +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder +import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder +import org.opensearch.search.aggregations.bucket.terms.IncludeExclude +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder +import org.opensearch.search.builder.SearchSourceBuilder +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit + +/** + * Integration tests for bucket-level trigger evaluation with the multi-tenant trigger eval flag enabled. + * These tests verify that standard bucket_selector injection and filtered response parsing work + * correctly end-to-end. + */ +class RemoteBucketLevelTriggerIT : AlertingRestTestCase() { + + private val SETTING_KEY = AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED.key + + private fun enableRemoteTriggerEval() { + client().updateSettings(SETTING_KEY, true) + } + + private fun disableRemoteTriggerEval() { + client().updateSettings(SETTING_KEY, false) + } + + private fun buildCompositeInput(index: String): SearchInput { + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf(TermsValuesSourceBuilder("test_field").field("test_field")) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + return SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + } + + private fun buildTermsInput(index: String): SearchInput { + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val termsAgg = TermsAggregationBuilder("terms_agg").field("test_field") + return SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(termsAgg)) + } + + private fun buildTrigger( + parentBucketPath: String = "composite_agg", + script: String = "params.docCount > 0" + ): org.opensearch.commons.alerting.model.BucketLevelTrigger { + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(script), + parentBucketPath = parentBucketPath, + filter = null + ) + ) + return trigger + } + + // ---- Tests ---- + + fun `test multi tenant bucket trigger composite agg`() { + enableRemoteTriggerEval() + try { + val testIndex = createTestIndex() + insertSampleTimeSerializedData(testIndex, listOf("test_value_1", "test_value_1", "test_value_2")) + + val input = buildCompositeInput(testIndex) + val trigger = buildTrigger(script = "params.docCount > 0") + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + @Suppress("UNCHECKED_CAST") + val buckets = triggerResult["agg_result_buckets"] as Map + assertEquals("Both buckets should match", 2, buckets.size) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant bucket trigger terms agg`() { + enableRemoteTriggerEval() + try { + val testIndex = createTestIndex() + insertSampleTimeSerializedData(testIndex, listOf("test_value_1", "test_value_1", "test_value_2")) + + val input = buildTermsInput(testIndex) + val trigger = buildTrigger(parentBucketPath = "terms_agg", script = "params.docCount > 1") + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + @Suppress("UNCHECKED_CAST") + val buckets = triggerResult["agg_result_buckets"] as Map + // Only test_value_1 has docCount > 1 + assertEquals("Only one bucket should match", 1, buckets.size) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant bucket trigger alert lifecycle`() { + enableRemoteTriggerEval() + try { + val testIndex = createTestIndex() + insertSampleTimeSerializedData(testIndex, listOf("test_value_1", "test_value_1", "test_value_2")) + + val input = buildCompositeInput(testIndex) + val trigger = buildTrigger(script = "params.docCount > 0") + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + // First execution — alerts created + executeMonitor(monitor.id) + var alerts = searchAlerts(monitor) + assertEquals("Alerts not saved", 2, alerts.size) + alerts.forEach { assertEquals(ACTIVE, it.state) } + + // Delete docs for one bucket + deleteDataWithDocIds(testIndex, listOf("1", "2")) // test_value_1 + + // Second execution — one alert completed + executeMonitor(monitor.id) + alerts = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN) + val activeAlerts = alerts.filter { it.state == ACTIVE } + val completedAlerts = alerts.filter { it.state == COMPLETED } + assertEquals("Incorrect number of active alerts", 1, activeAlerts.size) + assertEquals("Incorrect number of completed alerts", 1, completedAlerts.size) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant bucket trigger no matching buckets`() { + enableRemoteTriggerEval() + try { + val testIndex = createTestIndex() + insertSampleTimeSerializedData(testIndex, listOf("test_value_1")) + + val input = buildCompositeInput(testIndex) + val trigger = buildTrigger(script = "params.docCount > 100") + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + @Suppress("UNCHECKED_CAST") + val buckets = triggerResult["agg_result_buckets"] as Map + assertTrue("No buckets should match", buckets.isEmpty()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant bucket trigger dry run`() { + enableRemoteTriggerEval() + try { + val testIndex = createTestIndex() + insertSampleTimeSerializedData(testIndex, listOf("test_value_1", "test_value_2")) + + val input = buildCompositeInput(testIndex) + val trigger = buildTrigger(script = "params.docCount > 0") + val monitor = randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger)) + + // Dry run — no alerts persisted + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + @Suppress("UNCHECKED_CAST") + val buckets = triggerResult["agg_result_buckets"] as Map + assertEquals(2, buckets.size) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant bucket trigger rejects multiple triggers`() { + enableRemoteTriggerEval() + try { + val testIndex = createTestIndex() + val input = buildCompositeInput(testIndex) + val trigger1 = buildTrigger(script = "params.docCount > 0") + val trigger2 = buildTrigger(script = "params.docCount > 1") + try { + createMonitor( + randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger1, trigger2)) + ) + fail("Expected monitor creation to fail with 2 triggers when flag is on") + } catch (e: ResponseException) { + assertTrue(e.message!!.contains("only support 1 trigger")) + } + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant bucket trigger with painless script in query`() { + enableRemoteTriggerEval() + try { + val testIndex = createTestIndex( + randomAlphaOfLength(10).lowercase(), + """ + "properties": { + "test_strict_date_time": { "type": "date", "format": "strict_date_time" }, + "test_field": { "type": "keyword" }, + "value": { "type": "integer" } + } + """ + ) + val twoMinsAgo = ZonedDateTime.now().minus(2, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MILLIS) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(twoMinsAgo) + indexDoc(testIndex, "1", """{ "test_strict_date_time": "$testTime", "test_field": "a", "value": 10 }""") + indexDoc(testIndex, "2", """{ "test_strict_date_time": "$testTime", "test_field": "a", "value": 20 }""") + indexDoc(testIndex, "3", """{ "test_strict_date_time": "$testTime", "test_field": "b", "value": 5 }""") + + // Query uses a Painless script filter: only docs where value > 8 + val query = QueryBuilders.boolQuery() + .must( + QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d").lte("{{period_end}}").format("epoch_millis") + ) + .filter(QueryBuilders.scriptQuery(Script("doc['value'].value > 8"))) + val compositeSources = listOf(TermsValuesSourceBuilder("test_field").field("test_field")) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput( + indices = listOf(testIndex), + query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg) + ) + // Only bucket "a" survives the script filter (values 10, 20); "b" (value 5) is excluded + val trigger = buildTrigger(script = "params.docCount > 0") + val monitor = createMonitor( + randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger)) + ) + + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + @Suppress("UNCHECKED_CAST") + val buckets = triggerResult["agg_result_buckets"] as Map + assertEquals("Only bucket 'a' should survive the Painless script filter", 1, buckets.size) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant bucket trigger nested parent path`() { + enableRemoteTriggerEval() + try { + val testIndex = createTestIndex( + randomAlphaOfLength(10).lowercase(), + """ + "properties": { + "test_strict_date_time": { "type": "date", "format": "strict_date_time" }, + "host": { "type": "keyword" }, + "status": { "type": "keyword" } + } + """ + ) + val twoMinsAgo = ZonedDateTime.now().minus(2, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MILLIS) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(twoMinsAgo) + // host_a: 3 docs (status 200, 200, 500), host_b: 1 doc (status 200) + indexDoc(testIndex, "1", """{ "test_strict_date_time": "$testTime", "host": "host_a", "status": "200" }""") + indexDoc(testIndex, "2", """{ "test_strict_date_time": "$testTime", "host": "host_a", "status": "200" }""") + indexDoc(testIndex, "3", """{ "test_strict_date_time": "$testTime", "host": "host_a", "status": "500" }""") + indexDoc(testIndex, "4", """{ "test_strict_date_time": "$testTime", "host": "host_b", "status": "200" }""") + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + // Top-level composite agg on host — this is the parentBucketPath target + val compositeSources = listOf(TermsValuesSourceBuilder("host").field("host")) + // Nested sub-agg: terms on status under composite + val statusAgg = TermsAggregationBuilder("status_breakdown").field("status") + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + .subAggregation(statusAgg) + val input = SearchInput( + indices = listOf(testIndex), + query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg) + ) + + // Trigger: host_a(3 docs) matches, host_b(1 doc) doesn't + val trigger = buildTrigger(parentBucketPath = "composite_agg", script = "params.docCount > 1") + val monitor = createMonitor( + randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger)) + ) + + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + val triggerResults = output.objectMap("trigger_results") + + @Suppress("UNCHECKED_CAST") + val buckets = triggerResults.objectMap(trigger.id)["agg_result_buckets"] as Map + assertEquals("Only host_a should match", 1, buckets.size) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant bucket trigger include exclude filter`() { + enableRemoteTriggerEval() + try { + val testIndex = createTestIndex() + // 4 buckets: test_value_1(2 docs), test_value_2(1), test_value_3(1), test_value_4(1) + insertSampleTimeSerializedData( + testIndex, + listOf("test_value_1", "test_value_1", "test_value_2", "test_value_3", "test_value_4") + ) + + val input = buildCompositeInput(testIndex) + // Include filter limits to test_value_1 and test_value_2 + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script("params.docCount > 0"), + parentBucketPath = "composite_agg", + filter = BucketSelectorExtFilter(IncludeExclude("test_value_[12]", null)) + ) + ) + val monitor = createMonitor( + randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger)) + ) + + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + val triggerResults = output.objectMap("trigger_results") + + @Suppress("UNCHECKED_CAST") + val buckets = triggerResults.objectMap(trigger.id)["agg_result_buckets"] as Map + // 4 buckets pass script, include filter keeps test_value_1 and test_value_2 + assertEquals("Should match 2 buckets after include filter", 2, buckets.size) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant bucket trigger exclude filter`() { + enableRemoteTriggerEval() + try { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf("test_value_1", "test_value_1", "test_value_2", "test_value_3", "test_value_4") + ) + + val input = buildCompositeInput(testIndex) + // Exclude filter removes test_value_1 + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script("params.docCount > 0"), + parentBucketPath = "composite_agg", + filter = BucketSelectorExtFilter(IncludeExclude(null, "test_value_1")) + ) + ) + val monitor = createMonitor( + randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger)) + ) + + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + val triggerResults = output.objectMap("trigger_results") + + @Suppress("UNCHECKED_CAST") + val buckets = triggerResults.objectMap(trigger.id)["agg_result_buckets"] as Map + // 4 buckets pass script, exclude filter removes test_value_1 → 3 remain + assertEquals("Should match 3 buckets after exclude filter", 3, buckets.size) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant bucket trigger input results populated without base query`() { + enableRemoteTriggerEval() + try { + val testIndex = createTestIndex() + insertSampleTimeSerializedData(testIndex, listOf("test_value_1", "test_value_1", "test_value_2")) + + val input = buildCompositeInput(testIndex) + val trigger = buildTrigger(script = "params.docCount > 0") + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + // Verify input_results is populated even though the base query is skipped + @Suppress("UNCHECKED_CAST") + val inputResults = output.objectMap("input_results")["results"] as List> + assertTrue("input_results should not be empty", inputResults.isNotEmpty()) + val firstResult = inputResults.first() + assertNotNull("Aggregations should be present", firstResult["aggregations"]) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant bucket trigger single trigger matches flag off`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData(testIndex, listOf("test_value_1", "test_value_1", "test_value_2")) + + val input = buildCompositeInput(testIndex) + val trigger = buildTrigger(script = "params.docCount > 1") + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + // Execute with flag off + client().updateSettings(SETTING_KEY, false) + val responseOff = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val outputOff = entityAsMap(responseOff) + @Suppress("UNCHECKED_CAST") + val bucketsOff = outputOff.objectMap("trigger_results").objectMap(trigger.id)["agg_result_buckets"] as Map + + // Execute with flag on + enableRemoteTriggerEval() + try { + val responseOn = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val outputOn = entityAsMap(responseOn) + @Suppress("UNCHECKED_CAST") + val bucketsOn = outputOn.objectMap("trigger_results").objectMap(trigger.id)["agg_result_buckets"] as Map + + assertEquals("Single trigger should produce same bucket count", bucketsOff.size, bucketsOn.size) + assertEquals("Only test_value_1 should match", 1, bucketsOn.size) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant bucket trigger alert lifecycle single query`() { + enableRemoteTriggerEval() + try { + val testIndex = createTestIndex() + insertSampleTimeSerializedData(testIndex, listOf("test_value_1", "test_value_1", "test_value_2")) + + val input = buildCompositeInput(testIndex) + val trigger = buildTrigger(script = "params.docCount > 0") + val monitor = createMonitor( + randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger)) + ) + + // First execution — 2 alerts created (test_value_1, test_value_2) + executeMonitor(monitor.id) + var alerts = searchAlerts(monitor) + assertEquals("Should have 2 alerts", 2, alerts.size) + alerts.forEach { assertEquals(ACTIVE, it.state) } + + // Delete docs for test_value_1 + deleteDataWithDocIds(testIndex, listOf("1", "2")) + + // Second execution — test_value_1 completed, test_value_2 still active + executeMonitor(monitor.id) + alerts = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN) + val active = alerts.filter { it.state == ACTIVE } + val completed = alerts.filter { it.state == COMPLETED } + assertEquals("Should have 1 active alert", 1, active.size) + assertEquals("Should have 1 completed alert", 1, completed.size) + } finally { + disableRemoteTriggerEval() + } + } + + @Suppress("UNCHECKED_CAST") + private fun Map.objectMap(key: String): Map> { + return this[key] as Map> + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/RemoteBucketLevelTriggerRegressionIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteBucketLevelTriggerRegressionIT.kt new file mode 100644 index 00000000..1853a7dc --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteBucketLevelTriggerRegressionIT.kt @@ -0,0 +1,182 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder +import org.opensearch.commons.alerting.model.Alert.State.ACTIVE +import org.opensearch.commons.alerting.model.Alert.State.COMPLETED +import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.index.query.QueryBuilders +import org.opensearch.script.Script +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder +import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder +import org.opensearch.search.builder.SearchSourceBuilder + +/** + * Regression tests verifying that bucket-level monitors work correctly when + * multi_tenant_trigger_eval_enabled is false (the default). The existing + * BucketSelectorExt path must be completely unchanged. + */ +class RemoteBucketLevelTriggerRegressionIT : AlertingRestTestCase() { + + private val SETTING_KEY = AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED.key + + private fun buildCompositeInput(index: String): SearchInput { + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf(TermsValuesSourceBuilder("test_field").field("test_field")) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + return SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + } + + private fun buildTrigger(script: String = "params.docCount > 0"): org.opensearch.commons.alerting.model.BucketLevelTrigger { + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(script), + parentBucketPath = "composite_agg", + filter = null + ) + ) + return trigger + } + + fun `test bucket level trigger flag disabled`() { + // Explicitly disable — should use BucketSelectorExt path + client().updateSettings(SETTING_KEY, false) + try { + val testIndex = createTestIndex() + insertSampleTimeSerializedData(testIndex, listOf("test_value_1", "test_value_1", "test_value_2")) + + val input = buildCompositeInput(testIndex) + val trigger = buildTrigger() + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + executeMonitor(monitor.id) + val alerts = searchAlerts(monitor) + assertEquals("Alerts not saved", 2, alerts.size) + alerts.forEach { assertEquals(ACTIVE, it.state) } + } finally { + client().updateSettings(SETTING_KEY, false) + } + } + + fun `test bucket level trigger flag default is false`() { + // Don't set the flag at all — verify default is false + val settings = getClusterSettings() + val flagValue = settings?.get(SETTING_KEY) + assertTrue("Flag should default to false or be absent", flagValue == null || flagValue == "false") + } + + fun `test bucket level trigger alert lifecycle flag disabled`() { + client().updateSettings(SETTING_KEY, false) + try { + val testIndex = createTestIndex() + insertSampleTimeSerializedData(testIndex, listOf("test_value_1", "test_value_1", "test_value_2")) + + val input = buildCompositeInput(testIndex) + val trigger = buildTrigger() + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + // First execution — alerts created + executeMonitor(monitor.id) + var alerts = searchAlerts(monitor) + assertEquals(2, alerts.size) + alerts.forEach { assertEquals(ACTIVE, it.state) } + + // Delete docs for one bucket + deleteDataWithDocIds(testIndex, listOf("1", "2")) + + // Second execution — one completed + executeMonitor(monitor.id) + alerts = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN) + val activeAlerts = alerts.filter { it.state == ACTIVE } + val completedAlerts = alerts.filter { it.state == COMPLETED } + assertEquals(1, activeAlerts.size) + assertEquals(1, completedAlerts.size) + } finally { + client().updateSettings(SETTING_KEY, false) + } + } + + fun `test bucket level trigger toggle flag during execution`() { + try { + val testIndex = createTestIndex() + insertSampleTimeSerializedData(testIndex, listOf("test_value_1", "test_value_1", "test_value_2")) + + val input = buildCompositeInput(testIndex) + val trigger = buildTrigger() + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + // Execute with flag=false (BucketSelectorExt path) + client().updateSettings(SETTING_KEY, false) + val response1 = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output1 = entityAsMap(response1) + @Suppress("UNCHECKED_CAST") + val buckets1 = output1.objectMap("trigger_results").objectMap(trigger.id)["agg_result_buckets"] as Map + assertEquals(2, buckets1.size) + + // Toggle to true (standard bucket_selector path) + client().updateSettings(SETTING_KEY, true) + val response2 = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output2 = entityAsMap(response2) + @Suppress("UNCHECKED_CAST") + val buckets2 = output2.objectMap("trigger_results").objectMap(trigger.id)["agg_result_buckets"] as Map + assertEquals(2, buckets2.size) + } finally { + client().updateSettings(SETTING_KEY, false) + } + } + + fun `test bucket level trigger toggle flag single trigger`() { + try { + val testIndex = createTestIndex() + insertSampleTimeSerializedData(testIndex, listOf("test_value_1", "test_value_1", "test_value_2")) + + val input = buildCompositeInput(testIndex) + val trigger = buildTrigger(script = "params.docCount > 1") // only test_value_1 + val monitor = createMonitor( + randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger)) + ) + + // Execute with flag=false (BucketSelectorExt path) + client().updateSettings(SETTING_KEY, false) + val response1 = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output1 = entityAsMap(response1) + @Suppress("UNCHECKED_CAST") + val buckets1 = output1.objectMap("trigger_results").objectMap(trigger.id)["agg_result_buckets"] as Map + assertEquals("Flag off: should match 1 bucket", 1, buckets1.size) + + // Toggle to true (standard bucket_selector path) + client().updateSettings(SETTING_KEY, true) + val response2 = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output2 = entityAsMap(response2) + @Suppress("UNCHECKED_CAST") + val buckets2 = output2.objectMap("trigger_results").objectMap(trigger.id)["agg_result_buckets"] as Map + assertEquals("Flag on: should match 1 bucket", 1, buckets2.size) + } finally { + client().updateSettings(SETTING_KEY, false) + } + } + + private fun getClusterSettings(): Map? { + val response = client().performRequest(org.opensearch.client.Request("GET", "/_cluster/settings?flat_settings=true")) + @Suppress("UNCHECKED_CAST") + val settings = entityAsMap(response)["defaults"] as? Map + return settings + } + + @Suppress("UNCHECKED_CAST") + private fun Map.objectMap(key: String): Map> { + return this[key] as Map> + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TriggerServiceTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TriggerServiceTests.kt index a12742db..6baa016f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TriggerServiceTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TriggerServiceTests.kt @@ -13,6 +13,7 @@ import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.settings.Setting import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder import org.opensearch.commons.alerting.model.BucketLevelTriggerRunResult import org.opensearch.commons.alerting.model.InputRunResults import org.opensearch.commons.alerting.model.MonitorRunResult @@ -266,4 +267,198 @@ class TriggerServiceTests : OpenSearchTestCase() { val bucketLevelTriggerRunResult = triggerService.runBucketLevelTrigger(monitor, trigger, triggerCtx) assertNull(bucketLevelTriggerRunResult.error) } + + fun `test run bucket level trigger from filtered response with string keys`() { + val bucketSelectorExtAggregationBuilder = BucketSelectorExtAggregationBuilder( + "test_trigger", + mutableMapOf("_count" to "_count"), + randomScript(source = "params._count > 1"), + "status_code", + null + ) + val trigger = randomBucketLevelTrigger(bucketSelector = bucketSelectorExtAggregationBuilder) + val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) + + // Standard bucket_selector already removed non-matching buckets. + // Only buckets with doc_count > 1 remain. + val inputResultsStr = """ + { + "hits": { "hits": [], "total": { "value": 4, "relation": "eq" }, "max_score": null }, + "took": 10, "timed_out": false, + "aggregations": { + "status_code": { + "buckets": [ + { "doc_count": 2, "key": "200" }, + { "doc_count": 3, "key": "500" } + ] + } + } + } + """.trimIndent() + + val inputResults = parseInputResults(inputResultsStr) + var monitorRunResult = MonitorRunResult(monitor.name, Instant.now(), Instant.now()) + monitorRunResult = monitorRunResult.copy(inputResults = InputRunResults(listOf(inputResults))) + val triggerCtx = BucketLevelTriggerExecutionContext(monitor, trigger, monitorRunResult, clusterSettings = clusterSettings) + + val result = triggerService.runBucketLevelTriggerFromFilteredResponse(monitor, trigger, triggerCtx) + assertNull(result.error) + assertEquals(2, result.aggregationResultBuckets.size) + assertTrue(result.aggregationResultBuckets.containsKey("200")) + assertTrue(result.aggregationResultBuckets.containsKey("500")) + } + + fun `test run bucket level trigger from filtered response with int keys`() { + val bucketSelectorExtAggregationBuilder = BucketSelectorExtAggregationBuilder( + "test_trigger", + mutableMapOf("_count" to "_count"), + randomScript(source = "params._count > 0"), + "status_code", + null + ) + val trigger = randomBucketLevelTrigger(bucketSelector = bucketSelectorExtAggregationBuilder) + val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) + + val inputResultsStr = """ + { + "hits": { "hits": [], "total": { "value": 3, "relation": "eq" }, "max_score": null }, + "took": 10, "timed_out": false, + "aggregations": { + "status_code": { + "buckets": [ + { "doc_count": 2, "key": 100 }, + { "doc_count": 1, "key": 201 } + ] + } + } + } + """.trimIndent() + + val inputResults = parseInputResults(inputResultsStr) + var monitorRunResult = MonitorRunResult(monitor.name, Instant.now(), Instant.now()) + monitorRunResult = monitorRunResult.copy(inputResults = InputRunResults(listOf(inputResults))) + val triggerCtx = BucketLevelTriggerExecutionContext(monitor, trigger, monitorRunResult, clusterSettings = clusterSettings) + + val result = triggerService.runBucketLevelTriggerFromFilteredResponse(monitor, trigger, triggerCtx) + assertNull(result.error) + assertEquals(2, result.aggregationResultBuckets.size) + assertTrue(result.aggregationResultBuckets.containsKey("100")) + assertTrue(result.aggregationResultBuckets.containsKey("201")) + } + + fun `test run bucket level trigger from filtered response with composite map keys`() { + val bucketSelectorExtAggregationBuilder = BucketSelectorExtAggregationBuilder( + "test_trigger", + mutableMapOf("_count" to "_count"), + randomScript(source = "params._count > 1"), + "composite_agg", + null + ) + val trigger = randomBucketLevelTrigger(bucketSelector = bucketSelectorExtAggregationBuilder) + val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) + + val inputResultsStr = """ + { + "hits": { "hits": [], "total": { "value": 5, "relation": "eq" }, "max_score": null }, + "took": 10, "timed_out": false, + "aggregations": { + "composite_agg": { + "buckets": [ + { "doc_count": 3, "key": { "host": "server1", "status": "200" } }, + { "doc_count": 2, "key": { "host": "server2", "status": "500" } } + ] + } + } + } + """.trimIndent() + + val inputResults = parseInputResults(inputResultsStr) + var monitorRunResult = MonitorRunResult(monitor.name, Instant.now(), Instant.now()) + monitorRunResult = monitorRunResult.copy(inputResults = InputRunResults(listOf(inputResults))) + val triggerCtx = BucketLevelTriggerExecutionContext(monitor, trigger, monitorRunResult, clusterSettings = clusterSettings) + + val result = triggerService.runBucketLevelTriggerFromFilteredResponse(monitor, trigger, triggerCtx) + assertNull(result.error) + assertEquals(2, result.aggregationResultBuckets.size) + // Composite keys are joined with "#" + assertTrue(result.aggregationResultBuckets.containsKey("server1#200")) + assertTrue(result.aggregationResultBuckets.containsKey("server2#500")) + } + + fun `test run bucket level trigger from filtered response with empty buckets`() { + val bucketSelectorExtAggregationBuilder = BucketSelectorExtAggregationBuilder( + "test_trigger", + mutableMapOf("_count" to "_count"), + randomScript(source = "params._count > 100"), + "status_code", + null + ) + val trigger = randomBucketLevelTrigger(bucketSelector = bucketSelectorExtAggregationBuilder) + val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) + + val inputResultsStr = """ + { + "hits": { "hits": [], "total": { "value": 0, "relation": "eq" }, "max_score": null }, + "took": 10, "timed_out": false, + "aggregations": { + "status_code": { + "buckets": [] + } + } + } + """.trimIndent() + + val inputResults = parseInputResults(inputResultsStr) + var monitorRunResult = MonitorRunResult(monitor.name, Instant.now(), Instant.now()) + monitorRunResult = monitorRunResult.copy(inputResults = InputRunResults(listOf(inputResults))) + val triggerCtx = BucketLevelTriggerExecutionContext(monitor, trigger, monitorRunResult, clusterSettings = clusterSettings) + + val result = triggerService.runBucketLevelTriggerFromFilteredResponse(monitor, trigger, triggerCtx) + assertNull(result.error) + assertTrue(result.aggregationResultBuckets.isEmpty()) + } + + fun `test run bucket level trigger from filtered response with nested parent path`() { + val bucketSelectorExtAggregationBuilder = BucketSelectorExtAggregationBuilder( + "test_trigger", + mutableMapOf("_count" to "_count"), + randomScript(source = "params._count > 0"), + "outer>inner", + null + ) + val trigger = randomBucketLevelTrigger(bucketSelector = bucketSelectorExtAggregationBuilder) + val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) + + val inputResultsStr = """ + { + "hits": { "hits": [], "total": { "value": 2, "relation": "eq" }, "max_score": null }, + "took": 10, "timed_out": false, + "aggregations": { + "outer": { + "inner": { + "buckets": [ + { "doc_count": 1, "key": "val1" } + ] + } + } + } + } + """.trimIndent() + + val inputResults = parseInputResults(inputResultsStr) + var monitorRunResult = MonitorRunResult(monitor.name, Instant.now(), Instant.now()) + monitorRunResult = monitorRunResult.copy(inputResults = InputRunResults(listOf(inputResults))) + val triggerCtx = BucketLevelTriggerExecutionContext(monitor, trigger, monitorRunResult, clusterSettings = clusterSettings) + + val result = triggerService.runBucketLevelTriggerFromFilteredResponse(monitor, trigger, triggerCtx) + assertNull(result.error) + assertEquals(1, result.aggregationResultBuckets.size) + assertTrue(result.aggregationResultBuckets.containsKey("val1")) + } + + private fun parseInputResults(json: String): Map { + val parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json) + return parser.map() + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/BucketKeyFilterTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/BucketKeyFilterTests.kt new file mode 100644 index 00000000..c4d46c70 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/BucketKeyFilterTests.kt @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter +import org.opensearch.commons.alerting.model.AggregationResultBucket +import org.opensearch.search.aggregations.bucket.terms.IncludeExclude +import org.opensearch.test.OpenSearchTestCase + +class BucketKeyFilterTests : OpenSearchTestCase() { + + private fun bucket(parentPath: String, keys: List, docCount: Int = 1): AggregationResultBucket { + val keyField = if (keys.size == 1) keys[0] else keys.associateBy { it } + return AggregationResultBucket(parentPath, keys, mapOf("key" to keyField, "doc_count" to docCount)) + } + + private fun bucketsMap(parentPath: String, vararg keys: String): Map { + return keys.associate { key -> + val b = bucket(parentPath, listOf(key)) + b.getBucketKeysHash() to b + } + } + + fun `test filter with null filter passes all buckets through`() { + val buckets = bucketsMap("agg", "200", "404", "500") + val result = BucketKeyFilter.filterBuckets(buckets, null) + assertEquals(3, result.size) + } + + fun `test filter with include pattern`() { + val filter = BucketSelectorExtFilter(IncludeExclude("2.*", null)) + val buckets = bucketsMap("agg", "200", "201", "404", "500") + val result = BucketKeyFilter.filterBuckets(buckets, filter) + assertEquals(2, result.size) + assertTrue(result.containsKey("200")) + assertTrue(result.containsKey("201")) + } + + fun `test filter with exclude pattern`() { + val filter = BucketSelectorExtFilter(IncludeExclude(null, "4.*")) + val buckets = bucketsMap("agg", "200", "404", "500") + val result = BucketKeyFilter.filterBuckets(buckets, filter) + assertEquals(2, result.size) + assertTrue(result.containsKey("200")) + assertTrue(result.containsKey("500")) + } + + fun `test filter with include and exclude patterns`() { + val filter = BucketSelectorExtFilter(IncludeExclude("2.*", "201")) + val buckets = bucketsMap("agg", "200", "201", "404", "500") + val result = BucketKeyFilter.filterBuckets(buckets, filter) + // include "2.*" matches 200, 201; exclude "201" removes 201 -> only 200 + assertEquals(1, result.size) + assertTrue(result.containsKey("200")) + } + + fun `test filter with no matching buckets`() { + val filter = BucketSelectorExtFilter(IncludeExclude("999", null)) + val buckets = bucketsMap("agg", "200", "404", "500") + val result = BucketKeyFilter.filterBuckets(buckets, filter) + assertTrue(result.isEmpty()) + } + + fun `test filter with composite agg keys`() { + val filtersMap = hashMapOf("host" to IncludeExclude("server1", null)) + val filter = BucketSelectorExtFilter(filtersMap) + val b1 = AggregationResultBucket( + "comp", listOf("server1", "200"), + mapOf("key" to mapOf("host" to "server1", "status" to "200"), "doc_count" to 3) + ) + val b2 = AggregationResultBucket( + "comp", listOf("server2", "500"), + mapOf("key" to mapOf("host" to "server2", "status" to "500"), "doc_count" to 1) + ) + val buckets = mapOf(b1.getBucketKeysHash() to b1, b2.getBucketKeysHash() to b2) + val result = BucketKeyFilter.filterBuckets(buckets, filter) + assertEquals(1, result.size) + assertTrue(result.containsKey(b1.getBucketKeysHash())) + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/BucketSelectorQueryBuilderTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/BucketSelectorQueryBuilderTests.kt new file mode 100644 index 00000000..a6eb85d9 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/BucketSelectorQueryBuilderTests.kt @@ -0,0 +1,132 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.opensearch.alerting.randomBucketLevelTrigger +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.script.Script +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder +import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.OpenSearchTestCase + +class BucketSelectorQueryBuilderTests : OpenSearchTestCase() { + + private fun triggerWithSelector( + parentBucketPath: String, + bucketsPathsMap: Map = mapOf("docCount" to "_count"), + script: String = "params.docCount > 0" + ) = randomBucketLevelTrigger().let { trigger -> + trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + trigger.id, bucketsPathsMap.toMutableMap(), Script(script), parentBucketPath, null + ) + ) + } + + /** Serialize query to string for assertion — pipeline sub-aggs aren't in getSubAggregations() */ + private fun SearchSourceBuilder.toJsonString(): String { + val builder = XContentFactory.jsonBuilder() + toXContent(builder, ToXContent.EMPTY_PARAMS) + return builder.toString() + } + + fun `test inject bucket selector under composite agg`() { + val compositeAgg = CompositeAggregationBuilder( + "composite_agg", listOf(TermsValuesSourceBuilder("test_field").field("test_field")) + ) + val query = SearchSourceBuilder().size(0).aggregation(compositeAgg) + val trigger = triggerWithSelector("composite_agg") + + val result = BucketSelectorQueryBuilder.injectBucketSelector(query, listOf(trigger)) + + val json = result.toJsonString() + assertTrue("bucket_selector should be present", json.contains("bucket_selector")) + assertTrue( + "trigger filter name should be present", + json.contains("${BucketSelectorQueryBuilder.TRIGGER_FILTER_PREFIX}${trigger.id}") + ) + } + + fun `test inject bucket selector under terms agg`() { + val termsAgg = TermsAggregationBuilder("status_codes").field("status") + val query = SearchSourceBuilder().size(0).aggregation(termsAgg) + val trigger = triggerWithSelector("status_codes") + + val result = BucketSelectorQueryBuilder.injectBucketSelector(query, listOf(trigger)) + + val json = result.toJsonString() + assertTrue("bucket_selector should be present", json.contains("bucket_selector")) + assertTrue( + "trigger filter name should be present", + json.contains("${BucketSelectorQueryBuilder.TRIGGER_FILTER_PREFIX}${trigger.id}") + ) + } + + fun `test inject bucket selector with nested parent path`() { + val innerAgg = TermsAggregationBuilder("inner_agg").field("status") + val outerAgg = CompositeAggregationBuilder( + "outer_agg", listOf(TermsValuesSourceBuilder("host").field("host")) + ).subAggregation(innerAgg) + val query = SearchSourceBuilder().size(0).aggregation(outerAgg) + val trigger = triggerWithSelector("outer_agg>inner_agg") + + val result = BucketSelectorQueryBuilder.injectBucketSelector(query, listOf(trigger)) + + val json = result.toJsonString() + assertTrue("bucket_selector should be present under inner_agg", json.contains("bucket_selector")) + assertTrue(json.contains("${BucketSelectorQueryBuilder.TRIGGER_FILTER_PREFIX}${trigger.id}")) + // Verify it's nested inside inner_agg by checking the JSON structure + val innerAggIdx = json.indexOf("\"inner_agg\"") + val bucketSelectorIdx = json.indexOf("bucket_selector") + assertTrue("bucket_selector should appear after inner_agg", bucketSelectorIdx > innerAggIdx) + } + + fun `test inject multiple triggers under same parent`() { + val compositeAgg = CompositeAggregationBuilder( + "composite_agg", listOf(TermsValuesSourceBuilder("test_field").field("test_field")) + ) + val query = SearchSourceBuilder().size(0).aggregation(compositeAgg) + val trigger1 = triggerWithSelector("composite_agg") + val trigger2 = triggerWithSelector("composite_agg", script = "params.docCount > 5") + + val result = BucketSelectorQueryBuilder.injectBucketSelector(query, listOf(trigger1, trigger2)) + + val json = result.toJsonString() + assertTrue(json.contains("${BucketSelectorQueryBuilder.TRIGGER_FILTER_PREFIX}${trigger1.id}")) + assertTrue(json.contains("${BucketSelectorQueryBuilder.TRIGGER_FILTER_PREFIX}${trigger2.id}")) + } + + fun `test parent agg not found throws exception`() { + val query = SearchSourceBuilder().size(0).aggregation( + TermsAggregationBuilder("some_agg").field("field") + ) + val trigger = triggerWithSelector("nonexistent_agg") + + expectThrows(IllegalArgumentException::class.java) { + BucketSelectorQueryBuilder.injectBucketSelector(query, listOf(trigger)) + } + } + + fun `test preserves existing sub aggs`() { + val existingSubAgg = TermsAggregationBuilder("existing_sub").field("sub_field") + val compositeAgg = CompositeAggregationBuilder( + "composite_agg", listOf(TermsValuesSourceBuilder("test_field").field("test_field")) + ).subAggregation(existingSubAgg) + val query = SearchSourceBuilder().size(0).aggregation(compositeAgg) + val trigger = triggerWithSelector("composite_agg") + + val result = BucketSelectorQueryBuilder.injectBucketSelector(query, listOf(trigger)) + + val json = result.toJsonString() + assertTrue("existing sub-agg should be preserved", json.contains("existing_sub")) + assertTrue("bucket_selector should be added", json.contains("bucket_selector")) + assertTrue(json.contains("${BucketSelectorQueryBuilder.TRIGGER_FILTER_PREFIX}${trigger.id}")) + } +}