Skip to content
Closed

Ppl v1 #2112

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 126 additions & 99 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ dependencies {
// Needed for integ tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-notifications-core', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'notifications', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-sql-plugin', version: "${opensearch_build}"

// Needed for security tests
if (securityEnabled) {
Expand All @@ -173,7 +175,10 @@ dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
implementation "org.jetbrains:annotations:13.0"

// SQL/PPL plugin dependencies are included in alerting-core
api project(":alerting-core")
implementation 'org.json:json:20240303'

implementation "com.github.seancfoley:ipaddress:5.4.1"
implementation project(path: ":alerting-spi", configuration: 'shadow')

Expand Down Expand Up @@ -253,6 +258,28 @@ testClusters.integTest {
}
}))

plugin(provider({
new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.matching {
include '**/opensearch-job-scheduler*'
}.singleFile
}
}
}))

plugin(provider({
new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.matching {
include '**/opensearch-sql-plugin*'
}.singleFile
}
}
}))

if (securityEnabled) {
plugin(provider({
new RegularFile() {
Expand Down Expand Up @@ -418,105 +445,105 @@ task prepareBwcTests {
}
}

// Create two test clusters with 3 nodes of the old version
2.times {i ->
task "${baseName}#oldVersionClusterTask$i"(type: StandaloneRestIntegTestTask) {
dependsOn 'prepareBwcTests'
useCluster testClusters."${baseName}$i"
filter {
includeTestsMatching "org.opensearch.alerting.bwc.*IT"
}
systemProperty 'tests.rest.bwcsuite', 'old_cluster'
systemProperty 'tests.rest.bwcsuite_round', 'old'
systemProperty 'tests.plugin_bwc_version', bwcVersion
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}$i".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}$i".getName()}")
}
}

// Upgrade one node of the old cluster to new OpenSearch version with upgraded plugin version.
// This results in a mixed cluster with 2 nodes on the old version and 1 upgraded node.
// This is also used as a one third upgraded cluster for a rolling upgrade.
task "${baseName}#mixedClusterTask"(type: StandaloneRestIntegTestTask) {
useCluster testClusters."${baseName}0"
dependsOn "${baseName}#oldVersionClusterTask0"
doFirst {
testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins)
}
filter {
includeTestsMatching "org.opensearch.alerting.bwc.*IT"
}
systemProperty 'tests.rest.bwcsuite', 'mixed_cluster'
systemProperty 'tests.rest.bwcsuite_round', 'first'
systemProperty 'tests.plugin_bwc_version', bwcVersion
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
}

// Upgrade the second node to new OpenSearch version with upgraded plugin version after the first node is upgraded.
// This results in a mixed cluster with 1 node on the old version and 2 upgraded nodes.
// This is used for rolling upgrade.
task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTask) {
dependsOn "${baseName}#mixedClusterTask"
useCluster testClusters."${baseName}0"
doFirst {
testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins)
}
filter {
includeTestsMatching "org.opensearch.alerting.bwc.*IT"
}
systemProperty 'tests.rest.bwcsuite', 'mixed_cluster'
systemProperty 'tests.rest.bwcsuite_round', 'second'
systemProperty 'tests.plugin_bwc_version', bwcVersion
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
}

// Upgrade the third node to new OpenSearch version with upgraded plugin version after the second node is upgraded.
// This results in a fully upgraded cluster.
// This is used for rolling upgrade.
task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask) {
dependsOn "${baseName}#twoThirdsUpgradedClusterTask"
useCluster testClusters."${baseName}0"
doFirst {
testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins)
}
filter {
includeTestsMatching "org.opensearch.alerting.bwc.*IT"
}
mustRunAfter "${baseName}#mixedClusterTask"
systemProperty 'tests.rest.bwcsuite', 'mixed_cluster'
systemProperty 'tests.rest.bwcsuite_round', 'third'
systemProperty 'tests.plugin_bwc_version', bwcVersion
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
}

// Upgrade all the nodes of the old cluster to new OpenSearch version with upgraded plugin version
// at the same time resulting in a fully upgraded cluster.
task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) {
dependsOn "${baseName}#oldVersionClusterTask1"
useCluster testClusters."${baseName}1"
doFirst {
testClusters."${baseName}1".upgradeAllNodesAndPluginsToNextVersion(plugins)
}
filter {
includeTestsMatching "org.opensearch.alerting.bwc.*IT"
}
systemProperty 'tests.rest.bwcsuite', 'upgraded_cluster'
systemProperty 'tests.plugin_bwc_version', bwcVersion
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}1".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}1".getName()}")
}

// A bwc test suite which runs all the bwc tasks combined
task bwcTestSuite(type: StandaloneRestIntegTestTask) {
exclude '**/*Test*'
exclude '**/*IT*'
dependsOn tasks.named("${baseName}#mixedClusterTask")
dependsOn tasks.named("${baseName}#rollingUpgradeClusterTask")
dependsOn tasks.named("${baseName}#fullRestartClusterTask")
}
//// Create two test clusters with 3 nodes of the old version
//2.times {i ->
// task "${baseName}#oldVersionClusterTask$i"(type: StandaloneRestIntegTestTask) {
// dependsOn 'prepareBwcTests'
// useCluster testClusters."${baseName}$i"
// filter {
// includeTestsMatching "org.opensearch.alerting.bwc.*IT"
// }
// systemProperty 'tests.rest.bwcsuite', 'old_cluster'
// systemProperty 'tests.rest.bwcsuite_round', 'old'
// systemProperty 'tests.plugin_bwc_version', bwcVersion
// nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}$i".allHttpSocketURI.join(",")}")
// nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}$i".getName()}")
// }
//}
//
//// Upgrade one node of the old cluster to new OpenSearch version with upgraded plugin version.
//// This results in a mixed cluster with 2 nodes on the old version and 1 upgraded node.
//// This is also used as a one third upgraded cluster for a rolling upgrade.
//task "${baseName}#mixedClusterTask"(type: StandaloneRestIntegTestTask) {
// useCluster testClusters."${baseName}0"
// dependsOn "${baseName}#oldVersionClusterTask0"
// doFirst {
// testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins)
// }
// filter {
// includeTestsMatching "org.opensearch.alerting.bwc.*IT"
// }
// systemProperty 'tests.rest.bwcsuite', 'mixed_cluster'
// systemProperty 'tests.rest.bwcsuite_round', 'first'
// systemProperty 'tests.plugin_bwc_version', bwcVersion
// nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
// nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
//}
//
//// Upgrade the second node to new OpenSearch version with upgraded plugin version after the first node is upgraded.
//// This results in a mixed cluster with 1 node on the old version and 2 upgraded nodes.
//// This is used for rolling upgrade.
//task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTask) {
// dependsOn "${baseName}#mixedClusterTask"
// useCluster testClusters."${baseName}0"
// doFirst {
// testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins)
// }
// filter {
// includeTestsMatching "org.opensearch.alerting.bwc.*IT"
// }
// systemProperty 'tests.rest.bwcsuite', 'mixed_cluster'
// systemProperty 'tests.rest.bwcsuite_round', 'second'
// systemProperty 'tests.plugin_bwc_version', bwcVersion
// nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
// nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
//}
//
//// Upgrade the third node to new OpenSearch version with upgraded plugin version after the second node is upgraded.
//// This results in a fully upgraded cluster.
//// This is used for rolling upgrade.
//task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask) {
// dependsOn "${baseName}#twoThirdsUpgradedClusterTask"
// useCluster testClusters."${baseName}0"
// doFirst {
// testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins)
// }
// filter {
// includeTestsMatching "org.opensearch.alerting.bwc.*IT"
// }
// mustRunAfter "${baseName}#mixedClusterTask"
// systemProperty 'tests.rest.bwcsuite', 'mixed_cluster'
// systemProperty 'tests.rest.bwcsuite_round', 'third'
// systemProperty 'tests.plugin_bwc_version', bwcVersion
// nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
// nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
//}
//
//// Upgrade all the nodes of the old cluster to new OpenSearch version with upgraded plugin version
//// at the same time resulting in a fully upgraded cluster.
//task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) {
// dependsOn "${baseName}#oldVersionClusterTask1"
// useCluster testClusters."${baseName}1"
// doFirst {
// testClusters."${baseName}1".upgradeAllNodesAndPluginsToNextVersion(plugins)
// }
// filter {
// includeTestsMatching "org.opensearch.alerting.bwc.*IT"
// }
// systemProperty 'tests.rest.bwcsuite', 'upgraded_cluster'
// systemProperty 'tests.plugin_bwc_version', bwcVersion
// nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}1".allHttpSocketURI.join(",")}")
// nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}1".getName()}")
//}
//
//// A bwc test suite which runs all the bwc tasks combined
//task bwcTestSuite(type: StandaloneRestIntegTestTask) {
// exclude '**/*Test*'
// exclude '**/*IT*'
// dependsOn tasks.named("${baseName}#mixedClusterTask")
// dependsOn tasks.named("${baseName}#rollingUpgradeClusterTask")
// dependsOn tasks.named("${baseName}#fullRestartClusterTask")
//}

run {
doFirst {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ import org.opensearch.commons.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.NoOpTrigger
import org.opensearch.commons.alerting.model.PPLInput
import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.commons.alerting.util.isPPLMonitor
import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.bytes.BytesReference
import org.opensearch.core.rest.RestStatus
Expand Down Expand Up @@ -201,6 +203,10 @@ class AlertService(
}
}

// populate PPL Monitor specific fields
val query = if (ctx.monitor.isPPLMonitor()) (ctx.monitor.inputs[0] as PPLInput).query else null
val queryResults = if (ctx.monitor.isPPLMonitor()) ctx.pplQueryResults else emptyList()

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
Expand All @@ -211,7 +217,9 @@ class AlertService(
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
clusters = triggeredClusters
clusters = triggeredClusters,
query = query,
queryResults = queryResults
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
Expand All @@ -224,7 +232,9 @@ class AlertService(
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
clusters = triggeredClusters
clusters = triggeredClusters,
query = query,
queryResults = queryResults
)
} else {
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Expand All @@ -237,7 +247,7 @@ class AlertService(
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId,
workflowId = workflorwRunContext?.workflowId ?: "",
clusters = triggeredClusters
clusters = triggeredClusters, query = query, queryResults = queryResults
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocumentLevelTrigger
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.PPLInput
import org.opensearch.commons.alerting.model.PPLTrigger
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
Expand Down Expand Up @@ -207,6 +209,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
nodesInCluster: Supplier<DiscoveryNodes>
): List<RestHandler> {
return listOf(
// Alerting V1
RestGetMonitorAction(),
RestDeleteMonitorAction(),
RestIndexMonitorAction(),
Expand Down Expand Up @@ -236,6 +239,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R

override fun getActions(): List<ActionPlugin.ActionHandler<out ActionRequest, out ActionResponse>> {
return listOf(
// Alerting V1
ActionPlugin.ActionHandler(ScheduledJobsStatsAction.INSTANCE, ScheduledJobsStatsTransportAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_MONITOR_ACTION_TYPE, TransportIndexMonitorAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_MONITOR_ACTION_TYPE, TransportGetMonitorAction::class.java),
Expand All @@ -262,7 +266,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.DELETE_COMMENT_ACTION_TYPE, TransportDeleteAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java)
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java),
)
}

Expand All @@ -271,12 +275,14 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
Monitor.XCONTENT_REGISTRY,
SearchInput.XCONTENT_REGISTRY,
DocLevelMonitorInput.XCONTENT_REGISTRY,
PPLInput.XCONTENT_REGISTRY,
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY,
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY,
ChainedAlertTrigger.XCONTENT_REGISTRY,
RemoteMonitorTrigger.XCONTENT_REGISTRY,
PPLTrigger.XCONTENT_REGISTRY,
Workflow.XCONTENT_REGISTRY
)
}
Expand Down Expand Up @@ -468,7 +474,13 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.REMOTE_METADATA_ENDPOINT,
AlertingSettings.REMOTE_METADATA_REGION,
AlertingSettings.REMOTE_METADATA_SERVICE_NAME,
AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED
AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED,
AlertingSettings.PPL_MONITOR_EXECUTION_MAX_DURATION,
AlertingSettings.PPL_MAX_QUERY_LENGTH,
AlertingSettings.PPL_QUERY_RESULTS_MAX_DATAROWS,
AlertingSettings.PPL_QUERY_RESULTS_MAX_SIZE,
AlertingSettings.NOTIFICATION_SUBJECT_SOURCE_MAX_LENGTH,
AlertingSettings.NOTIFICATION_MESSAGE_SOURCE_MAX_LENGTH,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean,
manual: Boolean,
workflowRunContext: WorkflowRunContext?,
executionId: String,
transportService: TransportService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean,
manual: Boolean,
workflowRunContext: WorkflowRunContext?,
executionId: String,
transportService: TransportService
Expand Down
Loading
Loading