Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/core/app/apps/advanced_chat/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from extensions.otel import WorkflowAppRunnerHandler, trace_span
from models import Workflow
from models.enums import UserFrom
from models.model import App, Conversation, Message, MessageAnnotation
Expand Down Expand Up @@ -80,6 +81,7 @@ def __init__(
self._workflow_execution_repository = workflow_execution_repository
self._workflow_node_execution_repository = workflow_node_execution_repository

@trace_span(WorkflowAppRunnerHandler)
def run(self):
app_config = self.application_generate_entity.app_config
app_config = cast(AdvancedChatAppConfig, app_config)
Expand Down
2 changes: 2 additions & 0 deletions api/core/app/apps/workflow/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from core.workflow.variable_loader import VariableLoader
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_redis import redis_client
from extensions.otel import WorkflowAppRunnerHandler, trace_span
from libs.datetime_utils import naive_utc_now
from models.enums import UserFrom
from models.workflow import Workflow
Expand Down Expand Up @@ -56,6 +57,7 @@ def __init__(
self._workflow_execution_repository = workflow_execution_repository
self._workflow_node_execution_repository = workflow_node_execution_repository

@trace_span(WorkflowAppRunnerHandler)
def run(self):
"""
Run application
Expand Down
160 changes: 9 additions & 151 deletions api/extensions/ext_otel.py
Original file line number Diff line number Diff line change
@@ -1,148 +1,22 @@
import atexit
import contextlib
import logging
import os
import platform
import socket
import sys
from typing import Union

import flask
from celery.signals import worker_init
from flask_login import user_loaded_from_request, user_logged_in

from configs import dify_config
from dify_app import DifyApp
from libs.helper import extract_tenant_id
from models import Account, EndUser

logger = logging.getLogger(__name__)


@user_logged_in.connect
@user_loaded_from_request.connect
def on_user_loaded(_sender, user: Union["Account", "EndUser"]):
if dify_config.ENABLE_OTEL:
from opentelemetry.trace import get_current_span

if user:
try:
current_span = get_current_span()
tenant_id = extract_tenant_id(user)
if not tenant_id:
return
if current_span:
current_span.set_attribute("service.tenant.id", tenant_id)
current_span.set_attribute("service.user.id", user.id)
except Exception:
logger.exception("Error setting tenant and user attributes")
pass


def init_app(app: DifyApp):
from opentelemetry.semconv.trace import SpanAttributes

def is_celery_worker():
return "celery" in sys.argv[0].lower()

def instrument_exception_logging():
exception_handler = ExceptionLoggingHandler()
logging.getLogger().addHandler(exception_handler)

def init_flask_instrumentor(app: DifyApp):
meter = get_meter("http_metrics", version=dify_config.project.version)
_http_response_counter = meter.create_counter(
"http.server.response.count",
description="Total number of HTTP responses by status code, method and target",
unit="{response}",
)

def response_hook(span: Span, status: str, response_headers: list):
if span and span.is_recording():
try:
if status.startswith("2"):
span.set_status(StatusCode.OK)
else:
span.set_status(StatusCode.ERROR, status)

status = status.split(" ")[0]
status_code = int(status)
status_class = f"{status_code // 100}xx"
attributes: dict[str, str | int] = {"status_code": status_code, "status_class": status_class}
request = flask.request
if request and request.url_rule:
attributes[SpanAttributes.HTTP_TARGET] = str(request.url_rule.rule)
if request and request.method:
attributes[SpanAttributes.HTTP_METHOD] = str(request.method)
_http_response_counter.add(1, attributes)
except Exception:
logger.exception("Error setting status and attributes")
pass

instrumentor = FlaskInstrumentor()
if dify_config.DEBUG:
logger.info("Initializing Flask instrumentor")
instrumentor.instrument_app(app, response_hook=response_hook)

def init_sqlalchemy_instrumentor(app: DifyApp):
with app.app_context():
engines = list(app.extensions["sqlalchemy"].engines.values())
SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines)

def setup_context_propagation():
# Configure propagators
set_global_textmap(
CompositePropagator(
[
TraceContextTextMapPropagator(), # W3C trace context
B3Format(), # B3 propagation (used by many systems)
]
)
)

def shutdown_tracer():
provider = trace.get_tracer_provider()
if hasattr(provider, "force_flush"):
provider.force_flush()

class ExceptionLoggingHandler(logging.Handler):
"""Custom logging handler that creates spans for logging.exception() calls"""

def emit(self, record: logging.LogRecord):
with contextlib.suppress(Exception):
if record.exc_info:
tracer = get_tracer_provider().get_tracer("dify.exception.logging")
with tracer.start_as_current_span(
"log.exception",
attributes={
"log.level": record.levelname,
"log.message": record.getMessage(),
"log.logger": record.name,
"log.file.path": record.pathname,
"log.file.line": record.lineno,
},
) as span:
span.set_status(StatusCode.ERROR)
if record.exc_info[1]:
span.record_exception(record.exc_info[1])
span.set_attribute("exception.message", str(record.exc_info[1]))
if record.exc_info[0]:
span.set_attribute("exception.type", record.exc_info[0].__name__)

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as GRPCMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCSpanExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as HTTPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HTTPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.b3 import B3Format
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
Expand All @@ -153,9 +27,10 @@ def emit(self, record: logging.LogRecord):
)
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import Span, get_tracer_provider, set_tracer_provider
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.trace.status import StatusCode
from opentelemetry.trace import set_tracer_provider

from extensions.otel.instrumentation import init_instruments
from extensions.otel.runtime import setup_context_propagation, shutdown_tracer

setup_context_propagation()
# Initialize OpenTelemetry
Expand All @@ -177,6 +52,7 @@ def emit(self, record: logging.LogRecord):
)
sampler = ParentBasedTraceIdRatio(dify_config.OTEL_SAMPLING_RATE)
provider = TracerProvider(resource=resource, sampler=sampler)

set_tracer_provider(provider)
exporter: Union[GRPCSpanExporter, HTTPSpanExporter, ConsoleSpanExporter]
metric_exporter: Union[GRPCMetricExporter, HTTPMetricExporter, ConsoleMetricExporter]
Expand Down Expand Up @@ -231,29 +107,11 @@ def emit(self, record: logging.LogRecord):
export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT,
)
set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader]))
if not is_celery_worker():
init_flask_instrumentor(app)
CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument()
instrument_exception_logging()
init_sqlalchemy_instrumentor(app)
RedisInstrumentor().instrument()
HTTPXClientInstrumentor().instrument()

init_instruments(app)

atexit.register(shutdown_tracer)


def is_enabled():
return dify_config.ENABLE_OTEL


@worker_init.connect(weak=False)
def init_celery_worker(*args, **kwargs):
if dify_config.ENABLE_OTEL:
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.metrics import get_meter_provider
from opentelemetry.trace import get_tracer_provider

tracer_provider = get_tracer_provider()
metric_provider = get_meter_provider()
if dify_config.DEBUG:
logger.info("Initializing OpenTelemetry for Celery worker")
CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument()
11 changes: 11 additions & 0 deletions api/extensions/otel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from extensions.otel.decorators.base import trace_span
from extensions.otel.decorators.handler import SpanHandler
from extensions.otel.decorators.handlers.generate_handler import AppGenerateHandler
from extensions.otel.decorators.handlers.workflow_app_runner_handler import WorkflowAppRunnerHandler

__all__ = [
"AppGenerateHandler",
"SpanHandler",
"WorkflowAppRunnerHandler",
"trace_span",
]
Empty file.
61 changes: 61 additions & 0 deletions api/extensions/otel/decorators/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import functools
import os
from collections.abc import Callable
from typing import Any, TypeVar, cast

from opentelemetry.trace import get_tracer

from configs import dify_config
from extensions.otel.decorators.handler import SpanHandler

T = TypeVar("T", bound=Callable[..., Any])

_HANDLER_INSTANCES: dict[type[SpanHandler], SpanHandler] = {SpanHandler: SpanHandler()}


def _is_instrument_flag_enabled() -> bool:
"""
Check if external instrumentation is enabled via environment variable.

Third-party non-invasive instrumentation agents set this flag to coordinate
with Dify's manual OpenTelemetry instrumentation.
"""
return os.getenv("ENABLE_OTEL_FOR_INSTRUMENT", "").strip().lower() == "true"


def _get_handler_instance(handler_class: type[SpanHandler]) -> SpanHandler:
"""Get or create a singleton instance of the handler class."""
if handler_class not in _HANDLER_INSTANCES:
_HANDLER_INSTANCES[handler_class] = handler_class()
return _HANDLER_INSTANCES[handler_class]


def trace_span(handler_class: type[SpanHandler] | None = None) -> Callable[[T], T]:
"""
Decorator that traces a function with an OpenTelemetry span.

The decorator uses the provided handler class to create a singleton handler instance
and delegates the wrapper implementation to that handler.

:param handler_class: Optional handler class to use for this span. If None, uses the default SpanHandler.
"""

def decorator(func: T) -> T:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
if not (dify_config.ENABLE_OTEL or _is_instrument_flag_enabled()):
return func(*args, **kwargs)

handler = _get_handler_instance(handler_class or SpanHandler)
tracer = get_tracer(__name__)

return handler.wrapper(
tracer=tracer,
wrapped=func,
args=args,
kwargs=kwargs,
)

return cast(T, wrapper)

return decorator
Loading