Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ object BucketLevelMonitorRunner : MonitorRunner() {
}

var monitorResult = MonitorRunResult<BucketLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)

if (monitorCtx.multiTenantTriggerEvalEnabled && monitor.triggers.size > 1) {
val msg = "Bucket-level monitors only support 1 trigger when remote trigger evaluation is enabled."
logger.error(msg)
return monitorResult.copy(error = IllegalArgumentException(msg))
}

val currentAlerts = try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources)
Expand Down Expand Up @@ -137,12 +144,17 @@ object BucketLevelMonitorRunner : MonitorRunner() {
// in the final output of monitorResult which occurs when all pages have been exhausted.
// If it's favorable to return the last page, will need to check how to accomplish that with multiple aggregation paths
// with different page counts.
//
// When the flag is on, bucket-level monitors are limited to 1 trigger. The standard
// bucket_selector is injected directly into the query so a single search call performs
// both data collection and trigger evaluation — no separate per-trigger queries needed.
val inputResults = monitorCtx.inputService!!.collectInputResults(
monitor,
periodStart,
periodEnd,
monitorResult.inputResults,
workflowRunContext
workflowRunContext,
useStandardBucketSelector = monitorCtx.multiTenantTriggerEvalEnabled
)
if (firstIteration) {
firstPageOfInputResults = inputResults
Expand All @@ -155,13 +167,15 @@ object BucketLevelMonitorRunner : MonitorRunner() {
// The currentAlerts map is formed by iterating over the Monitor's Triggers as keys so null should not be returned here
val currentAlertsForTrigger = currentAlerts[trigger]!!
val triggerCtx = BucketLevelTriggerExecutionContext(
monitor,
trigger as BucketLevelTrigger,
monitorResult,
monitor, trigger as BucketLevelTrigger, monitorResult,
clusterSettings = monitorCtx.clusterService!!.clusterSettings
)
triggerContexts[trigger.id] = triggerCtx
val triggerResult = monitorCtx.triggerService!!.runBucketLevelTrigger(monitor, trigger, triggerCtx)
val triggerResult = if (monitorCtx.multiTenantTriggerEvalEnabled) {
monitorCtx.triggerService!!.runBucketLevelTriggerFromFilteredResponse(monitor, trigger, triggerCtx)
} else {
monitorCtx.triggerService!!.runBucketLevelTrigger(monitor, trigger, triggerCtx)
}
triggerResults[trigger.id] = triggerResult.getCombinedTriggerRunResult(triggerResults[trigger.id])

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.BucketSelectorQueryBuilder
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.addUserBackendRolesFilter
Expand All @@ -30,6 +31,7 @@ import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.model.Monitor
Expand Down Expand Up @@ -72,7 +74,8 @@ class InputService(
periodStart: Instant,
periodEnd: Instant,
prevResult: InputRunResults? = null,
workflowRunContext: WorkflowRunContext? = null
workflowRunContext: WorkflowRunContext? = null,
useStandardBucketSelector: Boolean = false
): InputRunResults {
return try {
val results = mutableListOf<Map<String, Any>>()
Expand All @@ -93,7 +96,8 @@ class InputService(
periodEnd = periodEnd,
prevResult = prevResult,
matchingDocIdsPerIndex = matchingDocIdsPerIndex,
returnSampleDocs = false
returnSampleDocs = false,
useStandardBucketSelector = useStandardBucketSelector
)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
aggTriggerAfterKey += AggregationQueryRewriter.getAfterKeysFromSearchResponse(
Expand Down Expand Up @@ -223,7 +227,8 @@ class InputService(
periodEnd: Instant,
prevResult: InputRunResults?,
matchingDocIdsPerIndex: Map<String, List<String>>?,
returnSampleDocs: Boolean = false
returnSampleDocs: Boolean = false,
useStandardBucketSelector: Boolean = false
): SearchRequest {
// TODO: Figure out a way to use SearchTemplateRequest without bringing in the entire TransportClient
val searchParams = mapOf(
Expand All @@ -233,11 +238,22 @@ class InputService(

// Deep copying query before passing it to rewriteQuery since otherwise, the monitor.input is modified directly
// which causes a strange bug where the rewritten query persists on the Monitor across executions
val copiedQuery = deepCopyQuery(searchInput.query)

// When using standard bucket_selector, inject it as a sub-agg instead of BucketSelectorExt.
if (useStandardBucketSelector) {
val bucketTriggers = monitor.triggers.filterIsInstance<BucketLevelTrigger>()
if (bucketTriggers.isNotEmpty()) {
BucketSelectorQueryBuilder.injectBucketSelector(copiedQuery, bucketTriggers)
}
}

val rewrittenQuery = AggregationQueryRewriter.rewriteQuery(
deepCopyQuery(searchInput.query),
copiedQuery,
prevResult,
monitor.triggers,
returnSampleDocs
returnSampleDocs,
skipBucketSelectorInjection = useStandardBucketSelector
)

// Rewrite query to consider the doc ids per given index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object MonitorMetadataService :
private lateinit var xContentRegistry: NamedXContentRegistry
private lateinit var clusterService: ClusterService
private lateinit var settings: Settings
private lateinit var sdkClient: SdkClient
lateinit var sdkClient: SdkClient

@Volatile
private lateinit var indexTimeout: TimeValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser
import org.opensearch.alerting.util.BucketKeyFilter
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.cluster.service.ClusterService
Expand Down Expand Up @@ -181,6 +182,41 @@ class TriggerService(val scriptService: ScriptService) {
}
}

fun runBucketLevelTriggerFromFilteredResponse(
monitor: Monitor,
trigger: BucketLevelTrigger,
ctx: BucketLevelTriggerExecutionContext
): BucketLevelTriggerRunResult {
return try {
val parentBucketPath = trigger.bucketSelector.parentBucketPath
val aggregationPath = AggregationPath.parse(parentBucketPath)
val aggs = ctx.results[0][Aggregations.AGGREGATIONS_FIELD]
require(aggs is Map<*, *>) { "Unexpected aggregations type: ${aggs?.javaClass}" }
var parentAgg: Map<*, *> = aggs
aggregationPath.pathElementsAsStringList.forEach { subAgg ->
val child = parentAgg[subAgg]
require(child is Map<*, *>) { "Unexpected type for agg '$subAgg': ${child?.javaClass}" }
parentAgg = child
}
val buckets = parentAgg[Aggregation.CommonFields.BUCKETS.preferredName]
require(buckets is List<*>) { "Unexpected buckets type: ${buckets?.javaClass}" }
val selectedBuckets = mutableMapOf<String, AggregationResultBucket>()
for (bucket in buckets) {
require(bucket is Map<*, *>) { "Unexpected bucket type: ${bucket?.javaClass}" }
@Suppress("UNCHECKED_CAST")
val bucketDict = bucket as Map<String, Any>
val bucketKeyValuesList = getBucketKeyValuesList(bucketDict)
val aggResultBucket = AggregationResultBucket(parentBucketPath, bucketKeyValuesList, bucketDict)
selectedBuckets[aggResultBucket.getBucketKeysHash()] = aggResultBucket
}
val filteredBuckets = BucketKeyFilter.filterBuckets(selectedBuckets, trigger.bucketSelector.filter)
BucketLevelTriggerRunResult(trigger.name, null, filteredBuckets)
} catch (e: Exception) {
logger.info("Error running trigger [${trigger.id}] for monitor [${monitor.id}]", e)
BucketLevelTriggerRunResult(trigger.name, e, emptyMap())
}
}

@Suppress("UNCHECKED_CAST")
fun runBucketLevelTrigger(
monitor: Monitor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERTING_MAX_
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTION_THROTTLE_VALUE
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_TRIGGERS_PER_MONITOR
import org.opensearch.alerting.settings.AlertingSettings.Companion.MULTI_TENANT_TRIGGER_EVAL_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST
import org.opensearch.alerting.util.DocLevelMonitorQueries
Expand Down Expand Up @@ -116,6 +117,7 @@ class TransportIndexMonitorAction @Inject constructor(

@Volatile private var maxMonitors = ALERTING_MAX_MONITORS.get(settings)
@Volatile private var maxTriggersPerMonitor = MAX_TRIGGERS_PER_MONITOR.get(settings)
@Volatile private var multiTenantTriggerEvalEnabled = MULTI_TENANT_TRIGGER_EVAL_ENABLED.get(settings)
@Volatile private var requestTimeout = REQUEST_TIMEOUT.get(settings)
@Volatile private var indexTimeout = INDEX_TIMEOUT.get(settings)
@Volatile private var maxActionThrottle = MAX_ACTION_THROTTLE_VALUE.get(settings)
Expand All @@ -131,6 +133,9 @@ class TransportIndexMonitorAction @Inject constructor(
init {
clusterService.clusterSettings.addSettingsUpdateConsumer(ALERTING_MAX_MONITORS) { maxMonitors = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(MAX_TRIGGERS_PER_MONITOR) { maxTriggersPerMonitor = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(MULTI_TENANT_TRIGGER_EVAL_ENABLED) {
multiTenantTriggerEvalEnabled = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(REQUEST_TIMEOUT) { requestTimeout = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(INDEX_TIMEOUT) { indexTimeout = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(MAX_ACTION_THROTTLE_VALUE) { maxActionThrottle = it }
Expand Down Expand Up @@ -453,6 +458,14 @@ class TransportIndexMonitorAction @Inject constructor(
require(monitor.triggers.size <= maxTriggersPerMonitor) {
"The current cluster settings only allow up to $maxTriggersPerMonitor triggers per monitor."
}
if (multiTenantTriggerEvalEnabled &&
Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) ==
Monitor.MonitorType.BUCKET_LEVEL_MONITOR
) {
require(monitor.triggers.size <= 1) {
Comment thread
eirsep marked this conversation as resolved.
"Bucket-level monitors only support 1 trigger when remote trigger evaluation is enabled."
}
}
}

private fun validateActionThrottle(monitor: Monitor, maxValue: TimeValue, minValue: TimeValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ class AggregationQueryRewriter {
query: SearchSourceBuilder,
prevResult: InputRunResults?,
triggers: List<Trigger>,
returnSampleDocs: Boolean = false
returnSampleDocs: Boolean = false,
skipBucketSelectorInjection: Boolean = false
): SearchSourceBuilder {
triggers.forEach { trigger ->
if (trigger is BucketLevelTrigger) {
// add bucket selector pipeline aggregation for each trigger in query
query.aggregation(trigger.bucketSelector)
if (!skipBucketSelectorInjection) {
query.aggregation(trigger.bucketSelector)
}

// if this request is processing the subsequent pages of input query result, then add after key
if (prevResult?.aggTriggersAfterKey?.get(trigger.id) != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.util

import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter
import org.opensearch.commons.alerting.model.AggregationResultBucket
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.search.aggregations.bucket.terms.IncludeExclude
import java.util.regex.Pattern

/**
* Applies [BucketSelectorExtFilter] include/exclude patterns to bucket results post-response.
*
* In the standard `bucket_selector` approach, the filter cannot be applied server-side since
* `BucketSelectorExt` is a custom plugin aggregation. Instead, the include/exclude filtering
* is applied after the response is received.
*/
object BucketKeyFilter {

fun filterBuckets(
buckets: Map<String, AggregationResultBucket>,
filter: BucketSelectorExtFilter?
): Map<String, AggregationResultBucket> {
if (filter == null) return buckets

return if (filter.isCompositeAggregation) {
filterCompositeKeys(buckets, filter.filtersMap ?: return buckets)
} else {
filterSimpleKeys(buckets, filter.filters ?: return buckets)
}
}

@Suppress("UNCHECKED_CAST")
private fun filterCompositeKeys(
buckets: Map<String, AggregationResultBucket>,
filtersMap: HashMap<String, IncludeExclude>
): Map<String, AggregationResultBucket> {
val patterns = filtersMap.mapValues { (_, ie) -> extractPatterns(ie) }
return buckets.filter { (_, bucket) ->
val keyMap = bucket.bucket?.get("key") as? Map<String, Any> ?: return@filter true
patterns.all { (sourceKey, regexPair) ->
val value = keyMap[sourceKey]?.toString() ?: return@all true
isAccepted(value, regexPair)
}
}
}

private fun filterSimpleKeys(
buckets: Map<String, AggregationResultBucket>,
includeExclude: IncludeExclude
): Map<String, AggregationResultBucket> {
val regexPair = extractPatterns(includeExclude)
return buckets.filter { (_, bucket) ->
val key = bucket.bucketKeys.joinToString("#")
isAccepted(key, regexPair)
}
}

private fun isAccepted(value: String, patterns: Pair<Pattern?, Pattern?>): Boolean {
val (includeRegex, excludeRegex) = patterns
if (includeRegex != null && !includeRegex.matcher(value).matches()) return false
if (excludeRegex != null && excludeRegex.matcher(value).matches()) return false
return true
}

/**
* Extracts include/exclude regex strings from [IncludeExclude] via XContent serialization.
*/
private fun extractPatterns(includeExclude: IncludeExclude): Pair<Pattern?, Pattern?> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reflection is an anti-pattern and won't be supported in some environments. Is there a way to take a dependency on the IncludeExclude object or recreate it in a way that we can access the fields without reflection?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to parse the json

val builder = XContentFactory.jsonBuilder().startObject()
includeExclude.toXContent(builder, ToXContent.EMPTY_PARAMS)
builder.endObject()
val json = builder.toString()
val map = XContentType.JSON.xContent()
.createParser(null, null, json)
.use { parser -> parser.map() }
val include = (map["include"] as? String)?.let { Pattern.compile(it) }
val exclude = (map["exclude"] as? String)?.let { Pattern.compile(it) }
return Pair(include, exclude)
}
}
Loading
Loading