diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt index 6e10b37f..f0fa69c9 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt @@ -3,6 +3,7 @@ package org.opensearch.commons.alerting.model import org.opensearch.Version import org.opensearch.common.lucene.uid.Versions import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.commons.alerting.model.Monitor.Companion.suppressWarning import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION import org.opensearch.commons.alerting.util.instant import org.opensearch.commons.alerting.util.optionalTimeField @@ -45,6 +46,11 @@ data class Alert( val executionId: String? = null, val associatedAlertIds: List, val clusters: List? = null, + // these fields are specifically used for PPL Monitors + // for now, Query-Level Monitors can support including + // query results in Alerts by using these fields + val query: String? = null, + val queryResults: List> = listOf(), val target: Target? = null ) : Writeable, ToXContent { @@ -56,6 +62,64 @@ data class Alert( } } + constructor( + id: String = NO_ID, + version: Long = NO_VERSION, + schemaVersion: Int = NO_SCHEMA_VERSION, + monitorId: String, + workflowId: String, + workflowName: String, + monitorName: String, + monitorVersion: Long, + monitorUser: User?, + triggerId: String, + triggerName: String, + findingIds: List, + relatedDocIds: List, + state: State, + startTime: Instant, + endTime: Instant? = null, + lastNotificationTime: Instant? = null, + acknowledgedTime: Instant? = null, + errorMessage: String? = null, + errorHistory: List, + severity: String, + actionExecutionResults: List, + aggregationResultBucket: AggregationResultBucket? = null, + executionId: String? = null, + associatedAlertIds: List, + clusters: List? = null + ) : this ( + id = id, + version = version, + schemaVersion = schemaVersion, + monitorId = monitorId, + workflowId = workflowId, + workflowName = workflowName, + monitorName = monitorName, + monitorVersion = monitorVersion, + monitorUser = monitorUser, + triggerId = triggerId, + triggerName = triggerName, + findingIds = findingIds, + relatedDocIds = relatedDocIds, + state = state, + startTime = startTime, + endTime = endTime, + lastNotificationTime = lastNotificationTime, + acknowledgedTime = acknowledgedTime, + errorMessage = errorMessage, + errorHistory = errorHistory, + severity = severity, + actionExecutionResults = actionExecutionResults, + aggregationResultBucket = aggregationResultBucket, + executionId = executionId, + associatedAlertIds = associatedAlertIds, + clusters = clusters, + query = null, + queryResults = listOf() + ) + constructor( startTime: Instant, lastNotificationTime: Instant?, @@ -89,12 +153,16 @@ data class Alert( workflowId = workflow.id, workflowName = workflow.name, associatedAlertIds = associatedAlertIds, - clusters = clusters + clusters = clusters, + query = null, + queryResults = listOf() ) + // constructor for Alerts from QueryLevelMonitorRunner + // this monitor runner runs Query-level, Cluster Metrics, and PPL Monitors constructor( monitor: Monitor, - trigger: QueryLevelTrigger, + trigger: Trigger, startTime: Instant, lastNotificationTime: Instant?, state: State = State.ACTIVE, @@ -104,7 +172,9 @@ data class Alert( schemaVersion: Int = NO_SCHEMA_VERSION, executionId: String? = null, workflowId: String? = null, - clusters: List? = null + clusters: List? = null, + query: String? = null, + queryResults: List> = listOf() ) : this( monitorId = monitor.id, monitorName = monitor.name, @@ -128,6 +198,8 @@ data class Alert( workflowName = "", associatedAlertIds = emptyList(), clusters = clusters, + query = query, + queryResults = queryResults, target = monitor.target ) @@ -168,6 +240,8 @@ data class Alert( workflowName = "", associatedAlertIds = emptyList(), clusters = clusters, + query = null, + queryResults = listOf(), target = monitor.target ) @@ -209,6 +283,8 @@ data class Alert( workflowName = "", associatedAlertIds = emptyList(), clusters = clusters, + query = null, + queryResults = listOf(), target = monitor.target ) @@ -252,6 +328,8 @@ data class Alert( workflowName = "", associatedAlertIds = emptyList(), clusters = clusters, + query = null, + queryResults = listOf(), target = monitor.target ) @@ -292,6 +370,8 @@ data class Alert( executionId = executionId, associatedAlertIds = emptyList(), clusters = clusters, + query = null, + queryResults = listOf(), target = monitor.target ) @@ -337,6 +417,16 @@ data class Alert( executionId = sin.readOptionalString(), associatedAlertIds = sin.readStringList(), clusters = sin.readOptionalStringList(), + query = if (sin.version.onOrAfter(Version.V_3_7_0)) { + sin.readOptionalString() + } else { + null + }, + queryResults = if (sin.version.onOrAfter(Version.V_3_7_0)) { + sin.readList { input -> suppressWarning(input.readMap()) } + } else { + listOf() + }, target = if (sin.version.onOrAfter(Version.V_3_6_0)) { if (sin.readBoolean()) Target(sin) else null } else { @@ -380,6 +470,12 @@ data class Alert( out.writeOptionalString(executionId) out.writeStringCollection(associatedAlertIds) out.writeOptionalStringArray(clusters?.toTypedArray()) + if (out.version.onOrAfter(Version.V_3_7_0)) { + out.writeOptionalString(query) + out.writeCollection(queryResults) { output, map -> + output.writeMap(map) + } + } if (out.version.onOrAfter(Version.V_3_6_0)) { out.writeBoolean(target != null) target?.writeTo(out) @@ -415,6 +511,8 @@ data class Alert( const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH const val CLUSTERS_FIELD = "clusters" + const val QUERY_FIELD = "query" + const val QUERY_RESULTS_FIELD = "query_results" const val TARGET_FIELD = "target" const val NO_ID = "" const val NO_VERSION = Versions.NOT_FOUND @@ -447,6 +545,8 @@ data class Alert( var aggAlertBucket: AggregationResultBucket? = null val associatedAlertIds = mutableListOf() val clusters = mutableListOf() + var query: String? = null + var queryResults: List> = listOf() var target: Target? = null ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -523,6 +623,19 @@ data class Alert( clusters.add(xcp.text()) } } + QUERY_FIELD -> { + if (xcp.currentToken() != XContentParser.Token.VALUE_NULL) { + query = xcp.text() + } + } + QUERY_RESULTS_FIELD -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + val resultsList = mutableListOf>() + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + resultsList.add(xcp.map()) + } + queryResults = resultsList + } TARGET_FIELD -> target = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { null } else { @@ -558,6 +671,8 @@ data class Alert( workflowName = workflowName, associatedAlertIds = associatedAlertIds, clusters = if (clusters.size > 0) clusters else null, + query = query, + queryResults = queryResults, target = target ) } @@ -612,6 +727,11 @@ data class Alert( if (!clusters.isNullOrEmpty()) builder.field(CLUSTERS_FIELD, clusters.toTypedArray()) if (target != null) builder.field(TARGET_FIELD, target) + if (!query.isNullOrEmpty()) builder.field(QUERY_FIELD, query) + if (queryResults.isNotEmpty()) { + builder.field(QUERY_RESULTS_FIELD, queryResults.toTypedArray()) + } + builder.endObject() return builder } @@ -636,7 +756,9 @@ data class Alert( PARENTS_BUCKET_PATH to aggregationResultBucket?.parentBucketPath, FINDING_IDS to findingIds.joinToString(","), RELATED_DOC_IDS to relatedDocIds.joinToString(","), - CLUSTERS_FIELD to clusters?.joinToString(",") + CLUSTERS_FIELD to clusters?.joinToString(","), + QUERY_FIELD to query, + QUERY_RESULTS_FIELD to queryResults ) } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTrigger.kt index bac059f8..337a6e21 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTrigger.kt @@ -63,7 +63,7 @@ data class BucketLevelTrigger( out.writeCollection(actions) } - fun asTemplateArg(): Map { + override fun asTemplateArg(): Map { return mapOf( ID_FIELD to id, NAME_FIELD to name, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt index c56ce856..fc95e9dc 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt @@ -55,7 +55,7 @@ data class ChainedAlertTrigger( } /** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */ - fun asTemplateArg(): Map { + override fun asTemplateArg(): Map { return mapOf( ID_FIELD to id, NAME_FIELD to name, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTrigger.kt index a1f8b617..5d7ec338 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTrigger.kt @@ -55,7 +55,7 @@ data class DocumentLevelTrigger( } /** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */ - fun asTemplateArg(): Map { + override fun asTemplateArg(): Map { return mapOf( ID_FIELD to id, NAME_FIELD to name, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt index 3846cea6..ab62e479 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt @@ -2,6 +2,7 @@ package org.opensearch.commons.alerting.model import org.opensearch.commons.alerting.model.ClusterMetricsInput.Companion.URI_FIELD import org.opensearch.commons.alerting.model.DocLevelMonitorInput.Companion.DOC_LEVEL_INPUT_FIELD +import org.opensearch.commons.alerting.model.PPLInput.Companion.PPL_INPUT_FIELD import org.opensearch.commons.alerting.model.SearchInput.Companion.SEARCH_FIELD import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput.Companion.REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD @@ -20,7 +21,8 @@ interface Input : BaseModel { CLUSTER_METRICS_INPUT(URI_FIELD), SEARCH_INPUT(SEARCH_FIELD), REMOTE_MONITOR_INPUT(REMOTE_MONITOR_INPUT_FIELD), - REMOTE_DOC_LEVEL_MONITOR_INPUT(REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD); + REMOTE_DOC_LEVEL_MONITOR_INPUT(REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD), + PPL_INPUT(PPL_INPUT_FIELD); override fun toString(): String { return value @@ -42,8 +44,10 @@ interface Input : BaseModel { DocLevelMonitorInput.parse(xcp) } else if (xcp.currentName() == Type.REMOTE_MONITOR_INPUT.value) { RemoteMonitorInput.parse(xcp) - } else { + } else if (xcp.currentName() == Type.REMOTE_DOC_LEVEL_MONITOR_INPUT.value) { RemoteDocLevelMonitorInput.parse(xcp) + } else { + PPLInput.parseInner(xcp) } XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) return input @@ -58,6 +62,7 @@ interface Input : BaseModel { Type.SEARCH_INPUT -> SearchInput(sin) Type.REMOTE_MONITOR_INPUT -> RemoteMonitorInput(sin) Type.REMOTE_DOC_LEVEL_MONITOR_INPUT -> RemoteDocLevelMonitorInput(sin) + Type.PPL_INPUT -> PPLInput(sin) // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns // enum can be null in Java else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt index 2ade77f5..293f0d74 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt @@ -10,6 +10,7 @@ import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION import org.opensearch.commons.alerting.util.IndexUtils.Companion.supportedClusterMetricsSettings import org.opensearch.commons.alerting.util.instant import org.opensearch.commons.alerting.util.isBucketLevelMonitor +import org.opensearch.commons.alerting.util.isPPLMonitor import org.opensearch.commons.alerting.util.optionalTimeField import org.opensearch.commons.alerting.util.optionalUserField import org.opensearch.commons.authuser.User @@ -69,6 +70,8 @@ data class Monitor( require(trigger is QueryLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } MonitorType.DOC_LEVEL_MONITOR.value -> require(trigger is DocumentLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } + MonitorType.PPL_MONITOR.value -> + require(trigger is PPLTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } } } if (enabled) { @@ -87,6 +90,27 @@ data class Monitor( } } } + + if (this.isPPLMonitor()) { + require(inputs.size == 1) { "Exactly 1 PPL query must be specified for PPL Monitor" } + + val pplInput = inputs[0] + + require(pplInput is PPLInput) { "Unsupported input [${pplInput.name()}] for PPL Monitor" } + + require(pplInput.queryLanguage == PPLInput.QueryLanguage.PPL) { "SQL queries are not supported. Please use a PPL query." } + + // this is a new check for PPL Alerting specifically, PPL Monitors will enforce + // a max name length, but it won't be enforced on other Monitor types to avoid + // adding a breaking change + require(this.name.length <= ALERTING_MAX_NAME_LENGTH) { + "Monitor name too long, length must be at most $ALERTING_MAX_NAME_LENGTH." + } + + // this is a new check for PPL Alerting specifically, other Monitor types allow + // themselves to be created without any Triggers + require(this.triggers.isNotEmpty()) { "PPL Monitor must include at least 1 trigger." } + } } @Throws(IOException::class) @@ -149,7 +173,8 @@ data class Monitor( QUERY_LEVEL_MONITOR("query_level_monitor"), BUCKET_LEVEL_MONITOR("bucket_level_monitor"), CLUSTER_METRICS_MONITOR("cluster_metrics_monitor"), - DOC_LEVEL_MONITOR("doc_level_monitor"); + DOC_LEVEL_MONITOR("doc_level_monitor"), + PPL_MONITOR("ppl_monitor"); override fun toString(): String { return value @@ -235,6 +260,8 @@ data class Monitor( out.writeEnum(Input.Type.SEARCH_INPUT) } else if (it is DocLevelMonitorInput) { out.writeEnum(Input.Type.DOCUMENT_LEVEL_INPUT) + } else if (it is PPLInput) { + out.writeEnum(Input.Type.PPL_INPUT) } else { out.writeEnum(Input.Type.REMOTE_DOC_LEVEL_MONITOR_INPUT) } @@ -247,6 +274,7 @@ data class Monitor( is BucketLevelTrigger -> out.writeEnum(Trigger.Type.BUCKET_LEVEL_TRIGGER) is DocumentLevelTrigger -> out.writeEnum(Trigger.Type.DOCUMENT_LEVEL_TRIGGER) is RemoteMonitorTrigger -> out.writeEnum(Trigger.Type.REMOTE_MONITOR_TRIGGER) + is PPLTrigger -> out.writeEnum(Trigger.Type.PPL_TRIGGER) else -> out.writeEnum(Trigger.Type.QUERY_LEVEL_TRIGGER) } it.writeTo(out) @@ -299,6 +327,10 @@ data class Monitor( const val TARGET_FIELD = "target" val MONITOR_TYPE_PATTERN = Pattern.compile("[a-zA-Z0-9_]{5,25}") + // hard, nonadjustable limits for PPL Alerting + const val ALERTING_MAX_NAME_LENGTH = 30 // max length of any name for monitors, triggers, notif actions, etc + const val UUID_LENGTH = 20 // the length of a UUID generated by UUIDs.base64UUID() + // This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all // the different subclasses and creating circular dependencies val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt index d403313b..c8125394 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt @@ -7,6 +7,7 @@ package org.opensearch.commons.alerting.model import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchException +import org.opensearch.Version import org.opensearch.commons.alerting.alerts.AlertError import org.opensearch.commons.alerting.util.optionalTimeField import org.opensearch.core.common.io.stream.StreamInput @@ -92,12 +93,31 @@ data class MonitorRunResult( data class InputRunResults( val results: List> = listOf(), val error: Exception? = null, - val aggTriggersAfterKey: MutableMap? = null + val aggTriggersAfterKey: MutableMap? = null, + // a PPL Monitor's base query is the query set in the Monitor itself. + // Custom trigger conditions append to the base query to run + // their own queries/evaluations + val pplBaseQueryResults: List> = listOf(), + val pplBaseQueryNumResults: Long? = null ) : Writeable, ToXContent { + constructor( + results: List> = listOf(), + error: Exception? = null, + aggTriggersAfterKey: MutableMap? = null + ) : this( + results = results, + error = error, + aggTriggersAfterKey = aggTriggersAfterKey, + pplBaseQueryResults = listOf(), + pplBaseQueryNumResults = null + ) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { return builder.startObject() .field("results", results) + .field("ppl_query_results", pplBaseQueryResults) + .field("ppl_num_results", pplBaseQueryNumResults) .field("error", error?.message) .endObject() } @@ -108,6 +128,15 @@ data class InputRunResults( for (map in results) { out.writeMap(map) } + + if (out.version.onOrAfter(Version.V_3_7_0)) { + out.writeVInt(pplBaseQueryResults.size) + for (datarow in pplBaseQueryResults) { + out.writeMap(datarow) + } + out.writeOptionalLong(pplBaseQueryNumResults) + } + out.writeException(error) } @@ -120,8 +149,24 @@ data class InputRunResults( for (i in 0 until count) { list.add(suppressWarning(sin.readMap())) // result(map) } + val pplCount = if (sin.version.onOrAfter(Version.V_3_7_0)) { + sin.readVInt() + } else { + 0 + } + val pplList = mutableListOf>() + if (sin.version.onOrAfter(Version.V_3_7_0)) { + for (i in 0 until pplCount) { + pplList.add(suppressWarning(sin.readMap())) // pplResults + } + } + val pplNumResults = if (sin.version.onOrAfter(Version.V_3_7_0)) { + sin.readOptionalLong() + } else { + null + } val error = sin.readException() // error - return InputRunResults(list, error) + return InputRunResults(list, error, null, pplList, pplNumResults) } @Suppress("UNCHECKED_CAST") diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/NoOpTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/NoOpTrigger.kt index 3ffacb6e..90d3d3a4 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/NoOpTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/NoOpTrigger.kt @@ -36,7 +36,7 @@ data class NoOpTrigger( return NOOP_TRIGGER_FIELD } - fun asTemplateArg(): Map { + override fun asTemplateArg(): Map { return mapOf() } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLInput.kt new file mode 100644 index 00000000..d2ec100a --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLInput.kt @@ -0,0 +1,113 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.CheckedFunction +import org.opensearch.commons.alerting.util.AlertingException +import org.opensearch.core.ParseField +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import java.io.IOException + +data class PPLInput( + val query: String, + val queryLanguage: QueryLanguage = QueryLanguage.PPL +) : Input { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // query + sin.readEnum(QueryLanguage::class.java) + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(query) + out.writeEnum(queryLanguage) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .startObject(PPL_INPUT_FIELD) + .field(QUERY_FIELD, query) + .field(QUERY_LANGUAGE_FIELD, queryLanguage.value) + .endObject() + .endObject() + } + + override fun asTemplateArg(): Map = + mapOf( + PPL_INPUT_FIELD to mapOf( + QUERY_FIELD to query, + QUERY_LANGUAGE_FIELD to queryLanguage.value + ) + ) + + override fun name(): String = PPL_INPUT_FIELD + + enum class QueryLanguage(val value: String) { + PPL(PPL_QUERY_LANGUAGE), + SQL(SQL_QUERY_LANGUAGE); + + companion object { + fun enumFromString(value: String): QueryLanguage? = QueryLanguage.entries.firstOrNull { it.value == value } + } + } + + companion object { + // PPL Input field names + const val PPL_INPUT_FIELD = "ppl_input" + const val QUERY_FIELD = "query" + const val QUERY_LANGUAGE_FIELD = "query_language" + + // query languages + const val PPL_QUERY_LANGUAGE = "ppl" + const val SQL_QUERY_LANGUAGE = "sql" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + Input::class.java, + ParseField(PPL_INPUT_FIELD), + CheckedFunction { parseInner(it) } + ) + + @JvmStatic + @Throws(IOException::class) + fun parseInner(xcp: XContentParser): PPLInput { + var query: String? = null + var queryLanguage: QueryLanguage = QueryLanguage.PPL // default to PPL + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + when (fieldName) { + QUERY_FIELD -> query = xcp.text() + QUERY_LANGUAGE_FIELD -> { + val input = xcp.text() + val enumMatchResult = QueryLanguage.enumFromString(input) + ?: throw AlertingException.wrap( + IllegalArgumentException( + "Invalid value for $QUERY_LANGUAGE_FIELD: $input. " + + "Supported values are ${QueryLanguage.entries.map { it.value }}" + ) + ) + queryLanguage = enumMatchResult + } + } + } + + requireNotNull(query) + + return PPLInput(query, queryLanguage) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): PPLInput { + return PPLInput(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt new file mode 100644 index 00000000..ebbff7ee --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt @@ -0,0 +1,326 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.common.CheckedFunction +import org.opensearch.common.UUIDs +import org.opensearch.commons.alerting.model.Monitor.Companion.ALERTING_MAX_NAME_LENGTH +import org.opensearch.commons.alerting.model.Monitor.Companion.UUID_LENGTH +import org.opensearch.commons.alerting.model.Trigger.Companion.ACTIONS_FIELD +import org.opensearch.commons.alerting.model.Trigger.Companion.ID_FIELD +import org.opensearch.commons.alerting.model.Trigger.Companion.NAME_FIELD +import org.opensearch.commons.alerting.model.Trigger.Companion.SEVERITY_FIELD +import org.opensearch.commons.alerting.model.action.Action +import org.opensearch.core.ParseField +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import java.io.IOException + +/** + * The PPL Trigger for PPL Monitors + * + * There are two types of PPLTrigger conditions: NUMBER_OF_RESULTS and CUSTOM + * NUMBER_OF_RESULTS: triggers based on whether the number of query results returned by the PPLMonitor + * query meets some threshold + * CUSTOM: triggers based on a custom condition that user specifies (a single ppl where statement) + * + * @property id Trigger ID, defaults to a base64 UUID. + * @property name Display name of the Trigger. + * @property severity The severity level of the Trigger. + * @property actions List of notification-sending actions to run when the Trigger condition is met. + * @property conditionType The type of condition to evaluate. + * Can be either [ConditionType.NUMBER_OF_RESULTS] or [ConditionType.CUSTOM]. + * @property numResultsCondition The comparison operator for NUMBER_OF_RESULTS conditions. Required if using NUMBER_OF_RESULTS conditions, + * required to be null otherwise. + * @property numResultsValue The threshold value for NUMBER_OF_RESULTS conditions. Required if using NUMBER_OF_RESULTS conditions, + * required to be null otherwise. + * @property customCondition A custom condition expression. Required if using CUSTOM conditions, + * required to be null otherwise. + */ +data class PPLTrigger( + override val id: String = UUIDs.base64UUID(), + override val name: String, + override val severity: String, + override val actions: List, + val conditionType: ConditionType, // NUMBER_OF_RESULTS or CUSTOM + val numResultsCondition: NumResultsCondition?, + val numResultsValue: Long?, + val customCondition: String? +) : Trigger { + + init { + requireNotNull(this.name) { "Trigger name must be included." } + requireNotNull(this.severity) { "Trigger severity must be included." } + requireNotNull(this.conditionType) { "Trigger condition type must be included." } + + require(this.id.length <= UUID_LENGTH) { + "Trigger ID too long, length must be less than $UUID_LENGTH." + } + + require(this.name.length <= ALERTING_MAX_NAME_LENGTH) { + "Trigger name too long, length must be less than $ALERTING_MAX_NAME_LENGTH." + } + + this.actions.forEach { + require(it.name.length <= ALERTING_MAX_NAME_LENGTH) { + "Name of action with ID ${it.id} too long, length must be less than $ALERTING_MAX_NAME_LENGTH." + } + require(it.destinationId.length <= NOTIFICATIONS_ID_MAX_LENGTH) { + "Channel ID of action with ID ${it.id} too long, length must be less than $NOTIFICATIONS_ID_MAX_LENGTH." + } + require(it.destinationId.isNotEmpty()) { + "Channel ID should not be empty." + } + } + + when (this.conditionType) { + ConditionType.NUMBER_OF_RESULTS -> { + requireNotNull(this.numResultsCondition) { + "if trigger condition is of type ${ConditionType.NUMBER_OF_RESULTS.value}, " + + "$NUM_RESULTS_CONDITION_FIELD must be included." + } + requireNotNull(this.numResultsValue) { + "if trigger condition is of type ${ConditionType.NUMBER_OF_RESULTS.value}, " + + "$NUM_RESULTS_VALUE_FIELD must be included." + } + require(this.customCondition == null) { + "if trigger condition is of type ${ConditionType.NUMBER_OF_RESULTS.value}, " + + "$CUSTOM_CONDITION_FIELD must not be included." + } + } + ConditionType.CUSTOM -> { + requireNotNull(this.customCondition) { + "if trigger condition is of type ${ConditionType.CUSTOM.value}, " + + "$CUSTOM_CONDITION_FIELD must be included." + } + require(this.numResultsCondition == null) { + "if trigger condition is of type ${ConditionType.CUSTOM.value}, " + + "$NUM_RESULTS_CONDITION_FIELD must not be included." + } + require(this.numResultsValue == null) { + "if trigger condition is of type ${ConditionType.CUSTOM.value}, " + + "$NUM_RESULTS_VALUE_FIELD must not be included." + } + } + } + + if (conditionType == ConditionType.NUMBER_OF_RESULTS) { + require(this.numResultsValue!! >= 0L) { "Number of results to check for cannot be negative." } + } + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readString(), // name + sin.readString(), // severity + sin.readList(::Action), // actions + sin.readEnum(ConditionType::class.java), // conditionType + if (sin.readBoolean()) sin.readEnum(NumResultsCondition::class.java) else null, // numResultsCondition + sin.readOptionalLong(), // numResultsValue + sin.readOptionalString() // customCondition + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(name) + out.writeString(severity) + out.writeCollection(actions) + out.writeEnum(conditionType) + + out.writeBoolean(numResultsCondition != null) + numResultsCondition?.let { out.writeEnum(numResultsCondition) } + + out.writeOptionalLong(numResultsValue) + out.writeOptionalString(customCondition) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + builder.startObject(PPL_TRIGGER_FIELD) + builder.field(ID_FIELD, id) + builder.field(NAME_FIELD, name) + builder.field(SEVERITY_FIELD, severity) + builder.field(ACTIONS_FIELD, actions.toTypedArray()) + builder.field(CONDITION_TYPE_FIELD, conditionType.value) + numResultsCondition?.let { builder.field(NUM_RESULTS_CONDITION_FIELD, numResultsCondition.value) } + numResultsValue?.let { builder.field(NUM_RESULTS_VALUE_FIELD, numResultsValue) } + customCondition?.let { builder.field(CUSTOM_CONDITION_FIELD, customCondition) } + builder.endObject() + builder.endObject() + return builder + } + + override fun asTemplateArg(): Map { + val templateArg = mutableMapOf( + ID_FIELD to id, + NAME_FIELD to name, + SEVERITY_FIELD to severity, + ACTIONS_FIELD to actions.map { it.asTemplateArg() }, + CONDITION_TYPE_FIELD to conditionType.value + ) + + if (conditionType == ConditionType.NUMBER_OF_RESULTS) { + templateArg[NUM_RESULTS_CONDITION_FIELD] = numResultsCondition!!.value + templateArg[NUM_RESULTS_VALUE_FIELD] = numResultsValue!! + } else { + templateArg[CUSTOM_CONDITION_FIELD] = customCondition!! + } + + return templateArg + } + + override fun name(): String { + return PPL_TRIGGER_FIELD + } + + enum class ConditionType(val value: String) { + NUMBER_OF_RESULTS("number_of_results"), + CUSTOM("custom"); + + companion object { + fun enumFromString(value: String): ConditionType? = entries.firstOrNull { it.value == value } + } + } + + enum class NumResultsCondition(val value: String) { + GREATER_THAN(">"), + GREATER_THAN_EQUAL(">="), + LESS_THAN("<"), + LESS_THAN_EQUAL("<="), + EQUAL("=="), + NOT_EQUAL("!="); + + companion object { + fun enumFromString(value: String): NumResultsCondition? = entries.firstOrNull { it.value == value } + } + } + + companion object { + // trigger wrapper object field name + const val PPL_TRIGGER_FIELD = "ppl_trigger" + + // field names + const val CONDITION_TYPE_FIELD = "type" + const val NUM_RESULTS_CONDITION_FIELD = "num_results_condition" + const val NUM_RESULTS_VALUE_FIELD = "num_results_value" + const val CUSTOM_CONDITION_FIELD = "custom_condition" + + // hard, nonadjustable limits + const val NOTIFICATIONS_ID_MAX_LENGTH = 512 // length limit for notifications channel custom ID at channel creation time + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + Trigger::class.java, + ParseField(PPL_TRIGGER_FIELD), + CheckedFunction { parseInner(it) } + ) + + @JvmStatic + @Throws(IOException::class) + fun parseInner(xcp: XContentParser): PPLTrigger { + var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified + var name: String? = null + var severity: String? = null + val actions: MutableList = mutableListOf() + var conditionType: ConditionType? = null + var numResultsCondition: NumResultsCondition? = null + var numResultsValue: Long? = null + var customCondition: String? = null + + /* parse */ + if (xcp.currentToken() != XContentParser.Token.START_OBJECT && xcp.currentToken() != XContentParser.Token.FIELD_NAME) { + XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.tokenLocation) + } + + // If the parser began on START_OBJECT, move to the next token so that the while loop enters on + // the fieldName (or END_OBJECT if it's empty). + if (xcp.currentToken() == XContentParser.Token.START_OBJECT) xcp.nextToken() + + while (xcp.currentToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + ID_FIELD -> id = xcp.text() + NAME_FIELD -> name = xcp.text() + SEVERITY_FIELD -> severity = xcp.text() + CONDITION_TYPE_FIELD -> { + val input = xcp.text() + val enumMatchResult = ConditionType.enumFromString(input) + ?: throw IllegalArgumentException( + "Invalid value for $CONDITION_TYPE_FIELD: $input. " + + "Supported values are ${ConditionType.entries.map { it.value }}" + ) + conditionType = enumMatchResult + } + NUM_RESULTS_CONDITION_FIELD -> { + if (xcp.currentToken() != XContentParser.Token.VALUE_NULL) { + val input = xcp.text() + val enumMatchResult = NumResultsCondition.enumFromString(input) + ?: throw IllegalArgumentException( + "Invalid value for $NUM_RESULTS_CONDITION_FIELD: $input. " + + "Supported values are ${NumResultsCondition.entries.map { it.value }}" + ) + numResultsCondition = enumMatchResult + } + } + NUM_RESULTS_VALUE_FIELD -> { + if (xcp.currentToken() != XContentParser.Token.VALUE_NULL) { + numResultsValue = xcp.longValue() + } + } + CUSTOM_CONDITION_FIELD -> { + if (xcp.currentToken() != XContentParser.Token.VALUE_NULL) { + customCondition = xcp.text() + } + } + ACTIONS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + actions.add(Action.parse(xcp)) + } + } + else -> throw IllegalArgumentException("Unexpected field when parsing PPL Trigger: $fieldName") + } + + xcp.nextToken() + } + + /* validations */ + requireNotNull(name) { "Trigger name must be included" } + requireNotNull(severity) { "Trigger severity must be included" } + requireNotNull(conditionType) { "Trigger condition type must be included" } + + // 3. prepare and return PPLTrigger object + return PPLTrigger( + id, + name, + severity, + actions, + conditionType, + numResultsCondition, + numResultsValue, + customCondition + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): PPLTrigger { + return PPLTrigger(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTrigger.kt index a88ef9b6..3713b1d5 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTrigger.kt @@ -55,7 +55,7 @@ data class QueryLevelTrigger( } /** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */ - fun asTemplateArg(): Map { + override fun asTemplateArg(): Map { return mapOf( ID_FIELD to id, NAME_FIELD to name, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt index 101d0067..04e77564 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt @@ -5,6 +5,7 @@ package org.opensearch.commons.alerting.model +import org.opensearch.Version import org.opensearch.commons.alerting.alerts.AlertError import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput @@ -18,7 +19,8 @@ open class QueryLevelTriggerRunResult( override var triggerName: String, open var triggered: Boolean, override var error: Exception?, - open var actionResults: MutableMap = mutableMapOf() + open var actionResults: MutableMap = mutableMapOf(), + open var pplCustomQueryResults: List> = listOf() ) : TriggerRunResult(triggerName, error) { @Throws(IOException::class) @@ -27,7 +29,12 @@ open class QueryLevelTriggerRunResult( triggerName = sin.readString(), error = sin.readException(), triggered = sin.readBoolean(), - actionResults = sin.readMap() as MutableMap + actionResults = sin.readMap() as MutableMap, + pplCustomQueryResults = if (sin.version.onOrAfter(Version.V_3_7_0)) { + sin.readList { it.readMap() } + } else { + listOf() + } ) override fun alertError(): AlertError? { @@ -47,6 +54,7 @@ open class QueryLevelTriggerRunResult( return builder .field("triggered", triggered) .field("action_results", actionResults as Map) + .field("ppl_query_results", pplCustomQueryResults) } @Throws(IOException::class) @@ -54,6 +62,9 @@ open class QueryLevelTriggerRunResult( super.writeTo(out) out.writeBoolean(triggered) out.writeMap(actionResults as Map) + if (out.version.onOrAfter(Version.V_3_7_0)) { + out.writeCollection(pplCustomQueryResults) { stream, map -> stream.writeMap(map) } + } } companion object { diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt index 7cfb9f41..1851d425 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt @@ -16,7 +16,8 @@ interface Trigger : BaseModel { BUCKET_LEVEL_TRIGGER(BucketLevelTrigger.BUCKET_LEVEL_TRIGGER_FIELD), NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD), CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD), - REMOTE_MONITOR_TRIGGER(RemoteMonitorTrigger.REMOTE_MONITOR_TRIGGER_FIELD); + REMOTE_MONITOR_TRIGGER(RemoteMonitorTrigger.REMOTE_MONITOR_TRIGGER_FIELD), + PPL_TRIGGER(PPLTrigger.PPL_TRIGGER_FIELD); override fun toString(): String { return value @@ -58,6 +59,7 @@ interface Trigger : BaseModel { Type.DOCUMENT_LEVEL_TRIGGER -> DocumentLevelTrigger(sin) Type.CHAINED_ALERT_TRIGGER -> ChainedAlertTrigger(sin) Type.REMOTE_MONITOR_TRIGGER -> RemoteMonitorTrigger(sin) + Type.PPL_TRIGGER -> PPLTrigger(sin) // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns // enum can be null in Java else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") @@ -78,4 +80,6 @@ interface Trigger : BaseModel { val actions: List fun name(): String + + fun asTemplateArg(): Map } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorTrigger.kt index 0e89e5ba..adb3613c 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorTrigger.kt @@ -32,7 +32,7 @@ data class RemoteMonitorTrigger( sin.readBytesReference() ) - fun asTemplateArg(): Map { + override fun asTemplateArg(): Map { val bytes = trigger.toBytesRef().bytes return mapOf( Trigger.ID_FIELD to id, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt index efc8a364..0d4593e2 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt @@ -50,6 +50,10 @@ fun Monitor.isBucketLevelMonitor(): Boolean = isMonitorOfStandardType() && Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.BUCKET_LEVEL_MONITOR +fun Monitor.isPPLMonitor(): Boolean = + isMonitorOfStandardType() && + Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.PPL_MONITOR + fun XContentBuilder.optionalUserField(name: String, user: User?): XContentBuilder { if (user == null) { return nullField(name) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 8177057d..e7ee2e98 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -41,6 +41,8 @@ import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.MonitorRunResult import org.opensearch.commons.alerting.model.NoOpTrigger +import org.opensearch.commons.alerting.model.PPLInput +import org.opensearch.commons.alerting.model.PPLTrigger import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult import org.opensearch.commons.alerting.model.Schedule @@ -174,6 +176,36 @@ fun randomDocumentLevelMonitor( ) } +fun randomPPLMonitor( + name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + user: User = randomUser(), + inputs: List = listOf( + PPLInput( + query = "source=logs | where status > 400", + queryLanguage = PPLInput.QueryLanguage.PPL + ) + ), + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = Random().nextBoolean(), + triggers: List = (1..RandomNumbers.randomIntBetween(Random(), 1, 10)).map { randomPPLTrigger() }, + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + withMetadata: Boolean = false +): Monitor { + return Monitor( + name = name, + monitorType = Monitor.MonitorType.PPL_MONITOR.value, + enabled = enabled, + inputs = inputs, + schedule = schedule, + triggers = triggers, + enabledTime = enabledTime, + lastUpdateTime = lastUpdateTime, + user = user, + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() + ) +} + fun randomWorkflow( name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), user: User? = randomUser(), @@ -325,6 +357,30 @@ fun randomChainedAlertTrigger( ) } +fun randomPPLTrigger( + id: String = UUIDs.base64UUID(), + name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + severity: String = "1", + actions: List = mutableListOf(), + conditionType: PPLTrigger.ConditionType = PPLTrigger.ConditionType.NUMBER_OF_RESULTS, + numResultsCondition: PPLTrigger.NumResultsCondition? = PPLTrigger.NumResultsCondition.GREATER_THAN, + numResultsValue: Long = 0, + customCondition: String? = null +): PPLTrigger { + return PPLTrigger( + id = id, + name = name, + severity = severity, + actions = actions.ifEmpty { + (0..RandomNumbers.randomIntBetween(Random(), 0, 10)).map { randomAction(destinationId = "fake-channel-id") } + }, + conditionType = conditionType, + numResultsCondition = numResultsCondition, + numResultsValue = numResultsValue, + customCondition = customCondition + ) +} + fun randomBucketSelectorExtAggregationBuilder( name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), bucketsPathsMap: MutableMap = mutableMapOf("avg" to "10"), @@ -533,12 +589,14 @@ fun xContentRegistry(): NamedXContentRegistry { listOf( SearchInput.XCONTENT_REGISTRY, DocLevelMonitorInput.XCONTENT_REGISTRY, + PPLInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY, DocumentLevelTrigger.XCONTENT_REGISTRY, ChainedAlertTrigger.XCONTENT_REGISTRY, NoOpTrigger.XCONTENT_REGISTRY, - RemoteMonitorTrigger.XCONTENT_REGISTRY + RemoteMonitorTrigger.XCONTENT_REGISTRY, + PPLTrigger.XCONTENT_REGISTRY ) + SearchModule(Settings.EMPTY, emptyList()).namedXContents ) } @@ -567,6 +625,28 @@ fun randomAlert(monitor: Monitor = randomQueryLevelMonitor()): Alert { ) } +fun randomAlertWithPPLFields(monitor: Monitor = randomQueryLevelMonitor()): Alert { + val trigger = randomQueryLevelTrigger() + val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult()) + val clusterCount = (-1..5).random() + val clusters = if (clusterCount == -1) null else (0..clusterCount).map { "index-$it" } + val pplQuery = "source=logs | where status=200" + val pplQueryResults = listOf( + mapOf("k1" to "v1", "num" to 42, "user" to mapOf("name" to "bob", "age" to 32), "vals" to listOf(1, 2, 3)), + mapOf("k1" to "v2", "num" to 17, "user" to mapOf("name" to "ana", "age" to 45), "vals" to listOf("a", "b", "c")) + ) + return Alert( + monitor, + trigger, + Instant.now().truncatedTo(ChronoUnit.MILLIS), + null, + actionExecutionResults = actionExecutionResults, + clusters = clusters, + query = pplQuery, + queryResults = pplQueryResults + ) +} + fun randomChainedAlert( workflow: Workflow = randomWorkflow(), trigger: ChainedAlertTrigger = randomChainedAlertTrigger() @@ -681,7 +761,39 @@ fun createCorrelationAlertTemplateArgs(correlationAlert: CorrelationAlert): Map< } fun randomInputRunResults(): InputRunResults { - return InputRunResults(listOf(), null) + return InputRunResults( + listOf( + mapOf( + "aggregation_result" to 42, + "bucket_key" to mapOf("nested-key" to "nested-val") + ) + ), + null + ) +} + +fun randomInputRunResultsWithPPLFields(): InputRunResults { + return InputRunResults( + listOf( + mapOf( + "aggregation_result" to 42, + "bucket_key" to mapOf("nested-key" to "nested-val") + ) + ), + null, + null, + listOf>( + mapOf( + "key1" to "val1", + "key2" to 4 + ), + mapOf( + "key3" to listOf(1, 2, 3), + "key4" to mapOf("nested-key" to "nested-val") + ) + ), + 5L + ) } fun randomActionRunResult(): ActionRunResult { @@ -774,7 +886,41 @@ fun randomQueryLevelTriggerRunResult(): QueryLevelTriggerRunResult { val map = mutableMapOf() map.plus(Pair("key1", randomActionRunResult())) map.plus(Pair("key2", randomActionRunResult())) - return QueryLevelTriggerRunResult("trigger-name", true, null, map) + + return QueryLevelTriggerRunResult( + "trigger-name", + true, + null, + map + ) +} + +fun randomQueryLevelTriggerRunResultWithPPLFields(): QueryLevelTriggerRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", randomActionRunResult())) + map.plus(Pair("key2", randomActionRunResult())) + + val queryResultsList = mutableListOf>() + queryResultsList.add( + mapOf( + "key1" to "val1", + "key2" to 4 + ) + ) + queryResultsList.add( + mapOf( + "key3" to listOf(1, 2, 3), + "key4" to mapOf("nested-key" to "nested-val") + ) + ) + + return QueryLevelTriggerRunResult( + "trigger-name", + true, + null, + map, + queryResultsList + ) } fun randomQueryLevelMonitorRunResult(): MonitorRunResult { diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt index 6aecb888..1f3db811 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt @@ -13,6 +13,8 @@ import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger import org.opensearch.commons.alerting.randomAction import org.opensearch.commons.alerting.randomActionExecutionPolicy +import org.opensearch.commons.alerting.randomAlert +import org.opensearch.commons.alerting.randomAlertWithPPLFields import org.opensearch.commons.alerting.randomBucketLevelMonitorRunResult import org.opensearch.commons.alerting.randomBucketLevelTrigger import org.opensearch.commons.alerting.randomBucketLevelTriggerRunResult @@ -21,10 +23,14 @@ import org.opensearch.commons.alerting.randomDocLevelQuery import org.opensearch.commons.alerting.randomDocumentLevelMonitorRunResult import org.opensearch.commons.alerting.randomDocumentLevelTrigger import org.opensearch.commons.alerting.randomInputRunResults +import org.opensearch.commons.alerting.randomInputRunResultsWithPPLFields +import org.opensearch.commons.alerting.randomPPLMonitor +import org.opensearch.commons.alerting.randomPPLTrigger import org.opensearch.commons.alerting.randomQueryLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelMonitorRunResult import org.opensearch.commons.alerting.randomQueryLevelTrigger import org.opensearch.commons.alerting.randomQueryLevelTriggerRunResult +import org.opensearch.commons.alerting.randomQueryLevelTriggerRunResultWithPPLFields import org.opensearch.commons.alerting.randomThrottle import org.opensearch.commons.alerting.randomUser import org.opensearch.commons.alerting.randomUserEmpty @@ -43,6 +49,26 @@ import kotlin.test.assertTrue class WriteableTests { + @Test + fun `test alert as stream`() { + val alert = randomAlert() + val out = BytesStreamOutput() + alert.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newAlert = Alert(sin) + Assertions.assertEquals(alert, newAlert, "Round tripping Alert doesn't work") + } + + @Test + fun `test alert with PPL fields as stream`() { + val alert = randomAlertWithPPLFields() + val out = BytesStreamOutput() + alert.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newAlert = Alert(sin) + Assertions.assertEquals(alert, newAlert, "Round tripping Alert with PPL fields doesn't work") + } + @Test fun `test throttle as stream`() { val throttle = randomThrottle() @@ -103,6 +129,16 @@ class WriteableTests { Assertions.assertEquals(monitor, newMonitor, "Round tripping QueryLevelMonitor doesn't work") } + @Test + fun `test ppl monitor as stream`() { + val monitor = randomPPLMonitor() + val out = BytesStreamOutput() + monitor.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newMonitor = Monitor(sin) + Assertions.assertEquals(monitor, newMonitor, "Round tripping PPLMonitor doesn't work") + } + @Test fun `test workflow as stream`() { val workflow = randomWorkflow(monitorIds = listOf("1", "2", "3", "4")) @@ -143,6 +179,16 @@ class WriteableTests { Assertions.assertEquals(trigger, newTrigger, "Round tripping DocumentLevelTrigger doesn't work") } + @Test + fun `test ppl trigger as stream`() { + val trigger = randomPPLTrigger() + val out = BytesStreamOutput() + trigger.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newTrigger = PPLTrigger.readFrom(sin) + Assertions.assertEquals(trigger, newTrigger, "Round tripping PPLTrigger doesn't work") + } + @Test fun `test doc-level query as stream`() { val dlq = randomDocLevelQuery() @@ -186,6 +232,16 @@ class WriteableTests { Assertions.assertEquals(input, newInput, "Round tripping MonitorRunResult doesn't work") } + @Test + fun `test ppl input as stream`() { + val input = PPLInput("source=some-index") + val out = BytesStreamOutput() + input.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newInput = PPLInput(sin) + Assertions.assertEquals(input, newInput, "Round tripping PPLInput doesn't work") + } + @Test fun `test user as stream`() { val user = randomUser() @@ -247,6 +303,20 @@ class WriteableTests { OpenSearchTestCase.assertEquals(runResult.actionResults, newRunResult.actionResults) } + @Test + fun `test query-level triggerrunresult with PPL fields as stream`() { + val runResult = randomQueryLevelTriggerRunResultWithPPLFields() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = QueryLevelTriggerRunResult(sin) + OpenSearchTestCase.assertEquals(runResult.triggerName, newRunResult.triggerName) + OpenSearchTestCase.assertEquals(runResult.triggered, newRunResult.triggered) + OpenSearchTestCase.assertEquals(runResult.error, newRunResult.error) + OpenSearchTestCase.assertEquals(runResult.actionResults, newRunResult.actionResults) + OpenSearchTestCase.assertEquals(runResult.pplCustomQueryResults, newRunResult.pplCustomQueryResults) + } + @Test fun `test bucket-level triggerrunresult as stream`() { val runResult = randomBucketLevelTriggerRunResult() @@ -277,6 +347,16 @@ class WriteableTests { OpenSearchTestCase.assertEquals("Round tripping InputRunResults doesn't work", runResult, newRunResult) } + @Test + fun `test inputrunresult with PPL fields as stream`() { + val runResult = randomInputRunResultsWithPPLFields() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = InputRunResults.readFrom(sin) + OpenSearchTestCase.assertEquals("Round tripping InputRunResults with PPL fields doesn't work", runResult, newRunResult) + } + @Test fun `test query-level monitorrunresult as stream`() { val runResult = randomQueryLevelMonitorRunResult() diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index d0072ec9..d9898dc5 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -23,9 +23,12 @@ import org.opensearch.commons.alerting.randomAlert import org.opensearch.commons.alerting.randomBucketLevelMonitor import org.opensearch.commons.alerting.randomBucketLevelTrigger import org.opensearch.commons.alerting.randomDocLevelQuery +import org.opensearch.commons.alerting.randomPPLMonitor +import org.opensearch.commons.alerting.randomPPLTrigger import org.opensearch.commons.alerting.randomQueryLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelMonitorWithoutUser import org.opensearch.commons.alerting.randomQueryLevelTrigger +import org.opensearch.commons.alerting.randomSearchInput import org.opensearch.commons.alerting.randomThrottle import org.opensearch.commons.alerting.randomUser import org.opensearch.commons.alerting.randomUserEmpty @@ -69,6 +72,7 @@ class XContentTests { Assertions.assertEquals(action, parsedAction, "Round tripping Action doesn't work") } + @Test fun `test action parsing with throttled enabled and null throttle`() { val action = randomAction().copy(throttle = null).copy(throttleEnabled = true) val actionString = action.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() @@ -127,6 +131,7 @@ class XContentTests { } } + @Test fun `test query-level monitor parsing`() { val monitor = randomQueryLevelMonitor() @@ -222,6 +227,15 @@ class XContentTests { Assertions.assertEquals(workflow, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work") } + @Test + fun `test ppl monitor parsing`() { + val monitor = randomPPLMonitor() + + val monitorString = monitor.toJsonStringWithUser() + val parsedMonitor = Monitor.parse(parser(monitorString)) + Assertions.assertEquals(monitor, parsedMonitor, "Round tripping PPLMonitor doesn't work") + } + @Test fun `test query-level trigger parsing`() { val trigger = randomQueryLevelTrigger() @@ -252,6 +266,16 @@ class XContentTests { Assertions.assertEquals(trigger, parsedTrigger, "Round tripping NoOpTrigger doesn't work") } + @Test + fun `test ppl trigger parsing`() { + val trigger = randomPPLTrigger() + + val triggerString = trigger.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() + val parsedTrigger = Trigger.parse(parser(triggerString)) + + Assertions.assertEquals(trigger, parsedTrigger, "Round tripping PPLTrigger doesn't work") + } + @Test fun `test creating a monitor with duplicate trigger ids fails`() { try { @@ -424,6 +448,16 @@ class XContentTests { } } + @Test + fun `test creating a ppl monitor with invalid trigger type fails`() { + try { + val queryLevelTrigger = randomQueryLevelTrigger() + randomPPLMonitor().copy(triggers = listOf(queryLevelTrigger)) + Assertions.fail("Creating a PPL monitor with query-level triggers did not fail.") + } catch (ignored: IllegalArgumentException) { + } + } + @Test fun `test creating an bucket-level monitor with invalid input fails`() { try { @@ -434,6 +468,16 @@ class XContentTests { } } + @Test + fun `test creating a ppl monitor with invalid input type fails`() { + try { + val searchInput = randomSearchInput() + randomPPLMonitor().copy(inputs = listOf(searchInput)) + Assertions.fail("Creating a PPL monitor with search input did not fail.") + } catch (ignored: IllegalArgumentException) { + } + } + @Test fun `test action execution policy`() { val actionExecutionPolicy = randomActionExecutionPolicy()