From d429761baa7c66c0f6c135a71faf59a1c0094206 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Mon, 9 Feb 2026 15:01:15 -0800 Subject: [PATCH 01/11] PPL Alerting: Adding PPL Related Models Signed-off-by: Dennis Toepker --- .../alerting/action/GetAlertsResponse.kt | 9 +- .../commons/alerting/model/Alert.kt | 118 +++++- .../alerting/model/BucketLevelTrigger.kt | 2 +- .../alerting/model/ChainedAlertTrigger.kt | 2 +- .../alerting/model/DocumentLevelTrigger.kt | 2 +- .../commons/alerting/model/Input.kt | 9 +- .../commons/alerting/model/Monitor.kt | 34 +- .../commons/alerting/model/NoOpTrigger.kt | 2 +- .../commons/alerting/model/PPLSQLInput.kt | 110 ++++++ .../commons/alerting/model/PPLSQLTrigger.kt | 335 ++++++++++++++++++ .../alerting/model/PPLSQLTriggerRunResult.kt | 42 +++ .../alerting/model/QueryLevelTrigger.kt | 2 +- .../commons/alerting/model/Trigger.kt | 5 +- .../remote/monitors/RemoteMonitorTrigger.kt | 2 +- .../commons/alerting/util/IndexUtils.kt | 8 + .../commons/alerting/TestHelpers.kt | 9 +- .../commons/alerting/model/WriteableTests.kt | 11 + .../commons/alerting/model/XContentTests.kt | 1 + 18 files changed, 685 insertions(+), 18 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLInput.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTrigger.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTriggerRunResult.kt diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetAlertsResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetAlertsResponse.kt index 50a4ec53..4570d7cf 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/GetAlertsResponse.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetAlertsResponse.kt @@ -39,9 +39,14 @@ class GetAlertsResponse : BaseResponse { @Throws(IOException::class) override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder.startObject() - .field("alerts", alerts) - .field("totalAlerts", totalAlerts) + .field(ALERTS_FIELD, alerts) + .field(TOTAL_ALERTS_FIELD, totalAlerts) return builder.endObject() } + + companion object { + const val ALERTS_FIELD = "alerts" + const val TOTAL_ALERTS_FIELD = "totalAlerts" + } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt index 6e10b37f..a064f06d 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt @@ -3,6 +3,7 @@ package org.opensearch.commons.alerting.model import org.opensearch.Version import org.opensearch.common.lucene.uid.Versions import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.commons.alerting.model.Monitor.Companion.suppressWarning import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION import org.opensearch.commons.alerting.util.instant import org.opensearch.commons.alerting.util.optionalTimeField @@ -45,9 +46,10 @@ data class Alert( val executionId: String? = null, val associatedAlertIds: List, val clusters: List? = null, + val pplQuery: String? = null, + val pplQueryResults: List> = listOf(), val target: Target? = null ) : Writeable, ToXContent { - init { if (errorMessage != null) { require(state == State.DELETED || state == State.ERROR || state == State.AUDIT) { @@ -56,6 +58,64 @@ data class Alert( } } + constructor( + id: String = NO_ID, + version: Long = NO_VERSION, + schemaVersion: Int = NO_SCHEMA_VERSION, + monitorId: String, + workflowId: String, + workflowName: String, + monitorName: String, + monitorVersion: Long, + monitorUser: User?, + triggerId: String, + triggerName: String, + findingIds: List, + relatedDocIds: List, + state: State, + startTime: Instant, + endTime: Instant? = null, + lastNotificationTime: Instant? = null, + acknowledgedTime: Instant? = null, + errorMessage: String? = null, + errorHistory: List, + severity: String, + actionExecutionResults: List, + aggregationResultBucket: AggregationResultBucket? = null, + executionId: String? = null, + associatedAlertIds: List, + clusters: List? = null + ) : this ( + id = id, + version = version, + schemaVersion = schemaVersion, + monitorId = monitorId, + workflowId = workflowId, + workflowName = workflowName, + monitorName = monitorName, + monitorVersion = monitorVersion, + monitorUser = monitorUser, + triggerId = triggerId, + triggerName = triggerName, + findingIds = findingIds, + relatedDocIds = relatedDocIds, + state = state, + startTime = startTime, + endTime = endTime, + lastNotificationTime = lastNotificationTime, + acknowledgedTime = acknowledgedTime, + errorMessage = errorMessage, + errorHistory = errorHistory, + severity = severity, + actionExecutionResults = actionExecutionResults, + aggregationResultBucket = aggregationResultBucket, + executionId = executionId, + associatedAlertIds = associatedAlertIds, + clusters = clusters, + pplQuery = null, + pplQueryResults = listOf() + ) + constructor( startTime: Instant, lastNotificationTime: Instant?, @@ -89,12 +149,16 @@ data class Alert( workflowId = workflow.id, workflowName = workflow.name, associatedAlertIds = associatedAlertIds, - clusters = clusters + clusters = clusters, + pplQuery = null, + pplQueryResults = listOf() ) + // constructor for Alerts from QueryLevelMonitorRunner + // this monitor runner runs Query-level, Cluster Metrics, and PPL Monitors constructor( monitor: Monitor, - trigger: QueryLevelTrigger, + trigger: Trigger, startTime: Instant, lastNotificationTime: Instant?, state: State = State.ACTIVE, @@ -104,7 +168,9 @@ data class Alert( schemaVersion: Int = NO_SCHEMA_VERSION, executionId: String? = null, workflowId: String? = null, - clusters: List? = null + clusters: List? = null, + pplQuery: String? = null, + pplQueryResults: List> = listOf() ) : this( monitorId = monitor.id, monitorName = monitor.name, @@ -128,6 +194,8 @@ data class Alert( workflowName = "", associatedAlertIds = emptyList(), clusters = clusters, + pplQuery = pplQuery, + pplQueryResults = pplQueryResults, target = monitor.target ) @@ -168,6 +236,8 @@ data class Alert( workflowName = "", associatedAlertIds = emptyList(), clusters = clusters, + pplQuery = null, + pplQueryResults = listOf(), target = monitor.target ) @@ -209,6 +279,8 @@ data class Alert( workflowName = "", associatedAlertIds = emptyList(), clusters = clusters, + pplQuery = null, + pplQueryResults = listOf(), target = monitor.target ) @@ -252,6 +324,8 @@ data class Alert( workflowName = "", associatedAlertIds = emptyList(), clusters = clusters, + pplQuery = null, + pplQueryResults = listOf(), target = monitor.target ) @@ -292,6 +366,8 @@ data class Alert( executionId = executionId, associatedAlertIds = emptyList(), clusters = clusters, + pplQuery = null, + pplQueryResults = listOf(), target = monitor.target ) @@ -337,6 +413,8 @@ data class Alert( executionId = sin.readOptionalString(), associatedAlertIds = sin.readStringList(), clusters = sin.readOptionalStringList(), + pplQuery = sin.readOptionalString(), + pplQueryResults = sin.readList { input -> suppressWarning(input.readMap()) }, target = if (sin.version.onOrAfter(Version.V_3_6_0)) { if (sin.readBoolean()) Target(sin) else null } else { @@ -380,6 +458,10 @@ data class Alert( out.writeOptionalString(executionId) out.writeStringCollection(associatedAlertIds) out.writeOptionalStringArray(clusters?.toTypedArray()) + out.writeOptionalString(pplQuery) + out.writeCollection(pplQueryResults) { output, map -> + output.writeMap(map) + } if (out.version.onOrAfter(Version.V_3_6_0)) { out.writeBoolean(target != null) target?.writeTo(out) @@ -415,6 +497,8 @@ data class Alert( const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH const val CLUSTERS_FIELD = "clusters" + const val PPL_SQL_QUERY_FIELD = "ppl_query" + const val PPL_SQL_QUERY_RESULTS_FIELD = "ppl_query_results" const val TARGET_FIELD = "target" const val NO_ID = "" const val NO_VERSION = Versions.NOT_FOUND @@ -447,6 +531,8 @@ data class Alert( var aggAlertBucket: AggregationResultBucket? = null val associatedAlertIds = mutableListOf() val clusters = mutableListOf() + var pplQuery: String? = null + var pplQueryResults: List> = listOf() var target: Target? = null ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -523,6 +609,19 @@ data class Alert( clusters.add(xcp.text()) } } + PPL_SQL_QUERY_FIELD -> { + if (xcp.currentToken() != XContentParser.Token.VALUE_NULL) { + pplQuery = xcp.text() + } + } + PPL_SQL_QUERY_RESULTS_FIELD -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + val resultsList = mutableListOf>() + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + resultsList.add(xcp.map()) + } + pplQueryResults = resultsList + } TARGET_FIELD -> target = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { null } else { @@ -558,6 +657,8 @@ data class Alert( workflowName = workflowName, associatedAlertIds = associatedAlertIds, clusters = if (clusters.size > 0) clusters else null, + pplQuery = pplQuery, + pplQueryResults = pplQueryResults, target = target ) } @@ -612,6 +713,11 @@ data class Alert( if (!clusters.isNullOrEmpty()) builder.field(CLUSTERS_FIELD, clusters.toTypedArray()) if (target != null) builder.field(TARGET_FIELD, target) + if (!pplQuery.isNullOrEmpty()) builder.field(PPL_SQL_QUERY_FIELD, pplQuery) + if (pplQueryResults.isNotEmpty()) { + builder.field(PPL_SQL_QUERY_RESULTS_FIELD, pplQueryResults.toTypedArray()) + } + builder.endObject() return builder } @@ -636,7 +742,9 @@ data class Alert( PARENTS_BUCKET_PATH to aggregationResultBucket?.parentBucketPath, FINDING_IDS to findingIds.joinToString(","), RELATED_DOC_IDS to relatedDocIds.joinToString(","), - CLUSTERS_FIELD to clusters?.joinToString(",") + CLUSTERS_FIELD to clusters?.joinToString(","), + PPL_SQL_QUERY_FIELD to pplQuery, + PPL_SQL_QUERY_RESULTS_FIELD to pplQueryResults ) } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTrigger.kt index bac059f8..337a6e21 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTrigger.kt @@ -63,7 +63,7 @@ data class BucketLevelTrigger( out.writeCollection(actions) } - fun asTemplateArg(): Map { + override fun asTemplateArg(): Map { return mapOf( ID_FIELD to id, NAME_FIELD to name, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt index c56ce856..fc95e9dc 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt @@ -55,7 +55,7 @@ data class ChainedAlertTrigger( } /** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */ - fun asTemplateArg(): Map { + override fun asTemplateArg(): Map { return mapOf( ID_FIELD to id, NAME_FIELD to name, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTrigger.kt index a1f8b617..5d7ec338 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTrigger.kt @@ -55,7 +55,7 @@ data class DocumentLevelTrigger( } /** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */ - fun asTemplateArg(): Map { + override fun asTemplateArg(): Map { return mapOf( ID_FIELD to id, NAME_FIELD to name, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt index 3846cea6..a47376b9 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt @@ -2,6 +2,7 @@ package org.opensearch.commons.alerting.model import org.opensearch.commons.alerting.model.ClusterMetricsInput.Companion.URI_FIELD import org.opensearch.commons.alerting.model.DocLevelMonitorInput.Companion.DOC_LEVEL_INPUT_FIELD +import org.opensearch.commons.alerting.model.PPLSQLInput.Companion.PPL_SQL_INPUT_FIELD import org.opensearch.commons.alerting.model.SearchInput.Companion.SEARCH_FIELD import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput.Companion.REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD @@ -20,7 +21,8 @@ interface Input : BaseModel { CLUSTER_METRICS_INPUT(URI_FIELD), SEARCH_INPUT(SEARCH_FIELD), REMOTE_MONITOR_INPUT(REMOTE_MONITOR_INPUT_FIELD), - REMOTE_DOC_LEVEL_MONITOR_INPUT(REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD); + REMOTE_DOC_LEVEL_MONITOR_INPUT(REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD), + PPL_SQL_INPUT(PPL_SQL_INPUT_FIELD); override fun toString(): String { return value @@ -42,8 +44,10 @@ interface Input : BaseModel { DocLevelMonitorInput.parse(xcp) } else if (xcp.currentName() == Type.REMOTE_MONITOR_INPUT.value) { RemoteMonitorInput.parse(xcp) - } else { + } else if (xcp.currentName() == Type.REMOTE_DOC_LEVEL_MONITOR_INPUT.value) { RemoteDocLevelMonitorInput.parse(xcp) + } else { + PPLSQLInput.parseInner(xcp) } XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) return input @@ -58,6 +62,7 @@ interface Input : BaseModel { Type.SEARCH_INPUT -> SearchInput(sin) Type.REMOTE_MONITOR_INPUT -> RemoteMonitorInput(sin) Type.REMOTE_DOC_LEVEL_MONITOR_INPUT -> RemoteDocLevelMonitorInput(sin) + Type.PPL_SQL_INPUT -> PPLSQLInput(sin) // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns // enum can be null in Java else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt index 53d2ef2e..e036d20e 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt @@ -10,6 +10,7 @@ import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION import org.opensearch.commons.alerting.util.IndexUtils.Companion.supportedClusterMetricsSettings import org.opensearch.commons.alerting.util.instant import org.opensearch.commons.alerting.util.isBucketLevelMonitor +import org.opensearch.commons.alerting.util.isPplSqlMonitor import org.opensearch.commons.alerting.util.optionalTimeField import org.opensearch.commons.alerting.util.optionalUserField import org.opensearch.commons.authuser.User @@ -69,6 +70,8 @@ data class Monitor( require(trigger is QueryLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } MonitorType.DOC_LEVEL_MONITOR.value -> require(trigger is DocumentLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } + MonitorType.PPL_MONITOR.value -> + require(trigger is PPLSQLTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } } } if (enabled) { @@ -87,6 +90,27 @@ data class Monitor( } } } + + if (this.isPplSqlMonitor()) { + require(inputs.size == 1) { "Exactly 1 PPL query must be specified for PPL Monitor" } + + val pplSqlInput = inputs[0] + + require(pplSqlInput is PPLSQLInput) { "Unsupported input [${pplSqlInput.name()}] for PPL Monitor" } + + require(pplSqlInput.queryLanguage == PPLSQLInput.QueryLanguage.PPL) { "SQL queries are not supported. Please use a PPL query." } + + // this is a new check for PPL Alerting specifically, PPL Monitors will enforce + // a max name length, but it won't be enforced on other Monitor types to avoid + // adding a breaking change + require(this.name.length <= ALERTING_V2_MAX_NAME_LENGTH) { + "Monitor name too long, length must be at most $ALERTING_V2_MAX_NAME_LENGTH." + } + + // this is a new check for PPL Alerting specifically, other Monitor types allow + // themselves to be created without any Triggers + require(this.triggers.isNotEmpty()) { "PPL Monitor must include at least 1 trigger." } + } } @Throws(IOException::class) @@ -149,7 +173,8 @@ data class Monitor( QUERY_LEVEL_MONITOR("query_level_monitor"), BUCKET_LEVEL_MONITOR("bucket_level_monitor"), CLUSTER_METRICS_MONITOR("cluster_metrics_monitor"), - DOC_LEVEL_MONITOR("doc_level_monitor"); + DOC_LEVEL_MONITOR("doc_level_monitor"), + PPL_MONITOR("ppl_monitor"); override fun toString(): String { return value @@ -235,6 +260,8 @@ data class Monitor( out.writeEnum(Input.Type.SEARCH_INPUT) } else if (it is DocLevelMonitorInput) { out.writeEnum(Input.Type.DOCUMENT_LEVEL_INPUT) + } else if (it is PPLSQLInput) { + out.writeEnum(Input.Type.PPL_SQL_INPUT) } else { out.writeEnum(Input.Type.REMOTE_DOC_LEVEL_MONITOR_INPUT) } @@ -247,6 +274,7 @@ data class Monitor( is BucketLevelTrigger -> out.writeEnum(Trigger.Type.BUCKET_LEVEL_TRIGGER) is DocumentLevelTrigger -> out.writeEnum(Trigger.Type.DOCUMENT_LEVEL_TRIGGER) is RemoteMonitorTrigger -> out.writeEnum(Trigger.Type.REMOTE_MONITOR_TRIGGER) + is PPLSQLTrigger -> out.writeEnum(Trigger.Type.PPL_SQL_TRIGGER) else -> out.writeEnum(Trigger.Type.QUERY_LEVEL_TRIGGER) } it.writeTo(out) @@ -299,6 +327,10 @@ data class Monitor( const val TARGET_FIELD = "target" val MONITOR_TYPE_PATTERN = Pattern.compile("[a-zA-Z0-9_]{5,25}") + // hard, nonadjustable limits for PPL Alerting + const val ALERTING_V2_MAX_NAME_LENGTH = 30 // max length of any name for monitors, triggers, notif actions, etc + const val UUID_LENGTH = 20 // the length of a UUID generated by UUIDs.base64UUID() + // This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all // the different subclasses and creating circular dependencies val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/NoOpTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/NoOpTrigger.kt index 3ffacb6e..90d3d3a4 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/NoOpTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/NoOpTrigger.kt @@ -36,7 +36,7 @@ data class NoOpTrigger( return NOOP_TRIGGER_FIELD } - fun asTemplateArg(): Map { + override fun asTemplateArg(): Map { return mapOf() } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLInput.kt new file mode 100644 index 00000000..3f9e69ac --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLInput.kt @@ -0,0 +1,110 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.CheckedFunction +import org.opensearch.commons.alerting.util.AlertingException +import org.opensearch.core.ParseField +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import java.io.IOException + +data class PPLSQLInput( + val query: String, + val queryLanguage: QueryLanguage = QueryLanguage.PPL +) : Input { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // query + sin.readEnum(QueryLanguage::class.java) + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(query) + out.writeEnum(queryLanguage) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .startObject(PPL_SQL_INPUT_FIELD) + .field(QUERY_FIELD, query) + .field(QUERY_LANGUAGE_FIELD, queryLanguage.value) + .endObject() + .endObject() + } + + override fun asTemplateArg(): Map = + mapOf( + PPL_SQL_INPUT_FIELD to mapOf( + QUERY_FIELD to query, + QUERY_LANGUAGE_FIELD to queryLanguage.value + ) + ) + + override fun name(): String = PPL_SQL_INPUT_FIELD + + enum class QueryLanguage(val value: String) { + PPL(PPL_QUERY_LANGUAGE), + SQL(SQL_QUERY_LANGUAGE); + + companion object { + fun enumFromString(value: String): QueryLanguage? = QueryLanguage.entries.firstOrNull { it.value == value } + } + } + + companion object { + // PPL/SQL Input field names + const val PPL_SQL_INPUT_FIELD = "ppl_input" + const val QUERY_FIELD = "query" + const val QUERY_LANGUAGE_FIELD = "query_language" + + // query languages + const val PPL_QUERY_LANGUAGE = "ppl" + const val SQL_QUERY_LANGUAGE = "sql" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + Input::class.java, + ParseField(PPL_SQL_INPUT_FIELD), + CheckedFunction { parseInner(it) } + ) + + @JvmStatic + @Throws(IOException::class) + fun parseInner(xcp: XContentParser): PPLSQLInput { + lateinit var query: String + var queryLanguage: QueryLanguage = QueryLanguage.PPL // default to PPL + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + when (fieldName) { + QUERY_FIELD -> query = xcp.text() + QUERY_LANGUAGE_FIELD -> { + val input = xcp.text() + val enumMatchResult = QueryLanguage.enumFromString(input) + ?: throw AlertingException.wrap( + IllegalArgumentException( + "Invalid value for $QUERY_LANGUAGE_FIELD: $input. " + + "Supported values are ${QueryLanguage.entries.map { it.value }}" + ) + ) + queryLanguage = enumMatchResult + } + } + } + return PPLSQLInput(query, queryLanguage) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): PPLSQLInput { + return PPLSQLInput(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTrigger.kt new file mode 100644 index 00000000..3c661f1e --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTrigger.kt @@ -0,0 +1,335 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.common.CheckedFunction +import org.opensearch.common.UUIDs +import org.opensearch.commons.alerting.model.Monitor.Companion.ALERTING_V2_MAX_NAME_LENGTH +import org.opensearch.commons.alerting.model.Monitor.Companion.UUID_LENGTH +import org.opensearch.commons.alerting.model.Trigger.Companion.ACTIONS_FIELD +import org.opensearch.commons.alerting.model.Trigger.Companion.ID_FIELD +import org.opensearch.commons.alerting.model.Trigger.Companion.NAME_FIELD +import org.opensearch.commons.alerting.model.Trigger.Companion.SEVERITY_FIELD +import org.opensearch.commons.alerting.model.action.Action +import org.opensearch.core.ParseField +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import java.io.IOException + +/** + * The PPL/SQL Trigger for PPL/SQL Monitors + * + * There are two types of PPLTrigger conditions: NUMBER_OF_RESULTS and CUSTOM + * NUMBER_OF_RESULTS: triggers based on whether the number of query results returned by the PPLSQLMonitor + * query meets some threshold + * CUSTOM: triggers based on a custom condition that user specifies (a single ppl eval statement) + * + * @property id Trigger ID, defaults to a base64 UUID. + * @property name Display name of the Trigger. + * @property severity The severity level of the Trigger. + * @property actions List of notification-sending actions to run when the Trigger condition is met. + * @property conditionType The type of condition to evaluate. + * Can be either [ConditionType.NUMBER_OF_RESULTS] or [ConditionType.CUSTOM]. + * @property numResultsCondition The comparison operator for NUMBER_OF_RESULTS conditions. Required if using NUMBER_OF_RESULTS conditions, + * required to be null otherwise. + * @property numResultsValue The threshold value for NUMBER_OF_RESULTS conditions. Required if using NUMBER_OF_RESULTS conditions, + * required to be null otherwise. + * @property customCondition A custom condition expression. Required if using CUSTOM conditions, + * required to be null otherwise. + * + * @opensearch.experimental + */ +data class PPLSQLTrigger( + override val id: String = UUIDs.base64UUID(), + override val name: String, + override val severity: String, + override val actions: List, + val conditionType: ConditionType, // NUMBER_OF_RESULTS or CUSTOM + val numResultsCondition: NumResultsCondition?, + val numResultsValue: Long?, + val customCondition: String? +) : Trigger { + + init { + requireNotNull(this.name) { "Trigger name must be included." } + requireNotNull(this.severity) { "Trigger severity must be included." } + requireNotNull(this.conditionType) { "Trigger condition type must be included." } + + require(this.id.length <= UUID_LENGTH) { + "Trigger ID too long, length must be less than $UUID_LENGTH." + } + + require(this.name.length <= ALERTING_V2_MAX_NAME_LENGTH) { + "Trigger name too long, length must be less than $ALERTING_V2_MAX_NAME_LENGTH." + } + + this.actions.forEach { + require(it.name.length <= ALERTING_V2_MAX_NAME_LENGTH) { + "Name of action with ID ${it.id} too long, length must be less than $ALERTING_V2_MAX_NAME_LENGTH." + } + require(it.destinationId.length <= NOTIFICATIONS_ID_MAX_LENGTH) { + "Channel ID of action with ID ${it.id} too long, length must be less than $NOTIFICATIONS_ID_MAX_LENGTH." + } + require(it.destinationId.isNotEmpty()) { + "Channel ID should not be empty." + } + require(it.destinationId.matches(validCharsRegex)) { + "Channel ID should only have alphanumeric characters, dashes, and underscores." + } + } + + when (this.conditionType) { + ConditionType.NUMBER_OF_RESULTS -> { + requireNotNull(this.numResultsCondition) { + "if trigger condition is of type ${ConditionType.NUMBER_OF_RESULTS.value}, " + + "$NUM_RESULTS_CONDITION_FIELD must be included." + } + requireNotNull(this.numResultsValue) { + "if trigger condition is of type ${ConditionType.NUMBER_OF_RESULTS.value}, " + + "$NUM_RESULTS_VALUE_FIELD must be included." + } + require(this.customCondition == null) { + "if trigger condition is of type ${ConditionType.NUMBER_OF_RESULTS.value}, " + + "$CUSTOM_CONDITION_FIELD must not be included." + } + } + ConditionType.CUSTOM -> { + requireNotNull(this.customCondition) { + "if trigger condition is of type ${ConditionType.CUSTOM.value}, " + + "$CUSTOM_CONDITION_FIELD must be included." + } + require(this.numResultsCondition == null) { + "if trigger condition is of type ${ConditionType.CUSTOM.value}, " + + "$NUM_RESULTS_CONDITION_FIELD must not be included." + } + require(this.numResultsValue == null) { + "if trigger condition is of type ${ConditionType.CUSTOM.value}, " + + "$NUM_RESULTS_VALUE_FIELD must not be included." + } + } + } + + if (conditionType == ConditionType.NUMBER_OF_RESULTS) { + require(this.numResultsValue!! >= 0L) { "Number of results to check for cannot be negative." } + } + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readString(), // name + sin.readString(), // severity + sin.readList(::Action), // actions + sin.readEnum(ConditionType::class.java), // conditionType + if (sin.readBoolean()) sin.readEnum(NumResultsCondition::class.java) else null, // numResultsCondition + sin.readOptionalLong(), // numResultsValue + sin.readOptionalString() // customCondition + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(name) + out.writeString(severity) + out.writeCollection(actions) + out.writeEnum(conditionType) + + out.writeBoolean(numResultsCondition != null) + numResultsCondition?.let { out.writeEnum(numResultsCondition) } + + out.writeOptionalLong(numResultsValue) + out.writeOptionalString(customCondition) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + builder.startObject(PPL_SQL_TRIGGER_FIELD) + builder.field(ID_FIELD, id) + builder.field(NAME_FIELD, name) + builder.field(SEVERITY_FIELD, severity) + builder.field(ACTIONS_FIELD, actions.toTypedArray()) + builder.field(CONDITION_TYPE_FIELD, conditionType.value) + numResultsCondition?.let { builder.field(NUM_RESULTS_CONDITION_FIELD, numResultsCondition.value) } + numResultsValue?.let { builder.field(NUM_RESULTS_VALUE_FIELD, numResultsValue) } + customCondition?.let { builder.field(CUSTOM_CONDITION_FIELD, customCondition) } + builder.endObject() + builder.endObject() + return builder + } + + override fun asTemplateArg(): Map { + val templateArg = mutableMapOf( + ID_FIELD to id, + NAME_FIELD to name, + SEVERITY_FIELD to severity, + ACTIONS_FIELD to actions.map { it.asTemplateArg() }, + CONDITION_TYPE_FIELD to conditionType.value + ) + + if (conditionType == ConditionType.NUMBER_OF_RESULTS) { + templateArg[NUM_RESULTS_CONDITION_FIELD] = numResultsCondition!!.value + templateArg[NUM_RESULTS_VALUE_FIELD] = numResultsValue!! + } else { + templateArg[CUSTOM_CONDITION_FIELD] = customCondition!! + } + + return templateArg + } + + override fun name(): String { + return PPL_SQL_TRIGGER_FIELD + } + + enum class ConditionType(val value: String) { + NUMBER_OF_RESULTS("number_of_results"), + CUSTOM("custom"); + + companion object { + fun enumFromString(value: String): ConditionType? = entries.firstOrNull { it.value == value } + } + } + + enum class NumResultsCondition(val value: String) { + GREATER_THAN(">"), + GREATER_THAN_EQUAL(">="), + LESS_THAN("<"), + LESS_THAN_EQUAL("<="), + EQUAL("=="), + NOT_EQUAL("!="); + + companion object { + fun enumFromString(value: String): NumResultsCondition? = entries.firstOrNull { it.value == value } + } + } + + companion object { + // trigger wrapper object field name + const val PPL_SQL_TRIGGER_FIELD = "ppl_trigger" + + // field names + const val CONDITION_TYPE_FIELD = "type" + const val NUM_RESULTS_CONDITION_FIELD = "num_results_condition" + const val NUM_RESULTS_VALUE_FIELD = "num_results_value" + const val CUSTOM_CONDITION_FIELD = "custom_condition" + + // hard, nonadjustable limits + const val NOTIFICATIONS_ID_MAX_LENGTH = 512 // length limit for notifications channel custom ID at channel creation time + + // regular expression for validating that a string contains + // only valid chars (letters, numbers, -, _) + private val validCharsRegex = """^[a-zA-Z0-9_-]+$""".toRegex() + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + Trigger::class.java, + ParseField(PPL_SQL_TRIGGER_FIELD), + CheckedFunction { parseInner(it) } + ) + + @JvmStatic + @Throws(IOException::class) + fun parseInner(xcp: XContentParser): PPLSQLTrigger { + var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified + var name: String? = null + var severity: String? = null + val actions: MutableList = mutableListOf() + var conditionType: ConditionType? = null + var numResultsCondition: NumResultsCondition? = null + var numResultsValue: Long? = null + var customCondition: String? = null + + /* parse */ + if (xcp.currentToken() != XContentParser.Token.START_OBJECT && xcp.currentToken() != XContentParser.Token.FIELD_NAME) { + XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.tokenLocation) + } + + // If the parser began on START_OBJECT, move to the next token so that the while loop enters on + // the fieldName (or END_OBJECT if it's empty). + if (xcp.currentToken() == XContentParser.Token.START_OBJECT) xcp.nextToken() + + while (xcp.currentToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + ID_FIELD -> id = xcp.text() + NAME_FIELD -> name = xcp.text() + SEVERITY_FIELD -> severity = xcp.text() + CONDITION_TYPE_FIELD -> { + val input = xcp.text() + val enumMatchResult = ConditionType.enumFromString(input) + ?: throw IllegalArgumentException( + "Invalid value for $CONDITION_TYPE_FIELD: $input. " + + "Supported values are ${ConditionType.entries.map { it.value }}" + ) + conditionType = enumMatchResult + } + NUM_RESULTS_CONDITION_FIELD -> { + if (xcp.currentToken() != XContentParser.Token.VALUE_NULL) { + val input = xcp.text() + val enumMatchResult = NumResultsCondition.enumFromString(input) + ?: throw IllegalArgumentException( + "Invalid value for $NUM_RESULTS_CONDITION_FIELD: $input. " + + "Supported values are ${NumResultsCondition.entries.map { it.value }}" + ) + numResultsCondition = enumMatchResult + } + } + NUM_RESULTS_VALUE_FIELD -> { + if (xcp.currentToken() != XContentParser.Token.VALUE_NULL) { + numResultsValue = xcp.longValue() + } + } + CUSTOM_CONDITION_FIELD -> { + if (xcp.currentToken() != XContentParser.Token.VALUE_NULL) { + customCondition = xcp.text() + } + } + ACTIONS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + actions.add(Action.parse(xcp)) + } + } + else -> throw IllegalArgumentException("Unexpected field when parsing PPL Trigger: $fieldName") + } + + xcp.nextToken() + } + + /* validations */ + requireNotNull(name) { "Trigger name must be included" } + requireNotNull(severity) { "Trigger severity must be included" } + requireNotNull(conditionType) { "Trigger condition type must be included" } + + // 3. prepare and return PPLTrigger object + return PPLSQLTrigger( + id, + name, + severity, + actions, + conditionType, + numResultsCondition, + numResultsValue, + customCondition + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): PPLSQLTrigger { + return PPLSQLTrigger(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTriggerRunResult.kt new file mode 100644 index 00000000..0a6bdf16 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTriggerRunResult.kt @@ -0,0 +1,42 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +data class PPLSQLTriggerRunResult( + override var triggerName: String, + override var error: Exception?, + var triggered: Boolean +) : TriggerRunResult(triggerName, error) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggered = sin.readBoolean() + ) + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.field(TRIGGERED_FIELD, triggered) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeBoolean(triggered) + } + + companion object { + const val TRIGGERED_FIELD = "triggered" + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return PPLSQLTriggerRunResult(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTrigger.kt index a88ef9b6..3713b1d5 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTrigger.kt @@ -55,7 +55,7 @@ data class QueryLevelTrigger( } /** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */ - fun asTemplateArg(): Map { + override fun asTemplateArg(): Map { return mapOf( ID_FIELD to id, NAME_FIELD to name, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt index 7cfb9f41..ffcad15c 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt @@ -16,7 +16,8 @@ interface Trigger : BaseModel { BUCKET_LEVEL_TRIGGER(BucketLevelTrigger.BUCKET_LEVEL_TRIGGER_FIELD), NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD), CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD), - REMOTE_MONITOR_TRIGGER(RemoteMonitorTrigger.REMOTE_MONITOR_TRIGGER_FIELD); + REMOTE_MONITOR_TRIGGER(RemoteMonitorTrigger.REMOTE_MONITOR_TRIGGER_FIELD), + PPL_SQL_TRIGGER(PPLSQLTrigger.PPL_SQL_TRIGGER_FIELD); override fun toString(): String { return value @@ -78,4 +79,6 @@ interface Trigger : BaseModel { val actions: List fun name(): String + + fun asTemplateArg(): Map } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorTrigger.kt index 0e89e5ba..adb3613c 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorTrigger.kt @@ -32,7 +32,7 @@ data class RemoteMonitorTrigger( sin.readBytesReference() ) - fun asTemplateArg(): Map { + override fun asTemplateArg(): Map { val bytes = trigger.toBytesRef().bytes return mapOf( Trigger.ID_FIELD to id, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt index efc8a364..f3c2bd0e 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt @@ -50,6 +50,10 @@ fun Monitor.isBucketLevelMonitor(): Boolean = isMonitorOfStandardType() && Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.BUCKET_LEVEL_MONITOR +fun Monitor.isPplSqlMonitor(): Boolean = + isMonitorOfStandardType() && + Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.PPL_MONITOR + fun XContentBuilder.optionalUserField(name: String, user: User?): XContentBuilder { if (user == null) { return nullField(name) @@ -72,6 +76,10 @@ fun XContentBuilder.optionalTimeField(name: String, instant: Instant?): XContent return this.timeField(name, "${name}_in_millis", instant.toEpochMilli()) } +fun XContentBuilder.nonOptionalTimeField(name: String, instant: Instant): XContentBuilder { + return this.timeField(name, "${name}_in_millis", instant.toEpochMilli()) +} + fun XContentParser.instant(): Instant? { return when { currentToken() == XContentParser.Token.VALUE_NULL -> null diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 8177057d..7d629966 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -557,13 +557,20 @@ fun randomAlert(monitor: Monitor = randomQueryLevelMonitor()): Alert { val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult()) val clusterCount = (-1..5).random() val clusters = if (clusterCount == -1) null else (0..clusterCount).map { "index-$it" } + val pplQuery = "source=logs | where status=200" + val pplQueryResults = listOf( + mapOf("k1" to "v1", "num" to 42, "user" to mapOf("name" to "bob", "age" to 32), "vals" to listOf(1, 2, 3)), + mapOf("k1" to "v2", "num" to 17, "user" to mapOf("name" to "ana", "age" to 45), "vals" to listOf("a", "b", "c")) + ) return Alert( monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, actionExecutionResults = actionExecutionResults, - clusters = clusters + clusters = clusters, + pplQuery = pplQuery, + pplQueryResults = pplQueryResults ) } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt index 6aecb888..31a241b0 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt @@ -13,6 +13,7 @@ import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger import org.opensearch.commons.alerting.randomAction import org.opensearch.commons.alerting.randomActionExecutionPolicy +import org.opensearch.commons.alerting.randomAlert import org.opensearch.commons.alerting.randomBucketLevelMonitorRunResult import org.opensearch.commons.alerting.randomBucketLevelTrigger import org.opensearch.commons.alerting.randomBucketLevelTriggerRunResult @@ -43,6 +44,16 @@ import kotlin.test.assertTrue class WriteableTests { + @Test + fun `test alert as stream`() { + val alert = randomAlert() + val out = BytesStreamOutput() + alert.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newAlert = Alert(sin) + Assertions.assertEquals(alert, newAlert, "Round tripping Alert doesn't work") + } + @Test fun `test throttle as stream`() { val throttle = randomThrottle() diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index d0072ec9..28ed7553 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -69,6 +69,7 @@ class XContentTests { Assertions.assertEquals(action, parsedAction, "Round tripping Action doesn't work") } + @Test fun `test action parsing with throttled enabled and null throttle`() { val action = randomAction().copy(throttle = null).copy(throttleEnabled = true) val actionString = action.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() From f1a2067e70230c949955407861e6e9373ebac576 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Wed, 8 Apr 2026 14:08:36 -0700 Subject: [PATCH 02/11] post load test optimization Signed-off-by: Dennis Toepker --- .../alerting/model/MonitorRunResult.kt | 19 ++++++++++++-- .../model/QueryLevelTriggerRunResult.kt | 8 ++++-- .../commons/alerting/TestHelpers.kt | 25 +++++++++++++++++-- 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt index d403313b..fafba50c 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt @@ -92,12 +92,16 @@ data class MonitorRunResult( data class InputRunResults( val results: List> = listOf(), val error: Exception? = null, - val aggTriggersAfterKey: MutableMap? = null + val aggTriggersAfterKey: MutableMap? = null, + val pplBaseQueryResults: List> = listOf(), + val pplBaseQueryNumResults: Long? = null ) : Writeable, ToXContent { override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { return builder.startObject() .field("results", results) + .field("ppl_query_results", pplBaseQueryResults) + .field("ppl_num_results", pplBaseQueryNumResults) .field("error", error?.message) .endObject() } @@ -108,6 +112,11 @@ data class InputRunResults( for (map in results) { out.writeMap(map) } + out.writeVInt(pplBaseQueryResults.size) + for (datarow in pplBaseQueryResults) { + out.writeMap(datarow) + } + out.writeOptionalLong(pplBaseQueryNumResults) out.writeException(error) } @@ -120,8 +129,14 @@ data class InputRunResults( for (i in 0 until count) { list.add(suppressWarning(sin.readMap())) // result(map) } + val pplSqlCount = sin.readVInt() // count + val pplSqlList = mutableListOf>() + for (i in 0 until pplSqlCount) { + pplSqlList.add(suppressWarning(sin.readMap())) // result(map) + } + val pplNumResults = sin.readOptionalLong() val error = sin.readException() // error - return InputRunResults(list, error) + return InputRunResults(list, error, null, pplSqlList, pplNumResults) } @Suppress("UNCHECKED_CAST") diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt index 101d0067..c2821890 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt @@ -18,7 +18,8 @@ open class QueryLevelTriggerRunResult( override var triggerName: String, open var triggered: Boolean, override var error: Exception?, - open var actionResults: MutableMap = mutableMapOf() + open var actionResults: MutableMap = mutableMapOf(), + open var pplCustomQueryResults: List> = listOf() ) : TriggerRunResult(triggerName, error) { @Throws(IOException::class) @@ -27,7 +28,8 @@ open class QueryLevelTriggerRunResult( triggerName = sin.readString(), error = sin.readException(), triggered = sin.readBoolean(), - actionResults = sin.readMap() as MutableMap + actionResults = sin.readMap() as MutableMap, + pplCustomQueryResults = sin.readList { it.readMap() } ) override fun alertError(): AlertError? { @@ -47,6 +49,7 @@ open class QueryLevelTriggerRunResult( return builder .field("triggered", triggered) .field("action_results", actionResults as Map) + .field("ppl_query_results", pplCustomQueryResults) } @Throws(IOException::class) @@ -54,6 +57,7 @@ open class QueryLevelTriggerRunResult( super.writeTo(out) out.writeBoolean(triggered) out.writeMap(actionResults as Map) + out.writeCollection(pplCustomQueryResults) { stream, map -> stream.writeMap(map) } } companion object { diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 7d629966..18879526 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -688,7 +688,7 @@ fun createCorrelationAlertTemplateArgs(correlationAlert: CorrelationAlert): Map< } fun randomInputRunResults(): InputRunResults { - return InputRunResults(listOf(), null) + return InputRunResults(listOf(), null, null, listOf(), 5L) } fun randomActionRunResult(): ActionRunResult { @@ -781,7 +781,28 @@ fun randomQueryLevelTriggerRunResult(): QueryLevelTriggerRunResult { val map = mutableMapOf() map.plus(Pair("key1", randomActionRunResult())) map.plus(Pair("key2", randomActionRunResult())) - return QueryLevelTriggerRunResult("trigger-name", true, null, map) + + val queryResultsList = mutableListOf>() + queryResultsList.add( + mapOf( + "key1" to "val1", + "key2" to 4 + ) + ) + queryResultsList.add( + mapOf( + "key3" to listOf(1, 2, 3), + "key4" to mapOf("nested-key" to "nested-val") + ) + ) + + return QueryLevelTriggerRunResult( + "trigger-name", + true, + null, + map, + queryResultsList + ) } fun randomQueryLevelMonitorRunResult(): MonitorRunResult { From 83afb2ba283a59eaaf553be6ce0bcd479bad327b Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Fri, 10 Apr 2026 00:26:28 -0700 Subject: [PATCH 03/11] adding constructor for bwc Signed-off-by: Dennis Toepker --- .../commons/alerting/model/MonitorRunResult.kt | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt index fafba50c..2ceaa135 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt @@ -97,6 +97,18 @@ data class InputRunResults( val pplBaseQueryNumResults: Long? = null ) : Writeable, ToXContent { + constructor( + results: List> = listOf(), + error: Exception? = null, + aggTriggersAfterKey: MutableMap? = null + ) : this( + results = results, + error = error, + aggTriggersAfterKey = aggTriggersAfterKey, + pplBaseQueryResults = listOf(), + pplBaseQueryNumResults = null + ) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { return builder.startObject() .field("results", results) From ca65c870a9b74c4f1874e01fcc2eade8773761ef Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Thu, 16 Apr 2026 14:28:36 -0700 Subject: [PATCH 04/11] extra tests and cleanup Signed-off-by: Dennis Toepker --- .../alerting/action/GetAlertsResponse.kt | 9 +-- .../commons/alerting/model/Alert.kt | 1 + .../commons/alerting/model/Monitor.kt | 6 +- .../commons/alerting/model/PPLSQLTrigger.kt | 10 ++-- .../alerting/model/PPLSQLTriggerRunResult.kt | 42 ------------- .../commons/alerting/model/Trigger.kt | 1 + .../commons/alerting/util/IndexUtils.kt | 4 -- .../commons/alerting/TestHelpers.kt | 60 ++++++++++++++++++- .../commons/alerting/model/WriteableTests.kt | 32 ++++++++++ .../commons/alerting/model/XContentTests.kt | 43 +++++++++++++ 10 files changed, 146 insertions(+), 62 deletions(-) delete mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTriggerRunResult.kt diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetAlertsResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetAlertsResponse.kt index 4570d7cf..50a4ec53 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/GetAlertsResponse.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetAlertsResponse.kt @@ -39,14 +39,9 @@ class GetAlertsResponse : BaseResponse { @Throws(IOException::class) override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder.startObject() - .field(ALERTS_FIELD, alerts) - .field(TOTAL_ALERTS_FIELD, totalAlerts) + .field("alerts", alerts) + .field("totalAlerts", totalAlerts) return builder.endObject() } - - companion object { - const val ALERTS_FIELD = "alerts" - const val TOTAL_ALERTS_FIELD = "totalAlerts" - } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt index a064f06d..b345e7df 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt @@ -50,6 +50,7 @@ data class Alert( val pplQueryResults: List> = listOf(), val target: Target? = null ) : Writeable, ToXContent { + init { if (errorMessage != null) { require(state == State.DELETED || state == State.ERROR || state == State.AUDIT) { diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt index e036d20e..eabd4f07 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt @@ -103,8 +103,8 @@ data class Monitor( // this is a new check for PPL Alerting specifically, PPL Monitors will enforce // a max name length, but it won't be enforced on other Monitor types to avoid // adding a breaking change - require(this.name.length <= ALERTING_V2_MAX_NAME_LENGTH) { - "Monitor name too long, length must be at most $ALERTING_V2_MAX_NAME_LENGTH." + require(this.name.length <= ALERTING_MAX_NAME_LENGTH) { + "Monitor name too long, length must be at most $ALERTING_MAX_NAME_LENGTH." } // this is a new check for PPL Alerting specifically, other Monitor types allow @@ -328,7 +328,7 @@ data class Monitor( val MONITOR_TYPE_PATTERN = Pattern.compile("[a-zA-Z0-9_]{5,25}") // hard, nonadjustable limits for PPL Alerting - const val ALERTING_V2_MAX_NAME_LENGTH = 30 // max length of any name for monitors, triggers, notif actions, etc + const val ALERTING_MAX_NAME_LENGTH = 30 // max length of any name for monitors, triggers, notif actions, etc const val UUID_LENGTH = 20 // the length of a UUID generated by UUIDs.base64UUID() // This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTrigger.kt index 3c661f1e..ebbd38da 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTrigger.kt @@ -7,7 +7,7 @@ package org.opensearch.commons.alerting.model import org.opensearch.common.CheckedFunction import org.opensearch.common.UUIDs -import org.opensearch.commons.alerting.model.Monitor.Companion.ALERTING_V2_MAX_NAME_LENGTH +import org.opensearch.commons.alerting.model.Monitor.Companion.ALERTING_MAX_NAME_LENGTH import org.opensearch.commons.alerting.model.Monitor.Companion.UUID_LENGTH import org.opensearch.commons.alerting.model.Trigger.Companion.ACTIONS_FIELD import org.opensearch.commons.alerting.model.Trigger.Companion.ID_FIELD @@ -67,13 +67,13 @@ data class PPLSQLTrigger( "Trigger ID too long, length must be less than $UUID_LENGTH." } - require(this.name.length <= ALERTING_V2_MAX_NAME_LENGTH) { - "Trigger name too long, length must be less than $ALERTING_V2_MAX_NAME_LENGTH." + require(this.name.length <= ALERTING_MAX_NAME_LENGTH) { + "Trigger name too long, length must be less than $ALERTING_MAX_NAME_LENGTH." } this.actions.forEach { - require(it.name.length <= ALERTING_V2_MAX_NAME_LENGTH) { - "Name of action with ID ${it.id} too long, length must be less than $ALERTING_V2_MAX_NAME_LENGTH." + require(it.name.length <= ALERTING_MAX_NAME_LENGTH) { + "Name of action with ID ${it.id} too long, length must be less than $ALERTING_MAX_NAME_LENGTH." } require(it.destinationId.length <= NOTIFICATIONS_ID_MAX_LENGTH) { "Channel ID of action with ID ${it.id} too long, length must be less than $NOTIFICATIONS_ID_MAX_LENGTH." diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTriggerRunResult.kt deleted file mode 100644 index 0a6bdf16..00000000 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTriggerRunResult.kt +++ /dev/null @@ -1,42 +0,0 @@ -package org.opensearch.commons.alerting.model - -import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.core.common.io.stream.StreamOutput -import org.opensearch.core.xcontent.ToXContent -import org.opensearch.core.xcontent.XContentBuilder -import java.io.IOException - -data class PPLSQLTriggerRunResult( - override var triggerName: String, - override var error: Exception?, - var triggered: Boolean -) : TriggerRunResult(triggerName, error) { - - @Throws(IOException::class) - @Suppress("UNCHECKED_CAST") - constructor(sin: StreamInput) : this( - triggerName = sin.readString(), - error = sin.readException(), - triggered = sin.readBoolean() - ) - - override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - return builder.field(TRIGGERED_FIELD, triggered) - } - - @Throws(IOException::class) - override fun writeTo(out: StreamOutput) { - super.writeTo(out) - out.writeBoolean(triggered) - } - - companion object { - const val TRIGGERED_FIELD = "triggered" - - @JvmStatic - @Throws(IOException::class) - fun readFrom(sin: StreamInput): TriggerRunResult { - return PPLSQLTriggerRunResult(sin) - } - } -} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt index ffcad15c..e6670d42 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt @@ -59,6 +59,7 @@ interface Trigger : BaseModel { Type.DOCUMENT_LEVEL_TRIGGER -> DocumentLevelTrigger(sin) Type.CHAINED_ALERT_TRIGGER -> ChainedAlertTrigger(sin) Type.REMOTE_MONITOR_TRIGGER -> RemoteMonitorTrigger(sin) + Type.PPL_SQL_TRIGGER -> PPLSQLTrigger(sin) // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns // enum can be null in Java else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") diff --git a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt index f3c2bd0e..812ba514 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt @@ -76,10 +76,6 @@ fun XContentBuilder.optionalTimeField(name: String, instant: Instant?): XContent return this.timeField(name, "${name}_in_millis", instant.toEpochMilli()) } -fun XContentBuilder.nonOptionalTimeField(name: String, instant: Instant): XContentBuilder { - return this.timeField(name, "${name}_in_millis", instant.toEpochMilli()) -} - fun XContentParser.instant(): Instant? { return when { currentToken() == XContentParser.Token.VALUE_NULL -> null diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 18879526..9a067a5b 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -41,6 +41,8 @@ import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.MonitorRunResult import org.opensearch.commons.alerting.model.NoOpTrigger +import org.opensearch.commons.alerting.model.PPLSQLInput +import org.opensearch.commons.alerting.model.PPLSQLTrigger import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult import org.opensearch.commons.alerting.model.Schedule @@ -174,6 +176,36 @@ fun randomDocumentLevelMonitor( ) } +fun randomPPLSQLMonitor( + name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + user: User = randomUser(), + inputs: List = listOf( + PPLSQLInput( + query = "source=logs | where status > 400", + queryLanguage = PPLSQLInput.QueryLanguage.PPL + ) + ), + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = Random().nextBoolean(), + triggers: List = (1..RandomNumbers.randomIntBetween(Random(), 0, 10)).map { randomPPLSQLTrigger() }, + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + withMetadata: Boolean = false +): Monitor { + return Monitor( + name = name, + monitorType = Monitor.MonitorType.PPL_MONITOR.value, + enabled = enabled, + inputs = inputs, + schedule = schedule, + triggers = triggers, + enabledTime = enabledTime, + lastUpdateTime = lastUpdateTime, + user = user, + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() + ) +} + fun randomWorkflow( name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), user: User? = randomUser(), @@ -325,6 +357,30 @@ fun randomChainedAlertTrigger( ) } +fun randomPPLSQLTrigger( + id: String = UUIDs.base64UUID(), + name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + severity: String = "1", + actions: List = mutableListOf(), + conditionType: PPLSQLTrigger.ConditionType = PPLSQLTrigger.ConditionType.NUMBER_OF_RESULTS, + numResultsCondition: PPLSQLTrigger.NumResultsCondition? = PPLSQLTrigger.NumResultsCondition.GREATER_THAN, + numResultsValue: Long = 0, + customCondition: String? = null +): PPLSQLTrigger { + return PPLSQLTrigger( + id = id, + name = name, + severity = severity, + actions = actions.ifEmpty { + (0..RandomNumbers.randomIntBetween(Random(), 0, 10)).map { randomAction(destinationId = "fake-channel-id") } + }, + conditionType = conditionType, + numResultsCondition = numResultsCondition, + numResultsValue = numResultsValue, + customCondition = customCondition + ) +} + fun randomBucketSelectorExtAggregationBuilder( name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), bucketsPathsMap: MutableMap = mutableMapOf("avg" to "10"), @@ -533,12 +589,14 @@ fun xContentRegistry(): NamedXContentRegistry { listOf( SearchInput.XCONTENT_REGISTRY, DocLevelMonitorInput.XCONTENT_REGISTRY, + PPLSQLInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY, DocumentLevelTrigger.XCONTENT_REGISTRY, ChainedAlertTrigger.XCONTENT_REGISTRY, NoOpTrigger.XCONTENT_REGISTRY, - RemoteMonitorTrigger.XCONTENT_REGISTRY + RemoteMonitorTrigger.XCONTENT_REGISTRY, + PPLSQLTrigger.XCONTENT_REGISTRY ) + SearchModule(Settings.EMPTY, emptyList()).namedXContents ) } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt index 31a241b0..5a492a70 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt @@ -22,6 +22,8 @@ import org.opensearch.commons.alerting.randomDocLevelQuery import org.opensearch.commons.alerting.randomDocumentLevelMonitorRunResult import org.opensearch.commons.alerting.randomDocumentLevelTrigger import org.opensearch.commons.alerting.randomInputRunResults +import org.opensearch.commons.alerting.randomPPLSQLMonitor +import org.opensearch.commons.alerting.randomPPLSQLTrigger import org.opensearch.commons.alerting.randomQueryLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelMonitorRunResult import org.opensearch.commons.alerting.randomQueryLevelTrigger @@ -114,6 +116,16 @@ class WriteableTests { Assertions.assertEquals(monitor, newMonitor, "Round tripping QueryLevelMonitor doesn't work") } + @Test + fun `test ppl sql monitor as stream`() { + val monitor = randomPPLSQLMonitor() + val out = BytesStreamOutput() + monitor.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newMonitor = Monitor(sin) + Assertions.assertEquals(monitor, newMonitor, "Round tripping PPLSQLMonitor doesn't work") + } + @Test fun `test workflow as stream`() { val workflow = randomWorkflow(monitorIds = listOf("1", "2", "3", "4")) @@ -154,6 +166,16 @@ class WriteableTests { Assertions.assertEquals(trigger, newTrigger, "Round tripping DocumentLevelTrigger doesn't work") } + @Test + fun `test ppl sql trigger as stream`() { + val trigger = randomPPLSQLTrigger() + val out = BytesStreamOutput() + trigger.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newTrigger = PPLSQLTrigger.readFrom(sin) + Assertions.assertEquals(trigger, newTrigger, "Round tripping PPLSQLTrigger doesn't work") + } + @Test fun `test doc-level query as stream`() { val dlq = randomDocLevelQuery() @@ -197,6 +219,16 @@ class WriteableTests { Assertions.assertEquals(input, newInput, "Round tripping MonitorRunResult doesn't work") } + @Test + fun `test ppl sql input as stream`() { + val input = PPLSQLInput("source=some-index") + val out = BytesStreamOutput() + input.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newInput = PPLSQLInput(sin) + Assertions.assertEquals(input, newInput, "Round tripping PPLSQLInput doesn't work") + } + @Test fun `test user as stream`() { val user = randomUser() diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index 28ed7553..c576362d 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -23,9 +23,12 @@ import org.opensearch.commons.alerting.randomAlert import org.opensearch.commons.alerting.randomBucketLevelMonitor import org.opensearch.commons.alerting.randomBucketLevelTrigger import org.opensearch.commons.alerting.randomDocLevelQuery +import org.opensearch.commons.alerting.randomPPLSQLMonitor +import org.opensearch.commons.alerting.randomPPLSQLTrigger import org.opensearch.commons.alerting.randomQueryLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelMonitorWithoutUser import org.opensearch.commons.alerting.randomQueryLevelTrigger +import org.opensearch.commons.alerting.randomSearchInput import org.opensearch.commons.alerting.randomThrottle import org.opensearch.commons.alerting.randomUser import org.opensearch.commons.alerting.randomUserEmpty @@ -128,6 +131,7 @@ class XContentTests { } } + @Test fun `test query-level monitor parsing`() { val monitor = randomQueryLevelMonitor() @@ -223,6 +227,15 @@ class XContentTests { Assertions.assertEquals(workflow, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work") } + @Test + fun `test ppl sql monitor parsing`() { + val monitor = randomPPLSQLMonitor() + + val monitorString = monitor.toJsonStringWithUser() + val parsedMonitor = Monitor.parse(parser(monitorString)) + Assertions.assertEquals(monitor, parsedMonitor, "Round tripping PPLSQLMonitor doesn't work") + } + @Test fun `test query-level trigger parsing`() { val trigger = randomQueryLevelTrigger() @@ -253,6 +266,16 @@ class XContentTests { Assertions.assertEquals(trigger, parsedTrigger, "Round tripping NoOpTrigger doesn't work") } + @Test + fun `test ppl sql trigger parsing`() { + val trigger = randomPPLSQLTrigger() + + val triggerString = trigger.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() + val parsedTrigger = Trigger.parse(parser(triggerString)) + + Assertions.assertEquals(trigger, parsedTrigger, "Round tripping PPLSQLTrigger doesn't work") + } + @Test fun `test creating a monitor with duplicate trigger ids fails`() { try { @@ -425,6 +448,16 @@ class XContentTests { } } + @Test + fun `test creating a ppl sql monitor with invalid trigger type fails`() { + try { + val queryLevelTrigger = randomQueryLevelTrigger() + randomPPLSQLMonitor().copy(triggers = listOf(queryLevelTrigger)) + Assertions.fail("Creating a PPL SQL monitor with query-level triggers did not fail.") + } catch (ignored: IllegalArgumentException) { + } + } + @Test fun `test creating an bucket-level monitor with invalid input fails`() { try { @@ -435,6 +468,16 @@ class XContentTests { } } + @Test + fun `test creating a ppl sql monitor with invalid input type fails`() { + try { + val searchInput = randomSearchInput() + randomPPLSQLMonitor().copy(inputs = listOf(searchInput)) + Assertions.fail("Creating a PPL SQL monitor with search input did not fail.") + } catch (ignored: IllegalArgumentException) { + } + } + @Test fun `test action execution policy`() { val actionExecutionPolicy = randomActionExecutionPolicy() From 697fb29b3873e2d217b6b813352073f3b79f9d21 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Mon, 20 Apr 2026 13:56:07 -0700 Subject: [PATCH 05/11] removing mention of SQL in names, generalizing query and queryResults fields in Alert, add extra serde tests Signed-off-by: Dennis Toepker --- .../commons/alerting/model/Alert.kt | 89 +++++++++++-------- .../commons/alerting/model/Input.kt | 8 +- .../commons/alerting/model/Monitor.kt | 18 ++-- .../alerting/model/MonitorRunResult.kt | 34 ++++--- .../model/{PPLSQLInput.kt => PPLInput.kt} | 22 ++--- .../model/{PPLSQLTrigger.kt => PPLTrigger.kt} | 22 ++--- .../model/QueryLevelTriggerRunResult.kt | 11 ++- .../commons/alerting/model/Trigger.kt | 4 +- .../commons/alerting/util/IndexUtils.kt | 2 +- .../commons/alerting/TestHelpers.kt | 79 ++++++++++++---- .../commons/alerting/model/WriteableTests.kt | 63 ++++++++++--- .../commons/alerting/model/XContentTests.kt | 28 +++--- 12 files changed, 249 insertions(+), 131 deletions(-) rename src/main/kotlin/org/opensearch/commons/alerting/model/{PPLSQLInput.kt => PPLInput.kt} (87%) rename src/main/kotlin/org/opensearch/commons/alerting/model/{PPLSQLTrigger.kt => PPLTrigger.kt} (96%) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt index b345e7df..f0fa69c9 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt @@ -46,8 +46,11 @@ data class Alert( val executionId: String? = null, val associatedAlertIds: List, val clusters: List? = null, - val pplQuery: String? = null, - val pplQueryResults: List> = listOf(), + // these fields are specifically used for PPL Monitors + // for now, Query-Level Monitors can support including + // query results in Alerts by using these fields + val query: String? = null, + val queryResults: List> = listOf(), val target: Target? = null ) : Writeable, ToXContent { @@ -113,8 +116,8 @@ data class Alert( executionId = executionId, associatedAlertIds = associatedAlertIds, clusters = clusters, - pplQuery = null, - pplQueryResults = listOf() + query = null, + queryResults = listOf() ) constructor( @@ -151,8 +154,8 @@ data class Alert( workflowName = workflow.name, associatedAlertIds = associatedAlertIds, clusters = clusters, - pplQuery = null, - pplQueryResults = listOf() + query = null, + queryResults = listOf() ) // constructor for Alerts from QueryLevelMonitorRunner @@ -170,8 +173,8 @@ data class Alert( executionId: String? = null, workflowId: String? = null, clusters: List? = null, - pplQuery: String? = null, - pplQueryResults: List> = listOf() + query: String? = null, + queryResults: List> = listOf() ) : this( monitorId = monitor.id, monitorName = monitor.name, @@ -195,8 +198,8 @@ data class Alert( workflowName = "", associatedAlertIds = emptyList(), clusters = clusters, - pplQuery = pplQuery, - pplQueryResults = pplQueryResults, + query = query, + queryResults = queryResults, target = monitor.target ) @@ -237,8 +240,8 @@ data class Alert( workflowName = "", associatedAlertIds = emptyList(), clusters = clusters, - pplQuery = null, - pplQueryResults = listOf(), + query = null, + queryResults = listOf(), target = monitor.target ) @@ -280,8 +283,8 @@ data class Alert( workflowName = "", associatedAlertIds = emptyList(), clusters = clusters, - pplQuery = null, - pplQueryResults = listOf(), + query = null, + queryResults = listOf(), target = monitor.target ) @@ -325,8 +328,8 @@ data class Alert( workflowName = "", associatedAlertIds = emptyList(), clusters = clusters, - pplQuery = null, - pplQueryResults = listOf(), + query = null, + queryResults = listOf(), target = monitor.target ) @@ -367,8 +370,8 @@ data class Alert( executionId = executionId, associatedAlertIds = emptyList(), clusters = clusters, - pplQuery = null, - pplQueryResults = listOf(), + query = null, + queryResults = listOf(), target = monitor.target ) @@ -414,8 +417,16 @@ data class Alert( executionId = sin.readOptionalString(), associatedAlertIds = sin.readStringList(), clusters = sin.readOptionalStringList(), - pplQuery = sin.readOptionalString(), - pplQueryResults = sin.readList { input -> suppressWarning(input.readMap()) }, + query = if (sin.version.onOrAfter(Version.V_3_7_0)) { + sin.readOptionalString() + } else { + null + }, + queryResults = if (sin.version.onOrAfter(Version.V_3_7_0)) { + sin.readList { input -> suppressWarning(input.readMap()) } + } else { + listOf() + }, target = if (sin.version.onOrAfter(Version.V_3_6_0)) { if (sin.readBoolean()) Target(sin) else null } else { @@ -459,9 +470,11 @@ data class Alert( out.writeOptionalString(executionId) out.writeStringCollection(associatedAlertIds) out.writeOptionalStringArray(clusters?.toTypedArray()) - out.writeOptionalString(pplQuery) - out.writeCollection(pplQueryResults) { output, map -> - output.writeMap(map) + if (out.version.onOrAfter(Version.V_3_7_0)) { + out.writeOptionalString(query) + out.writeCollection(queryResults) { output, map -> + output.writeMap(map) + } } if (out.version.onOrAfter(Version.V_3_6_0)) { out.writeBoolean(target != null) @@ -498,8 +511,8 @@ data class Alert( const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH const val CLUSTERS_FIELD = "clusters" - const val PPL_SQL_QUERY_FIELD = "ppl_query" - const val PPL_SQL_QUERY_RESULTS_FIELD = "ppl_query_results" + const val QUERY_FIELD = "query" + const val QUERY_RESULTS_FIELD = "query_results" const val TARGET_FIELD = "target" const val NO_ID = "" const val NO_VERSION = Versions.NOT_FOUND @@ -532,8 +545,8 @@ data class Alert( var aggAlertBucket: AggregationResultBucket? = null val associatedAlertIds = mutableListOf() val clusters = mutableListOf() - var pplQuery: String? = null - var pplQueryResults: List> = listOf() + var query: String? = null + var queryResults: List> = listOf() var target: Target? = null ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -610,18 +623,18 @@ data class Alert( clusters.add(xcp.text()) } } - PPL_SQL_QUERY_FIELD -> { + QUERY_FIELD -> { if (xcp.currentToken() != XContentParser.Token.VALUE_NULL) { - pplQuery = xcp.text() + query = xcp.text() } } - PPL_SQL_QUERY_RESULTS_FIELD -> { + QUERY_RESULTS_FIELD -> { ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) val resultsList = mutableListOf>() while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { resultsList.add(xcp.map()) } - pplQueryResults = resultsList + queryResults = resultsList } TARGET_FIELD -> target = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { null @@ -658,8 +671,8 @@ data class Alert( workflowName = workflowName, associatedAlertIds = associatedAlertIds, clusters = if (clusters.size > 0) clusters else null, - pplQuery = pplQuery, - pplQueryResults = pplQueryResults, + query = query, + queryResults = queryResults, target = target ) } @@ -714,9 +727,9 @@ data class Alert( if (!clusters.isNullOrEmpty()) builder.field(CLUSTERS_FIELD, clusters.toTypedArray()) if (target != null) builder.field(TARGET_FIELD, target) - if (!pplQuery.isNullOrEmpty()) builder.field(PPL_SQL_QUERY_FIELD, pplQuery) - if (pplQueryResults.isNotEmpty()) { - builder.field(PPL_SQL_QUERY_RESULTS_FIELD, pplQueryResults.toTypedArray()) + if (!query.isNullOrEmpty()) builder.field(QUERY_FIELD, query) + if (queryResults.isNotEmpty()) { + builder.field(QUERY_RESULTS_FIELD, queryResults.toTypedArray()) } builder.endObject() @@ -744,8 +757,8 @@ data class Alert( FINDING_IDS to findingIds.joinToString(","), RELATED_DOC_IDS to relatedDocIds.joinToString(","), CLUSTERS_FIELD to clusters?.joinToString(","), - PPL_SQL_QUERY_FIELD to pplQuery, - PPL_SQL_QUERY_RESULTS_FIELD to pplQueryResults + QUERY_FIELD to query, + QUERY_RESULTS_FIELD to queryResults ) } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt index a47376b9..ab62e479 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt @@ -2,7 +2,7 @@ package org.opensearch.commons.alerting.model import org.opensearch.commons.alerting.model.ClusterMetricsInput.Companion.URI_FIELD import org.opensearch.commons.alerting.model.DocLevelMonitorInput.Companion.DOC_LEVEL_INPUT_FIELD -import org.opensearch.commons.alerting.model.PPLSQLInput.Companion.PPL_SQL_INPUT_FIELD +import org.opensearch.commons.alerting.model.PPLInput.Companion.PPL_INPUT_FIELD import org.opensearch.commons.alerting.model.SearchInput.Companion.SEARCH_FIELD import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput.Companion.REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD @@ -22,7 +22,7 @@ interface Input : BaseModel { SEARCH_INPUT(SEARCH_FIELD), REMOTE_MONITOR_INPUT(REMOTE_MONITOR_INPUT_FIELD), REMOTE_DOC_LEVEL_MONITOR_INPUT(REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD), - PPL_SQL_INPUT(PPL_SQL_INPUT_FIELD); + PPL_INPUT(PPL_INPUT_FIELD); override fun toString(): String { return value @@ -47,7 +47,7 @@ interface Input : BaseModel { } else if (xcp.currentName() == Type.REMOTE_DOC_LEVEL_MONITOR_INPUT.value) { RemoteDocLevelMonitorInput.parse(xcp) } else { - PPLSQLInput.parseInner(xcp) + PPLInput.parseInner(xcp) } XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) return input @@ -62,7 +62,7 @@ interface Input : BaseModel { Type.SEARCH_INPUT -> SearchInput(sin) Type.REMOTE_MONITOR_INPUT -> RemoteMonitorInput(sin) Type.REMOTE_DOC_LEVEL_MONITOR_INPUT -> RemoteDocLevelMonitorInput(sin) - Type.PPL_SQL_INPUT -> PPLSQLInput(sin) + Type.PPL_INPUT -> PPLInput(sin) // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns // enum can be null in Java else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt index eabd4f07..9fbcc942 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt @@ -10,7 +10,7 @@ import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION import org.opensearch.commons.alerting.util.IndexUtils.Companion.supportedClusterMetricsSettings import org.opensearch.commons.alerting.util.instant import org.opensearch.commons.alerting.util.isBucketLevelMonitor -import org.opensearch.commons.alerting.util.isPplSqlMonitor +import org.opensearch.commons.alerting.util.isPPLMonitor import org.opensearch.commons.alerting.util.optionalTimeField import org.opensearch.commons.alerting.util.optionalUserField import org.opensearch.commons.authuser.User @@ -71,7 +71,7 @@ data class Monitor( MonitorType.DOC_LEVEL_MONITOR.value -> require(trigger is DocumentLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } MonitorType.PPL_MONITOR.value -> - require(trigger is PPLSQLTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } + require(trigger is PPLTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } } } if (enabled) { @@ -91,14 +91,14 @@ data class Monitor( } } - if (this.isPplSqlMonitor()) { + if (this.isPPLMonitor()) { require(inputs.size == 1) { "Exactly 1 PPL query must be specified for PPL Monitor" } - val pplSqlInput = inputs[0] + val pplInput = inputs[0] - require(pplSqlInput is PPLSQLInput) { "Unsupported input [${pplSqlInput.name()}] for PPL Monitor" } + require(pplInput is PPLInput) { "Unsupported input [${pplInput.name()}] for PPL Monitor" } - require(pplSqlInput.queryLanguage == PPLSQLInput.QueryLanguage.PPL) { "SQL queries are not supported. Please use a PPL query." } + require(pplInput.queryLanguage == PPLInput.QueryLanguage.PPL) { "SQL queries are not supported. Please use a PPL query." } // this is a new check for PPL Alerting specifically, PPL Monitors will enforce // a max name length, but it won't be enforced on other Monitor types to avoid @@ -260,8 +260,8 @@ data class Monitor( out.writeEnum(Input.Type.SEARCH_INPUT) } else if (it is DocLevelMonitorInput) { out.writeEnum(Input.Type.DOCUMENT_LEVEL_INPUT) - } else if (it is PPLSQLInput) { - out.writeEnum(Input.Type.PPL_SQL_INPUT) + } else if (it is PPLInput) { + out.writeEnum(Input.Type.PPL_INPUT) } else { out.writeEnum(Input.Type.REMOTE_DOC_LEVEL_MONITOR_INPUT) } @@ -274,7 +274,7 @@ data class Monitor( is BucketLevelTrigger -> out.writeEnum(Trigger.Type.BUCKET_LEVEL_TRIGGER) is DocumentLevelTrigger -> out.writeEnum(Trigger.Type.DOCUMENT_LEVEL_TRIGGER) is RemoteMonitorTrigger -> out.writeEnum(Trigger.Type.REMOTE_MONITOR_TRIGGER) - is PPLSQLTrigger -> out.writeEnum(Trigger.Type.PPL_SQL_TRIGGER) + is PPLTrigger -> out.writeEnum(Trigger.Type.PPL_TRIGGER) else -> out.writeEnum(Trigger.Type.QUERY_LEVEL_TRIGGER) } it.writeTo(out) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt index 2ceaa135..8b607650 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt @@ -7,6 +7,7 @@ package org.opensearch.commons.alerting.model import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchException +import org.opensearch.Version import org.opensearch.commons.alerting.alerts.AlertError import org.opensearch.commons.alerting.util.optionalTimeField import org.opensearch.core.common.io.stream.StreamInput @@ -124,11 +125,14 @@ data class InputRunResults( for (map in results) { out.writeMap(map) } - out.writeVInt(pplBaseQueryResults.size) - for (datarow in pplBaseQueryResults) { - out.writeMap(datarow) + + if (out.version.onOrAfter(Version.V_3_7_0)) { + out.writeVInt(pplBaseQueryResults.size) + for (datarow in pplBaseQueryResults) { + out.writeMap(datarow) + } + out.writeOptionalLong(pplBaseQueryNumResults) } - out.writeOptionalLong(pplBaseQueryNumResults) out.writeException(error) } @@ -141,14 +145,24 @@ data class InputRunResults( for (i in 0 until count) { list.add(suppressWarning(sin.readMap())) // result(map) } - val pplSqlCount = sin.readVInt() // count - val pplSqlList = mutableListOf>() - for (i in 0 until pplSqlCount) { - pplSqlList.add(suppressWarning(sin.readMap())) // result(map) + val pplCount = if (sin.version.onOrAfter(Version.V_3_7_0)) { + sin.readVInt() + } else { + 0 + } + val pplList = mutableListOf>() + if (sin.version.onOrAfter(Version.V_3_7_0)) { + for (i in 0 until pplCount) { + pplList.add(suppressWarning(sin.readMap())) // pplResults + } + } + val pplNumResults = if (sin.version.onOrAfter(Version.V_3_7_0)) { + sin.readOptionalLong() + } else { + null } - val pplNumResults = sin.readOptionalLong() val error = sin.readException() // error - return InputRunResults(list, error, null, pplSqlList, pplNumResults) + return InputRunResults(list, error, null, pplList, pplNumResults) } @Suppress("UNCHECKED_CAST") diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLInput.kt similarity index 87% rename from src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLInput.kt rename to src/main/kotlin/org/opensearch/commons/alerting/model/PPLInput.kt index 3f9e69ac..ff154283 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLInput.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLInput.kt @@ -12,7 +12,7 @@ import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils import java.io.IOException -data class PPLSQLInput( +data class PPLInput( val query: String, val queryLanguage: QueryLanguage = QueryLanguage.PPL ) : Input { @@ -31,7 +31,7 @@ data class PPLSQLInput( override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { return builder.startObject() - .startObject(PPL_SQL_INPUT_FIELD) + .startObject(PPL_INPUT_FIELD) .field(QUERY_FIELD, query) .field(QUERY_LANGUAGE_FIELD, queryLanguage.value) .endObject() @@ -40,13 +40,13 @@ data class PPLSQLInput( override fun asTemplateArg(): Map = mapOf( - PPL_SQL_INPUT_FIELD to mapOf( + PPL_INPUT_FIELD to mapOf( QUERY_FIELD to query, QUERY_LANGUAGE_FIELD to queryLanguage.value ) ) - override fun name(): String = PPL_SQL_INPUT_FIELD + override fun name(): String = PPL_INPUT_FIELD enum class QueryLanguage(val value: String) { PPL(PPL_QUERY_LANGUAGE), @@ -58,8 +58,8 @@ data class PPLSQLInput( } companion object { - // PPL/SQL Input field names - const val PPL_SQL_INPUT_FIELD = "ppl_input" + // PPL Input field names + const val PPL_INPUT_FIELD = "ppl_input" const val QUERY_FIELD = "query" const val QUERY_LANGUAGE_FIELD = "query_language" @@ -69,13 +69,13 @@ data class PPLSQLInput( val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( Input::class.java, - ParseField(PPL_SQL_INPUT_FIELD), + ParseField(PPL_INPUT_FIELD), CheckedFunction { parseInner(it) } ) @JvmStatic @Throws(IOException::class) - fun parseInner(xcp: XContentParser): PPLSQLInput { + fun parseInner(xcp: XContentParser): PPLInput { lateinit var query: String var queryLanguage: QueryLanguage = QueryLanguage.PPL // default to PPL @@ -98,13 +98,13 @@ data class PPLSQLInput( } } } - return PPLSQLInput(query, queryLanguage) + return PPLInput(query, queryLanguage) } @JvmStatic @Throws(IOException::class) - fun readFrom(sin: StreamInput): PPLSQLInput { - return PPLSQLInput(sin) + fun readFrom(sin: StreamInput): PPLInput { + return PPLInput(sin) } } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt similarity index 96% rename from src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTrigger.kt rename to src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt index ebbd38da..4b889257 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLSQLTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt @@ -25,10 +25,10 @@ import org.opensearch.core.xcontent.XContentParserUtils import java.io.IOException /** - * The PPL/SQL Trigger for PPL/SQL Monitors + * The PPL Trigger for PPL Monitors * * There are two types of PPLTrigger conditions: NUMBER_OF_RESULTS and CUSTOM - * NUMBER_OF_RESULTS: triggers based on whether the number of query results returned by the PPLSQLMonitor + * NUMBER_OF_RESULTS: triggers based on whether the number of query results returned by the PPLMonitor * query meets some threshold * CUSTOM: triggers based on a custom condition that user specifies (a single ppl eval statement) * @@ -47,7 +47,7 @@ import java.io.IOException * * @opensearch.experimental */ -data class PPLSQLTrigger( +data class PPLTrigger( override val id: String = UUIDs.base64UUID(), override val name: String, override val severity: String, @@ -151,7 +151,7 @@ data class PPLSQLTrigger( override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { builder.startObject() - builder.startObject(PPL_SQL_TRIGGER_FIELD) + builder.startObject(PPL_TRIGGER_FIELD) builder.field(ID_FIELD, id) builder.field(NAME_FIELD, name) builder.field(SEVERITY_FIELD, severity) @@ -185,7 +185,7 @@ data class PPLSQLTrigger( } override fun name(): String { - return PPL_SQL_TRIGGER_FIELD + return PPL_TRIGGER_FIELD } enum class ConditionType(val value: String) { @@ -212,7 +212,7 @@ data class PPLSQLTrigger( companion object { // trigger wrapper object field name - const val PPL_SQL_TRIGGER_FIELD = "ppl_trigger" + const val PPL_TRIGGER_FIELD = "ppl_trigger" // field names const val CONDITION_TYPE_FIELD = "type" @@ -229,13 +229,13 @@ data class PPLSQLTrigger( val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( Trigger::class.java, - ParseField(PPL_SQL_TRIGGER_FIELD), + ParseField(PPL_TRIGGER_FIELD), CheckedFunction { parseInner(it) } ) @JvmStatic @Throws(IOException::class) - fun parseInner(xcp: XContentParser): PPLSQLTrigger { + fun parseInner(xcp: XContentParser): PPLTrigger { var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified var name: String? = null var severity: String? = null @@ -314,7 +314,7 @@ data class PPLSQLTrigger( requireNotNull(conditionType) { "Trigger condition type must be included" } // 3. prepare and return PPLTrigger object - return PPLSQLTrigger( + return PPLTrigger( id, name, severity, @@ -328,8 +328,8 @@ data class PPLSQLTrigger( @JvmStatic @Throws(IOException::class) - fun readFrom(sin: StreamInput): PPLSQLTrigger { - return PPLSQLTrigger(sin) + fun readFrom(sin: StreamInput): PPLTrigger { + return PPLTrigger(sin) } } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt index c2821890..04e77564 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt @@ -5,6 +5,7 @@ package org.opensearch.commons.alerting.model +import org.opensearch.Version import org.opensearch.commons.alerting.alerts.AlertError import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput @@ -29,7 +30,11 @@ open class QueryLevelTriggerRunResult( error = sin.readException(), triggered = sin.readBoolean(), actionResults = sin.readMap() as MutableMap, - pplCustomQueryResults = sin.readList { it.readMap() } + pplCustomQueryResults = if (sin.version.onOrAfter(Version.V_3_7_0)) { + sin.readList { it.readMap() } + } else { + listOf() + } ) override fun alertError(): AlertError? { @@ -57,7 +62,9 @@ open class QueryLevelTriggerRunResult( super.writeTo(out) out.writeBoolean(triggered) out.writeMap(actionResults as Map) - out.writeCollection(pplCustomQueryResults) { stream, map -> stream.writeMap(map) } + if (out.version.onOrAfter(Version.V_3_7_0)) { + out.writeCollection(pplCustomQueryResults) { stream, map -> stream.writeMap(map) } + } } companion object { diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt index e6670d42..1851d425 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt @@ -17,7 +17,7 @@ interface Trigger : BaseModel { NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD), CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD), REMOTE_MONITOR_TRIGGER(RemoteMonitorTrigger.REMOTE_MONITOR_TRIGGER_FIELD), - PPL_SQL_TRIGGER(PPLSQLTrigger.PPL_SQL_TRIGGER_FIELD); + PPL_TRIGGER(PPLTrigger.PPL_TRIGGER_FIELD); override fun toString(): String { return value @@ -59,7 +59,7 @@ interface Trigger : BaseModel { Type.DOCUMENT_LEVEL_TRIGGER -> DocumentLevelTrigger(sin) Type.CHAINED_ALERT_TRIGGER -> ChainedAlertTrigger(sin) Type.REMOTE_MONITOR_TRIGGER -> RemoteMonitorTrigger(sin) - Type.PPL_SQL_TRIGGER -> PPLSQLTrigger(sin) + Type.PPL_TRIGGER -> PPLTrigger(sin) // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns // enum can be null in Java else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") diff --git a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt index 812ba514..0d4593e2 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt @@ -50,7 +50,7 @@ fun Monitor.isBucketLevelMonitor(): Boolean = isMonitorOfStandardType() && Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.BUCKET_LEVEL_MONITOR -fun Monitor.isPplSqlMonitor(): Boolean = +fun Monitor.isPPLMonitor(): Boolean = isMonitorOfStandardType() && Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.PPL_MONITOR diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 9a067a5b..b67bdc2f 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -41,8 +41,8 @@ import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.MonitorRunResult import org.opensearch.commons.alerting.model.NoOpTrigger -import org.opensearch.commons.alerting.model.PPLSQLInput -import org.opensearch.commons.alerting.model.PPLSQLTrigger +import org.opensearch.commons.alerting.model.PPLInput +import org.opensearch.commons.alerting.model.PPLTrigger import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult import org.opensearch.commons.alerting.model.Schedule @@ -176,18 +176,18 @@ fun randomDocumentLevelMonitor( ) } -fun randomPPLSQLMonitor( +fun randomPPLMonitor( name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), user: User = randomUser(), inputs: List = listOf( - PPLSQLInput( + PPLInput( query = "source=logs | where status > 400", - queryLanguage = PPLSQLInput.QueryLanguage.PPL + queryLanguage = PPLInput.QueryLanguage.PPL ) ), schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), enabled: Boolean = Random().nextBoolean(), - triggers: List = (1..RandomNumbers.randomIntBetween(Random(), 0, 10)).map { randomPPLSQLTrigger() }, + triggers: List = (1..RandomNumbers.randomIntBetween(Random(), 0, 10)).map { randomPPLTrigger() }, enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false @@ -357,17 +357,17 @@ fun randomChainedAlertTrigger( ) } -fun randomPPLSQLTrigger( +fun randomPPLTrigger( id: String = UUIDs.base64UUID(), name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), severity: String = "1", actions: List = mutableListOf(), - conditionType: PPLSQLTrigger.ConditionType = PPLSQLTrigger.ConditionType.NUMBER_OF_RESULTS, - numResultsCondition: PPLSQLTrigger.NumResultsCondition? = PPLSQLTrigger.NumResultsCondition.GREATER_THAN, + conditionType: PPLTrigger.ConditionType = PPLTrigger.ConditionType.NUMBER_OF_RESULTS, + numResultsCondition: PPLTrigger.NumResultsCondition? = PPLTrigger.NumResultsCondition.GREATER_THAN, numResultsValue: Long = 0, customCondition: String? = null -): PPLSQLTrigger { - return PPLSQLTrigger( +): PPLTrigger { + return PPLTrigger( id = id, name = name, severity = severity, @@ -589,14 +589,14 @@ fun xContentRegistry(): NamedXContentRegistry { listOf( SearchInput.XCONTENT_REGISTRY, DocLevelMonitorInput.XCONTENT_REGISTRY, - PPLSQLInput.XCONTENT_REGISTRY, + PPLInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY, DocumentLevelTrigger.XCONTENT_REGISTRY, ChainedAlertTrigger.XCONTENT_REGISTRY, NoOpTrigger.XCONTENT_REGISTRY, RemoteMonitorTrigger.XCONTENT_REGISTRY, - PPLSQLTrigger.XCONTENT_REGISTRY + PPLTrigger.XCONTENT_REGISTRY ) + SearchModule(Settings.EMPTY, emptyList()).namedXContents ) } @@ -611,6 +611,21 @@ fun assertUserNull(monitor: Monitor) { } fun randomAlert(monitor: Monitor = randomQueryLevelMonitor()): Alert { + val trigger = randomQueryLevelTrigger() + val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult()) + val clusterCount = (-1..5).random() + val clusters = if (clusterCount == -1) null else (0..clusterCount).map { "index-$it" } + return Alert( + monitor, + trigger, + Instant.now().truncatedTo(ChronoUnit.MILLIS), + null, + actionExecutionResults = actionExecutionResults, + clusters = clusters + ) +} + +fun randomAlertWithPPLFields(monitor: Monitor = randomQueryLevelMonitor()): Alert { val trigger = randomQueryLevelTrigger() val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult()) val clusterCount = (-1..5).random() @@ -627,8 +642,8 @@ fun randomAlert(monitor: Monitor = randomQueryLevelMonitor()): Alert { null, actionExecutionResults = actionExecutionResults, clusters = clusters, - pplQuery = pplQuery, - pplQueryResults = pplQueryResults + query = pplQuery, + queryResults = pplQueryResults ) } @@ -746,7 +761,26 @@ fun createCorrelationAlertTemplateArgs(correlationAlert: CorrelationAlert): Map< } fun randomInputRunResults(): InputRunResults { - return InputRunResults(listOf(), null, null, listOf(), 5L) + return InputRunResults(listOf(), null, null) +} + +fun randomInputRunResultsWithPPLFields(): InputRunResults { + return InputRunResults( + listOf(), + null, + null, + listOf>( + mapOf( + "key1" to "val1", + "key2" to 4 + ), + mapOf( + "key3" to listOf(1, 2, 3), + "key4" to mapOf("nested-key" to "nested-val") + ) + ), + 5L + ) } fun randomActionRunResult(): ActionRunResult { @@ -840,6 +874,19 @@ fun randomQueryLevelTriggerRunResult(): QueryLevelTriggerRunResult { map.plus(Pair("key1", randomActionRunResult())) map.plus(Pair("key2", randomActionRunResult())) + return QueryLevelTriggerRunResult( + "trigger-name", + true, + null, + map + ) +} + +fun randomQueryLevelTriggerRunResultWithPPLFields(): QueryLevelTriggerRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", randomActionRunResult())) + map.plus(Pair("key2", randomActionRunResult())) + val queryResultsList = mutableListOf>() queryResultsList.add( mapOf( diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt index 5a492a70..1f3db811 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt @@ -14,6 +14,7 @@ import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigge import org.opensearch.commons.alerting.randomAction import org.opensearch.commons.alerting.randomActionExecutionPolicy import org.opensearch.commons.alerting.randomAlert +import org.opensearch.commons.alerting.randomAlertWithPPLFields import org.opensearch.commons.alerting.randomBucketLevelMonitorRunResult import org.opensearch.commons.alerting.randomBucketLevelTrigger import org.opensearch.commons.alerting.randomBucketLevelTriggerRunResult @@ -22,12 +23,14 @@ import org.opensearch.commons.alerting.randomDocLevelQuery import org.opensearch.commons.alerting.randomDocumentLevelMonitorRunResult import org.opensearch.commons.alerting.randomDocumentLevelTrigger import org.opensearch.commons.alerting.randomInputRunResults -import org.opensearch.commons.alerting.randomPPLSQLMonitor -import org.opensearch.commons.alerting.randomPPLSQLTrigger +import org.opensearch.commons.alerting.randomInputRunResultsWithPPLFields +import org.opensearch.commons.alerting.randomPPLMonitor +import org.opensearch.commons.alerting.randomPPLTrigger import org.opensearch.commons.alerting.randomQueryLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelMonitorRunResult import org.opensearch.commons.alerting.randomQueryLevelTrigger import org.opensearch.commons.alerting.randomQueryLevelTriggerRunResult +import org.opensearch.commons.alerting.randomQueryLevelTriggerRunResultWithPPLFields import org.opensearch.commons.alerting.randomThrottle import org.opensearch.commons.alerting.randomUser import org.opensearch.commons.alerting.randomUserEmpty @@ -56,6 +59,16 @@ class WriteableTests { Assertions.assertEquals(alert, newAlert, "Round tripping Alert doesn't work") } + @Test + fun `test alert with PPL fields as stream`() { + val alert = randomAlertWithPPLFields() + val out = BytesStreamOutput() + alert.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newAlert = Alert(sin) + Assertions.assertEquals(alert, newAlert, "Round tripping Alert with PPL fields doesn't work") + } + @Test fun `test throttle as stream`() { val throttle = randomThrottle() @@ -117,13 +130,13 @@ class WriteableTests { } @Test - fun `test ppl sql monitor as stream`() { - val monitor = randomPPLSQLMonitor() + fun `test ppl monitor as stream`() { + val monitor = randomPPLMonitor() val out = BytesStreamOutput() monitor.writeTo(out) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) val newMonitor = Monitor(sin) - Assertions.assertEquals(monitor, newMonitor, "Round tripping PPLSQLMonitor doesn't work") + Assertions.assertEquals(monitor, newMonitor, "Round tripping PPLMonitor doesn't work") } @Test @@ -167,13 +180,13 @@ class WriteableTests { } @Test - fun `test ppl sql trigger as stream`() { - val trigger = randomPPLSQLTrigger() + fun `test ppl trigger as stream`() { + val trigger = randomPPLTrigger() val out = BytesStreamOutput() trigger.writeTo(out) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) - val newTrigger = PPLSQLTrigger.readFrom(sin) - Assertions.assertEquals(trigger, newTrigger, "Round tripping PPLSQLTrigger doesn't work") + val newTrigger = PPLTrigger.readFrom(sin) + Assertions.assertEquals(trigger, newTrigger, "Round tripping PPLTrigger doesn't work") } @Test @@ -220,13 +233,13 @@ class WriteableTests { } @Test - fun `test ppl sql input as stream`() { - val input = PPLSQLInput("source=some-index") + fun `test ppl input as stream`() { + val input = PPLInput("source=some-index") val out = BytesStreamOutput() input.writeTo(out) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) - val newInput = PPLSQLInput(sin) - Assertions.assertEquals(input, newInput, "Round tripping PPLSQLInput doesn't work") + val newInput = PPLInput(sin) + Assertions.assertEquals(input, newInput, "Round tripping PPLInput doesn't work") } @Test @@ -290,6 +303,20 @@ class WriteableTests { OpenSearchTestCase.assertEquals(runResult.actionResults, newRunResult.actionResults) } + @Test + fun `test query-level triggerrunresult with PPL fields as stream`() { + val runResult = randomQueryLevelTriggerRunResultWithPPLFields() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = QueryLevelTriggerRunResult(sin) + OpenSearchTestCase.assertEquals(runResult.triggerName, newRunResult.triggerName) + OpenSearchTestCase.assertEquals(runResult.triggered, newRunResult.triggered) + OpenSearchTestCase.assertEquals(runResult.error, newRunResult.error) + OpenSearchTestCase.assertEquals(runResult.actionResults, newRunResult.actionResults) + OpenSearchTestCase.assertEquals(runResult.pplCustomQueryResults, newRunResult.pplCustomQueryResults) + } + @Test fun `test bucket-level triggerrunresult as stream`() { val runResult = randomBucketLevelTriggerRunResult() @@ -320,6 +347,16 @@ class WriteableTests { OpenSearchTestCase.assertEquals("Round tripping InputRunResults doesn't work", runResult, newRunResult) } + @Test + fun `test inputrunresult with PPL fields as stream`() { + val runResult = randomInputRunResultsWithPPLFields() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = InputRunResults.readFrom(sin) + OpenSearchTestCase.assertEquals("Round tripping InputRunResults with PPL fields doesn't work", runResult, newRunResult) + } + @Test fun `test query-level monitorrunresult as stream`() { val runResult = randomQueryLevelMonitorRunResult() diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index c576362d..d9898dc5 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -23,8 +23,8 @@ import org.opensearch.commons.alerting.randomAlert import org.opensearch.commons.alerting.randomBucketLevelMonitor import org.opensearch.commons.alerting.randomBucketLevelTrigger import org.opensearch.commons.alerting.randomDocLevelQuery -import org.opensearch.commons.alerting.randomPPLSQLMonitor -import org.opensearch.commons.alerting.randomPPLSQLTrigger +import org.opensearch.commons.alerting.randomPPLMonitor +import org.opensearch.commons.alerting.randomPPLTrigger import org.opensearch.commons.alerting.randomQueryLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelMonitorWithoutUser import org.opensearch.commons.alerting.randomQueryLevelTrigger @@ -228,12 +228,12 @@ class XContentTests { } @Test - fun `test ppl sql monitor parsing`() { - val monitor = randomPPLSQLMonitor() + fun `test ppl monitor parsing`() { + val monitor = randomPPLMonitor() val monitorString = monitor.toJsonStringWithUser() val parsedMonitor = Monitor.parse(parser(monitorString)) - Assertions.assertEquals(monitor, parsedMonitor, "Round tripping PPLSQLMonitor doesn't work") + Assertions.assertEquals(monitor, parsedMonitor, "Round tripping PPLMonitor doesn't work") } @Test @@ -267,13 +267,13 @@ class XContentTests { } @Test - fun `test ppl sql trigger parsing`() { - val trigger = randomPPLSQLTrigger() + fun `test ppl trigger parsing`() { + val trigger = randomPPLTrigger() val triggerString = trigger.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() val parsedTrigger = Trigger.parse(parser(triggerString)) - Assertions.assertEquals(trigger, parsedTrigger, "Round tripping PPLSQLTrigger doesn't work") + Assertions.assertEquals(trigger, parsedTrigger, "Round tripping PPLTrigger doesn't work") } @Test @@ -449,11 +449,11 @@ class XContentTests { } @Test - fun `test creating a ppl sql monitor with invalid trigger type fails`() { + fun `test creating a ppl monitor with invalid trigger type fails`() { try { val queryLevelTrigger = randomQueryLevelTrigger() - randomPPLSQLMonitor().copy(triggers = listOf(queryLevelTrigger)) - Assertions.fail("Creating a PPL SQL monitor with query-level triggers did not fail.") + randomPPLMonitor().copy(triggers = listOf(queryLevelTrigger)) + Assertions.fail("Creating a PPL monitor with query-level triggers did not fail.") } catch (ignored: IllegalArgumentException) { } } @@ -469,11 +469,11 @@ class XContentTests { } @Test - fun `test creating a ppl sql monitor with invalid input type fails`() { + fun `test creating a ppl monitor with invalid input type fails`() { try { val searchInput = randomSearchInput() - randomPPLSQLMonitor().copy(inputs = listOf(searchInput)) - Assertions.fail("Creating a PPL SQL monitor with search input did not fail.") + randomPPLMonitor().copy(inputs = listOf(searchInput)) + Assertions.fail("Creating a PPL monitor with search input did not fail.") } catch (ignored: IllegalArgumentException) { } } From 348cb9d76730bb54710ec859b63c5a8b4e4fa462 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Mon, 20 Apr 2026 16:39:23 -0700 Subject: [PATCH 06/11] removing experimental flag from PPLTrigger Signed-off-by: Dennis Toepker --- .../kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt index 4b889257..579baf7d 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt @@ -44,8 +44,6 @@ import java.io.IOException * required to be null otherwise. * @property customCondition A custom condition expression. Required if using CUSTOM conditions, * required to be null otherwise. - * - * @opensearch.experimental */ data class PPLTrigger( override val id: String = UUIDs.base64UUID(), From 995e8d08e49d420715b5e45c1580a7af44500339 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Thu, 23 Apr 2026 15:16:34 -0700 Subject: [PATCH 07/11] random PPL Monitor for tests now always includes at least 1 trigger Signed-off-by: Dennis Toepker --- src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index b67bdc2f..8f0c1e7a 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -187,7 +187,7 @@ fun randomPPLMonitor( ), schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), enabled: Boolean = Random().nextBoolean(), - triggers: List = (1..RandomNumbers.randomIntBetween(Random(), 0, 10)).map { randomPPLTrigger() }, + triggers: List = (1..RandomNumbers.randomIntBetween(Random(), 1, 10)).map { randomPPLTrigger() }, enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false From bbebb2a0ae4733bfe3e53f681b594eeb955b0057 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Thu, 30 Apr 2026 13:04:07 -0700 Subject: [PATCH 08/11] improving serde tests and added comment about base ppl query Signed-off-by: Dennis Toepker --- .../commons/alerting/model/MonitorRunResult.kt | 4 ++++ .../commons/alerting/model/PPLInput.kt | 5 ++++- .../opensearch/commons/alerting/TestHelpers.kt | 18 ++++++++++++++++-- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt index 8b607650..c8125394 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt @@ -94,6 +94,9 @@ data class InputRunResults( val results: List> = listOf(), val error: Exception? = null, val aggTriggersAfterKey: MutableMap? = null, + // a PPL Monitor's base query is the query set in the Monitor itself. + // Custom trigger conditions append to the base query to run + // their own queries/evaluations val pplBaseQueryResults: List> = listOf(), val pplBaseQueryNumResults: Long? = null ) : Writeable, ToXContent { @@ -133,6 +136,7 @@ data class InputRunResults( } out.writeOptionalLong(pplBaseQueryNumResults) } + out.writeException(error) } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLInput.kt index ff154283..d2ec100a 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLInput.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLInput.kt @@ -76,7 +76,7 @@ data class PPLInput( @JvmStatic @Throws(IOException::class) fun parseInner(xcp: XContentParser): PPLInput { - lateinit var query: String + var query: String? = null var queryLanguage: QueryLanguage = QueryLanguage.PPL // default to PPL XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) @@ -98,6 +98,9 @@ data class PPLInput( } } } + + requireNotNull(query) + return PPLInput(query, queryLanguage) } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 8f0c1e7a..c813f063 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -761,12 +761,26 @@ fun createCorrelationAlertTemplateArgs(correlationAlert: CorrelationAlert): Map< } fun randomInputRunResults(): InputRunResults { - return InputRunResults(listOf(), null, null) + return InputRunResults( + listOf( + mapOf( + "aggregation_result" to 42, + "bucket_key" to mapOf("nested-key" to "nested-val") + ) + ), + null, + null + ) } fun randomInputRunResultsWithPPLFields(): InputRunResults { return InputRunResults( - listOf(), + listOf( + mapOf( + "aggregation_result" to 42, + "bucket_key" to mapOf("nested-key" to "nested-val") + ) + ), null, null, listOf>( From d44d5cec6f64e1d810cdccba2e9ddda3607c6e5c Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Thu, 30 Apr 2026 13:15:49 -0700 Subject: [PATCH 09/11] small cleanup Signed-off-by: Dennis Toepker --- src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index c813f063..e7ee2e98 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -768,7 +768,6 @@ fun randomInputRunResults(): InputRunResults { "bucket_key" to mapOf("nested-key" to "nested-val") ) ), - null, null ) } From 04c04669e1e6e77018d675221b3afe2e07987e28 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Thu, 30 Apr 2026 13:39:52 -0700 Subject: [PATCH 10/11] minor comments adjustment Signed-off-by: Dennis Toepker --- .../kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt index 579baf7d..ccee8550 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt @@ -30,7 +30,7 @@ import java.io.IOException * There are two types of PPLTrigger conditions: NUMBER_OF_RESULTS and CUSTOM * NUMBER_OF_RESULTS: triggers based on whether the number of query results returned by the PPLMonitor * query meets some threshold - * CUSTOM: triggers based on a custom condition that user specifies (a single ppl eval statement) + * CUSTOM: triggers based on a custom condition that user specifies (a single ppl where statement) * * @property id Trigger ID, defaults to a base64 UUID. * @property name Display name of the Trigger. From 092a85d031f54bc4751e9c6ee69adf9f271cc714 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Thu, 30 Apr 2026 13:55:42 -0700 Subject: [PATCH 11/11] removing notifications ID validation check Signed-off-by: Dennis Toepker --- .../org/opensearch/commons/alerting/model/PPLTrigger.kt | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt index ccee8550..ebbff7ee 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/PPLTrigger.kt @@ -79,9 +79,6 @@ data class PPLTrigger( require(it.destinationId.isNotEmpty()) { "Channel ID should not be empty." } - require(it.destinationId.matches(validCharsRegex)) { - "Channel ID should only have alphanumeric characters, dashes, and underscores." - } } when (this.conditionType) { @@ -221,10 +218,6 @@ data class PPLTrigger( // hard, nonadjustable limits const val NOTIFICATIONS_ID_MAX_LENGTH = 512 // length limit for notifications channel custom ID at channel creation time - // regular expression for validating that a string contains - // only valid chars (letters, numbers, -, _) - private val validCharsRegex = """^[a-zA-Z0-9_-]+$""".toRegex() - val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( Trigger::class.java, ParseField(PPL_TRIGGER_FIELD),