From 383340adf2e2530b8cf09dfd7b4ae7f2605f113d Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Mon, 27 Apr 2026 18:32:44 -0700 Subject: [PATCH 01/11] Making job poller populate thread context with data source info for Oasis interception Signed-off-by: Dennis Toepker --- .../alerting/service/MonitorJobPoller.kt | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt index 1d498c00..7e20ac38 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt @@ -134,6 +134,10 @@ class MonitorJobPoller( } private suspend fun executeMonitor(monitor: Monitor, jobStartTime: Instant) { + // populate thread context for downstream Oasis interception the moment + // Monitor config is in hand + populateThreadContext(monitor) + val request = ExecuteMonitorRequest( dryrun = false, requestEnd = TimeValue(jobStartTime.toEpochMilli()), @@ -180,8 +184,48 @@ class MonitorJobPoller( } } + // populates thread context with KVs that Oasis will need + // when intercepting search or PPL calls to external customer + // data source + private fun populateThreadContext(monitor: Monitor) { + if (monitor.target == null) { + throw AlertingException.wrap( + IllegalStateException("Monitor received by Job Poller did not contain target") + ) + } + + if (region.isBlank()) { + throw AlertingException.wrap( + IllegalStateException("Monitor received by Job Poller did not contain target") + ) + } + + val threadContext = client.threadPool().threadContext + + // Oasis checks for this flag to know that because this is + // a scheduled background monitor execution, there will be + // no user credentials to make the search/ppl call to customer + // data source with, and it must use service credentials + threadContext.putHeader(IS_BACKGROUND_JOB_HEADER, "true") + + // TODO: in long term, may need to generalize to aos data source type + threadContext.putHeader(SERVICE_NAME_HEADER, "aoss") + + // external customer data source endpoint, to run search/ppl against + threadContext.putHeader(OPENSEARCH_ENDPOINT_HEADER, monitor.target!!.endpoint) + + // populated upstream in AlertingPlugin.kt with REMOTE_METADATA_REGION.get(settings) + threadContext.putHeader(REGION_HEADER, region) + } + companion object { const val POLLER_THREAD_COUNT = 10 const val POLL_INTERVAL_MS = 1000L + + // thread context header keys for Oasis interception + const val IS_BACKGROUND_JOB_HEADER = "alerting-is-background-job" + const val SERVICE_NAME_HEADER = "aws-service-name" + const val OPENSEARCH_ENDPOINT_HEADER = "opensearch-url" + const val REGION_HEADER = "aws-region" } } From ac44c348630db3950407b5a7ae9e9d0754fbfa41 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Mon, 27 Apr 2026 18:54:27 -0700 Subject: [PATCH 02/11] added unit test Signed-off-by: Dennis Toepker --- .../alerting/service/MonitorJobPoller.kt | 4 +-- .../alerting/service/MonitorJobPollerTests.kt | 28 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt index 7e20ac38..61a8ada8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt @@ -187,7 +187,7 @@ class MonitorJobPoller( // populates thread context with KVs that Oasis will need // when intercepting search or PPL calls to external customer // data source - private fun populateThreadContext(monitor: Monitor) { + internal fun populateThreadContext(monitor: Monitor) { if (monitor.target == null) { throw AlertingException.wrap( IllegalStateException("Monitor received by Job Poller did not contain target") @@ -196,7 +196,7 @@ class MonitorJobPoller( if (region.isBlank()) { throw AlertingException.wrap( - IllegalStateException("Monitor received by Job Poller did not contain target") + IllegalStateException("No region configured when populating thread context from job poller") ) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/service/MonitorJobPollerTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/service/MonitorJobPollerTests.kt index 315f41a1..35848001 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/service/MonitorJobPollerTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/service/MonitorJobPollerTests.kt @@ -8,9 +8,12 @@ package org.opensearch.alerting.service import com.carrotsearch.randomizedtesting.ThreadFilter import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters import org.mockito.Mockito.mock +import org.mockito.Mockito.`when` +import org.opensearch.alerting.randomQueryLevelMonitor import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.model.Target import org.opensearch.commons.utils.scheduler.JobQueueAccountIdProvider import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.search.SearchModule @@ -335,4 +338,29 @@ class MonitorJobPollerTests : OpenSearchTestCase() { } poller.close() } + + fun `test thread context populated correctly`() { + val mockClient = mockClient() + val mockThreadPool = mock(org.opensearch.threadpool.ThreadPool::class.java) + val mockThreadContext = org.opensearch.common.util.concurrent.ThreadContext(Settings.EMPTY) + + `when`(mockClient.threadPool()).thenReturn(mockThreadPool) + `when`(mockThreadPool.threadContext).thenReturn(mockThreadContext) + + val poller = MonitorJobPoller( + testXContentRegistry(), mockClient, true, + testAccountIdProvider(), "us-east-1", "test-queue" + ) + + val monitor = randomQueryLevelMonitor().copy(target = Target(endpoint = "https://test.aoss.amazonaws.com")) + + poller.populateThreadContext(monitor) + + assertEquals("true", mockThreadContext.getHeader(MonitorJobPoller.IS_BACKGROUND_JOB_HEADER)) + assertEquals("aoss", mockThreadContext.getHeader(MonitorJobPoller.SERVICE_NAME_HEADER)) + assertEquals("https://test.aoss.amazonaws.com", mockThreadContext.getHeader(MonitorJobPoller.OPENSEARCH_ENDPOINT_HEADER)) + assertEquals("us-east-1", mockThreadContext.getHeader(MonitorJobPoller.REGION_HEADER)) + + poller.close() + } } From 2a71174aaa9951ffa1e7b6b5f736c96a28367062 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Tue, 28 Apr 2026 09:33:19 -0700 Subject: [PATCH 03/11] adjusting comments Signed-off-by: Dennis Toepker --- .../opensearch/alerting/service/MonitorJobPoller.kt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt index 61a8ada8..65bda7b1 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt @@ -134,7 +134,7 @@ class MonitorJobPoller( } private suspend fun executeMonitor(monitor: Monitor, jobStartTime: Instant) { - // populate thread context for downstream Oasis interception the moment + // populate thread context for downstream request interception the moment // Monitor config is in hand populateThreadContext(monitor) @@ -184,8 +184,8 @@ class MonitorJobPoller( } } - // populates thread context with KVs that Oasis will need - // when intercepting search or PPL calls to external customer + // populates thread context with KVs that downstream interception will + // need when intercepting search or PPL calls to external customer // data source internal fun populateThreadContext(monitor: Monitor) { if (monitor.target == null) { @@ -202,7 +202,7 @@ class MonitorJobPoller( val threadContext = client.threadPool().threadContext - // Oasis checks for this flag to know that because this is + // Request interception checks for this flag to know that because this is // a scheduled background monitor execution, there will be // no user credentials to make the search/ppl call to customer // data source with, and it must use service credentials @@ -222,7 +222,7 @@ class MonitorJobPoller( const val POLLER_THREAD_COUNT = 10 const val POLL_INTERVAL_MS = 1000L - // thread context header keys for Oasis interception + // thread context header keys for request interception const val IS_BACKGROUND_JOB_HEADER = "alerting-is-background-job" const val SERVICE_NAME_HEADER = "aws-service-name" const val OPENSEARCH_ENDPOINT_HEADER = "opensearch-url" From c7ba88f4ab151cb7088032a6a1a669baf4fda2b5 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Tue, 28 Apr 2026 16:44:07 -0700 Subject: [PATCH 04/11] added extra validations and target -> service name mappings Signed-off-by: Dennis Toepker --- .../alerting/service/MonitorJobPoller.kt | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt index 65bda7b1..75ca5fc9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt @@ -5,6 +5,8 @@ package org.opensearch.alerting.service +import java.time.Instant +import java.util.concurrent.atomic.AtomicInteger import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob @@ -30,8 +32,6 @@ import software.amazon.awssdk.services.sqs.SqsClient import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest import software.amazon.awssdk.services.sqs.model.Message import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest -import java.time.Instant -import java.util.concurrent.atomic.AtomicInteger /** * Polls SQS queues for monitor execution messages and dispatches them @@ -65,6 +65,7 @@ class MonitorJobPoller( } val provider = requireNotNull(accountIdProvider) { "accountIdProvider must be set before starting" } val sqs = requireNotNull(sqsClient) { "sqsClient must be set before starting" } + require(region.isNotBlank()) { "region must be set before starting" } logger.info("Starting MonitorJobPoller with $POLLER_THREAD_COUNT workers") repeat(POLLER_THREAD_COUNT) { scope.launch { pollLoop(provider, sqs, region, queueName) } } @@ -194,22 +195,27 @@ class MonitorJobPoller( ) } - if (region.isBlank()) { + if (monitor.target!!.type.isBlank()) { throw AlertingException.wrap( - IllegalStateException("No region configured when populating thread context from job poller") + IllegalStateException("Monitor target received by Job Poller did not contain target type") + ) + } + + if (monitor.target!!.endpoint.isBlank()) { + throw AlertingException.wrap( + IllegalStateException("Monitor target received by Job Poller did not contain endpoint") ) } val threadContext = client.threadPool().threadContext - // Request interception checks for this flag to know that because this is - // a scheduled background monitor execution, there will be + // Request interception checks for this flag to know that this is + // a scheduled background monitor execution, meaning there will be // no user credentials to make the search/ppl call to customer // data source with, and it must use service credentials threadContext.putHeader(IS_BACKGROUND_JOB_HEADER, "true") - // TODO: in long term, may need to generalize to aos data source type - threadContext.putHeader(SERVICE_NAME_HEADER, "aoss") + threadContext.putHeader(SERVICE_NAME_HEADER, mapTargetTypeToServiceName(monitor.target!!.type)) // external customer data source endpoint, to run search/ppl against threadContext.putHeader(OPENSEARCH_ENDPOINT_HEADER, monitor.target!!.endpoint) @@ -218,6 +224,14 @@ class MonitorJobPoller( threadContext.putHeader(REGION_HEADER, region) } + private fun mapTargetTypeToServiceName(targetType: String): String { + return when (targetType) { + "AOSS_COLLECTION" -> "aoss" + "AOS_DOMAIN" -> "es" + else -> throw AlertingException.wrap(IllegalStateException("Received unknown target type in Job Poller: " + targetType)) + } + } + companion object { const val POLLER_THREAD_COUNT = 10 const val POLL_INTERVAL_MS = 1000L From 5eff8a21159c1bd6693a99200a1d95ab909fff3c Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Tue, 28 Apr 2026 16:48:00 -0700 Subject: [PATCH 05/11] putting mappings behind consts Signed-off-by: Dennis Toepker --- .../alerting/service/MonitorJobPoller.kt | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt index 75ca5fc9..fad49649 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt @@ -226,9 +226,10 @@ class MonitorJobPoller( private fun mapTargetTypeToServiceName(targetType: String): String { return when (targetType) { - "AOSS_COLLECTION" -> "aoss" - "AOS_DOMAIN" -> "es" - else -> throw AlertingException.wrap(IllegalStateException("Received unknown target type in Job Poller: " + targetType)) + AOSS_COLLECTION -> AOSS_SERVICE_NAME + AOS_DOMAIN -> AOS_SERVICE_NAME + // default target type of "local" is invalid and will throw exception + else -> throw AlertingException.wrap(IllegalStateException("Received invalid target type in Job Poller: " + targetType)) } } @@ -241,5 +242,13 @@ class MonitorJobPoller( const val SERVICE_NAME_HEADER = "aws-service-name" const val OPENSEARCH_ENDPOINT_HEADER = "opensearch-url" const val REGION_HEADER = "aws-region" + + // target types + const val AOSS_COLLECTION = "AOSS_COLLECTION" + const val AOS_DOMAIN = "AOS_DOMAIN" + + // service names + const val AOSS_SERVICE_NAME = "aoss" + const val AOS_SERVICE_NAME = "es" } } From b779d5ca9508fbb6cf33656f0e0c8e86185b104c Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Tue, 28 Apr 2026 16:54:15 -0700 Subject: [PATCH 06/11] fixing linter issues Signed-off-by: Dennis Toepker --- .../org/opensearch/alerting/service/MonitorJobPoller.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt index fad49649..ec7a3a01 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt @@ -5,8 +5,6 @@ package org.opensearch.alerting.service -import java.time.Instant -import java.util.concurrent.atomic.AtomicInteger import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob @@ -32,6 +30,8 @@ import software.amazon.awssdk.services.sqs.SqsClient import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest import software.amazon.awssdk.services.sqs.model.Message import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest +import java.time.Instant +import java.util.concurrent.atomic.AtomicInteger /** * Polls SQS queues for monitor execution messages and dispatches them From e109e179bb61eadec81a4a92f37b1fe0248c74f6 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Tue, 28 Apr 2026 17:04:11 -0700 Subject: [PATCH 07/11] passing Target directly to helper instead of full Monitor object Signed-off-by: Dennis Toepker --- .../alerting/service/MonitorJobPoller.kt | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt index ec7a3a01..fbb45445 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt @@ -22,6 +22,7 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduleJobPayload +import org.opensearch.commons.alerting.model.Target import org.opensearch.commons.alerting.util.AlertingException import org.opensearch.commons.utils.scheduler.JobQueueAccountIdProvider import org.opensearch.core.xcontent.NamedXContentRegistry @@ -137,7 +138,7 @@ class MonitorJobPoller( private suspend fun executeMonitor(monitor: Monitor, jobStartTime: Instant) { // populate thread context for downstream request interception the moment // Monitor config is in hand - populateThreadContext(monitor) + populateThreadContext(monitor.target) val request = ExecuteMonitorRequest( dryrun = false, @@ -188,20 +189,20 @@ class MonitorJobPoller( // populates thread context with KVs that downstream interception will // need when intercepting search or PPL calls to external customer // data source - internal fun populateThreadContext(monitor: Monitor) { - if (monitor.target == null) { + internal fun populateThreadContext(target: Target?) { + if (target == null) { throw AlertingException.wrap( IllegalStateException("Monitor received by Job Poller did not contain target") ) } - if (monitor.target!!.type.isBlank()) { + if (target.type.isBlank()) { throw AlertingException.wrap( IllegalStateException("Monitor target received by Job Poller did not contain target type") ) } - if (monitor.target!!.endpoint.isBlank()) { + if (target.endpoint.isBlank()) { throw AlertingException.wrap( IllegalStateException("Monitor target received by Job Poller did not contain endpoint") ) @@ -215,10 +216,10 @@ class MonitorJobPoller( // data source with, and it must use service credentials threadContext.putHeader(IS_BACKGROUND_JOB_HEADER, "true") - threadContext.putHeader(SERVICE_NAME_HEADER, mapTargetTypeToServiceName(monitor.target!!.type)) + threadContext.putHeader(SERVICE_NAME_HEADER, mapTargetTypeToServiceName(target.type)) // external customer data source endpoint, to run search/ppl against - threadContext.putHeader(OPENSEARCH_ENDPOINT_HEADER, monitor.target!!.endpoint) + threadContext.putHeader(OPENSEARCH_ENDPOINT_HEADER, target.endpoint) // populated upstream in AlertingPlugin.kt with REMOTE_METADATA_REGION.get(settings) threadContext.putHeader(REGION_HEADER, region) From 0c9eed88d484a42f8a1e1b082b4a06fffd500fb7 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Tue, 28 Apr 2026 17:33:46 -0700 Subject: [PATCH 08/11] adding operation type header and extra tests Signed-off-by: Dennis Toepker --- .../alerting/service/MonitorJobPoller.kt | 6 ++ .../alerting/service/MonitorJobPollerTests.kt | 69 +++++++++++++++++-- 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt index fbb45445..2f1b6ddd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt @@ -210,6 +210,8 @@ class MonitorJobPoller( val threadContext = client.threadPool().threadContext + threadContext.putHeader(OPERATION_NAME_HEADER, ALERTING_OP_TYPE) + // Request interception checks for this flag to know that this is // a scheduled background monitor execution, meaning there will be // no user credentials to make the search/ppl call to customer @@ -239,11 +241,15 @@ class MonitorJobPoller( const val POLL_INTERVAL_MS = 1000L // thread context header keys for request interception + const val OPERATION_NAME_HEADER = "x-amzn-oasis-operation" const val IS_BACKGROUND_JOB_HEADER = "alerting-is-background-job" const val SERVICE_NAME_HEADER = "aws-service-name" const val OPENSEARCH_ENDPOINT_HEADER = "opensearch-url" const val REGION_HEADER = "aws-region" + // operation type + const val ALERTING_OP_TYPE = "Alerting" + // target types const val AOSS_COLLECTION = "AOSS_COLLECTION" const val AOS_DOMAIN = "AOS_DOMAIN" diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/service/MonitorJobPollerTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/service/MonitorJobPollerTests.kt index 35848001..75770481 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/service/MonitorJobPollerTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/service/MonitorJobPollerTests.kt @@ -9,7 +9,11 @@ import com.carrotsearch.randomizedtesting.ThreadFilter import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters import org.mockito.Mockito.mock import org.mockito.Mockito.`when` -import org.opensearch.alerting.randomQueryLevelMonitor +import org.opensearch.alerting.service.MonitorJobPoller.Companion.ALERTING_OP_TYPE +import org.opensearch.alerting.service.MonitorJobPoller.Companion.AOSS_COLLECTION +import org.opensearch.alerting.service.MonitorJobPoller.Companion.AOSS_SERVICE_NAME +import org.opensearch.alerting.service.MonitorJobPoller.Companion.AOS_DOMAIN +import org.opensearch.alerting.service.MonitorJobPoller.Companion.AOS_SERVICE_NAME import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.SearchInput @@ -142,6 +146,17 @@ class MonitorJobPollerTests : OpenSearchTestCase() { poller.close() } + fun `test start throws when region not set`() { + val poller = MonitorJobPoller( + testXContentRegistry(), mockClient(), true, + testAccountIdProvider(), "", "test-queue" + ) + expectThrows(Exception::class.java) { + poller.start() + } + poller.close() + } + fun `test worker polls when provider is set`() { val latch = CountDownLatch(1) val sqsClient = FakeSqsClient( @@ -339,7 +354,7 @@ class MonitorJobPollerTests : OpenSearchTestCase() { poller.close() } - fun `test thread context populated correctly`() { + fun `test thread context populated correctly with aoss endpoint`() { val mockClient = mockClient() val mockThreadPool = mock(org.opensearch.threadpool.ThreadPool::class.java) val mockThreadContext = org.opensearch.common.util.concurrent.ThreadContext(Settings.EMPTY) @@ -352,15 +367,59 @@ class MonitorJobPollerTests : OpenSearchTestCase() { testAccountIdProvider(), "us-east-1", "test-queue" ) - val monitor = randomQueryLevelMonitor().copy(target = Target(endpoint = "https://test.aoss.amazonaws.com")) + val target = Target(type = AOSS_COLLECTION, endpoint = "https://test.aoss.amazonaws.com") - poller.populateThreadContext(monitor) + poller.populateThreadContext(target) + assertEquals(ALERTING_OP_TYPE, mockThreadContext.getHeader(MonitorJobPoller.OPERATION_NAME_HEADER)) assertEquals("true", mockThreadContext.getHeader(MonitorJobPoller.IS_BACKGROUND_JOB_HEADER)) - assertEquals("aoss", mockThreadContext.getHeader(MonitorJobPoller.SERVICE_NAME_HEADER)) + assertEquals(AOSS_SERVICE_NAME, mockThreadContext.getHeader(MonitorJobPoller.SERVICE_NAME_HEADER)) assertEquals("https://test.aoss.amazonaws.com", mockThreadContext.getHeader(MonitorJobPoller.OPENSEARCH_ENDPOINT_HEADER)) assertEquals("us-east-1", mockThreadContext.getHeader(MonitorJobPoller.REGION_HEADER)) poller.close() } + + fun `test thread context populated correctly with aos endpoint`() { + val mockClient = mockClient() + val mockThreadPool = mock(org.opensearch.threadpool.ThreadPool::class.java) + val mockThreadContext = org.opensearch.common.util.concurrent.ThreadContext(Settings.EMPTY) + + `when`(mockClient.threadPool()).thenReturn(mockThreadPool) + `when`(mockThreadPool.threadContext).thenReturn(mockThreadContext) + + val poller = MonitorJobPoller( + testXContentRegistry(), mockClient, true, + testAccountIdProvider(), "us-east-1", "test-queue" + ) + + val target = Target(type = AOS_DOMAIN, endpoint = "https://test.es.amazonaws.com") + + poller.populateThreadContext(target) + + assertEquals(ALERTING_OP_TYPE, mockThreadContext.getHeader(MonitorJobPoller.OPERATION_NAME_HEADER)) + assertEquals("true", mockThreadContext.getHeader(MonitorJobPoller.IS_BACKGROUND_JOB_HEADER)) + assertEquals(AOS_SERVICE_NAME, mockThreadContext.getHeader(MonitorJobPoller.SERVICE_NAME_HEADER)) + assertEquals("https://test.es.amazonaws.com", mockThreadContext.getHeader(MonitorJobPoller.OPENSEARCH_ENDPOINT_HEADER)) + assertEquals("us-east-1", mockThreadContext.getHeader(MonitorJobPoller.REGION_HEADER)) + + poller.close() + } + + fun `test thread context population rejects invalid target type`() { + val mockClient = mockClient() + + val poller = MonitorJobPoller( + testXContentRegistry(), mockClient, true, + testAccountIdProvider(), "us-east-1", "test-queue" + ) + + val target = Target(type = "local", endpoint = "https://test.aoss.amazonaws.com") + + expectThrows(Exception::class.java) { + poller.populateThreadContext(target) + } + + poller.close() + } } From 7dcdcfecd0c0bb8eb3f2c8d5b19e88eabf8287a6 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Tue, 28 Apr 2026 20:50:55 -0700 Subject: [PATCH 09/11] changing name of is background job flag thread context header Signed-off-by: Dennis Toepker --- .../kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt index 2f1b6ddd..2223a122 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt @@ -242,7 +242,7 @@ class MonitorJobPoller( // thread context header keys for request interception const val OPERATION_NAME_HEADER = "x-amzn-oasis-operation" - const val IS_BACKGROUND_JOB_HEADER = "alerting-is-background-job" + const val IS_BACKGROUND_JOB_HEADER = "is-observability-bg-job" const val SERVICE_NAME_HEADER = "aws-service-name" const val OPENSEARCH_ENDPOINT_HEADER = "opensearch-url" const val REGION_HEADER = "aws-region" From 102583b2886faf88576fee18f45c1efb5c7e8fd6 Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Wed, 29 Apr 2026 13:10:44 -0700 Subject: [PATCH 10/11] putting target type -> service name mappings behind setting Signed-off-by: Dennis Toepker --- .../org/opensearch/alerting/AlertingPlugin.kt | 8 +- .../alerting/service/MonitorJobPoller.kt | 31 +++----- .../alerting/settings/AlertingSettings.kt | 5 ++ .../alerting/service/MonitorJobPollerTests.kt | 73 +++++++------------ 4 files changed, 50 insertions(+), 67 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index fc19b428..f4d3a209 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -379,7 +379,10 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R MULTI_TENANCY_ENABLED.get(settings), if (providerType.isNotEmpty()) JobQueueAccountIdProvider.find(providerType, settings) else null, REMOTE_METADATA_REGION.get(settings) ?: "", - AlertingSettings.JOB_QUEUE_NAME.get(settings) ?: "" + AlertingSettings.JOB_QUEUE_NAME.get(settings) ?: "", + AlertingSettings.JOB_QUEUE_TARGET_TYPE_TO_SERVICE_NAME.get(settings).let { + it.keySet().associateWith { key -> it.get(key) } + } ) ExternalSchedulerService.initialize(settings) @@ -491,7 +494,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.JOB_QUEUE_MESSAGE_GROUP_KEY_NAME, AlertingSettings.EXTERNAL_SCHEDULER_ROLE_ARN, AlertingSettings.JOB_QUEUE_ACCOUNT_ID, - AlertingSettings.JOB_QUEUE_ACCOUNT_PROVIDER_TYPE + AlertingSettings.JOB_QUEUE_ACCOUNT_PROVIDER_TYPE, + AlertingSettings.JOB_QUEUE_TARGET_TYPE_TO_SERVICE_NAME ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt index 2223a122..d34c69c0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt @@ -49,7 +49,8 @@ class MonitorJobPoller( private val enabled: Boolean, private val accountIdProvider: JobQueueAccountIdProvider?, private val region: String, - private val queueName: String + private val queueName: String, + private val targetTypeToServiceName: Map ) : AbstractLifecycleComponent() { private val logger = LogManager.getLogger(MonitorJobPoller::class.java) @@ -210,8 +211,6 @@ class MonitorJobPoller( val threadContext = client.threadPool().threadContext - threadContext.putHeader(OPERATION_NAME_HEADER, ALERTING_OP_TYPE) - // Request interception checks for this flag to know that this is // a scheduled background monitor execution, meaning there will be // no user credentials to make the search/ppl call to customer @@ -228,12 +227,16 @@ class MonitorJobPoller( } private fun mapTargetTypeToServiceName(targetType: String): String { - return when (targetType) { - AOSS_COLLECTION -> AOSS_SERVICE_NAME - AOS_DOMAIN -> AOS_SERVICE_NAME - // default target type of "local" is invalid and will throw exception - else -> throw AlertingException.wrap(IllegalStateException("Received invalid target type in Job Poller: " + targetType)) + if (!targetTypeToServiceName.containsKey(targetType)) { + throw AlertingException.wrap( + IllegalStateException( + "Received invalid target type in Job Poller: " + targetType + + ", expected one of: " + targetTypeToServiceName.keys + ) + ) } + + return targetTypeToServiceName[targetType]!! } companion object { @@ -241,21 +244,9 @@ class MonitorJobPoller( const val POLL_INTERVAL_MS = 1000L // thread context header keys for request interception - const val OPERATION_NAME_HEADER = "x-amzn-oasis-operation" const val IS_BACKGROUND_JOB_HEADER = "is-observability-bg-job" const val SERVICE_NAME_HEADER = "aws-service-name" const val OPENSEARCH_ENDPOINT_HEADER = "opensearch-url" const val REGION_HEADER = "aws-region" - - // operation type - const val ALERTING_OP_TYPE = "Alerting" - - // target types - const val AOSS_COLLECTION = "AOSS_COLLECTION" - const val AOS_DOMAIN = "AOS_DOMAIN" - - // service names - const val AOSS_SERVICE_NAME = "aoss" - const val AOS_SERVICE_NAME = "es" } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index f5b866e0..00624485 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -404,5 +404,10 @@ class AlertingSettings { "plugins.alerting.external_scheduler.job_queue_message_group_key_name", Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val JOB_QUEUE_TARGET_TYPE_TO_SERVICE_NAME = Setting.groupSetting( + "plugins.alerting.external_scheduler.type_to_service.", + Setting.Property.NodeScope, Setting.Property.Dynamic + ) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/service/MonitorJobPollerTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/service/MonitorJobPollerTests.kt index 75770481..baef77ab 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/service/MonitorJobPollerTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/service/MonitorJobPollerTests.kt @@ -9,11 +9,6 @@ import com.carrotsearch.randomizedtesting.ThreadFilter import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters import org.mockito.Mockito.mock import org.mockito.Mockito.`when` -import org.opensearch.alerting.service.MonitorJobPoller.Companion.ALERTING_OP_TYPE -import org.opensearch.alerting.service.MonitorJobPoller.Companion.AOSS_COLLECTION -import org.opensearch.alerting.service.MonitorJobPoller.Companion.AOSS_SERVICE_NAME -import org.opensearch.alerting.service.MonitorJobPoller.Companion.AOS_DOMAIN -import org.opensearch.alerting.service.MonitorJobPoller.Companion.AOS_SERVICE_NAME import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.SearchInput @@ -63,6 +58,13 @@ class MonitorJobPollerTests : OpenSearchTestCase() { } } + private fun mappingProvider(): Map { + return mapOf( + "target_1" to "service_1", + "target_2" to "service_2" + ) + } + private fun validMessageBody(): String { val monitorConfig = "{\"type\":\"monitor\",\"name\":\"test\"," + "\"monitor_type\":\"query_level_monitor\",\"enabled\":true," + @@ -82,7 +84,8 @@ class MonitorJobPollerTests : OpenSearchTestCase() { ): MonitorJobPoller { return MonitorJobPoller( testXContentRegistry(), mockClient(), enabled, - testAccountIdProvider(), "us-west-2", "test-queue" + testAccountIdProvider(), "us-west-2", "test-queue", + mappingProvider() ).also { it.sqsClient = sqsClient } } @@ -108,7 +111,8 @@ class MonitorJobPollerTests : OpenSearchTestCase() { val sqsClient = FakeSqsClient() val poller = MonitorJobPoller( testXContentRegistry(), mockClient(), true, - testAccountIdProvider(), "us-west-2", "test-queue" + testAccountIdProvider(), "us-west-2", "test-queue", + mappingProvider() ).also { it.sqsClient = sqsClient } poller.start() Thread.sleep(100) @@ -126,7 +130,7 @@ class MonitorJobPollerTests : OpenSearchTestCase() { ) val poller = MonitorJobPoller( testXContentRegistry(), mockClient(), false, - null, "", "" + null, "", "", mappingProvider() ) poller.start() // Should NOT poll since disabled @@ -138,7 +142,7 @@ class MonitorJobPollerTests : OpenSearchTestCase() { fun `test start throws when provider not set`() { val poller = MonitorJobPoller( testXContentRegistry(), mockClient(), true, - null, "us-west-2", "test-queue" + null, "us-west-2", "test-queue", mappingProvider() ) expectThrows(Exception::class.java) { poller.start() @@ -149,7 +153,8 @@ class MonitorJobPollerTests : OpenSearchTestCase() { fun `test start throws when region not set`() { val poller = MonitorJobPoller( testXContentRegistry(), mockClient(), true, - testAccountIdProvider(), "", "test-queue" + testAccountIdProvider(), "", "test-queue", + mappingProvider() ) expectThrows(Exception::class.java) { poller.start() @@ -187,7 +192,8 @@ class MonitorJobPollerTests : OpenSearchTestCase() { } val poller = MonitorJobPoller( testXContentRegistry(), mockClient(), true, - errorProvider, "us-west-2", "test-queue" + errorProvider, "us-west-2", "test-queue", + mappingProvider() ).also { it.sqsClient = FakeSqsClient() } poller.start() assertTrue("Worker should have polled twice", latch.await(5, TimeUnit.SECONDS)) @@ -207,7 +213,8 @@ class MonitorJobPollerTests : OpenSearchTestCase() { } val poller = MonitorJobPoller( testXContentRegistry(), mockClient(), true, - emptyProvider, "us-west-2", "test-queue" + emptyProvider, "us-west-2", "test-queue", + mappingProvider() ).also { it.sqsClient = FakeSqsClient() } poller.start() assertTrue("Worker should have polled multiple times", latch.await(5, TimeUnit.SECONDS)) @@ -354,7 +361,7 @@ class MonitorJobPollerTests : OpenSearchTestCase() { poller.close() } - fun `test thread context populated correctly with aoss endpoint`() { + fun `test thread context populated correctly based on target type`() { val mockClient = mockClient() val mockThreadPool = mock(org.opensearch.threadpool.ThreadPool::class.java) val mockThreadContext = org.opensearch.common.util.concurrent.ThreadContext(Settings.EMPTY) @@ -364,57 +371,33 @@ class MonitorJobPollerTests : OpenSearchTestCase() { val poller = MonitorJobPoller( testXContentRegistry(), mockClient, true, - testAccountIdProvider(), "us-east-1", "test-queue" + testAccountIdProvider(), "us-east-1", "test-queue", + mappingProvider() ) - val target = Target(type = AOSS_COLLECTION, endpoint = "https://test.aoss.amazonaws.com") + val mockTargetType = mappingProvider().entries.first().key + val target = Target(type = mockTargetType, endpoint = "https://test.aoss.amazonaws.com") poller.populateThreadContext(target) - assertEquals(ALERTING_OP_TYPE, mockThreadContext.getHeader(MonitorJobPoller.OPERATION_NAME_HEADER)) assertEquals("true", mockThreadContext.getHeader(MonitorJobPoller.IS_BACKGROUND_JOB_HEADER)) - assertEquals(AOSS_SERVICE_NAME, mockThreadContext.getHeader(MonitorJobPoller.SERVICE_NAME_HEADER)) + assertEquals(mappingProvider()[mockTargetType], mockThreadContext.getHeader(MonitorJobPoller.SERVICE_NAME_HEADER)) assertEquals("https://test.aoss.amazonaws.com", mockThreadContext.getHeader(MonitorJobPoller.OPENSEARCH_ENDPOINT_HEADER)) assertEquals("us-east-1", mockThreadContext.getHeader(MonitorJobPoller.REGION_HEADER)) poller.close() } - fun `test thread context populated correctly with aos endpoint`() { - val mockClient = mockClient() - val mockThreadPool = mock(org.opensearch.threadpool.ThreadPool::class.java) - val mockThreadContext = org.opensearch.common.util.concurrent.ThreadContext(Settings.EMPTY) - - `when`(mockClient.threadPool()).thenReturn(mockThreadPool) - `when`(mockThreadPool.threadContext).thenReturn(mockThreadContext) - - val poller = MonitorJobPoller( - testXContentRegistry(), mockClient, true, - testAccountIdProvider(), "us-east-1", "test-queue" - ) - - val target = Target(type = AOS_DOMAIN, endpoint = "https://test.es.amazonaws.com") - - poller.populateThreadContext(target) - - assertEquals(ALERTING_OP_TYPE, mockThreadContext.getHeader(MonitorJobPoller.OPERATION_NAME_HEADER)) - assertEquals("true", mockThreadContext.getHeader(MonitorJobPoller.IS_BACKGROUND_JOB_HEADER)) - assertEquals(AOS_SERVICE_NAME, mockThreadContext.getHeader(MonitorJobPoller.SERVICE_NAME_HEADER)) - assertEquals("https://test.es.amazonaws.com", mockThreadContext.getHeader(MonitorJobPoller.OPENSEARCH_ENDPOINT_HEADER)) - assertEquals("us-east-1", mockThreadContext.getHeader(MonitorJobPoller.REGION_HEADER)) - - poller.close() - } - fun `test thread context population rejects invalid target type`() { val mockClient = mockClient() val poller = MonitorJobPoller( testXContentRegistry(), mockClient, true, - testAccountIdProvider(), "us-east-1", "test-queue" + testAccountIdProvider(), "us-east-1", "test-queue", + mappingProvider() ) - val target = Target(type = "local", endpoint = "https://test.aoss.amazonaws.com") + val target = Target(type = "non_existent_type", endpoint = "https://test.aoss.amazonaws.com") expectThrows(Exception::class.java) { poller.populateThreadContext(target) From bf6feb04c2c0ef49cc682e5103f07b4a9e0a5b9a Mon Sep 17 00:00:00 2001 From: Dennis Toepker Date: Wed, 29 Apr 2026 14:14:40 -0700 Subject: [PATCH 11/11] renaming target type to service name mappings setting Signed-off-by: Dennis Toepker --- .../main/kotlin/org/opensearch/alerting/AlertingPlugin.kt | 4 ++-- .../org/opensearch/alerting/settings/AlertingSettings.kt | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index f4d3a209..888ddffc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -380,7 +380,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R if (providerType.isNotEmpty()) JobQueueAccountIdProvider.find(providerType, settings) else null, REMOTE_METADATA_REGION.get(settings) ?: "", AlertingSettings.JOB_QUEUE_NAME.get(settings) ?: "", - AlertingSettings.JOB_QUEUE_TARGET_TYPE_TO_SERVICE_NAME.get(settings).let { + AlertingSettings.TARGET_TYPE_TO_SERVICE_NAME.get(settings).let { it.keySet().associateWith { key -> it.get(key) } } ) @@ -495,7 +495,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.EXTERNAL_SCHEDULER_ROLE_ARN, AlertingSettings.JOB_QUEUE_ACCOUNT_ID, AlertingSettings.JOB_QUEUE_ACCOUNT_PROVIDER_TYPE, - AlertingSettings.JOB_QUEUE_TARGET_TYPE_TO_SERVICE_NAME + AlertingSettings.TARGET_TYPE_TO_SERVICE_NAME ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 00624485..40406dd2 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -405,8 +405,10 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) - val JOB_QUEUE_TARGET_TYPE_TO_SERVICE_NAME = Setting.groupSetting( - "plugins.alerting.external_scheduler.type_to_service.", + /** Mappings from Monitor target type to opensearch service name, used in MonitorJobPoller + * to populate thread context with required Monitor target information */ + val TARGET_TYPE_TO_SERVICE_NAME = Setting.groupSetting( + "plugins.alerting.monitor.target_type_to_service_name.", Setting.Property.NodeScope, Setting.Property.Dynamic ) }