Make MonitorJobPoller populate thread context for downstream request interception#2107
Make MonitorJobPoller populate thread context for downstream request interception#2107toepkerd merged 11 commits intoopensearch-project:mainfrom
Conversation
…asis interception Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
| const val POLLER_THREAD_COUNT = 10 | ||
| const val POLL_INTERVAL_MS = 1000L | ||
|
|
||
| // thread context header keys for request interception |
There was a problem hiding this comment.
nit: I think we should follow a consistent naming convention for all these headers but needs to be discussed this across team
There was a problem hiding this comment.
These constants are actually existing header names from other code owners that we are reusing. The only new and original const here is IS_BACKGROUND_JOB_HEADER = "alerting-is-background-job"
| threadContext.putHeader(IS_BACKGROUND_JOB_HEADER, "true") | ||
|
|
||
| // TODO: in long term, may need to generalize to aos data source type | ||
| threadContext.putHeader(SERVICE_NAME_HEADER, "aoss") |
There was a problem hiding this comment.
Is this information in the monitor config already? If so we can pull it from there instead of hardcoding
There was a problem hiding this comment.
I don't think it's anywhere in the Monitor config, but there is a REMOTE_METADATA_SERVICE_NAME setting I could retrieve the value from. Will explore reading from this value instead of hard-coding aoss.
There was a problem hiding this comment.
If reading from REMOTE_METADATA_SERVICE_NAME approach is used, we must be mindful of conventions for naming services. For collection data sources, I think we can trust the service name will always be aoss.
In the long term, if we support domain data sources, we will need to match existing service name conventions in the interception logic. Whether they favor aos or es as the service name, our logic for populating these remote metadata settings (outside the scope of this PR) will need to be ready to match that convention if reading from settings is to work.
There was a problem hiding this comment.
There was a problem hiding this comment.
@riysaxen-amzn @MohammedAghil can help ensure that this header's value will match exactly what the header needs or what a setting can hold as a kv map where key will be target.type and value would be header value
There was a problem hiding this comment.
I see, I misunderstood the monitor.target.type field, thanks for clarifying. I will double check the values that this field can take on (other than the default value of "local"). I will add fail-fast validations for the type.
| ) | ||
| } | ||
|
|
||
| if (region.isBlank()) { |
There was a problem hiding this comment.
this should be a check before poller threads are bootstrapped itself near
val provider = requireNotNull(accountIdProvider) { "accountIdProvider must be set before starting" }
val sqs = requireNotNull(sqsClient) { "sqsClient must be set before starting" }
| threadContext.putHeader(IS_BACKGROUND_JOB_HEADER, "true") | ||
|
|
||
| // TODO: in long term, may need to generalize to aos data source type | ||
| threadContext.putHeader(SERVICE_NAME_HEADER, "aoss") |
There was a problem hiding this comment.
| threadContext.putHeader(IS_BACKGROUND_JOB_HEADER, "true") | ||
|
|
||
| // TODO: in long term, may need to generalize to aos data source type | ||
| threadContext.putHeader(SERVICE_NAME_HEADER, "aoss") |
There was a problem hiding this comment.
@riysaxen-amzn @MohammedAghil can help ensure that this header's value will match exactly what the header needs or what a setting can hold as a kv map where key will be target.type and value would be header value
| threadContext.putHeader(SERVICE_NAME_HEADER, "aoss") | ||
|
|
||
| // external customer data source endpoint, to run search/ppl against | ||
| threadContext.putHeader(OPENSEARCH_ENDPOINT_HEADER, monitor.target!!.endpoint) |
There was a problem hiding this comment.
plz check that it's not null/empty before you set it.
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
| Setting.Property.NodeScope, Setting.Property.Dynamic | ||
| ) | ||
|
|
||
| val JOB_QUEUE_TARGET_TYPE_TO_SERVICE_NAME = Setting.groupSetting( |
There was a problem hiding this comment.
Yes.
If no settings with this prefix is configured anywhere, MonitorJobPoller construction in AlertingPlugin.kt effectively gets an empty keySet() (like get(SETTINGS.Empty)). This means every Monitor Job Poller job will fail at thread context writing with "invalid target type" exception. This is intentional behavior: values for these settings must be configured in the following way in opensearch.yml:
plugins.alerting.monitor.target_type_to_service_name.target_type_1: service_name_1
plugins.alerting.monitor.target_type_to_service_name.target_type_2: service_name_2
| ) | ||
|
|
||
| val JOB_QUEUE_TARGET_TYPE_TO_SERVICE_NAME = Setting.groupSetting( | ||
| "plugins.alerting.external_scheduler.type_to_service.", |
There was a problem hiding this comment.
can this be renamed to plugins.alerting.monitor.target_type_to_service_name.
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Description
Job poller now populates the thread context with information about the external data source so downstream flows can properly intercept it and make Search and PPL calls to external data source.
Check List
--signoff.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.