Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,37 @@ Async agent loop with LiteLLM.
| **`context_manager/`** | Manages conversation history, very rudimentary context engineering support | Implement intelligent context engineering to keep the agent on track |
| **`config.py`** | Loads JSON config for the agent | Support different configs etc. |
| **`main.py`** | Interactive CLI with async queue architecture (submission→agent, agent→events) (simple way to interact with the agent now)| Serve as reference implementation for other UIs (web, API, programmatic) |

## Observability (optional)

LLM calls can additionally be streamed to a [LangFuse](https://langfuse.com)
instance — useful for local development and for self-hosted deployments
that already run LangFuse / Phoenix / Langsmith. The primary
HF-Dataset-based telemetry pipeline (`agent/core/telemetry.py`) is unchanged.

Set the LangFuse host plus both keys to opt in. Either env-var name for
the host works — Langfuse SDK v4 issues credentials as `LANGFUSE_BASE_URL`,
while litellm's callback reads `LANGFUSE_HOST`; this integration accepts
either and mirrors the value through to litellm:

```
LANGFUSE_BASE_URL=https://your-langfuse.example.com # or LANGFUSE_HOST=...
LANGFUSE_PUBLIC_KEY=pk-...
LANGFUSE_SECRET_KEY=sk-...
```

Both self-hosted LangFuse and the SaaS endpoint
(`https://cloud.langfuse.com`) are supported. The host is mandatory so the
destination is always an explicit choice — there is no silent fallback.
With any of the three vars unset the integration is a no-op.

Install the optional dependency:

```
pip install ml-intern[observability]
```

**Privacy.** The callback ships the full prompt, tool calls, tool results,
and completions of every LLM turn to the configured host. Pick the
destination deliberately. See
https://github.com/huggingface/ml-intern/issues/196 for full details.
6 changes: 6 additions & 0 deletions agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from dotenv import load_dotenv

from agent.core.observability import setup_langfuse
from agent.messaging.models import MessagingConfig

# Project root: two levels up from this file (agent/config.py -> project root)
Expand Down Expand Up @@ -207,4 +208,9 @@ def load_config(
raw_config = apply_slack_user_defaults(raw_config)

config_with_env = substitute_env_vars(raw_config)

# Opt-in: register litellm's LangFuse callback if the operator set the
# three LANGFUSE_* env vars. No-op otherwise. See agent/core/observability.py.
setup_langfuse()

return Config.model_validate(config_with_env)
55 changes: 55 additions & 0 deletions agent/core/observability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Opt-in third-party observability hooks for litellm calls.

Today ml-intern's primary telemetry pipeline writes ``Event``s to a Hugging
Face Dataset (see ``agent/core/telemetry.py``). This module is a small,
opt-in side channel that lets operators also stream LLM traces to a
LangFuse instance (self-hosted or SaaS) via litellm's OTEL callback.

Activation requires the LangFuse host plus both keys: either
``LANGFUSE_HOST`` or ``LANGFUSE_BASE_URL`` (the SDK v4 docs use the latter),
together with ``LANGFUSE_PUBLIC_KEY`` and ``LANGFUSE_SECRET_KEY``. With any
of them missing, this module is a no-op and behavior is identical to today.
The host is mandatory by design — see issue #196 for the privacy rationale.
"""

from __future__ import annotations

import logging
import os

import litellm

logger = logging.getLogger(__name__)


def setup_langfuse() -> None:
"""Register litellm's LangFuse OTEL callback if host + keys are set.

Accepts either ``LANGFUSE_HOST`` or ``LANGFUSE_BASE_URL`` for the host:
Langfuse SDK v4's docs issue credentials as ``LANGFUSE_BASE_URL``, but
litellm's callback only reads ``LANGFUSE_HOST`` — so we mirror the value
into ``LANGFUSE_HOST`` when only ``BASE_URL`` was set.

Uses the ``langfuse_otel`` callback rather than the legacy ``langfuse``
one because the legacy integration in current litellm releases breaks
against Langfuse SDK v4 (``module 'langfuse' has no attribute 'version'``).
The OTEL path works against both v3 and v4.

Idempotent: ``load_config`` runs multiple times per process (CLI start
plus backend module-init), so guard against double-registration.
"""
host = os.getenv("LANGFUSE_HOST") or os.getenv("LANGFUSE_BASE_URL")
if not (
host
and os.getenv("LANGFUSE_PUBLIC_KEY")
and os.getenv("LANGFUSE_SECRET_KEY")
):
return
# litellm only reads LANGFUSE_HOST, so propagate the value if the
# operator set only LANGFUSE_BASE_URL.
os.environ.setdefault("LANGFUSE_HOST", host)
if "langfuse_otel" not in litellm.success_callback:
litellm.success_callback.append("langfuse_otel")
if "langfuse_otel" not in litellm.failure_callback:
litellm.failure_callback.append("langfuse_otel")
logger.info("LangFuse observability enabled (host=%s)", host)
9 changes: 7 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,14 @@ dev = [
"pytest-asyncio>=1.2.0",
]

# All dependencies (eval + dev)
# Optional third-party observability backends
observability = [
"langfuse>=2.0.0",
]

# All dependencies (eval + dev + observability)
all = [
"ml-intern[eval,dev]",
"ml-intern[eval,dev,observability]",
]

[project.scripts]
Expand Down
83 changes: 83 additions & 0 deletions tests/unit/test_observability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os

import litellm
import pytest

from agent.core.observability import setup_langfuse


@pytest.fixture(autouse=True)
def _reset_litellm_callbacks():
"""Restore litellm callback lists around each test so they don't leak."""
success_before = list(litellm.success_callback)
failure_before = list(litellm.failure_callback)
try:
yield
finally:
litellm.success_callback[:] = success_before
litellm.failure_callback[:] = failure_before


def _set_all_vars(monkeypatch):
monkeypatch.setenv("LANGFUSE_HOST", "https://langfuse.example.com")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-test")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-test")
monkeypatch.delenv("LANGFUSE_BASE_URL", raising=False)


def test_setup_langfuse_registers_callbacks_when_all_vars_set(monkeypatch):
_set_all_vars(monkeypatch)

setup_langfuse()

assert "langfuse_otel" in litellm.success_callback
assert "langfuse_otel" in litellm.failure_callback


def test_setup_langfuse_accepts_base_url_alias(monkeypatch):
monkeypatch.delenv("LANGFUSE_HOST", raising=False)
monkeypatch.setenv("LANGFUSE_BASE_URL", "https://langfuse.example.com")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-test")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-test")

setup_langfuse()

assert "langfuse_otel" in litellm.success_callback
# litellm only reads LANGFUSE_HOST; the alias must be mirrored into it.
assert os.environ["LANGFUSE_HOST"] == "https://langfuse.example.com"


def test_setup_langfuse_host_takes_precedence_over_base_url(monkeypatch):
monkeypatch.setenv("LANGFUSE_HOST", "https://from-host.example.com")
monkeypatch.setenv("LANGFUSE_BASE_URL", "https://from-base-url.example.com")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-test")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-test")

setup_langfuse()

assert os.environ["LANGFUSE_HOST"] == "https://from-host.example.com"


@pytest.mark.parametrize(
"missing", ["LANGFUSE_HOST", "LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY"]
)
def test_setup_langfuse_is_noop_when_any_var_missing(monkeypatch, missing):
_set_all_vars(monkeypatch)
monkeypatch.delenv(missing, raising=False)
success_before = list(litellm.success_callback)
failure_before = list(litellm.failure_callback)

setup_langfuse()

assert litellm.success_callback == success_before
assert litellm.failure_callback == failure_before


def test_setup_langfuse_is_idempotent(monkeypatch):
_set_all_vars(monkeypatch)

setup_langfuse()
setup_langfuse()

assert litellm.success_callback.count("langfuse_otel") == 1
assert litellm.failure_callback.count("langfuse_otel") == 1
Loading
Loading