Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6d9bdfe
init otel handler
hieheihei Nov 18, 2025
dc8fb84
feat: implement OTel instrumentation with wrapper-based handlers
hieheihei Nov 19, 2025
cf1c82f
refactor: simplify OTel trace_span decorator to accept handler class …
hieheihei Nov 19, 2025
b1fa557
update otel
hieheihei Nov 20, 2025
5979df6
feat: add GenAI and Dify semantic conventions
hieheihei Nov 20, 2025
97063d0
Merge branch 'otel/dev_2' into otel/service
hieheihei Nov 24, 2025
bf2f389
add otel span for app_generate_service
hieheihei Nov 24, 2025
7fd7769
auto fix
hieheihei Nov 24, 2025
2e0bd9d
[autofix.ci] apply automated fixes
autofix-ci[bot] Nov 24, 2025
aae5064
Merge branch 'main' into otel/service
hieheihei Nov 25, 2025
f87e83b
update otel env
hieheihei Nov 25, 2025
d4bc38e
add workflow handler
hieheihei Nov 26, 2025
c179f55
[autofix.ci] apply automated fixes
autofix-ci[bot] Nov 26, 2025
407e3a5
fix otel
hieheihei Nov 26, 2025
86e7207
Merge remote-tracking branch 'hieheihei/otel/service' into otel/service
hieheihei Nov 26, 2025
375f169
Merge branch 'main' into otel/service
hieheihei Nov 26, 2025
773e42c
refactor: add extensions/otel dir
hieheihei Nov 27, 2025
eca5328
move observability to extensions
hieheihei Nov 27, 2025
461dd58
auto fix
hieheihei Nov 27, 2025
0b1258d
rename func
hieheihei Nov 27, 2025
3d28855
rename generate_handler arg
hieheihei Dec 3, 2025
6e22272
add unit tests for otel ext
hieheihei Dec 3, 2025
bfc7090
Merge branch 'main' into otel/service
hieheihei Dec 3, 2025
abc0042
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 3, 2025
159b1b1
fix unit test
hieheihei Dec 3, 2025
2ceaf61
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 3, 2025
a69af96
Apply changes for benchmark PR
tomerqodo Dec 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/core/app/apps/advanced_chat/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from extensions.otel import WorkflowAppRunnerHandler, trace_span
from models import Workflow
from models.enums import UserFrom
from models.model import App, Conversation, Message, MessageAnnotation
Expand Down Expand Up @@ -80,6 +81,7 @@ def __init__(
self._workflow_execution_repository = workflow_execution_repository
self._workflow_node_execution_repository = workflow_node_execution_repository

@trace_span(WorkflowAppRunnerHandler)
def run(self):
app_config = self.application_generate_entity.app_config
app_config = cast(AdvancedChatAppConfig, app_config)
Expand Down
2 changes: 2 additions & 0 deletions api/core/app/apps/workflow/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from core.workflow.variable_loader import VariableLoader
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_redis import redis_client
from extensions.otel import WorkflowAppRunnerHandler, trace_span
from libs.datetime_utils import naive_utc_now
from models.enums import UserFrom
from models.workflow import Workflow
Expand Down Expand Up @@ -56,6 +57,7 @@ def __init__(
self._workflow_execution_repository = workflow_execution_repository
self._workflow_node_execution_repository = workflow_node_execution_repository

@trace_span(WorkflowAppRunnerHandler)
def run(self):
"""
Run application
Expand Down
160 changes: 9 additions & 151 deletions api/extensions/ext_otel.py
Original file line number Diff line number Diff line change
@@ -1,148 +1,22 @@
import atexit
import contextlib
import logging
import os
import platform
import socket
import sys
from typing import Union

import flask
from celery.signals import worker_init
from flask_login import user_loaded_from_request, user_logged_in

from configs import dify_config
from dify_app import DifyApp
from libs.helper import extract_tenant_id
from models import Account, EndUser

logger = logging.getLogger(__name__)


@user_logged_in.connect
@user_loaded_from_request.connect
def on_user_loaded(_sender, user: Union["Account", "EndUser"]):
if dify_config.ENABLE_OTEL:
from opentelemetry.trace import get_current_span

if user:
try:
current_span = get_current_span()
tenant_id = extract_tenant_id(user)
if not tenant_id:
return
if current_span:
current_span.set_attribute("service.tenant.id", tenant_id)
current_span.set_attribute("service.user.id", user.id)
except Exception:
logger.exception("Error setting tenant and user attributes")
pass


def init_app(app: DifyApp):
from opentelemetry.semconv.trace import SpanAttributes

def is_celery_worker():
return "celery" in sys.argv[0].lower()

def instrument_exception_logging():
exception_handler = ExceptionLoggingHandler()
logging.getLogger().addHandler(exception_handler)

def init_flask_instrumentor(app: DifyApp):
meter = get_meter("http_metrics", version=dify_config.project.version)
_http_response_counter = meter.create_counter(
"http.server.response.count",
description="Total number of HTTP responses by status code, method and target",
unit="{response}",
)

def response_hook(span: Span, status: str, response_headers: list):
if span and span.is_recording():
try:
if status.startswith("2"):
span.set_status(StatusCode.OK)
else:
span.set_status(StatusCode.ERROR, status)

status = status.split(" ")[0]
status_code = int(status)
status_class = f"{status_code // 100}xx"
attributes: dict[str, str | int] = {"status_code": status_code, "status_class": status_class}
request = flask.request
if request and request.url_rule:
attributes[SpanAttributes.HTTP_TARGET] = str(request.url_rule.rule)
if request and request.method:
attributes[SpanAttributes.HTTP_METHOD] = str(request.method)
_http_response_counter.add(1, attributes)
except Exception:
logger.exception("Error setting status and attributes")
pass

instrumentor = FlaskInstrumentor()
if dify_config.DEBUG:
logger.info("Initializing Flask instrumentor")
instrumentor.instrument_app(app, response_hook=response_hook)

def init_sqlalchemy_instrumentor(app: DifyApp):
with app.app_context():
engines = list(app.extensions["sqlalchemy"].engines.values())
SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines)

def setup_context_propagation():
# Configure propagators
set_global_textmap(
CompositePropagator(
[
TraceContextTextMapPropagator(), # W3C trace context
B3Format(), # B3 propagation (used by many systems)
]
)
)

def shutdown_tracer():
provider = trace.get_tracer_provider()
if hasattr(provider, "force_flush"):
provider.force_flush()

class ExceptionLoggingHandler(logging.Handler):
"""Custom logging handler that creates spans for logging.exception() calls"""

def emit(self, record: logging.LogRecord):
with contextlib.suppress(Exception):
if record.exc_info:
tracer = get_tracer_provider().get_tracer("dify.exception.logging")
with tracer.start_as_current_span(
"log.exception",
attributes={
"log.level": record.levelname,
"log.message": record.getMessage(),
"log.logger": record.name,
"log.file.path": record.pathname,
"log.file.line": record.lineno,
},
) as span:
span.set_status(StatusCode.ERROR)
if record.exc_info[1]:
span.record_exception(record.exc_info[1])
span.set_attribute("exception.message", str(record.exc_info[1]))
if record.exc_info[0]:
span.set_attribute("exception.type", record.exc_info[0].__name__)

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as GRPCMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCSpanExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as HTTPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HTTPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.b3 import B3Format
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
Expand All @@ -153,9 +27,10 @@ def emit(self, record: logging.LogRecord):
)
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import Span, get_tracer_provider, set_tracer_provider
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.trace.status import StatusCode
from opentelemetry.trace import set_tracer_provider

from extensions.otel.instrumentation import init_instruments
from extensions.otel.runtime import setup_context_propagation, shutdown_tracer

setup_context_propagation()
# Initialize OpenTelemetry
Expand All @@ -177,6 +52,7 @@ def emit(self, record: logging.LogRecord):
)
sampler = ParentBasedTraceIdRatio(dify_config.OTEL_SAMPLING_RATE)
provider = TracerProvider(resource=resource, sampler=sampler)

set_tracer_provider(provider)
exporter: Union[GRPCSpanExporter, HTTPSpanExporter, ConsoleSpanExporter]
metric_exporter: Union[GRPCMetricExporter, HTTPMetricExporter, ConsoleMetricExporter]
Expand Down Expand Up @@ -231,29 +107,11 @@ def emit(self, record: logging.LogRecord):
export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT,
)
set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader]))
if not is_celery_worker():
init_flask_instrumentor(app)
CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument()
instrument_exception_logging()
init_sqlalchemy_instrumentor(app)
RedisInstrumentor().instrument()
HTTPXClientInstrumentor().instrument()

init_instruments(app)

atexit.register(shutdown_tracer)


def is_enabled():
return dify_config.ENABLE_OTEL


@worker_init.connect(weak=False)
def init_celery_worker(*args, **kwargs):
if dify_config.ENABLE_OTEL:
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.metrics import get_meter_provider
from opentelemetry.trace import get_tracer_provider

tracer_provider = get_tracer_provider()
metric_provider = get_meter_provider()
if dify_config.DEBUG:
logger.info("Initializing OpenTelemetry for Celery worker")
CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument()
11 changes: 11 additions & 0 deletions api/extensions/otel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from extensions.otel.decorators.base import trace_span
from extensions.otel.decorators.handler import SpanHandler
from extensions.otel.decorators.handlers.generate_handler import AppGenerateHandler
from extensions.otel.decorators.handlers.workflow_app_runner_handler import WorkflowAppRunnerHandler

__all__ = [
"AppGenerateHandler",
"SpanHandler",
"WorkflowAppRunnerHandler",
"trace_span",
]
Empty file.
71 changes: 71 additions & 0 deletions api/extensions/otel/decorators/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import functools
import os
from collections.abc import Callable
from typing import Any, TypeVar, cast

from opentelemetry.trace import get_tracer

from configs import dify_config
from extensions.otel.decorators.handler import SpanHandler

T = TypeVar("T", bound=Callable[..., Any])

_HANDLER_INSTANCES: dict[type[SpanHandler], SpanHandler] = {SpanHandler: SpanHandler()}


def _is_instrument_flag_enabled() -> bool:
"""
Check if external instrumentation is enabled via environment variable.

Third-party non-invasive instrumentation agents set this flag to coordinate
with Dify's manual OpenTelemetry instrumentation.
"""
return os.getenv("ENABLE_OTEL_FOR_INSTRUMENT", "").strip().lower() == "true"


def _get_handler_instance(handler_class: type[SpanHandler]) -> SpanHandler:
"""Get or create a singleton instance of the handler class."""
if handler_class not in _HANDLER_INSTANCES:
_HANDLER_INSTANCES[handler_class] = handler_class()
return _HANDLER_INSTANCES[handler_class]


class TraceSpanDecorator:
"""Decorator that traces a function with an OpenTelemetry span."""

def __init__(self):
# Instantiate tracer internally instead of receiving it as a dependency
self.tracer = get_tracer(__name__)

def __call__(self, handler_class: type[SpanHandler] | None = None) -> Callable[[T], T]:
"""
Decorator that traces a function with an OpenTelemetry span.

The decorator uses the provided handler class to create a singleton handler instance
and delegates the wrapper implementation to that handler.

:param handler_class: Optional handler class to use for this span. If None, uses the default SpanHandler.
"""

def decorator(func: T) -> T:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
if not (dify_config.ENABLE_OTEL or _is_instrument_flag_enabled()):
return func(*args, **kwargs)

handler = _get_handler_instance(handler_class or SpanHandler)

return handler.wrapper(
tracer=self.tracer,
wrapped=func,
args=args,
kwargs=kwargs,
)

return cast(T, wrapper)

return decorator


# Global instance that instantiates its own dependencies
trace_span = TraceSpanDecorator()
Loading