From 65caee5d75511ba9c3103bcaef1989cba3ace651 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Thu, 30 Apr 2026 15:57:35 -0700 Subject: [PATCH] PPL Alerting: PPL Dependencies and Utils Signed-off-by: Dennis Toepker --- alerting/build.gradle | 27 ++ .../org/opensearch/alerting/PPLUtils.kt | 256 ++++++++++++++ .../alerting/settings/AlertingSettings.kt | 44 +++ .../alerting/alerts/alert_mapping.json | 9 +- .../org/opensearch/alerting/PPLUtilsTests.kt | 327 ++++++++++++++++++ core/build.gradle | 51 ++- .../alerting/core/ppl/PPLPluginInterface.kt | 43 +++ .../opensearchapi/OpenSearchExtensions.kt | 15 + 8 files changed, 770 insertions(+), 2 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/PPLUtils.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/PPLUtilsTests.kt create mode 100644 core/src/main/kotlin/org/opensearch/alerting/core/ppl/PPLPluginInterface.kt diff --git a/alerting/build.gradle b/alerting/build.gradle index dd6f8ef2f..ee97eaaa1 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -156,6 +156,8 @@ dependencies { // Needed for integ tests zipArchive group: 'org.opensearch.plugin', name:'opensearch-notifications-core', version: "${opensearch_build}" zipArchive group: 'org.opensearch.plugin', name:'notifications', version: "${opensearch_build}" + zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}" + zipArchive group: 'org.opensearch.plugin', name:'opensearch-sql-plugin', version: "${opensearch_build}" // Needed for security tests if (securityEnabled) { @@ -173,7 +175,10 @@ dependencies { implementation "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}" implementation "org.jetbrains:annotations:13.0" + // SQL/PPL plugin dependencies are included in alerting-core api project(":alerting-core") + implementation 'org.json:json:20240303' + implementation "com.github.seancfoley:ipaddress:5.4.1" implementation project(path: ":alerting-spi", configuration: 'shadow') @@ -263,6 +268,28 @@ testClusters.integTest { } })) + plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.matching { + include '**/opensearch-job-scheduler*' + }.singleFile + } + } + })) + + plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.matching { + include '**/opensearch-sql-plugin*' + }.singleFile + } + } + })) + if (securityEnabled) { plugin(provider({ new RegularFile() { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/PPLUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/PPLUtils.kt new file mode 100644 index 000000000..4fc3d909c --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/PPLUtils.kt @@ -0,0 +1,256 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.json.JSONArray +import org.json.JSONObject +import org.opensearch.alerting.core.ppl.PPLPluginInterface +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.sql.plugin.transport.TransportPPLQueryRequest +import org.opensearch.transport.client.node.NodeClient + +object PPLUtils { + + private val customConditionValidationRegex = """^\s*where\s+.+""".toRegex() + + const val PPL_RESULTS_SIZE_EXCEEDED_MESSAGE = "The PPL Query results were too large and thus excluded." + + /** + * Appends a user-defined custom condition to a PPL query. + * + * This method is used exclusively for custom condition triggers. It concatenates + * the custom condition to the base PPL query using the pipe operator (|), allowing the condition + * to evaluate each query result data row. + * + * @param query The base PPL query string (e.g., "source=logs | where status=error") + * @param customCondition The custom trigger condition to append (e.g., "eval result = avg > 3") + * @return The combined PPL query with the custom condition appended + * + * @example + * ``` + * val baseQuery = "source=logs | stats max(price) as max_price by region" + * val condition = "eval result = max_price > 300" + * val result = appendCustomCondition(baseQuery, condition) + * // Returns: "source=logs | stats max(price) as max_price by region | eval result = max_price > 300" + * ``` + * + * @note This method does not validate the syntax of either the query or custom condition. + * It is assumed that upstream workflows have already validated the base query, + * and that downstream workflows will validate the constructed query + */ + fun appendCustomCondition(query: String, customCondition: String): String { + return "$query | $customCondition" + } + + /** + * Appends a limit on the number of documents/data rows to retrieve from a PPL query. + * + * This method uses the PPL `head` command to restrict the number of rows returned by + * the query. This is used to prevent memory issues and improving performance when + * only a subset of results is needed for alert evaluation. + * + * @param query The base PPL query string + * @param maxDataRows The maximum number of data rows to retrieve + * @return The PPL query with a head limit appended (e.g., "source=logs | head 1000") + * + * @example + * ``` + * val query = "source=logs | where status=error" + * val limitedQuery = appendDataRowsLimit(query, 100) + * // Returns: "source=logs | where status=error | head 100" + * ``` + */ + fun appendDataRowsLimit(query: String, maxDataRows: Long): String { + return "$query | head $maxDataRows" + } + + fun customConditionIsValid(customCondition: String): Boolean { + return customCondition.matches(customConditionValidationRegex) + } + + /** + * Executes a PPL query and returns the response as a parsable JSONObject. + * + * This method calls the PPL Plugin's Execute or Explain API via the transport layer to execute the provided query + * and parses the response into a structured JSON format suitable for trigger evaluation + * + * @param query The PPL query string to execute + * @param explain true if the query should just be explained, false if the query should be executed + * @param localNode The node within which the request will be serviced + * @param transportService The transport service used to run the request + * @return A JSONObject containing the query execution results + * + * @throws Exception if the query execution fails or the response cannot be parsed as JSON + * + * @note The response format follows the PPL plugin's Execute API response structure with + * "schema", "datarows", "total", and "size" fields. + */ + suspend fun executePplQuery( + query: String, + explain: Boolean, + client: NodeClient + ): JSONObject { + val path = if (explain) { + "/_plugins/_ppl/_explain" + } else { + "/_plugins/_ppl" + } + + // call PPL plugin to execute query + val transportPplQueryRequest = TransportPPLQueryRequest( + query, + JSONObject(mapOf("query" to query)), + path + ) + + val transportPplQueryResponse = PPLPluginInterface.suspendUntil { + this.executeQuery( + client, + transportPplQueryRequest, + it + ) + } + + val queryResponseJson = JSONObject(transportPplQueryResponse.result) + + return queryResponseJson + } + + fun capAndReformatPPLQueryResults(rawQueryResults: JSONObject, maxSize: Long): List> { + val cappedQueryResults = capPPLQueryResultsSize(rawQueryResults, maxSize).toMap() + val reformattedQueryResults = constructPPLQueryResultsMap(cappedQueryResults) + return reformattedQueryResults + } + + /** + * Caps the size of PPL query results to prevent memory issues and oversized alert payloads. + * + * Checks if the serialized query results exceed a specified size limit. If the results + * are within the limit, they are returned unchanged. If they exceed the limit, the whole response + * is replaced with an informational message while preserving the original structure of the response. + * This ensures alerts can still be created even when query results are too large. + * + * @param pplQueryResults The PPL query response JSONObject + * @param maxSize The maximum allowed size in bytes (estimated by serialized string length) + * @return The original results if under the limit, or a modified version with datarows replaced by a message + * + * @example + * ``` + * val queryResults = executePplQuery(query, client) + * val cappedResults = capPPLQueryResultsSize(queryResults, maxSize = 5000L) + * + * // If results were too large, datarows will contain: + * // [["The PPL Query results were too large and thus excluded"]] + * // But schema, total, and size fields are preserved + * ``` + * + * @note Size is estimated using `toString().length`, which approximates byte size but may + * not be exact for multi-byte characters + * @note The PPL query results structure includes: + * - `schema`: Array of objects storing data types for each column + * - `datarows`: Array of arrays containing the actual query result rows + * - `total`: Total number of result rows + * - `size`: Same as `total` (redundant field in PPL response) + */ + fun capPPLQueryResultsSize(pplQueryResults: JSONObject, maxSize: Long): JSONObject { + // estimate byte size with serialized string length + // if query results size are already under the limit, do nothing + // and return the query results as is + val pplQueryResultsSize = pplQueryResults.toString().length + if (pplQueryResultsSize <= maxSize) { + return pplQueryResults + } + + // if the query results exceed the limit, we need to replace the query results + // with a message that says the results were too large, but still retain the other + // ppl query response fields like schema, total, and size + val limitExceedMessageQueryResults = JSONObject() + + val schema = JSONArray().put(JSONObject(mapOf("name" to "message", "type" to "string"))) + val datarows = JSONArray().put(JSONArray(listOf(PPL_RESULTS_SIZE_EXCEEDED_MESSAGE))) + + limitExceedMessageQueryResults.put("schema", schema) + limitExceedMessageQueryResults.put("datarows", datarows) + limitExceedMessageQueryResults.put("total", 1) + limitExceedMessageQueryResults.put("size", 1) + + return limitExceedMessageQueryResults + } + + /** + * Transforms PPL query results from array-based format (that SQL Plugin Execute API response uses) + * to map-based format for easier template access. + * + * PPL query responses contain a `schema` array that defines field names and types, and a `datarows` array + * that contains the actual data values in positional format. This function combines them into a list of maps + * where each list element represents a row with field names as keys and corresponding values from the datarows. + * + * ### Input Format + * The input should be a PPL query result with this structure: + * ```json + * { + * "schema": [ + * {"name": "abc", "type": "string"}, + * {"name": "number", "type": "integer"} + * ], + * "datarows": [ + * ["xyz", 3], + * ["def", 5] + * ] + * } + * ``` + * + * ### Output Format + * The function returns a list where each element is a map representing a data row: + * ```json + * [ + * {"abc": "xyz", "number": 3}, + * {"abc": "def", "number": 5} + * ] + * ``` + * + * ### Edge Cases + * - If `schema` is missing or empty, returns an empty list + * - If `datarows` is missing or empty, returns an empty list + * - If a schema entry is malformed (not a map or missing "name" field), it is skipped + * - If a datarow has fewer values than schema fields, missing values are set to `null` + * - If a datarow has more values than schema fields, extra values are ignored + * - If a datarow is not a list, that row is skipped + * + * @param rawQueryResults The PPL query results map from SQL Plugin Execute API response + * containing "schema" and "datarows" fields. + * @return A list of maps where each map represents a data row with field names as keys and + * corresponding values from datarows. Returns an empty list if schema or datarows + * are missing, empty, or malformed. + * + * @see org.opensearch.alerting.script.QueryLevelTriggerExecutionContext.asTemplateArg + * @see org.opensearch.alerting.PPLUtils.executePplQuery + */ + fun constructPPLQueryResultsMap(rawQueryResults: Map): List> { + // Extract schema array + val schema = rawQueryResults["schema"] as? List<*> ?: return emptyList() + + // Extract field names from schema + val fieldNames = schema.mapNotNull { schemaEntry -> + (schemaEntry as? Map<*, *>)?.get("name") as? String + } + + if (fieldNames.isEmpty()) return emptyList() + + // Extract datarows array + val datarows = rawQueryResults["datarows"] as? List<*> ?: return emptyList() + + // Transform each row into a map + return datarows.mapNotNull { row -> + val rowList = row as? List<*> ?: return@mapNotNull null + + // Create a map from field names to values + fieldNames.mapIndexed { index, fieldName -> + fieldName to rowList.getOrNull(index) + }.toMap() + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 9e6b321a6..2fab47331 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -309,6 +309,50 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + val PPL_MONITOR_EXECUTION_MAX_DURATION = Setting.positiveTimeSetting( + "plugins.alerting.ppl_monitor_max_execution_duration", + TimeValue(1, TimeUnit.MINUTES), + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + val PPL_MAX_QUERY_LENGTH = Setting.longSetting( + "plugins.alerting.ppl_monitor_max_query_length", + 2000L, + 0L, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + // max data rows to retrieve when executing PPL query against + // SQL/PPL plugin during monitor execution + val PPL_QUERY_RESULTS_MAX_DATAROWS = Setting.longSetting( + "plugins.alerting.ppl_query_results_max_datarows", + 10000L, + 1L, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + // max size of query results to store in alerts and notifications + val PPL_QUERY_RESULTS_MAX_SIZE = Setting.longSetting( + "plugins.alerting.ppl_query_results_max_size", + 3000L, + 0L, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + val NOTIFICATION_SUBJECT_SOURCE_MAX_LENGTH = Setting.intSetting( + "plugins.alerting.notification_subject_source_max_length", + 1000, + 100, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + val NOTIFICATION_MESSAGE_SOURCE_MAX_LENGTH = Setting.intSetting( + "plugins.alerting.notification_message_source_max_length", + 3000, + 1000, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + val NOTIFICATION_CONTEXT_RESULTS_ALLOWED_ROLES: Setting> = Setting.listSetting( "plugins.alerting.notification_context_results_allowed_roles", listOf(), diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json index 76e5104cc..12a3d86d2 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json @@ -4,7 +4,7 @@ "required": true }, "_meta" : { - "schema_version": 5 + "schema_version": 6 }, "properties": { "schema_version": { @@ -177,6 +177,13 @@ "type" : "keyword" } } + }, + "query": { + "type": "text" + }, + "query_results": { + "type": "nested", + "dynamic": true } } } \ No newline at end of file diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/PPLUtilsTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/PPLUtilsTests.kt new file mode 100644 index 000000000..1ef5757ff --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/PPLUtilsTests.kt @@ -0,0 +1,327 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.opensearch.alerting.PPLUtils.PPL_RESULTS_SIZE_EXCEEDED_MESSAGE +import org.opensearch.test.OpenSearchTestCase + +class PPLUtilsTests : OpenSearchTestCase() { + fun `test constructPPLQueryResultsMap with simple types`() { + // Arrange: Simple query result with basic types + val rawResults = mapOf( + "schema" to listOf( + mapOf("name" to "username", "type" to "string"), + mapOf("name" to "count", "type" to "integer"), + mapOf("name" to "active", "type" to "boolean") + ), + "datarows" to listOf( + listOf("alice", 42, true), + listOf("bob", 17, false), + listOf("charlie", 99, true) + ), + "total" to 3, + "size" to 3 + ) + + // Act + val result = PPLUtils.constructPPLQueryResultsMap(rawResults) + + // Assert + assertEquals(3, result.size) + + // First row + assertEquals("alice", result[0]["username"]) + assertEquals(42, result[0]["count"]) + assertEquals(true, result[0]["active"]) + + // Second row + assertEquals("bob", result[1]["username"]) + assertEquals(17, result[1]["count"]) + assertEquals(false, result[1]["active"]) + + // Third row + assertEquals("charlie", result[2]["username"]) + assertEquals(99, result[2]["count"]) + assertEquals(true, result[2]["active"]) + } + + fun `test constructPPLQueryResultsMap with size exceeded message`() { + // Arrange: Simple query result with basic types + val rawResults = mapOf( + "schema" to listOf( + mapOf("name" to "message", "type" to "string") + ), + "datarows" to listOf( + listOf(PPL_RESULTS_SIZE_EXCEEDED_MESSAGE) + ), + "total" to 3, + "size" to 3 + ) + + // Act + val result = PPLUtils.constructPPLQueryResultsMap(rawResults) + + // Assert + assertEquals(1, result.size) + + // First row + assertEquals(PPL_RESULTS_SIZE_EXCEEDED_MESSAGE, result[0]["message"]) + } + + fun `test constructPPLQueryResultsMap with nested objects and nulls`() { + // Arrange: Complex query result with nested arrays, objects, and nulls + val rawResults = mapOf( + "schema" to listOf( + mapOf("name" to "list", "type" to "bigint"), + mapOf("name" to "user", "type" to "struct"), + mapOf("name" to "abc", "type" to "string") + ), + "datarows" to listOf( + // Row 1: All fields populated with complex types + listOf( + listOf(1, 2, 3), // Nested array + mapOf("name" to "bob", "age" to 32), // Nested object + "abc" // Simple string + ), + // Row 2: First field is null, rest populated + listOf( + null, // Null array + mapOf("name" to "bob", "age" to 32), // Nested object + "abc" // Simple string + ), + // Row 3: Multiple null fields + listOf( + null, // Null array + null, // Null object + "abc" // Simple string + ) + ), + "total" to 3, + "size" to 3 + ) + + // Act + val result = PPLUtils.constructPPLQueryResultsMap(rawResults) + + // Assert + assertEquals(3, result.size) + + // Row 1: Nested array and nested object + val row1 = result[0] + val list1 = row1["list"] as? List<*> + assertNotNull(list1) + assertEquals(3, list1?.size) + assertEquals(1, list1?.get(0)) + assertEquals(2, list1?.get(1)) + assertEquals(3, list1?.get(2)) + + val user1 = row1["user"] as? Map<*, *> + assertNotNull(user1) + assertEquals("bob", user1?.get("name")) + assertEquals(32, user1?.get("age")) + + assertEquals("abc", row1["abc"]) + + // Row 2: Null list, populated user + val row2 = result[1] + assertNull(row2["list"]) + + val user2 = row2["user"] as? Map<*, *> + assertNotNull(user2) + assertEquals("bob", user2?.get("name")) + assertEquals(32, user2?.get("age")) + + assertEquals("abc", row2["abc"]) + + // Row 3: Multiple nulls + val row3 = result[2] + assertNull(row3["list"]) + assertNull(row3["user"]) + assertEquals("abc", row3["abc"]) + } + + fun `test constructPPLQueryResultsMap with empty schema`() { + // Arrange: Empty schema + val rawResults = mapOf( + "schema" to emptyList>(), + "datarows" to listOf( + listOf("value1", "value2") + ), + "total" to 1, + "size" to 1 + ) + + // Act + val result = PPLUtils.constructPPLQueryResultsMap(rawResults) + + // Assert + assertTrue(result.isEmpty()) + } + + fun `test constructPPLQueryResultsMap with empty datarows`() { + // Arrange: Empty datarows + val rawResults = mapOf( + "schema" to listOf( + mapOf("name" to "field1", "type" to "string"), + mapOf("name" to "field2", "type" to "integer") + ), + "datarows" to emptyList>(), + "total" to 0, + "size" to 0 + ) + + // Act + val result = PPLUtils.constructPPLQueryResultsMap(rawResults) + + // Assert + assertTrue(result.isEmpty()) + } + + fun `test constructPPLQueryResultsMap with missing schema`() { + // Arrange: Missing schema field + val rawResults = mapOf( + "datarows" to listOf( + listOf("value1", "value2") + ), + "total" to 1, + "size" to 1 + ) + + // Act + val result = PPLUtils.constructPPLQueryResultsMap(rawResults) + + // Assert + assertTrue(result.isEmpty()) + } + + fun `test constructPPLQueryResultsMap with missing datarows`() { + // Arrange: Missing datarows field + val rawResults = mapOf( + "schema" to listOf( + mapOf("name" to "field1", "type" to "string") + ), + "total" to 0, + "size" to 0 + ) + + // Act + val result = PPLUtils.constructPPLQueryResultsMap(rawResults) + + // Assert + assertTrue(result.isEmpty()) + } + + fun `test constructPPLQueryResultsMap with mismatched row lengths`() { + // Arrange: More schema fields than datarow values + val rawResults = mapOf( + "schema" to listOf( + mapOf("name" to "field1", "type" to "string"), + mapOf("name" to "field2", "type" to "integer"), + mapOf("name" to "field3", "type" to "boolean") + ), + "datarows" to listOf( + listOf("value1", 42), // Missing third field + listOf("value2") // Missing second and third fields + ), + "total" to 2, + "size" to 2 + ) + + // Act + val result = PPLUtils.constructPPLQueryResultsMap(rawResults) + + // Assert + assertEquals(2, result.size) + + // First row: third field should be null + assertEquals("value1", result[0]["field1"]) + assertEquals(42, result[0]["field2"]) + assertNull(result[0]["field3"]) + + // Second row: second and third fields should be null + assertEquals("value2", result[1]["field1"]) + assertNull(result[1]["field2"]) + assertNull(result[1]["field3"]) + } + + fun `test constructPPLQueryResultsMap with extra datarow values`() { + // Arrange: More datarow values than schema fields + val rawResults = mapOf( + "schema" to listOf( + mapOf("name" to "field1", "type" to "string"), + mapOf("name" to "field2", "type" to "integer") + ), + "datarows" to listOf( + listOf("value1", 42, true, "extra") // Extra values ignored + ), + "total" to 1, + "size" to 1 + ) + + // Act + val result = PPLUtils.constructPPLQueryResultsMap(rawResults) + + // Assert + assertEquals(1, result.size) + assertEquals(2, result[0].size) + assertEquals("value1", result[0]["field1"]) + assertEquals(42, result[0]["field2"]) + // Extra values are ignored + } + + fun `test constructPPLQueryResultsMap with deeply nested structures`() { + // Arrange: Deeply nested objects and arrays + val rawResults = mapOf( + "schema" to listOf( + mapOf("name" to "nested_data", "type" to "struct") + ), + "datarows" to listOf( + listOf( + mapOf( + "level1" to mapOf( + "level2" to mapOf( + "level3" to listOf(1, 2, 3) + ) + ), + "array_of_objects" to listOf( + mapOf("id" to 1, "name" to "item1"), + mapOf("id" to 2, "name" to "item2") + ) + ) + ) + ), + "total" to 1, + "size" to 1 + ) + + // Act + val result = PPLUtils.constructPPLQueryResultsMap(rawResults) + + // Assert + assertEquals(1, result.size) + + val nestedData = result[0]["nested_data"] as? Map<*, *> + assertNotNull(nestedData) + + val level1 = nestedData?.get("level1") as? Map<*, *> + assertNotNull(level1) + + val level2 = level1?.get("level2") as? Map<*, *> + assertNotNull(level2) + + val level3 = level2?.get("level3") as? List<*> + assertNotNull(level3) + assertEquals(3, level3?.size) + + val arrayOfObjects = nestedData?.get("array_of_objects") as? List<*> + assertNotNull(arrayOfObjects) + assertEquals(2, arrayOfObjects?.size) + + val firstObject = arrayOfObjects?.get(0) as? Map<*, *> + assertEquals(1, firstObject?.get("id")) + assertEquals("item1", firstObject?.get("name")) + } +} diff --git a/core/build.gradle b/core/build.gradle index cfce74c42..9aad7da88 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -8,7 +8,8 @@ apply plugin: 'opensearch.java-rest-test' apply plugin: 'org.jetbrains.kotlin.jvm' apply plugin: 'jacoco' -configurations{ +configurations { + zipArchive all { resolutionStrategy { // force commons-beanutils to a non-vulnerable version @@ -17,6 +18,18 @@ configurations{ } } +def sqlJarDirectory = "$buildDir/dependencies/opensearch-sql-plugin" + +task addJarsToClasspath(type: Copy) { + from(fileTree(dir: sqlJarDirectory)) { + include "opensearch-sql-${opensearch_build}.jar" + include "ppl-${opensearch_build}.jar" + include "protocol-${opensearch_build}.jar" + include "core-${opensearch_build}.jar" + } + into("$buildDir/classes") +} + dependencies { compileOnly "org.opensearch:opensearch:${opensearch_version}" implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" @@ -26,8 +39,44 @@ dependencies { api "org.opensearch.client:opensearch-rest-client:${opensearch_version}" api "org.opensearch:common-utils:${common_utils_version}@jar" implementation 'commons-validator:commons-validator:1.7' + implementation 'org.json:json:20240303' + + api fileTree(dir: sqlJarDirectory, include: ["opensearch-sql-thin-${opensearch_build}.jar", "ppl-${opensearch_build}.jar", "protocol-${opensearch_build}.jar", "core-${opensearch_build}.jar"]) + + zipArchive group: 'org.opensearch.plugin', name:'opensearch-sql-plugin', version: "${opensearch_build}" testImplementation "org.opensearch.test:framework:${opensearch_version}" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" testImplementation "org.jetbrains.kotlin:kotlin-test-junit:${kotlin_version}" } + +task extractSqlJar(type: Copy) { + mustRunAfter() + from(zipTree(configurations.zipArchive.find { it.name.startsWith("opensearch-sql-plugin") })) + into sqlJarDirectory +} + +task extractSqlClass(type: Copy, dependsOn: [extractSqlJar]) { + from zipTree("${sqlJarDirectory}/opensearch-sql-${opensearch_build}.jar") + into("$buildDir/opensearch-sql") + include 'org/opensearch/sql/**' +} + +task replaceSqlJar(type: Jar, dependsOn: [extractSqlClass]) { + from("$buildDir/opensearch-sql") + archiveFileName = "opensearch-sql-thin-${opensearch_build}.jar" + destinationDirectory = file(sqlJarDirectory) + doLast { + file("${sqlJarDirectory}/opensearch-sql-${opensearch_build}.jar").delete() + } +} + +tasks.addJarsToClasspath.dependsOn(replaceSqlJar) + +compileJava { + dependsOn addJarsToClasspath +} + +compileKotlin { + dependsOn addJarsToClasspath +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/ppl/PPLPluginInterface.kt b/core/src/main/kotlin/org/opensearch/alerting/core/ppl/PPLPluginInterface.kt new file mode 100644 index 000000000..8ff15165c --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/ppl/PPLPluginInterface.kt @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.core.ppl + +import org.opensearch.commons.utils.recreateObject +import org.opensearch.core.action.ActionListener +import org.opensearch.core.action.ActionResponse +import org.opensearch.sql.plugin.transport.PPLQueryAction +import org.opensearch.sql.plugin.transport.TransportPPLQueryRequest +import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse +import org.opensearch.transport.client.node.NodeClient + +/** + * Transport action plugin interfaces for the SQL/PPL plugin + */ +@Suppress("UNCHECKED_CAST") +object PPLPluginInterface { + fun executeQuery( + client: NodeClient, + request: TransportPPLQueryRequest, + listener: ActionListener, + ) { + val wrappedListener = object : ActionListener { + override fun onResponse(response: ActionResponse) { + val recreated = recreateObject(response) { TransportPPLQueryResponse(it) } + listener.onResponse(recreated) + } + + override fun onFailure(exception: Exception) { + listener.onFailure(exception) + } + } as ActionListener + + client.execute( + PPLQueryAction.INSTANCE, + request, + wrappedListener + ) + } +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt index 582d13fbe..fd500ef1d 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt @@ -14,6 +14,7 @@ import org.opensearch.OpenSearchException import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.search.SearchResponse import org.opensearch.action.search.ShardSearchFailure +import org.opensearch.alerting.core.ppl.PPLPluginInterface import org.opensearch.common.settings.Settings import org.opensearch.common.util.concurrent.ThreadContext import org.opensearch.common.xcontent.XContentHelper @@ -170,6 +171,20 @@ suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPl }) } +/** + * Converts [PPLPluginInterface] methods that take a callback into a kotlin suspending function. + * + * @param block - a block of code that is passed an [ActionListener] that should be passed to the PPLPluginInterface API. + */ +suspend fun PPLPluginInterface.suspendUntil(block: PPLPluginInterface.(ActionListener) -> Unit): T = + suspendCoroutine { cont -> + block(object : ActionListener { + override fun onResponse(response: T) = cont.resume(response) + + override fun onFailure(e: Exception) = cont.resumeWithException(e) + }) + } + class InjectorContextElement( id: String, settings: Settings,