From 75850d5746d2e785b3434e715ca5615e556a157a Mon Sep 17 00:00:00 2001 From: Bas Schoenmaeckers Date: Sun, 16 Nov 2025 18:45:54 +0100 Subject: [PATCH] Add `ThreadPoolExecutor` metrics to `opentelemetry-instrumentation-threading` --- .../instrumentation/threading/__init__.py | 124 ++++++++++++++---- 1 file changed, 100 insertions(+), 24 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-threading/src/opentelemetry/instrumentation/threading/__init__.py b/instrumentation/opentelemetry-instrumentation-threading/src/opentelemetry/instrumentation/threading/__init__.py index 6352197465..ecb7e84548 100644 --- a/instrumentation/opentelemetry-instrumentation-threading/src/opentelemetry/instrumentation/threading/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-threading/src/opentelemetry/instrumentation/threading/__init__.py @@ -39,8 +39,10 @@ import threading from concurrent import futures +from concurrent.futures import Future from typing import TYPE_CHECKING, Any, Callable, Collection +from opentelemetry.metrics import get_meter from wrapt import ( wrap_function_wrapper, # type: ignore[reportUnknownVariableType] ) @@ -48,6 +50,7 @@ from opentelemetry import context from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.threading.package import _instruments +from opentelemetry.instrumentation.threading.version import __version__ from opentelemetry.instrumentation.utils import unwrap if TYPE_CHECKING: @@ -63,11 +66,40 @@ class ThreadingInstrumentor(BaseInstrumentor): __WRAPPER_START_METHOD = "start" __WRAPPER_RUN_METHOD = "run" __WRAPPER_SUBMIT_METHOD = "submit" + __WRAPPER_INIT_METHOD = "__init__" def instrumentation_dependencies(self) -> Collection[str]: return _instruments def _instrument(self, **kwargs: Any): + meter_provider = kwargs.get("meter_provider") + meter = get_meter( + __name__, + __version__, + meter_provider, + schema_url="https://opentelemetry.io/schemas/1.38.0", + ) + + self.working_items_count = meter.create_up_down_counter( + name="python.threadpool.working_items.count", + unit="threads", + description="The number of jobs currently being processed by the thread pool", + ) + self.queue_count = meter.create_up_down_counter( + name="python.threadpool.queue.length", + unit="threads", + description="The number of jobs currently queued in the thread pool", + ) + self.thread_count = meter.create_gauge( + name="python.threadpool.thread.count", + unit="threads", + description="The maximum number of concurrent jobs allowed in the thread pool", + ) + self.max_thread_count = meter.create_gauge( + name="python.threadpool.thread.max_count", + unit="threads", + description="The maximum number of concurrent jobs allowed in the thread pool", + ) self._instrument_thread() self._instrument_timer() self._instrument_thread_pool() @@ -103,12 +135,16 @@ def _instrument_timer(): ThreadingInstrumentor.__wrap_threading_run, ) - @staticmethod - def _instrument_thread_pool(): + def _instrument_thread_pool(self): + wrap_function_wrapper( + futures.ThreadPoolExecutor, + ThreadingInstrumentor.__WRAPPER_INIT_METHOD, + self.__build_wrap_thread_pool_init(), + ) wrap_function_wrapper( futures.ThreadPoolExecutor, ThreadingInstrumentor.__WRAPPER_SUBMIT_METHOD, - ThreadingInstrumentor.__wrap_thread_pool_submit, + self.__build_wrap_thread_pool_submit(), ) @staticmethod @@ -123,6 +159,10 @@ def _uninstrument_timer(): @staticmethod def _uninstrument_thread_pool(): + unwrap( + futures.ThreadPoolExecutor, + ThreadingInstrumentor.__WRAPPER_INIT_METHOD, + ) unwrap( futures.ThreadPoolExecutor, ThreadingInstrumentor.__WRAPPER_SUBMIT_METHOD, @@ -153,26 +193,62 @@ def __wrap_threading_run( if token is not None: context.detach(token) - @staticmethod - def __wrap_thread_pool_submit( - call_wrapped: Callable[..., R], - instance: futures.ThreadPoolExecutor, - args: tuple[Callable[..., Any], ...], - kwargs: dict[str, Any], - ) -> R: - # obtain the original function and wrapped kwargs - original_func = args[0] - otel_context = context.get_current() + def __build_wrap_thread_pool_submit(self) -> Callable[..., Future[R]]: + def __wrap_thread_pool_submit( + call_wrapped: Callable[..., Future[R]], + instance: futures.ThreadPoolExecutor, + args: tuple[Callable[..., Any], ...], + kwargs: dict[str, Any], + ) -> Future[R]: + # obtain the original function and wrapped kwargs + original_func = args[0] + otel_context = context.get_current() + attributes = { + "threadpool.executor": instance._thread_name_prefix, + } + + def wrapped_func(*func_args: Any, **func_kwargs: Any) -> R: + token = None + try: + token = context.attach(otel_context) + self.queue_count.add(-1, attributes) + self.working_items_count.add(1, attributes) + return original_func(*func_args, **func_kwargs) + finally: + if token is not None: + context.detach(token) + + # replace the original function with the wrapped function + new_args: tuple[Callable[..., Any], ...] = (wrapped_func,) + args[ + 1: + ] + self.queue_count.add(1, attributes) - def wrapped_func(*func_args: Any, **func_kwargs: Any) -> R: - token = None try: - token = context.attach(otel_context) - return original_func(*func_args, **func_kwargs) - finally: - if token is not None: - context.detach(token) - - # replace the original function with the wrapped function - new_args: tuple[Callable[..., Any], ...] = (wrapped_func,) + args[1:] - return call_wrapped(*new_args, **kwargs) + future = call_wrapped(*new_args, **kwargs) + except RuntimeError: + self.queue_count.add(-1, attributes) + raise + + self.thread_count.set(len(instance._threads), attributes) + future.add_done_callback( + lambda _: self.working_items_count.add(-1, attributes) + ) + return future + + return __wrap_thread_pool_submit + + def __build_wrap_thread_pool_init(self) -> Callable[..., None]: + def __wrap_thread_pool_init( + call_wrapped: Callable[..., None], + instance: futures.ThreadPoolExecutor, + args: tuple[Callable[..., Any], ...], + kwargs: dict[str, Any], + ) -> None: + call_wrapped(*args, **kwargs) + attributes = { + "threadpool.executor": instance._thread_name_prefix, + } + self.max_thread_count.set(instance._max_workers, attributes) + + return __wrap_thread_pool_init