Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 126 additions & 4 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.opensearch.commons.alerting.model
import org.opensearch.Version
import org.opensearch.common.lucene.uid.Versions
import org.opensearch.commons.alerting.alerts.AlertError
import org.opensearch.commons.alerting.model.Monitor.Companion.suppressWarning
import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.commons.alerting.util.instant
import org.opensearch.commons.alerting.util.optionalTimeField
Expand Down Expand Up @@ -45,6 +46,11 @@ data class Alert(
val executionId: String? = null,
val associatedAlertIds: List<String>,
val clusters: List<String>? = null,
// these fields are specifically used for PPL Monitors
// for now, Query-Level Monitors can support including
// query results in Alerts by using these fields
val query: String? = null,
val queryResults: List<Map<String, Any?>> = listOf(),
val target: Target? = null
) : Writeable, ToXContent {

Expand All @@ -56,6 +62,64 @@ data class Alert(
}
}

constructor(
id: String = NO_ID,
version: Long = NO_VERSION,
schemaVersion: Int = NO_SCHEMA_VERSION,
monitorId: String,
workflowId: String,
workflowName: String,
monitorName: String,
monitorVersion: Long,
monitorUser: User?,
triggerId: String,
triggerName: String,
findingIds: List<String>,
relatedDocIds: List<String>,
state: State,
startTime: Instant,
endTime: Instant? = null,
lastNotificationTime: Instant? = null,
acknowledgedTime: Instant? = null,
errorMessage: String? = null,
errorHistory: List<AlertError>,
severity: String,
actionExecutionResults: List<ActionExecutionResult>,
aggregationResultBucket: AggregationResultBucket? = null,
executionId: String? = null,
associatedAlertIds: List<String>,
clusters: List<String>? = null
) : this (
id = id,
version = version,
schemaVersion = schemaVersion,
monitorId = monitorId,
workflowId = workflowId,
workflowName = workflowName,
monitorName = monitorName,
monitorVersion = monitorVersion,
monitorUser = monitorUser,
triggerId = triggerId,
triggerName = triggerName,
findingIds = findingIds,
relatedDocIds = relatedDocIds,
state = state,
startTime = startTime,
endTime = endTime,
lastNotificationTime = lastNotificationTime,
acknowledgedTime = acknowledgedTime,
errorMessage = errorMessage,
errorHistory = errorHistory,
severity = severity,
actionExecutionResults = actionExecutionResults,
aggregationResultBucket = aggregationResultBucket,
executionId = executionId,
associatedAlertIds = associatedAlertIds,
clusters = clusters,
query = null,
queryResults = listOf()
)

constructor(
startTime: Instant,
lastNotificationTime: Instant?,
Expand Down Expand Up @@ -89,12 +153,16 @@ data class Alert(
workflowId = workflow.id,
workflowName = workflow.name,
associatedAlertIds = associatedAlertIds,
clusters = clusters
clusters = clusters,
query = null,
queryResults = listOf()
)

// constructor for Alerts from QueryLevelMonitorRunner
// this monitor runner runs Query-level, Cluster Metrics, and PPL Monitors
constructor(
monitor: Monitor,
trigger: QueryLevelTrigger,
trigger: Trigger,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we changing this? how or why is this in scope of PPL models related changes?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because PPL Monitors will not have their own dedicated PPLMonitorRunner. They will run on QueryLevelMonitorRunner, much like AD and Cluster Metrics Monitors do. The thing is PPL Triggers need to be their own type because they are fundamentally different from QueryLevelTriggers. As such, the type of trigger had to be generalized to Trigger interface so that it can handle either QueryLevelTrigger or PPLTrigger.

This constructor is called by composeQueryLevelAlert in Alerting. There are validations in Alerting that the trigger passed into this constructor is only ever type QueryLevelTrigger or PPLTrigger.

startTime: Instant,
lastNotificationTime: Instant?,
state: State = State.ACTIVE,
Expand All @@ -104,7 +172,9 @@ data class Alert(
schemaVersion: Int = NO_SCHEMA_VERSION,
executionId: String? = null,
workflowId: String? = null,
clusters: List<String>? = null
clusters: List<String>? = null,
query: String? = null,
queryResults: List<Map<String, Any?>> = listOf()
) : this(
monitorId = monitor.id,
monitorName = monitor.name,
Expand All @@ -128,6 +198,8 @@ data class Alert(
workflowName = "",
associatedAlertIds = emptyList(),
clusters = clusters,
query = query,
queryResults = queryResults,
target = monitor.target
)

Expand Down Expand Up @@ -168,6 +240,8 @@ data class Alert(
workflowName = "",
associatedAlertIds = emptyList(),
clusters = clusters,
query = null,
queryResults = listOf(),
target = monitor.target
)

Expand Down Expand Up @@ -209,6 +283,8 @@ data class Alert(
workflowName = "",
associatedAlertIds = emptyList(),
clusters = clusters,
query = null,
queryResults = listOf(),
target = monitor.target
)

Expand Down Expand Up @@ -252,6 +328,8 @@ data class Alert(
workflowName = "",
associatedAlertIds = emptyList(),
clusters = clusters,
query = null,
queryResults = listOf(),
target = monitor.target
)

Expand Down Expand Up @@ -292,6 +370,8 @@ data class Alert(
executionId = executionId,
associatedAlertIds = emptyList(),
clusters = clusters,
query = null,
queryResults = listOf(),
target = monitor.target
)

Expand Down Expand Up @@ -337,6 +417,16 @@ data class Alert(
executionId = sin.readOptionalString(),
associatedAlertIds = sin.readStringList(),
clusters = sin.readOptionalStringList(),
query = if (sin.version.onOrAfter(Version.V_3_7_0)) {
sin.readOptionalString()
} else {
null
},
queryResults = if (sin.version.onOrAfter(Version.V_3_7_0)) {
sin.readList { input -> suppressWarning(input.readMap()) }
} else {
listOf()
},
Comment thread
toepkerd marked this conversation as resolved.
target = if (sin.version.onOrAfter(Version.V_3_6_0)) {
if (sin.readBoolean()) Target(sin) else null
} else {
Expand Down Expand Up @@ -380,6 +470,12 @@ data class Alert(
out.writeOptionalString(executionId)
out.writeStringCollection(associatedAlertIds)
out.writeOptionalStringArray(clusters?.toTypedArray())
if (out.version.onOrAfter(Version.V_3_7_0)) {
Comment thread
toepkerd marked this conversation as resolved.
out.writeOptionalString(query)
out.writeCollection(queryResults) { output, map ->
output.writeMap(map)
}
}
if (out.version.onOrAfter(Version.V_3_6_0)) {
out.writeBoolean(target != null)
target?.writeTo(out)
Expand Down Expand Up @@ -415,6 +511,8 @@ data class Alert(
const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS
const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH
const val CLUSTERS_FIELD = "clusters"
const val QUERY_FIELD = "query"
const val QUERY_RESULTS_FIELD = "query_results"
const val TARGET_FIELD = "target"
const val NO_ID = ""
const val NO_VERSION = Versions.NOT_FOUND
Expand Down Expand Up @@ -447,6 +545,8 @@ data class Alert(
var aggAlertBucket: AggregationResultBucket? = null
val associatedAlertIds = mutableListOf<String>()
val clusters = mutableListOf<String>()
var query: String? = null
var queryResults: List<Map<String, Any?>> = listOf()
var target: Target? = null
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -523,6 +623,19 @@ data class Alert(
clusters.add(xcp.text())
}
}
QUERY_FIELD -> {
if (xcp.currentToken() != XContentParser.Token.VALUE_NULL) {
query = xcp.text()
}
}
QUERY_RESULTS_FIELD -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
val resultsList = mutableListOf<Map<String, Any?>>()
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
resultsList.add(xcp.map())
}
queryResults = resultsList
}
TARGET_FIELD -> target = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) {
null
} else {
Expand Down Expand Up @@ -558,6 +671,8 @@ data class Alert(
workflowName = workflowName,
associatedAlertIds = associatedAlertIds,
clusters = if (clusters.size > 0) clusters else null,
query = query,
queryResults = queryResults,
target = target
)
}
Expand Down Expand Up @@ -612,6 +727,11 @@ data class Alert(
if (!clusters.isNullOrEmpty()) builder.field(CLUSTERS_FIELD, clusters.toTypedArray())
if (target != null) builder.field(TARGET_FIELD, target)

if (!query.isNullOrEmpty()) builder.field(QUERY_FIELD, query)
if (queryResults.isNotEmpty()) {
builder.field(QUERY_RESULTS_FIELD, queryResults.toTypedArray())
}

builder.endObject()
return builder
}
Expand All @@ -636,7 +756,9 @@ data class Alert(
PARENTS_BUCKET_PATH to aggregationResultBucket?.parentBucketPath,
FINDING_IDS to findingIds.joinToString(","),
RELATED_DOC_IDS to relatedDocIds.joinToString(","),
CLUSTERS_FIELD to clusters?.joinToString(",")
CLUSTERS_FIELD to clusters?.joinToString(","),
QUERY_FIELD to query,
QUERY_RESULTS_FIELD to queryResults
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ data class BucketLevelTrigger(
out.writeCollection(actions)
}

fun asTemplateArg(): Map<String, Any> {
override fun asTemplateArg(): Map<String, Any> {
return mapOf(
ID_FIELD to id,
NAME_FIELD to name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ data class ChainedAlertTrigger(
}

/** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */
fun asTemplateArg(): Map<String, Any> {
override fun asTemplateArg(): Map<String, Any> {
return mapOf(
ID_FIELD to id,
NAME_FIELD to name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ data class DocumentLevelTrigger(
}

/** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */
fun asTemplateArg(): Map<String, Any> {
override fun asTemplateArg(): Map<String, Any> {
return mapOf(
ID_FIELD to id,
NAME_FIELD to name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.opensearch.commons.alerting.model

import org.opensearch.commons.alerting.model.ClusterMetricsInput.Companion.URI_FIELD
import org.opensearch.commons.alerting.model.DocLevelMonitorInput.Companion.DOC_LEVEL_INPUT_FIELD
import org.opensearch.commons.alerting.model.PPLInput.Companion.PPL_INPUT_FIELD
import org.opensearch.commons.alerting.model.SearchInput.Companion.SEARCH_FIELD
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput.Companion.REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD
Expand All @@ -20,7 +21,8 @@ interface Input : BaseModel {
CLUSTER_METRICS_INPUT(URI_FIELD),
SEARCH_INPUT(SEARCH_FIELD),
REMOTE_MONITOR_INPUT(REMOTE_MONITOR_INPUT_FIELD),
REMOTE_DOC_LEVEL_MONITOR_INPUT(REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD);
REMOTE_DOC_LEVEL_MONITOR_INPUT(REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD),
PPL_INPUT(PPL_INPUT_FIELD);

override fun toString(): String {
return value
Expand All @@ -42,8 +44,10 @@ interface Input : BaseModel {
DocLevelMonitorInput.parse(xcp)
} else if (xcp.currentName() == Type.REMOTE_MONITOR_INPUT.value) {
RemoteMonitorInput.parse(xcp)
} else {
} else if (xcp.currentName() == Type.REMOTE_DOC_LEVEL_MONITOR_INPUT.value) {
RemoteDocLevelMonitorInput.parse(xcp)
} else {
PPLInput.parseInner(xcp)
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp)
return input
Expand All @@ -58,6 +62,7 @@ interface Input : BaseModel {
Type.SEARCH_INPUT -> SearchInput(sin)
Type.REMOTE_MONITOR_INPUT -> RemoteMonitorInput(sin)
Type.REMOTE_DOC_LEVEL_MONITOR_INPUT -> RemoteDocLevelMonitorInput(sin)
Type.PPL_INPUT -> PPLInput(sin)
// This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns
// enum can be null in Java
else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger")
Expand Down
Loading
Loading