Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.JOB_QUEUE_NAME,
AlertingSettings.JOB_QUEUE_MESSAGE_GROUP_KEY_NAME,
AlertingSettings.EXTERNAL_SCHEDULER_ROLE_ARN,
AlertingSettings.EXTERNAL_SCHEDULER_EXECUTION_ROLE_ARN,
AlertingSettings.JOB_QUEUE_ACCOUNT_ID,
AlertingSettings.JOB_QUEUE_ACCOUNT_PROVIDER_TYPE
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ object ExternalSchedulerService {
val universalInput = buildUniversalInput(queueUrl, targetInput, monitor)
return Target.builder()
.arn(EB_SQS_UNIVERSAL_TARGET_ARN)
.roleArn(routing.roleArn)
.roleArn(routing.executionRoleArn)
.input(universalInput)
.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@ package org.opensearch.alerting.service
object SchedulerRoutingResolver {

/** Routing info for external scheduler operations. */
data class Routing(val accountId: String, val queueName: String, val roleArn: String)
data class Routing(val accountId: String, val queueName: String, val roleArn: String, val executionRoleArn: String)

fun resolve(
settingsAccountId: String,
settingsQueueName: String,
settingsRoleArn: String,
settingsExecutionRoleArn: String? = null,
threadContextAccountIdOverride: String?
): Routing? {
val accountId = pickAccountId(settingsAccountId, threadContextAccountIdOverride) ?: return null
val queueName = settingsQueueName.takeIf { it.isNotBlank() } ?: return null
val roleArn = settingsRoleArn.takeIf { it.isNotBlank() } ?: return null
return Routing(accountId, queueName, roleArn)
val executionRoleArn = settingsExecutionRoleArn?.takeIf { it.isNotBlank() } ?: return null
return Routing(accountId, queueName, roleArn, executionRoleArn)
}

/** Delete only needs accountId + roleArn; queueName is set to empty. */
Expand All @@ -38,7 +40,7 @@ object SchedulerRoutingResolver {
): Routing? {
val accountId = pickAccountId(settingsAccountId, threadContextAccountIdOverride) ?: return null
val roleArn = settingsRoleArn.takeIf { it.isNotBlank() } ?: return null
return Routing(accountId, "", roleArn)
return Routing(accountId, "", roleArn, "")
}

/** ThreadContext override wins; falls back to plugin setting; null if both are blank. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,12 @@ class AlertingSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** IAM role ARN that EventBridge Scheduler assumes at fire time (Target.roleArn). Required when external scheduler is enabled. */
val EXTERNAL_SCHEDULER_EXECUTION_ROLE_ARN = Setting.simpleString(
"plugins.alerting.external_scheduler.execution_role_arn",
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** AWS account ID that hosts the job queues available for polling. */
val JOB_QUEUE_ACCOUNT_ID = Setting.simpleString(
"plugins.alerting.external_scheduler.job_queue_account_id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class TransportIndexMonitorAction @Inject constructor(
@Volatile private var externalSchedulerAccountId = AlertingSettings.EXTERNAL_SCHEDULER_ACCOUNT_ID.get(settings)
@Volatile private var jobQueueName = AlertingSettings.JOB_QUEUE_NAME.get(settings)
@Volatile private var externalSchedulerRoleArn = AlertingSettings.EXTERNAL_SCHEDULER_ROLE_ARN.get(settings)
@Volatile private var externalSchedulerExecutionRoleArn = AlertingSettings.EXTERNAL_SCHEDULER_EXECUTION_ROLE_ARN.get(settings)

private val multiTenancyEnabled = AlertingSettings.MULTI_TENANCY_ENABLED.get(settings)

Expand All @@ -147,6 +148,9 @@ class TransportIndexMonitorAction @Inject constructor(
clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.EXTERNAL_SCHEDULER_ROLE_ARN) {
externalSchedulerRoleArn = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.EXTERNAL_SCHEDULER_EXECUTION_ROLE_ARN) {
externalSchedulerExecutionRoleArn = it
}
listenFilterBySettingChange(clusterService)
}

Expand Down Expand Up @@ -884,6 +888,7 @@ class TransportIndexMonitorAction @Inject constructor(
settingsAccountId = externalSchedulerAccountId,
settingsQueueName = jobQueueName,
settingsRoleArn = externalSchedulerRoleArn,
settingsExecutionRoleArn = externalSchedulerExecutionRoleArn,
threadContextAccountIdOverride = client.threadPool().threadContext
.getTransient<String>(ExternalSchedulerService.SCHEDULER_ACCOUNT_ID_KEY)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class ExternalSchedulerServiceTests {
.put("plugins.alerting.remote_metadata_region", "us-west-2").build()
)
val monitor = testMonitor()
val routing = SchedulerRoutingResolver.Routing("111111111111", "queue", "arn:aws:iam::111:role/test")
val routing = SchedulerRoutingResolver.Routing("111111111111", "queue", "arn:aws:iam::111:role/test", "arn:aws:iam::111:role/exec")
try {
ExternalSchedulerService.createSchedule(monitor, routing, buildPayloadJson(monitor))
throw AssertionError("Expected IllegalArgumentException")
Expand All @@ -92,7 +92,7 @@ class ExternalSchedulerServiceTests {
org.opensearch.common.settings.Settings.builder()
.put("plugins.alerting.remote_metadata_region", "us-west-2").build()
)
val routing = SchedulerRoutingResolver.Routing("333333333333", "queue", "arn:aws:iam::333:role/test")
val routing = SchedulerRoutingResolver.Routing("333333333333", "queue", "arn:aws:iam::333:role/test", "arn:aws:iam::333:role/exec")
try {
ExternalSchedulerService.deleteSchedule("mon-3", routing)
throw AssertionError("Expected IllegalArgumentException")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,52 @@ class SchedulerRoutingResolverTests {
private val override = "999999999999"
private val queue = "arn:aws:sqs:us-east-1:111:queue"
private val role = "arn:aws:iam::111:role/eb"
private val execRole = "arn:aws:iam::111:role/eb-exec"

// ---------- resolve() — create/update path ----------

@Test fun `resolve uses plugin settings when no override`() {
val r = SchedulerRoutingResolver.resolve(acct, queue, role, threadContextAccountIdOverride = null)!!
val r = SchedulerRoutingResolver.resolve(acct, queue, role, execRole, threadContextAccountIdOverride = null)!!
assertEquals(acct, r.accountId)
assertEquals(queue, r.queueName)
assertEquals(role, r.roleArn)
assertEquals(execRole, r.executionRoleArn)
}

@Test fun `resolve applies ThreadContext override for accountId`() {
val r = SchedulerRoutingResolver.resolve(acct, queue, role, threadContextAccountIdOverride = override)!!
val r = SchedulerRoutingResolver.resolve(acct, queue, role, execRole, threadContextAccountIdOverride = override)!!
assertEquals(override, r.accountId)
}

@Test fun `resolve treats blank override as absent`() {
val r = SchedulerRoutingResolver.resolve(acct, queue, role, threadContextAccountIdOverride = " ")!!
val r = SchedulerRoutingResolver.resolve(acct, queue, role, execRole, threadContextAccountIdOverride = " ")!!
assertEquals(acct, r.accountId)
}

@Test fun `resolve returns null when accountId missing in both setting and override`() {
assertNull(SchedulerRoutingResolver.resolve("", queue, role, threadContextAccountIdOverride = null))
assertNull(SchedulerRoutingResolver.resolve("", queue, role, threadContextAccountIdOverride = ""))
assertNull(SchedulerRoutingResolver.resolve("", queue, role, execRole, threadContextAccountIdOverride = null))
assertNull(SchedulerRoutingResolver.resolve("", queue, role, execRole, threadContextAccountIdOverride = ""))
}

@Test fun `resolve still succeeds when setting blank but override provided`() {
val r = SchedulerRoutingResolver.resolve("", queue, role, threadContextAccountIdOverride = override)!!
val r = SchedulerRoutingResolver.resolve("", queue, role, execRole, threadContextAccountIdOverride = override)!!
assertEquals(override, r.accountId)
}

@Test fun `resolve returns null when queueName blank`() {
assertNull(SchedulerRoutingResolver.resolve(acct, "", role, threadContextAccountIdOverride = null))
assertNull(SchedulerRoutingResolver.resolve(acct, "", role, execRole, threadContextAccountIdOverride = null))
}

@Test fun `resolve returns null when roleArn blank`() {
assertNull(SchedulerRoutingResolver.resolve(acct, queue, "", threadContextAccountIdOverride = null))
assertNull(SchedulerRoutingResolver.resolve(acct, queue, "", execRole, threadContextAccountIdOverride = null))
}

@Test fun `resolve returns null when executionRoleArn blank`() {
assertNull(SchedulerRoutingResolver.resolve(acct, queue, role, " ", threadContextAccountIdOverride = null))
}

@Test fun `resolve returns null when executionRoleArn omitted`() {
assertNull(SchedulerRoutingResolver.resolve(acct, queue, role, threadContextAccountIdOverride = null))
}

// ---------- resolveForDelete() ----------
Expand Down
Loading