diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index fc19b428..87628a1d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -490,6 +490,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.JOB_QUEUE_NAME, AlertingSettings.JOB_QUEUE_MESSAGE_GROUP_KEY_NAME, AlertingSettings.EXTERNAL_SCHEDULER_ROLE_ARN, + AlertingSettings.EXTERNAL_SCHEDULER_EXECUTION_ROLE_ARN, AlertingSettings.JOB_QUEUE_ACCOUNT_ID, AlertingSettings.JOB_QUEUE_ACCOUNT_PROVIDER_TYPE ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/ExternalSchedulerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/ExternalSchedulerService.kt index bd183eb2..9c2f780d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/ExternalSchedulerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/ExternalSchedulerService.kt @@ -134,7 +134,7 @@ object ExternalSchedulerService { val universalInput = buildUniversalInput(queueUrl, targetInput, monitor) return Target.builder() .arn(EB_SQS_UNIVERSAL_TARGET_ARN) - .roleArn(routing.roleArn) + .roleArn(routing.executionRoleArn) .input(universalInput) .build() } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/SchedulerRoutingResolver.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/SchedulerRoutingResolver.kt index 0459de0b..d035b54c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/SchedulerRoutingResolver.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/SchedulerRoutingResolver.kt @@ -16,18 +16,20 @@ package org.opensearch.alerting.service object SchedulerRoutingResolver { /** Routing info for external scheduler operations. */ - data class Routing(val accountId: String, val queueName: String, val roleArn: String) + data class Routing(val accountId: String, val queueName: String, val roleArn: String, val executionRoleArn: String) fun resolve( settingsAccountId: String, settingsQueueName: String, settingsRoleArn: String, + settingsExecutionRoleArn: String? = null, threadContextAccountIdOverride: String? ): Routing? { val accountId = pickAccountId(settingsAccountId, threadContextAccountIdOverride) ?: return null val queueName = settingsQueueName.takeIf { it.isNotBlank() } ?: return null val roleArn = settingsRoleArn.takeIf { it.isNotBlank() } ?: return null - return Routing(accountId, queueName, roleArn) + val executionRoleArn = settingsExecutionRoleArn?.takeIf { it.isNotBlank() } ?: return null + return Routing(accountId, queueName, roleArn, executionRoleArn) } /** Delete only needs accountId + roleArn; queueName is set to empty. */ @@ -38,7 +40,7 @@ object SchedulerRoutingResolver { ): Routing? { val accountId = pickAccountId(settingsAccountId, threadContextAccountIdOverride) ?: return null val roleArn = settingsRoleArn.takeIf { it.isNotBlank() } ?: return null - return Routing(accountId, "", roleArn) + return Routing(accountId, "", roleArn, "") } /** ThreadContext override wins; falls back to plugin setting; null if both are blank. */ diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index f5b866e0..dac5acae 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -380,6 +380,12 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + /** IAM role ARN that EventBridge Scheduler assumes at fire time (Target.roleArn). Required when external scheduler is enabled. */ + val EXTERNAL_SCHEDULER_EXECUTION_ROLE_ARN = Setting.simpleString( + "plugins.alerting.external_scheduler.execution_role_arn", + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + /** AWS account ID that hosts the job queues available for polling. */ val JOB_QUEUE_ACCOUNT_ID = Setting.simpleString( "plugins.alerting.external_scheduler.job_queue_account_id", diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 82e5d334..ef104578 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -125,6 +125,7 @@ class TransportIndexMonitorAction @Inject constructor( @Volatile private var externalSchedulerAccountId = AlertingSettings.EXTERNAL_SCHEDULER_ACCOUNT_ID.get(settings) @Volatile private var jobQueueName = AlertingSettings.JOB_QUEUE_NAME.get(settings) @Volatile private var externalSchedulerRoleArn = AlertingSettings.EXTERNAL_SCHEDULER_ROLE_ARN.get(settings) + @Volatile private var externalSchedulerExecutionRoleArn = AlertingSettings.EXTERNAL_SCHEDULER_EXECUTION_ROLE_ARN.get(settings) private val multiTenancyEnabled = AlertingSettings.MULTI_TENANCY_ENABLED.get(settings) @@ -147,6 +148,9 @@ class TransportIndexMonitorAction @Inject constructor( clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.EXTERNAL_SCHEDULER_ROLE_ARN) { externalSchedulerRoleArn = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.EXTERNAL_SCHEDULER_EXECUTION_ROLE_ARN) { + externalSchedulerExecutionRoleArn = it + } listenFilterBySettingChange(clusterService) } @@ -884,6 +888,7 @@ class TransportIndexMonitorAction @Inject constructor( settingsAccountId = externalSchedulerAccountId, settingsQueueName = jobQueueName, settingsRoleArn = externalSchedulerRoleArn, + settingsExecutionRoleArn = externalSchedulerExecutionRoleArn, threadContextAccountIdOverride = client.threadPool().threadContext .getTransient(ExternalSchedulerService.SCHEDULER_ACCOUNT_ID_KEY) ) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/service/ExternalSchedulerServiceTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/service/ExternalSchedulerServiceTests.kt index 73a69641..780e3623 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/service/ExternalSchedulerServiceTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/service/ExternalSchedulerServiceTests.kt @@ -76,7 +76,7 @@ class ExternalSchedulerServiceTests { .put("plugins.alerting.remote_metadata_region", "us-west-2").build() ) val monitor = testMonitor() - val routing = SchedulerRoutingResolver.Routing("111111111111", "queue", "arn:aws:iam::111:role/test") + val routing = SchedulerRoutingResolver.Routing("111111111111", "queue", "arn:aws:iam::111:role/test", "arn:aws:iam::111:role/exec") try { ExternalSchedulerService.createSchedule(monitor, routing, buildPayloadJson(monitor)) throw AssertionError("Expected IllegalArgumentException") @@ -92,7 +92,7 @@ class ExternalSchedulerServiceTests { org.opensearch.common.settings.Settings.builder() .put("plugins.alerting.remote_metadata_region", "us-west-2").build() ) - val routing = SchedulerRoutingResolver.Routing("333333333333", "queue", "arn:aws:iam::333:role/test") + val routing = SchedulerRoutingResolver.Routing("333333333333", "queue", "arn:aws:iam::333:role/test", "arn:aws:iam::333:role/exec") try { ExternalSchedulerService.deleteSchedule("mon-3", routing) throw AssertionError("Expected IllegalArgumentException") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/service/SchedulerRoutingResolverTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/service/SchedulerRoutingResolverTests.kt index fe2d7344..982fc405 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/service/SchedulerRoutingResolverTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/service/SchedulerRoutingResolverTests.kt @@ -15,42 +15,52 @@ class SchedulerRoutingResolverTests { private val override = "999999999999" private val queue = "arn:aws:sqs:us-east-1:111:queue" private val role = "arn:aws:iam::111:role/eb" + private val execRole = "arn:aws:iam::111:role/eb-exec" // ---------- resolve() — create/update path ---------- @Test fun `resolve uses plugin settings when no override`() { - val r = SchedulerRoutingResolver.resolve(acct, queue, role, threadContextAccountIdOverride = null)!! + val r = SchedulerRoutingResolver.resolve(acct, queue, role, execRole, threadContextAccountIdOverride = null)!! assertEquals(acct, r.accountId) assertEquals(queue, r.queueName) assertEquals(role, r.roleArn) + assertEquals(execRole, r.executionRoleArn) } @Test fun `resolve applies ThreadContext override for accountId`() { - val r = SchedulerRoutingResolver.resolve(acct, queue, role, threadContextAccountIdOverride = override)!! + val r = SchedulerRoutingResolver.resolve(acct, queue, role, execRole, threadContextAccountIdOverride = override)!! assertEquals(override, r.accountId) } @Test fun `resolve treats blank override as absent`() { - val r = SchedulerRoutingResolver.resolve(acct, queue, role, threadContextAccountIdOverride = " ")!! + val r = SchedulerRoutingResolver.resolve(acct, queue, role, execRole, threadContextAccountIdOverride = " ")!! assertEquals(acct, r.accountId) } @Test fun `resolve returns null when accountId missing in both setting and override`() { - assertNull(SchedulerRoutingResolver.resolve("", queue, role, threadContextAccountIdOverride = null)) - assertNull(SchedulerRoutingResolver.resolve("", queue, role, threadContextAccountIdOverride = "")) + assertNull(SchedulerRoutingResolver.resolve("", queue, role, execRole, threadContextAccountIdOverride = null)) + assertNull(SchedulerRoutingResolver.resolve("", queue, role, execRole, threadContextAccountIdOverride = "")) } @Test fun `resolve still succeeds when setting blank but override provided`() { - val r = SchedulerRoutingResolver.resolve("", queue, role, threadContextAccountIdOverride = override)!! + val r = SchedulerRoutingResolver.resolve("", queue, role, execRole, threadContextAccountIdOverride = override)!! assertEquals(override, r.accountId) } @Test fun `resolve returns null when queueName blank`() { - assertNull(SchedulerRoutingResolver.resolve(acct, "", role, threadContextAccountIdOverride = null)) + assertNull(SchedulerRoutingResolver.resolve(acct, "", role, execRole, threadContextAccountIdOverride = null)) } @Test fun `resolve returns null when roleArn blank`() { - assertNull(SchedulerRoutingResolver.resolve(acct, queue, "", threadContextAccountIdOverride = null)) + assertNull(SchedulerRoutingResolver.resolve(acct, queue, "", execRole, threadContextAccountIdOverride = null)) + } + + @Test fun `resolve returns null when executionRoleArn blank`() { + assertNull(SchedulerRoutingResolver.resolve(acct, queue, role, " ", threadContextAccountIdOverride = null)) + } + + @Test fun `resolve returns null when executionRoleArn omitted`() { + assertNull(SchedulerRoutingResolver.resolve(acct, queue, role, threadContextAccountIdOverride = null)) } // ---------- resolveForDelete() ----------