From 29bf942ab558d6e9b11afc31d35a044653a7dd23 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 19 Feb 2026 09:53:17 -0800 Subject: [PATCH 1/3] adds periodic cleanup job to delete doc level monitor percolate query indices Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/AlertingPlugin.kt | 5 + .../opensearch/alerting/QueryIndexCleanup.kt | 459 ++++++++++++++++++ .../alerting/settings/AlertingSettings.kt | 14 + .../alerting/util/ValidationHelpers.kt | 78 +++ .../alerting/QueryIndexCleanupIT.kt | 295 +++++++++++ 5 files changed, 851 insertions(+) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/QueryIndexCleanup.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/util/ValidationHelpers.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/QueryIndexCleanupIT.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 3dc2cd571..f9fcfa74b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -180,6 +180,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R lateinit var docLevelMonitorQueries: DocLevelMonitorQueries lateinit var threadPool: ThreadPool lateinit var alertIndices: AlertIndices + lateinit var queryIndexCleanup: QueryIndexCleanup lateinit var clusterService: ClusterService lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator var monitorTypeToMonitorRunners: MutableMap = mutableMapOf() @@ -285,6 +286,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R val settings = environment.settings() val lockService = LockService(client, clusterService) alertIndices = AlertIndices(settings, client, threadPool, clusterService) + queryIndexCleanup = QueryIndexCleanup(settings, client, threadPool, clusterService) val alertService = AlertService(client, xContentRegistry, alertIndices) val triggerService = TriggerService(scriptService) runner = MonitorRunnerService @@ -348,6 +350,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R scheduledJobIndices, commentsIndices, docLevelMonitorQueries, + queryIndexCleanup, destinationMigrationCoordinator, lockService, alertService, @@ -422,6 +425,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE, AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD, + AlertingSettings.QUERY_INDEX_CLEANUP_ENABLED, + AlertingSettings.QUERY_INDEX_CLEANUP_PERIOD, AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE, AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED, AlertingSettings.ALERTING_COMMENTS_ENABLED, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryIndexCleanup.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryIndexCleanup.kt new file mode 100644 index 000000000..bd06952aa --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryIndexCleanup.kt @@ -0,0 +1,459 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.IndicesOptions +import org.opensearch.action.support.clustermanager.AcknowledgedResponse +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings.Companion.QUERY_INDEX_CLEANUP_ENABLED +import org.opensearch.alerting.settings.AlertingSettings.Companion.QUERY_INDEX_CLEANUP_PERIOD +import org.opensearch.cluster.ClusterChangedEvent +import org.opensearch.cluster.ClusterStateListener +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.MonitorMetadata +import org.opensearch.core.action.ActionListener +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.index.query.QueryBuilders +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.threadpool.Scheduler.Cancellable +import org.opensearch.threadpool.ThreadPool +import org.opensearch.transport.client.Client + +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + +private const val SEARCH_QUERY_RESULT_SIZE = 10000 +private const val METADATA_FIELD = "metadata" + +class QueryIndexCleanup( + settings: Settings, + private val client: Client, + private val threadPool: ThreadPool, + private val clusterService: ClusterService, +) : ClusterStateListener { + + private val logger = LogManager.getLogger(javaClass) + + @Volatile private var queryIndexCleanupEnabled = QUERY_INDEX_CLEANUP_ENABLED.get(settings) + @Volatile private var queryIndexCleanupPeriod = QUERY_INDEX_CLEANUP_PERIOD.get(settings) + + private var scheduledCleanup: Cancellable? = null + private var isClusterManager = false + + init { + clusterService.addListener(this) + clusterService.clusterSettings.addSettingsUpdateConsumer(QUERY_INDEX_CLEANUP_ENABLED) { + queryIndexCleanupEnabled = it + } + clusterService.clusterSettings.addSettingsUpdateConsumer(QUERY_INDEX_CLEANUP_PERIOD) { + queryIndexCleanupPeriod = it + rescheduleCleanup() + } + } + + fun onClusterManager() { + try { + scheduledCleanup = threadPool.scheduleWithFixedDelay( + { cleanupQueryIndices() }, + queryIndexCleanupPeriod, + ThreadPool.Names.MANAGEMENT + ) + logger.info("Query index cleanup scheduled with period: $queryIndexCleanupPeriod") + } catch (e: Exception) { + logger.error("Error scheduling query index cleanup", e) + } + } + + fun offClusterManager() { + scheduledCleanup?.cancel() + logger.info("Query index cleanup cancelled") + } + + override fun clusterChanged(event: ClusterChangedEvent) { + if (this.isClusterManager != event.localNodeClusterManager()) { + this.isClusterManager = event.localNodeClusterManager() + if (this.isClusterManager) { + onClusterManager() + } else { + offClusterManager() + } + } + } + + private fun rescheduleCleanup() { + if (clusterService.state().nodes.isLocalNodeElectedClusterManager) { + scheduledCleanup?.cancel() + scheduledCleanup = threadPool.scheduleWithFixedDelay( + { cleanupQueryIndices() }, + queryIndexCleanupPeriod, + ThreadPool.Names.MANAGEMENT + ) + logger.info("Query index cleanup rescheduled with period: $queryIndexCleanupPeriod") + } + } + + private fun cleanupQueryIndices() { + if (!queryIndexCleanupEnabled) { + logger.debug("Query index cleanup is disabled") + return + } + + scope.launch { + try { + logger.info("Starting query index cleanup") + val startTime = System.currentTimeMillis() + + val allMetadata = fetchAllMonitorMetadata() + val queryIndexUsageMap = buildQueryIndexUsageMap(allMetadata) + val indicesToDelete = determineIndicesToDelete(queryIndexUsageMap) + + if (indicesToDelete.isNotEmpty()) { + cleanupMetadataMappings(allMetadata, indicesToDelete) + deleteQueryIndices(indicesToDelete) + } else { + logger.info("No query indices eligible for deletion") + } + + val duration = System.currentTimeMillis() - startTime + logger.info("Query index cleanup completed in ${duration}ms") + } catch (e: Exception) { + logger.error("Error during query index cleanup", e) + } + } + } + + private suspend fun fetchAllMonitorMetadata(): List { + val configIndex = ".opendistro-alerting-config" + + // Check if index exists first + val indexExists = try { + val response: IndicesExistsResponse = client.suspendUntil { + admin().indices().exists(IndicesExistsRequest(configIndex), it) + } + response.isExists + } catch (e: Exception) { + logger.warn("Failed to check if config index exists", e) + false + } + + if (!indexExists) { + logger.info("Config index does not exist yet, skipping metadata fetch") + return emptyList() + } + + val searchRequest = SearchRequest(configIndex) + .source( + SearchSourceBuilder() + .query(QueryBuilders.existsQuery(METADATA_FIELD)) + .size(SEARCH_QUERY_RESULT_SIZE) + ) + .indicesOptions(IndicesOptions.lenientExpandOpen()) + + return try { + val response: SearchResponse = client.suspendUntil { + search(searchRequest, it) + } + + logger.info("Metadata query returned ${response.hits.hits.size} documents") + + response.hits.hits.mapNotNull { hit -> + try { + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + MonitorMetadata.parse(xcp, hit.id, hit.seqNo, hit.primaryTerm) + } catch (e: Exception) { + logger.warn("Failed to parse monitor metadata: ${hit.id}", e) + null + } + } + } catch (e: Exception) { + logger.error("Failed to fetch monitor metadata", e) + emptyList() + } + } + + data class QueryIndexUsage( + val concreteQueryIndex: String, + val aliasName: String, + val isWriteIndex: Boolean, + val monitorUsages: MutableMap>, + ) + + private suspend fun buildQueryIndexUsageMap(allMetadata: List): Map { + val queryIndexUsageMap = mutableMapOf() + + for (metadata in allMetadata) { + val monitorId = extractMonitorId(metadata.id) + val monitor = getMonitor(monitorId) + + if (monitor == null) { + for ((sourceKey, concreteQueryIndex) in metadata.sourceToQueryIndexMapping) { + val aliasName = extractAliasFromConcreteIndex(concreteQueryIndex) + val isWriteIndex = checkIfWriteIndex(aliasName, concreteQueryIndex) + + queryIndexUsageMap.getOrPut(concreteQueryIndex) { + QueryIndexUsage(concreteQueryIndex, aliasName, isWriteIndex, mutableMapOf()) + }.monitorUsages.getOrPut(monitorId) { mutableSetOf() } + } + continue + } + + for ((sourceKey, concreteQueryIndex) in metadata.sourceToQueryIndexMapping) { + val sourceIndexName = extractSourceIndexName(sourceKey, monitorId) + val aliasName = monitor.dataSources.queryIndex + val isWriteIndex = checkIfWriteIndex(aliasName, concreteQueryIndex) + + logger.info("Monitor $monitorId: sourceKey=$sourceKey, extracted=$sourceIndexName, queryIndex=$concreteQueryIndex") + + queryIndexUsageMap.getOrPut(concreteQueryIndex) { + QueryIndexUsage(concreteQueryIndex, aliasName, isWriteIndex, mutableMapOf()) + }.monitorUsages.getOrPut(monitorId) { mutableSetOf() }.add(sourceIndexName) + } + } + + return queryIndexUsageMap + } + + private fun determineIndicesToDelete(queryIndexUsageMap: Map): List { + val concreteIndicesByAlias = queryIndexUsageMap.values.groupBy { it.aliasName } + val indicesToDelete = mutableListOf() + + logger.info("Determining indices to delete. Total aliases: ${concreteIndicesByAlias.size}") + + for ((aliasName, concreteIndices) in concreteIndicesByAlias) { + // Get ALL backing indices from cluster state, not just ones in metadata + val allBackingIndices = try { + val aliasMetadata = clusterService.state().metadata.indicesLookup[aliasName] + aliasMetadata?.indices?.map { it.index.name } ?: emptyList() + } catch (e: Exception) { + logger.warn("Failed to get backing indices for alias $aliasName", e) + emptyList() + } + + logger.info("Processing alias $aliasName: ${concreteIndices.size} in metadata, ${allBackingIndices.size} in cluster") + + // Never delete if there's only one backing index in the cluster + if (allBackingIndices.size == 1) { + logger.info("Retaining only backing index for alias $aliasName: ${allBackingIndices.first()}") + continue + } + + val sortedIndices = concreteIndices.sortedBy { extractIndexNumber(it.concreteQueryIndex) } + + // Determine latest index from ALL backing indices in cluster, not just metadata + val latestIndexInCluster = allBackingIndices.maxByOrNull { extractIndexNumber(it) } + + // Check if alias still exists + val aliasExists = clusterService.state().metadata().indicesLookup?.get(aliasName) != null + if (!aliasExists) { + logger.debug("Alias $aliasName no longer exists, skipping all indices") + continue + } + + for (queryIndexInfo in sortedIndices) { + // Re-check write index status at deletion time + val isCurrentlyWriteIndex = checkIfWriteIndex(queryIndexInfo.aliasName, queryIndexInfo.concreteQueryIndex) + if (isCurrentlyWriteIndex) { + logger.info("Retaining write index: ${queryIndexInfo.concreteQueryIndex}") + continue + } + + // Don't delete the latest index in the cluster + if (queryIndexInfo.concreteQueryIndex == latestIndexInCluster) { + logger.info("Retaining latest index in cluster: ${queryIndexInfo.concreteQueryIndex}") + continue + } + + var hasActiveUsage = false + var retentionReason: String? = null + + for ((monitorId, sourceIndices) in queryIndexInfo.monitorUsages) { + if (sourceIndices.isEmpty()) continue + + logger.info("Checking sources for monitor $monitorId: $sourceIndices") + + val firstExistingIndex = sourceIndices.firstOrNull { indexExists(it) } + if (firstExistingIndex != null) { + hasActiveUsage = true + retentionReason = "Source index $firstExistingIndex still exists (monitor: $monitorId)" + break + } else { + logger.info("All source indices deleted for monitor $monitorId: $sourceIndices") + } + } + + if (!hasActiveUsage) { + indicesToDelete.add(queryIndexInfo.concreteQueryIndex) + logger.info("Marking for deletion: ${queryIndexInfo.concreteQueryIndex}") + } else { + logger.debug("Retaining ${queryIndexInfo.concreteQueryIndex}: $retentionReason") + } + } + } + + val retainedCount = queryIndexUsageMap.size - indicesToDelete.size + logger.info("Query index cleanup summary: ${indicesToDelete.size} to delete, $retainedCount retained") + return indicesToDelete + } + + private suspend fun cleanupMetadataMappings(allMetadata: List, indicesToDelete: List) { + for (metadata in allMetadata) { + val monitorId = extractMonitorId(metadata.id) + val entriesToRemove = mutableListOf() + + for ((sourceKey, concreteQueryIndex) in metadata.sourceToQueryIndexMapping) { + val sourceIndexName = extractSourceIndexName(sourceKey, monitorId) + + if (!indexExists(sourceIndexName) || indicesToDelete.contains(concreteQueryIndex)) { + entriesToRemove.add(sourceKey) + } + } + + if (entriesToRemove.isNotEmpty()) { + val updatedMapping = metadata.sourceToQueryIndexMapping.toMutableMap() + entriesToRemove.forEach { updatedMapping.remove(it) } + + try { + MonitorMetadataService.upsertMetadata( + metadata.copy(sourceToQueryIndexMapping = updatedMapping), + true + ) + logger.debug("Cleaned up ${entriesToRemove.size} entries from metadata: ${metadata.id}") + } catch (e: Exception) { + logger.error("Failed to update metadata: ${metadata.id}", e) + } + } + } + } + + private fun deleteQueryIndices(indicesToDelete: List) { + if (indicesToDelete.isEmpty()) return + + logger.info("Deleting query indices: $indicesToDelete") + + // Filter to only indices that actually exist + val existingIndices = indicesToDelete.filter { indexExists(it) } + if (existingIndices.isEmpty()) { + logger.info("No existing indices to delete") + return + } + + val deleteIndexRequest = DeleteIndexRequest(*existingIndices.toTypedArray()) + client.admin().indices().delete( + deleteIndexRequest, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + logger.info("Successfully deleted query indices: $indicesToDelete") + } + + override fun onFailure(e: Exception) { + logger.error("Batch delete failed for: $indicesToDelete. Retrying individually.", e) + deleteQueryIndicesOneByOne(indicesToDelete) + } + } + ) + } + + private fun deleteQueryIndicesOneByOne(indicesToDelete: List) { + for (index in indicesToDelete) { + val deleteRequest = DeleteIndexRequest(index) + client.admin().indices().delete( + deleteRequest, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + logger.info("Successfully deleted query index: $index") + } + + override fun onFailure(e: Exception) { + logger.error("Failed to delete query index: $index", e) + } + } + ) + } + } + + internal fun extractMonitorId(metadataId: String): String { + val parts = metadataId.split("-metadata") + return if (parts.size > 1 && parts[0].contains("-metadata-")) { + parts[0].substringAfterLast("-metadata-") + } else { + parts[0] + } + } + + private fun extractSourceIndexName(sourceKey: String, monitorId: String): String { + return sourceKey.removeSuffix(monitorId) + } + + internal fun extractAliasFromConcreteIndex(concreteQueryIndex: String): String { + return concreteQueryIndex.substringBeforeLast("-") + } + + internal fun extractIndexNumber(concreteQueryIndex: String): Int { + return concreteQueryIndex.substringAfterLast("-").toIntOrNull() ?: 0 + } + + private fun checkIfWriteIndex(aliasName: String, concreteQueryIndex: String): Boolean { + val indicesLookup = clusterService.state().metadata().indicesLookup ?: return false + val indexAbstraction = indicesLookup.get(aliasName) ?: return false + val writeIndexName = indexAbstraction.writeIndex?.index?.name + logger.debug("Checking write index for alias $aliasName: writeIndex=$writeIndexName, concrete=$concreteQueryIndex") + return writeIndexName == concreteQueryIndex + } + + private suspend fun getMonitor(monitorId: String): Monitor? { + val getRequest = GetRequest(".opendistro-alerting-config", monitorId) + return try { + val response: GetResponse = client.suspendUntil { + get(getRequest, it) + } + if (response.isExists) { + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + response.sourceAsBytesRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + Monitor.parse(xcp, response.id, response.version) + } else null + } catch (e: Exception) { + logger.warn("Error getting monitor: $monitorId", e) + null + } + } + + private fun indexExists(indexName: String): Boolean { + return try { + clusterService.state().metadata().hasIndex(indexName) + } catch (e: Exception) { + logger.warn("Error checking if index exists: $indexName", e) + true + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 2ae09aead..c2a09e5d2 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -196,6 +196,20 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + val QUERY_INDEX_CLEANUP_ENABLED = Setting.boolSetting( + "plugins.alerting.query_index_cleanup.enabled", + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ) + + val QUERY_INDEX_CLEANUP_PERIOD = Setting.positiveTimeSetting( + "plugins.alerting.query_index_cleanup.period", + TimeValue.timeValueHours(1), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ) + val REQUEST_TIMEOUT = Setting.positiveTimeSetting( "plugins.alerting.request_timeout", LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/ValidationHelpers.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/ValidationHelpers.kt new file mode 100644 index 000000000..a8b84baaf --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/ValidationHelpers.kt @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import inet.ipaddr.IPAddressString +import org.apache.logging.log4j.LogManager +import java.net.InetAddress +import java.net.URL + +object ValidationHelpers { + + private val logger = LogManager.getLogger() + + const val FQDN_REGEX = + "^(?!.*?_.*?)(?!(?:\\w+?\\.)?-[\\w.\\-]*?)(?!\\w+?-\\.[\\w.\\-]+?)" + + "(?=\\w)(?=[\\w.\\-]*?\\.+[\\w.\\-]*?)(?![\\w.\\-]{254})(?!(?:" + + "\\.?[\\w\\-.]*?[\\w\\-]{64,}\\.)+?)[\\w.\\-]+?(? { + try { + // Try parsing as literal IP first + val ip = IPAddressString(host) + if (ip.isValid) return listOf(ip) + } catch (_: Exception) { + // ignore, proceed to DNS + } + + return try { + InetAddress.getAllByName(host).map { inetAddress -> + IPAddressString(inetAddress.hostAddress) + } + } catch (e: Exception) { + logger.error("Unable to resolve host ips for $host: ${e.message}") + emptyList() + } + } + + /** + * Checks if a given URL's host or resolved IPs are in the deny list. + */ + fun isHostInDenylist(urlString: String, hostDenyList: List): Boolean { + val denyNetworks = hostDenyList.map { IPAddressString(it) } + + val hostIps: List = try { + val literal = IPAddressString(urlString) + if (literal.isValid) { + listOf(literal) // treat urlString itself as literal IP + } else { + val url = URL(urlString) + val host = url.host ?: return false + getResolvedIps(host) + } + } catch (e: Exception) { + // If parsing as URL fails, attempt resolving as host literal + getResolvedIps(urlString) + } + + for (ip in hostIps) { + for (deny in denyNetworks) { + if ((ip.isZero && deny.isZero) || deny.contains(ip)) { + LogManager.getLogger() + .error("$urlString is denied by rule $deny (matched $ip)") + return true + } + } + } + + return false + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/QueryIndexCleanupIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/QueryIndexCleanupIT.kt new file mode 100644 index 000000000..91b0574b6 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/QueryIndexCleanupIT.kt @@ -0,0 +1,295 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.apache.hc.core5.http.ContentType +import org.apache.hc.core5.http.io.entity.StringEntity +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.commons.alerting.model.DataSources +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery + +class QueryIndexCleanupIT : AlertingRestTestCase() { + + fun `test query index cleanup settings exist`() { + val settings = """ + { + "persistent": { + "${AlertingSettings.QUERY_INDEX_CLEANUP_ENABLED.key}": true, + "${AlertingSettings.QUERY_INDEX_CLEANUP_PERIOD.key}": "1d" + } + } + """.trimIndent() + + val response = client().makeRequest( + "PUT", + "_cluster/settings", + emptyMap(), + StringEntity(settings, ContentType.APPLICATION_JSON) + ) + + assertEquals("Setting update should succeed", 200, response.restStatus().status) + } + + fun `test query index created for doc level monitor`() { + val testIndex = createTestIndex() + + val docQuery = DocLevelQuery(query = "test_field:\"test\"", name = "test-query", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val createdMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + + executeMonitor(createdMonitor.id) + + val retrievedMonitor = getMonitor(monitorId = createdMonitor.id) + val queryIndexName = (retrievedMonitor.dataSources as DataSources).queryIndex + + assertTrue("Query index should exist after monitor execution", indexExists(queryIndexName)) + } + + fun `test cleanup disabled by setting`() { + val disableSettings = """ + { + "persistent": { + "${AlertingSettings.QUERY_INDEX_CLEANUP_ENABLED.key}": false + } + } + """.trimIndent() + + val response = client().makeRequest( + "PUT", + "_cluster/settings", + emptyMap(), + StringEntity(disableSettings, ContentType.APPLICATION_JSON) + ) + + assertEquals("Disable setting should succeed", 200, response.restStatus().status) + + val enableSettings = """ + { + "persistent": { + "${AlertingSettings.QUERY_INDEX_CLEANUP_ENABLED.key}": true + } + } + """.trimIndent() + + client().makeRequest( + "PUT", + "_cluster/settings", + emptyMap(), + StringEntity(enableSettings, ContentType.APPLICATION_JSON) + ) + } + + fun `test write index is never deleted`() { + val testIndex = createTestIndex() + + val docQuery = DocLevelQuery(query = "test_field:\"test\"", name = "test-query", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + + executeMonitor(monitor.id) + + val retrievedMonitor = getMonitor(monitorId = monitor.id) + val queryIndexName = (retrievedMonitor.dataSources as DataSources).queryIndex + + assertTrue("Query index should exist", indexExists(queryIndexName)) + } + + fun `test query index retained with active source indices`() { + val testIndex = createTestIndex() + + val docQuery = DocLevelQuery(query = "test_field:\"test\"", name = "test-query", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + + executeMonitor(monitor.id) + + val retrievedMonitor = getMonitor(monitorId = monitor.id) + val queryIndexName = (retrievedMonitor.dataSources as DataSources).queryIndex + + assertTrue("Query index should exist", indexExists(queryIndexName)) + + val shortPeriod = """ + { + "persistent": { + "${AlertingSettings.QUERY_INDEX_CLEANUP_PERIOD.key}": "1s" + } + } + """.trimIndent() + + client().makeRequest( + "PUT", + "_cluster/settings", + emptyMap(), + StringEntity(shortPeriod, ContentType.APPLICATION_JSON) + ) + + Thread.sleep(5000) + + assertTrue("Query index should be retained with active source", indexExists(queryIndexName)) + assertTrue("Source index should still exist", indexExists(testIndex)) + } + + fun `test cleanup period can be configured`() { + val settings = """ + { + "persistent": { + "${AlertingSettings.QUERY_INDEX_CLEANUP_PERIOD.key}": "5m" + } + } + """.trimIndent() + + val response = client().makeRequest( + "PUT", + "_cluster/settings", + emptyMap(), + StringEntity(settings, ContentType.APPLICATION_JSON) + ) + + assertEquals("Period setting should succeed", 200, response.restStatus().status) + + val getResponse = client().makeRequest("GET", "_cluster/settings") + val responseBody = getResponse.entity.content.readBytes().toString(Charsets.UTF_8) + assertTrue("Period should be set", responseBody.contains("5m")) + } + + fun `test cleanup deletes non-write backing indices when source deleted`() { + val testIndex = createTestIndex() + + val docQuery = DocLevelQuery(query = "test_field:\"test\"", name = "test-query", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + + executeMonitor(monitor.id) + + val retrievedMonitor = getMonitor(monitorId = monitor.id) + val queryIndexAlias = (retrievedMonitor.dataSources as DataSources).queryIndex + + assertTrue("Query index should exist before cleanup", indexExists(queryIndexAlias)) + + // Get the first backing index + val aliasResponse1 = client().makeRequest("GET", "/_cat/aliases/$queryIndexAlias?format=json") + val aliasBody1 = aliasResponse1.entity.content.readBytes().toString(Charsets.UTF_8) + val firstBackingIndex = aliasBody1.substringAfter("\"index\":\"").substringBefore("\"") + + // Force a rollover by creating a new backing index manually + val nextIndexNumber = firstBackingIndex.substringAfterLast("-").toInt() + 1 + val nextIndexName = "$queryIndexAlias-" + String.format("%06d", nextIndexNumber) + + val createIndexRequest = """ + { + "aliases": { + "$queryIndexAlias": { + "is_write_index": true + } + } + } + """.trimIndent() + + client().makeRequest( + "PUT", + "/$nextIndexName", + emptyMap(), + StringEntity(createIndexRequest, ContentType.APPLICATION_JSON) + ) + + // Update the old index to not be write index + val updateAliasRequest = """ + { + "actions": [ + { + "add": { + "index": "$firstBackingIndex", + "alias": "$queryIndexAlias", + "is_write_index": false + } + } + ] + } + """.trimIndent() + + client().makeRequest( + "POST", + "/_aliases", + emptyMap(), + StringEntity(updateAliasRequest, ContentType.APPLICATION_JSON) + ) + + // Verify we now have 2 backing indices + val aliasResponse2 = client().makeRequest("GET", "/_cat/aliases/$queryIndexAlias?format=json") + val aliasBody2 = aliasResponse2.entity.content.readBytes().toString(Charsets.UTF_8) + assertTrue("Should have 2 backing indices", aliasBody2.contains(firstBackingIndex) && aliasBody2.contains(nextIndexName)) + + // Delete source index but keep monitor running + deleteIndex(testIndex) + + // Trigger cleanup + val shortPeriod = """ + { + "persistent": { + "${AlertingSettings.QUERY_INDEX_CLEANUP_PERIOD.key}": "1s" + } + } + """.trimIndent() + + client().makeRequest( + "PUT", + "_cluster/settings", + emptyMap(), + StringEntity(shortPeriod, ContentType.APPLICATION_JSON) + ) + + Thread.sleep(10000) + + // Verify old backing index is deleted but write index remains + assertFalse("Old backing index should be deleted", indexExists(firstBackingIndex)) + assertTrue("Write index should be retained", indexExists(nextIndexName)) + assertTrue("Alias should still exist", indexExists(queryIndexAlias)) + } + + fun `test cleanup retains indices when only one backing index exists`() { + val testIndex = createTestIndex() + + val docQuery = DocLevelQuery(query = "test_field:\"test\"", name = "test-query", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + + executeMonitor(monitor.id) + + val retrievedMonitor = getMonitor(monitorId = monitor.id) + val queryIndexAlias = (retrievedMonitor.dataSources as DataSources).queryIndex + + assertTrue("Query index should exist before cleanup", indexExists(queryIndexAlias)) + + deleteIndex(testIndex) + // Don't delete monitor - just delete source index + + // Don't delete anything - just verify cleanup runs and doesn't delete active indices + val shortPeriod = """ + { + "persistent": { + "${AlertingSettings.QUERY_INDEX_CLEANUP_PERIOD.key}": "1s" + } + } + """.trimIndent() + + client().makeRequest( + "PUT", + "_cluster/settings", + emptyMap(), + StringEntity(shortPeriod, ContentType.APPLICATION_JSON) + ) + + Thread.sleep(10000) + + assertTrue("Single backing index should be retained", indexExists(queryIndexAlias)) + } +} From 237bccd6209ea16084c8c502b80ec075e81961aa Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 25 Feb 2026 09:35:23 -0800 Subject: [PATCH 2/3] address comments. Signed-off-by: Surya Sashank Nistala --- .../opensearch/alerting/QueryIndexCleanup.kt | 155 +++++++++++------- .../alerting/util/ValidationHelpers.kt | 78 --------- .../alerting/QueryIndexCleanupIT.kt | 2 +- 3 files changed, 93 insertions(+), 142 deletions(-) delete mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/util/ValidationHelpers.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryIndexCleanup.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryIndexCleanup.kt index bd06952aa..f2df9eb86 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryIndexCleanup.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryIndexCleanup.kt @@ -12,8 +12,6 @@ import org.apache.logging.log4j.LogManager import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse -import org.opensearch.action.get.GetRequest -import org.opensearch.action.get.GetResponse import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.IndicesOptions @@ -30,6 +28,7 @@ import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.MonitorMetadata +import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.core.action.ActionListener import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser @@ -102,13 +101,8 @@ class QueryIndexCleanup( private fun rescheduleCleanup() { if (clusterService.state().nodes.isLocalNodeElectedClusterManager) { - scheduledCleanup?.cancel() - scheduledCleanup = threadPool.scheduleWithFixedDelay( - { cleanupQueryIndices() }, - queryIndexCleanupPeriod, - ThreadPool.Names.MANAGEMENT - ) - logger.info("Query index cleanup rescheduled with period: $queryIndexCleanupPeriod") + offClusterManager() + onClusterManager() } } @@ -143,12 +137,10 @@ class QueryIndexCleanup( } private suspend fun fetchAllMonitorMetadata(): List { - val configIndex = ".opendistro-alerting-config" - // Check if index exists first val indexExists = try { val response: IndicesExistsResponse = client.suspendUntil { - admin().indices().exists(IndicesExistsRequest(configIndex), it) + admin().indices().exists(IndicesExistsRequest(SCHEDULED_JOBS_INDEX), it) } response.isExists } catch (e: Exception) { @@ -161,7 +153,7 @@ class QueryIndexCleanup( return emptyList() } - val searchRequest = SearchRequest(configIndex) + val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX) .source( SearchSourceBuilder() .query(QueryBuilders.existsQuery(METADATA_FIELD)) @@ -174,7 +166,7 @@ class QueryIndexCleanup( search(searchRequest, it) } - logger.info("Metadata query returned ${response.hits.hits.size} documents") + logger.info("Metadata query returned ${response.hits.hits.size} of ${response.hits.totalHits} documents") response.hits.hits.mapNotNull { hit -> try { @@ -207,13 +199,21 @@ class QueryIndexCleanup( private suspend fun buildQueryIndexUsageMap(allMetadata: List): Map { val queryIndexUsageMap = mutableMapOf() + // Batch-fetch all monitors instead of one-by-one + val monitorIds = allMetadata.map { it.monitorId }.toSet() + val monitors = fetchMonitors(monitorIds) + for (metadata in allMetadata) { - val monitorId = extractMonitorId(metadata.id) - val monitor = getMonitor(monitorId) + val monitorId = metadata.monitorId + val monitor = monitors[monitorId] if (monitor == null) { for ((sourceKey, concreteQueryIndex) in metadata.sourceToQueryIndexMapping) { - val aliasName = extractAliasFromConcreteIndex(concreteQueryIndex) + val aliasName = getAliasForIndex(concreteQueryIndex) + if (aliasName == null) { + logger.debug("Skipping $concreteQueryIndex: not backed by an alias") + continue + } val isWriteIndex = checkIfWriteIndex(aliasName, concreteQueryIndex) queryIndexUsageMap.getOrPut(concreteQueryIndex) { @@ -226,6 +226,13 @@ class QueryIndexCleanup( for ((sourceKey, concreteQueryIndex) in metadata.sourceToQueryIndexMapping) { val sourceIndexName = extractSourceIndexName(sourceKey, monitorId) val aliasName = monitor.dataSources.queryIndex + + // Only process query indices that are part of an alias + if (!isAlias(aliasName)) { + logger.debug("Skipping $concreteQueryIndex: queryIndex $aliasName is not an alias") + continue + } + val isWriteIndex = checkIfWriteIndex(aliasName, concreteQueryIndex) logger.info("Monitor $monitorId: sourceKey=$sourceKey, extracted=$sourceIndexName, queryIndex=$concreteQueryIndex") @@ -263,10 +270,10 @@ class QueryIndexCleanup( continue } - val sortedIndices = concreteIndices.sortedBy { extractIndexNumber(it.concreteQueryIndex) } + val sortedIndices = concreteIndices.sortedBy { getIndexCreationDate(it.concreteQueryIndex) } // Determine latest index from ALL backing indices in cluster, not just metadata - val latestIndexInCluster = allBackingIndices.maxByOrNull { extractIndexNumber(it) } + val latestIndexInCluster = allBackingIndices.maxByOrNull { getIndexCreationDate(it) } // Check if alias still exists val aliasExists = clusterService.state().metadata().indicesLookup?.get(aliasName) != null @@ -323,7 +330,7 @@ class QueryIndexCleanup( private suspend fun cleanupMetadataMappings(allMetadata: List, indicesToDelete: List) { for (metadata in allMetadata) { - val monitorId = extractMonitorId(metadata.id) + val monitorId = metadata.monitorId val entriesToRemove = mutableListOf() for ((sourceKey, concreteQueryIndex) in metadata.sourceToQueryIndexMapping) { @@ -368,35 +375,16 @@ class QueryIndexCleanup( deleteIndexRequest, object : ActionListener { override fun onResponse(response: AcknowledgedResponse) { - logger.info("Successfully deleted query indices: $indicesToDelete") + logger.info("Successfully deleted query indices: $existingIndices") } override fun onFailure(e: Exception) { - logger.error("Batch delete failed for: $indicesToDelete. Retrying individually.", e) - deleteQueryIndicesOneByOne(indicesToDelete) + logger.error("Failed to delete query indices: $existingIndices. Will retry on next cleanup run.", e) } } ) } - private fun deleteQueryIndicesOneByOne(indicesToDelete: List) { - for (index in indicesToDelete) { - val deleteRequest = DeleteIndexRequest(index) - client.admin().indices().delete( - deleteRequest, - object : ActionListener { - override fun onResponse(response: AcknowledgedResponse) { - logger.info("Successfully deleted query index: $index") - } - - override fun onFailure(e: Exception) { - logger.error("Failed to delete query index: $index", e) - } - } - ) - } - } - internal fun extractMonitorId(metadataId: String): String { val parts = metadataId.split("-metadata") return if (parts.size > 1 && parts[0].contains("-metadata-")) { @@ -410,12 +398,34 @@ class QueryIndexCleanup( return sourceKey.removeSuffix(monitorId) } - internal fun extractAliasFromConcreteIndex(concreteQueryIndex: String): String { - return concreteQueryIndex.substringBeforeLast("-") + private fun isAlias(name: String): Boolean { + return clusterService.state().metadata().hasAlias(name) } - internal fun extractIndexNumber(concreteQueryIndex: String): Int { - return concreteQueryIndex.substringAfterLast("-").toIntOrNull() ?: 0 + /** + * Looks up the alias that a concrete index belongs to from cluster state. + * Returns null if the index is not part of any alias. + */ + private fun getAliasForIndex(concreteIndex: String): String? { + return try { + val indexMetadata = clusterService.state().metadata().index(concreteIndex) ?: return null + val aliases = indexMetadata.aliases + if (aliases.isEmpty()) return null + // Return the first alias — query indices typically belong to exactly one alias + aliases.keys.iterator().next() + } catch (e: Exception) { + logger.warn("Failed to look up alias for index: $concreteIndex", e) + null + } + } + + private fun getIndexCreationDate(indexName: String): Long { + return try { + clusterService.state().metadata().index(indexName)?.settings?.get("index.creation_date")?.toLong() ?: 0L + } catch (e: Exception) { + logger.warn("Failed to get creation date for index: $indexName", e) + 0L + } } private fun checkIfWriteIndex(aliasName: String, concreteQueryIndex: String): Boolean { @@ -426,28 +436,47 @@ class QueryIndexCleanup( return writeIndexName == concreteQueryIndex } - private suspend fun getMonitor(monitorId: String): Monitor? { - val getRequest = GetRequest(".opendistro-alerting-config", monitorId) - return try { - val response: GetResponse = client.suspendUntil { - get(getRequest, it) - } - if (response.isExists) { - val xcp = XContentHelper.createParser( - NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, - response.sourceAsBytesRef, - XContentType.JSON + private suspend fun fetchMonitors(monitorIds: Set): Map { + if (monitorIds.isEmpty()) return emptyMap() + + val monitors = mutableMapOf() + // Batch in chunks to avoid overly large queries + for (batch in monitorIds.chunked(SEARCH_QUERY_RESULT_SIZE)) { + val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX) + .source( + SearchSourceBuilder() + .query(QueryBuilders.termsQuery("_id", batch)) + .size(batch.size) ) - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) - Monitor.parse(xcp, response.id, response.version) - } else null - } catch (e: Exception) { - logger.warn("Error getting monitor: $monitorId", e) - null + .indicesOptions(IndicesOptions.lenientExpandOpen()) + + try { + val response: SearchResponse = client.suspendUntil { + search(searchRequest, it) + } + for (hit in response.hits.hits) { + try { + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + monitors[hit.id] = Monitor.parse(xcp, hit.id, hit.version) + } catch (e: Exception) { + logger.warn("Failed to parse monitor: ${hit.id}", e) + } + } + } catch (e: Exception) { + logger.error("Failed to batch-fetch monitors", e) + } } + logger.info("Fetched ${monitors.size} monitors out of ${monitorIds.size} requested") + return monitors } + // Uses locally cached cluster state — not a remote call to the cluster manager private fun indexExists(indexName: String): Boolean { return try { clusterService.state().metadata().hasIndex(indexName) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/ValidationHelpers.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/ValidationHelpers.kt deleted file mode 100644 index a8b84baaf..000000000 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/ValidationHelpers.kt +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.util - -import inet.ipaddr.IPAddressString -import org.apache.logging.log4j.LogManager -import java.net.InetAddress -import java.net.URL - -object ValidationHelpers { - - private val logger = LogManager.getLogger() - - const val FQDN_REGEX = - "^(?!.*?_.*?)(?!(?:\\w+?\\.)?-[\\w.\\-]*?)(?!\\w+?-\\.[\\w.\\-]+?)" + - "(?=\\w)(?=[\\w.\\-]*?\\.+[\\w.\\-]*?)(?![\\w.\\-]{254})(?!(?:" + - "\\.?[\\w\\-.]*?[\\w\\-]{64,}\\.)+?)[\\w.\\-]+?(? { - try { - // Try parsing as literal IP first - val ip = IPAddressString(host) - if (ip.isValid) return listOf(ip) - } catch (_: Exception) { - // ignore, proceed to DNS - } - - return try { - InetAddress.getAllByName(host).map { inetAddress -> - IPAddressString(inetAddress.hostAddress) - } - } catch (e: Exception) { - logger.error("Unable to resolve host ips for $host: ${e.message}") - emptyList() - } - } - - /** - * Checks if a given URL's host or resolved IPs are in the deny list. - */ - fun isHostInDenylist(urlString: String, hostDenyList: List): Boolean { - val denyNetworks = hostDenyList.map { IPAddressString(it) } - - val hostIps: List = try { - val literal = IPAddressString(urlString) - if (literal.isValid) { - listOf(literal) // treat urlString itself as literal IP - } else { - val url = URL(urlString) - val host = url.host ?: return false - getResolvedIps(host) - } - } catch (e: Exception) { - // If parsing as URL fails, attempt resolving as host literal - getResolvedIps(urlString) - } - - for (ip in hostIps) { - for (deny in denyNetworks) { - if ((ip.isZero && deny.isZero) || deny.contains(ip)) { - LogManager.getLogger() - .error("$urlString is denied by rule $deny (matched $ip)") - return true - } - } - } - - return false - } -} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/QueryIndexCleanupIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/QueryIndexCleanupIT.kt index 91b0574b6..39528790c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/QueryIndexCleanupIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/QueryIndexCleanupIT.kt @@ -181,7 +181,7 @@ class QueryIndexCleanupIT : AlertingRestTestCase() { // Force a rollover by creating a new backing index manually val nextIndexNumber = firstBackingIndex.substringAfterLast("-").toInt() + 1 - val nextIndexName = "$queryIndexAlias-" + String.format("%06d", nextIndexNumber) + val nextIndexName = "$queryIndexAlias-" + String.format(java.util.Locale.ROOT, "%06d", nextIndexNumber) val createIndexRequest = """ { From bb222bad63f08137e48f25ffc38d43df32d83fc5 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 4 Mar 2026 08:16:48 -0800 Subject: [PATCH 3/3] fix monitor metadata source to query index mapping update --- .../org/opensearch/alerting/AlertingPlugin.kt | 2 +- .../opensearch/alerting/QueryIndexCleanup.kt | 80 +++--- .../alerting/QueryIndexCleanupIT.kt | 228 +++++++++++------- 3 files changed, 192 insertions(+), 118 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index f9fcfa74b..5db8fde3e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -286,7 +286,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R val settings = environment.settings() val lockService = LockService(client, clusterService) alertIndices = AlertIndices(settings, client, threadPool, clusterService) - queryIndexCleanup = QueryIndexCleanup(settings, client, threadPool, clusterService) + queryIndexCleanup = QueryIndexCleanup(settings, client, threadPool, clusterService, xContentRegistry) val alertService = AlertService(client, xContentRegistry, alertIndices) val triggerService = TriggerService(scriptService) runner = MonitorRunnerService diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryIndexCleanup.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryIndexCleanup.kt index f2df9eb86..09bfdac9f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryIndexCleanup.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryIndexCleanup.kt @@ -28,6 +28,7 @@ import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.MonitorMetadata +import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.core.action.ActionListener import org.opensearch.core.xcontent.NamedXContentRegistry @@ -49,6 +50,7 @@ class QueryIndexCleanup( private val client: Client, private val threadPool: ThreadPool, private val clusterService: ClusterService, + private val xContentRegistry: NamedXContentRegistry, ) : ClusterStateListener { private val logger = LogManager.getLogger(javaClass) @@ -121,8 +123,10 @@ class QueryIndexCleanup( val queryIndexUsageMap = buildQueryIndexUsageMap(allMetadata) val indicesToDelete = determineIndicesToDelete(queryIndexUsageMap) + // Always clean metadata mappings for dead source indices + cleanupMetadataMappings(allMetadata, indicesToDelete) + if (indicesToDelete.isNotEmpty()) { - cleanupMetadataMappings(allMetadata, indicesToDelete) deleteQueryIndices(indicesToDelete) } else { logger.info("No query indices eligible for deletion") @@ -153,40 +157,56 @@ class QueryIndexCleanup( return emptyList() } - val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX) - .source( - SearchSourceBuilder() + val allMetadata = mutableListOf() + var searchAfterValues: Array? = null + + try { + do { + val searchSourceBuilder = SearchSourceBuilder() .query(QueryBuilders.existsQuery(METADATA_FIELD)) .size(SEARCH_QUERY_RESULT_SIZE) - ) - .indicesOptions(IndicesOptions.lenientExpandOpen()) + .sort("_id") - return try { - val response: SearchResponse = client.suspendUntil { - search(searchRequest, it) - } + if (searchAfterValues != null) { + searchSourceBuilder.searchAfter(searchAfterValues) + } - logger.info("Metadata query returned ${response.hits.hits.size} of ${response.hits.totalHits} documents") + val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX) + .source(searchSourceBuilder) + .indicesOptions(IndicesOptions.lenientExpandOpen()) - response.hits.hits.mapNotNull { hit -> - try { - val xcp = XContentHelper.createParser( - NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, - hit.sourceRef, - XContentType.JSON - ) - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) - MonitorMetadata.parse(xcp, hit.id, hit.seqNo, hit.primaryTerm) - } catch (e: Exception) { - logger.warn("Failed to parse monitor metadata: ${hit.id}", e) - null + val response: SearchResponse = client.suspendUntil { + search(searchRequest, it) } - } + + val hits = response.hits.hits + if (allMetadata.isEmpty()) { + logger.info("Metadata query: total ${response.hits.totalHits} documents") + } + + for (hit in hits) { + try { + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + allMetadata.add(MonitorMetadata.parse(xcp, hit.id, hit.seqNo, hit.primaryTerm)) + } catch (e: Exception) { + logger.warn("Failed to parse monitor metadata: ${hit.id}", e) + } + } + + searchAfterValues = if (hits.isNotEmpty()) hits.last().sortValues else null + } while (searchAfterValues != null) } catch (e: Exception) { logger.error("Failed to fetch monitor metadata", e) - emptyList() } + + logger.info("Fetched ${allMetadata.size} monitor metadata documents") + return allMetadata } data class QueryIndexUsage( @@ -457,13 +477,15 @@ class QueryIndexCleanup( for (hit in response.hits.hits) { try { val xcp = XContentHelper.createParser( - NamedXContentRegistry.EMPTY, + xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.sourceRef, XContentType.JSON ) - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) - monitors[hit.id] = Monitor.parse(xcp, hit.id, hit.version) + val job = ScheduledJob.parse(xcp, hit.id, hit.version) + if (job is Monitor) { + monitors[hit.id] = job + } } catch (e: Exception) { logger.warn("Failed to parse monitor: ${hit.id}", e) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/QueryIndexCleanupIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/QueryIndexCleanupIT.kt index 39528790c..6aaa76ad6 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/QueryIndexCleanupIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/QueryIndexCleanupIT.kt @@ -115,21 +115,7 @@ class QueryIndexCleanupIT : AlertingRestTestCase() { assertTrue("Query index should exist", indexExists(queryIndexName)) - val shortPeriod = """ - { - "persistent": { - "${AlertingSettings.QUERY_INDEX_CLEANUP_PERIOD.key}": "1s" - } - } - """.trimIndent() - - client().makeRequest( - "PUT", - "_cluster/settings", - emptyMap(), - StringEntity(shortPeriod, ContentType.APPLICATION_JSON) - ) - + setCleanupPeriod("1s") Thread.sleep(5000) assertTrue("Query index should be retained with active source", indexExists(queryIndexName)) @@ -174,84 +160,40 @@ class QueryIndexCleanupIT : AlertingRestTestCase() { assertTrue("Query index should exist before cleanup", indexExists(queryIndexAlias)) - // Get the first backing index - val aliasResponse1 = client().makeRequest("GET", "/_cat/aliases/$queryIndexAlias?format=json") - val aliasBody1 = aliasResponse1.entity.content.readBytes().toString(Charsets.UTF_8) - val firstBackingIndex = aliasBody1.substringAfter("\"index\":\"").substringBefore("\"") - - // Force a rollover by creating a new backing index manually - val nextIndexNumber = firstBackingIndex.substringAfterLast("-").toInt() + 1 - val nextIndexName = "$queryIndexAlias-" + String.format(java.util.Locale.ROOT, "%06d", nextIndexNumber) - - val createIndexRequest = """ - { - "aliases": { - "$queryIndexAlias": { - "is_write_index": true - } - } - } - """.trimIndent() - - client().makeRequest( - "PUT", - "/$nextIndexName", - emptyMap(), - StringEntity(createIndexRequest, ContentType.APPLICATION_JSON) - ) - - // Update the old index to not be write index - val updateAliasRequest = """ - { - "actions": [ - { - "add": { - "index": "$firstBackingIndex", - "alias": "$queryIndexAlias", - "is_write_index": false - } - } - ] - } - """.trimIndent() - - client().makeRequest( - "POST", - "/_aliases", - emptyMap(), - StringEntity(updateAliasRequest, ContentType.APPLICATION_JSON) - ) + val firstBackingIndex = getFirstBackingIndex(queryIndexAlias) + val nextIndexName = createNextBackingIndex(queryIndexAlias, firstBackingIndex) // Verify we now have 2 backing indices val aliasResponse2 = client().makeRequest("GET", "/_cat/aliases/$queryIndexAlias?format=json") val aliasBody2 = aliasResponse2.entity.content.readBytes().toString(Charsets.UTF_8) assertTrue("Should have 2 backing indices", aliasBody2.contains(firstBackingIndex) && aliasBody2.contains(nextIndexName)) - // Delete source index but keep monitor running - deleteIndex(testIndex) - - // Trigger cleanup - val shortPeriod = """ - { - "persistent": { - "${AlertingSettings.QUERY_INDEX_CLEANUP_PERIOD.key}": "1s" - } - } - """.trimIndent() + // Verify metadata has mapping before deletion + val mappingBefore = getSourceToQueryIndexMapping(monitor.id) + assertTrue("Metadata should have mapping entries before cleanup", mappingBefore.isNotEmpty()) - client().makeRequest( - "PUT", - "_cluster/settings", - emptyMap(), - StringEntity(shortPeriod, ContentType.APPLICATION_JSON) - ) + // Delete source index + deleteIndex(testIndex) + setCleanupPeriod("1s") Thread.sleep(10000) // Verify old backing index is deleted but write index remains assertFalse("Old backing index should be deleted", indexExists(firstBackingIndex)) assertTrue("Write index should be retained", indexExists(nextIndexName)) assertTrue("Alias should still exist", indexExists(queryIndexAlias)) + + // Verify metadata mapping was cleaned up for deleted source index + val mappingAfter = getSourceToQueryIndexMapping(monitor.id) + for ((sourceKey, _) in mappingBefore) { + val sourceIndex = sourceKey.removeSuffix(monitor.id) + if (!indexExists(sourceIndex)) { + assertFalse( + "Mapping for deleted source $sourceIndex should be removed", + mappingAfter.containsKey(sourceKey) + ) + } + } } fun `test cleanup retains indices when only one backing index exists`() { @@ -270,26 +212,136 @@ class QueryIndexCleanupIT : AlertingRestTestCase() { assertTrue("Query index should exist before cleanup", indexExists(queryIndexAlias)) deleteIndex(testIndex) - // Don't delete monitor - just delete source index - // Don't delete anything - just verify cleanup runs and doesn't delete active indices - val shortPeriod = """ + setCleanupPeriod("1s") + Thread.sleep(10000) + + assertTrue("Single backing index should be retained", indexExists(queryIndexAlias)) + } + + fun `test cleanup removes dead source entries but retains alive ones in metadata`() { + // Create two concrete source indices (simulating timeseries rollover) + val sourceIndex1 = createTestIndex(index = "log-test-000001") + val sourceIndex2 = createTestIndex(index = "log-test-000002") + + // Create monitor targeting both concrete indices + val docQuery = DocLevelQuery(query = "test_field:\"test\"", name = "test-query", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(sourceIndex1, sourceIndex2), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + + executeMonitor(monitor.id) + + // Verify metadata has entries for both source indices + val mappingBefore = getSourceToQueryIndexMapping(monitor.id) + assertEquals("Should have 2 mapping entries", 2, mappingBefore.size) + + val key1 = "$sourceIndex1${monitor.id}" + val key2 = "$sourceIndex2${monitor.id}" + assertTrue("Should have entry for source index 1", mappingBefore.containsKey(key1)) + assertTrue("Should have entry for source index 2", mappingBefore.containsKey(key2)) + + // Delete only source index 1, keep source index 2 alive + deleteIndex(sourceIndex1) + + setCleanupPeriod("1s") + Thread.sleep(10000) + + // Verify metadata: dead source entry removed, alive source entry retained + val mappingAfter = getSourceToQueryIndexMapping(monitor.id) + assertFalse( + "Mapping for deleted source $sourceIndex1 should be removed", + mappingAfter.containsKey(key1) + ) + assertTrue( + "Mapping for alive source $sourceIndex2 should be retained", + mappingAfter.containsKey(key2) + ) + + // Source index 2 still exists + assertTrue("Source index 2 should still exist", indexExists(sourceIndex2)) + } + + // --- Helper methods --- + + @Suppress("UNCHECKED_CAST") + private fun getSourceToQueryIndexMapping(monitorId: String): Map { + val metadataId = "$monitorId-metadata" + val searchRequest = """ { - "persistent": { - "${AlertingSettings.QUERY_INDEX_CLEANUP_PERIOD.key}": "1s" + "query": { + "term": { + "_id": "$metadataId" + } } } """.trimIndent() + val response = client().makeRequest( + "GET", + "/.opendistro-alerting-config/_search", + emptyMap(), + StringEntity(searchRequest, ContentType.APPLICATION_JSON) + ) + + val responseBody = response.entity.content.readBytes().toString(Charsets.UTF_8) + val parser = org.opensearch.common.xcontent.json.JsonXContent.jsonXContent + .createParser( + org.opensearch.core.xcontent.NamedXContentRegistry.EMPTY, + org.opensearch.common.xcontent.LoggingDeprecationHandler.INSTANCE, + responseBody + ) + val responseMap = parser.map() + val hits = (responseMap["hits"] as Map)["hits"] as List> + if (hits.isEmpty()) return emptyMap() + + val source = hits[0]["_source"] as Map + val metadata = source["metadata"] as Map + return metadata.getOrDefault("source_to_query_index_mapping", emptyMap()) as Map + } + + private fun getFirstBackingIndex(queryIndexAlias: String): String { + val aliasResponse = client().makeRequest("GET", "/_cat/aliases/$queryIndexAlias?format=json") + val aliasBody = aliasResponse.entity.content.readBytes().toString(Charsets.UTF_8) + return aliasBody.substringAfter("\"index\":\"").substringBefore("\"") + } + + private fun createNextBackingIndex(queryIndexAlias: String, firstBackingIndex: String): String { + val nextIndexNumber = firstBackingIndex.substringAfterLast("-").toInt() + 1 + val nextIndexName = "$queryIndexAlias-" + String.format(java.util.Locale.ROOT, "%06d", nextIndexNumber) + client().makeRequest( "PUT", - "_cluster/settings", + "/$nextIndexName", emptyMap(), - StringEntity(shortPeriod, ContentType.APPLICATION_JSON) + StringEntity( + """{"aliases": {"$queryIndexAlias": {"is_write_index": true}}}""", + ContentType.APPLICATION_JSON + ) ) - Thread.sleep(10000) + client().makeRequest( + "POST", + "/_aliases", + emptyMap(), + StringEntity( + """{"actions": [{"add": {"index": "$firstBackingIndex", "alias": "$queryIndexAlias", "is_write_index": false}}]}""", + ContentType.APPLICATION_JSON + ) + ) - assertTrue("Single backing index should be retained", indexExists(queryIndexAlias)) + return nextIndexName + } + + private fun setCleanupPeriod(period: String) { + client().makeRequest( + "PUT", + "_cluster/settings", + emptyMap(), + StringEntity( + """{"persistent": {"${AlertingSettings.QUERY_INDEX_CLEANUP_PERIOD.key}": "$period"}}""", + ContentType.APPLICATION_JSON + ) + ) } }