Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ dependencies = [
"aiocsv == 1.4.0",
"aiofiles == 25.1.0",
"rich",
"opentelemetry-api >= 1.40.0, < 2.0.0",
"opentelemetry-sdk >= 1.40.0, < 2.0.0",
"opentelemetry-instrumentation-fastapi >= 0.61b0, < 1.0.0",
"opentelemetry-exporter-otlp-proto-http >= 1.40.0, < 2.0.0",
]
requires-python = ">=3.11,<=3.14"

Expand Down
66 changes: 66 additions & 0 deletions tests/test_otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pytest
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

from .conftest import client

_exporter = InMemorySpanExporter()
_provider = TracerProvider()
_provider.add_span_processor(SimpleSpanProcessor(_exporter))
trace.set_tracer_provider(_provider)


@pytest.fixture(autouse=True)
def span_exporter():
"""Provide the in-memory span exporter, clearing spans between tests."""
_exporter.clear()
yield _exporter


def test_fastapi_instrumentation_creates_spans(span_exporter):
"""Verify that HTTP requests produce OTEL spans."""
response = client.get("/healthz")
assert response.status_code == 200

spans = span_exporter.get_finished_spans()
http_spans = [s for s in spans if "healthz" in s.name or "GET" in s.name]
assert len(http_spans) >= 1, f"Expected HTTP span, got: {[s.name for s in spans]}"


@pytest.mark.asyncio
async def test_search_provider_creates_spans(search_provider, span_exporter):
"""Verify that SearchProvider operations produce OTEL spans."""
from yente import settings

result = await search_provider.search(
index=settings.ENTITY_INDEX,
query={"match_all": {}},
size=1,
)
assert result is not None

spans = span_exporter.get_finished_spans()
search_spans = [s for s in spans if s.name == "SearchProvider.search"]
assert (
len(search_spans) == 1
), f"Expected 1 search span, got: {[s.name for s in spans]}"

span = search_spans[0]
assert span.attributes["db.system.name"] in ("elasticsearch", "opensearch")
assert span.attributes["db.operation.name"] == "search"


@pytest.mark.asyncio
async def test_search_provider_records_errors_in_spans(search_provider, span_exporter):
"""Verify that SearchProvider errors are recorded in OTEL spans."""
from yente.exc import YenteNotFoundError

fake_index = "nonexistent-index-otel-test"
with pytest.raises((YenteNotFoundError, Exception)):
await search_provider.refresh(fake_index)

spans = span_exporter.get_finished_spans()
error_spans = [s for s in spans if s.status.status_code == trace.StatusCode.ERROR]
assert len(error_spans) >= 1, f"Expected error span, got: {[s.name for s in spans]}"
4 changes: 4 additions & 0 deletions yente/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from yente.search.indexer import update_index_threaded
from yente.provider import close_provider
from yente.middleware import RequestLogMiddleware, TraceContextMiddleware
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

log = get_logger("yente")
ExceptionHandler = Callable[[Request, Any], Coroutine[Any, Any, Response]]
Expand Down Expand Up @@ -137,4 +138,7 @@ def create_app() -> FastAPI:
app.include_router(search.router)
app.include_router(reconcile.router)
app.include_router(admin.router)

FastAPIInstrumentor.instrument_app(app)

return app
27 changes: 26 additions & 1 deletion yente/provider/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,29 @@
from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Union
import functools
from typing import Any, AsyncIterable, Callable, Dict, Iterable, List, Optional, Union

from opentelemetry import trace

from yente import settings

_tracer = trace.get_tracer("yente.provider")


def traced(method: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator that wraps a SearchProvider method with an OTEL span."""
name = method.__name__

@_tracer.start_as_current_span(f"SearchProvider.{name}")
@functools.wraps(method)
async def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
span = trace.get_current_span()
span.set_attribute(
"db.system.name",
"opensearch" if settings.INDEX_TYPE == "opensearch" else "elasticsearch",
)
span.set_attribute("db.operation.name", name)
return await method(self, *args, **kwargs)

return wrapper


class SearchProvider(object):
Expand Down
14 changes: 13 additions & 1 deletion yente/provider/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from yente import settings
from yente.exc import IndexNotReadyError, YenteIndexError, YenteNotFoundError
from yente.logs import get_logger
from yente.provider.base import SearchProvider
from yente.provider.base import SearchProvider, traced
from yente.middleware.trace_context import get_trace_context

log = get_logger(__name__)
Expand Down Expand Up @@ -72,18 +72,21 @@ def client(self, **kwargs: Any) -> AsyncElasticsearch:
async def close(self) -> None:
await self._client.close()

@traced
async def refresh(self, index: str) -> None:
"""Refresh the index to make changes visible."""
try:
await self.client().indices.refresh(index=index)
except NotFoundError as nfe:
raise YenteNotFoundError(f"Index {index} does not exist.") from nfe

@traced
async def get_all_indices(self) -> List[str]:
"""Get a list of all indices in the ElasticSearch cluster."""
indices: Any = await self.client().cat.indices(format="json")
return [index.get("index") for index in indices]

@traced
async def get_alias_indices(self, alias: str) -> List[str]:
"""Get a list of indices that are aliased to the entity query alias."""
try:
Expand All @@ -94,6 +97,7 @@ async def get_alias_indices(self, alias: str) -> List[str]:
except (ApiError, TransportError) as te:
raise YenteIndexError(f"Could not get alias indices: {te}") from te

@traced
async def rollover_index(self, alias: str, next_index: str, prefix: str) -> None:
"""Remove all existing indices with a given prefix from the alias and
add the new one."""
Expand All @@ -105,6 +109,7 @@ async def rollover_index(self, alias: str, next_index: str, prefix: str) -> None
except (ApiError, TransportError) as te:
raise YenteIndexError(f"Could not rollover index: {te}") from te

@traced
async def clone_index(self, base_version: str, target_version: str) -> None:
"""Create a copy of the index with the given name."""
if base_version == target_version:
Expand Down Expand Up @@ -140,6 +145,7 @@ async def clone_index(self, base_version: str, target_version: str) -> None:
msg = f"Could not clone index {base_version} to {target_version}: {te}"
raise YenteIndexError(msg) from te

@traced
async def create_index(
self, index: str, mappings: Dict[str, Any], settings: Dict[str, Any]
) -> None:
Expand All @@ -156,6 +162,7 @@ async def create_index(
return
raise YenteIndexError(f"Could not create index: {exc}") from exc

@traced
async def delete_index(self, index: str) -> None:
"""Delete a given index if it exists."""
try:
Expand All @@ -165,6 +172,7 @@ async def delete_index(self, index: str) -> None:
except (ApiError, TransportError) as te:
raise YenteIndexError(f"Could not delete index: {te}") from te

@traced
async def exists_index_alias(self, alias: str, index: str) -> bool:
"""Check if an index exists and is linked into the given alias."""
try:
Expand All @@ -175,6 +183,7 @@ async def exists_index_alias(self, alias: str, index: str) -> bool:
except (ApiError, TransportError) as te:
raise YenteIndexError(f"Could not check index alias: {te}") from te

@traced
async def check_health(self, index: str) -> bool:
try:
health = await self.client(request_timeout=5).cluster.health(
Expand All @@ -187,6 +196,7 @@ async def check_health(self, index: str) -> bool:
log.error(f"Search status failure: {te}")
return False

@traced
async def search(
self,
index: str,
Expand Down Expand Up @@ -247,6 +257,7 @@ async def search(
msg = f"Error during search: {str(exc)}"
raise YenteIndexError(msg, status=500) from exc

@traced
async def get_document(self, index: str, doc_id: str) -> Optional[Dict[str, Any]]:
"""Get a document by ID using the GET API.

Expand All @@ -260,6 +271,7 @@ async def get_document(self, index: str, doc_id: str) -> Optional[Dict[str, Any]
except Exception as exc:
raise YenteIndexError(f"Error getting document: {exc}") from exc

@traced
async def bulk_index(
self, actions: Union[Iterable[Dict[str, Any]], AsyncIterable[Dict[str, Any]]]
) -> None:
Expand Down
14 changes: 13 additions & 1 deletion yente/provider/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from yente import settings
from yente.exc import IndexNotReadyError, YenteIndexError, YenteNotFoundError
from yente.logs import get_logger
from yente.provider.base import SearchProvider
from yente.provider.base import SearchProvider, traced

log = get_logger(__name__)
logging.getLogger("opensearch").setLevel(logging.ERROR)
Expand Down Expand Up @@ -81,6 +81,7 @@ def __init__(
async def close(self) -> None:
await self.client.close()

@traced
async def refresh(self, index: str) -> None:
"""Refresh the index to make changes visible."""
if self.service_type == OpenSearchServiceType.AOSS:
Expand All @@ -93,11 +94,13 @@ async def refresh(self, index: str) -> None:
except NotFoundError as nfe:
raise YenteNotFoundError(f"Index {index} does not exist.") from nfe

@traced
async def get_all_indices(self) -> List[str]:
"""Get a list of all indices in the ElasticSearch cluster."""
indices: Any = await self.client.cat.indices(format="json")
return [index.get("index") for index in indices]

@traced
async def get_alias_indices(self, alias: str) -> List[str]:
"""Get a list of indices that are aliased to the entity query alias."""
try:
Expand All @@ -108,6 +111,7 @@ async def get_alias_indices(self, alias: str) -> List[str]:
except TransportError as te:
raise YenteIndexError(f"Could not get alias indices: {te}") from te

@traced
async def rollover_index(self, alias: str, next_index: str, prefix: str) -> None:
"""Remove all existing indices with a given prefix from the alias and
add the new one."""
Expand All @@ -122,6 +126,7 @@ async def rollover_index(self, alias: str, next_index: str, prefix: str) -> None
except TransportError as te:
raise YenteIndexError(f"Could not rollover index: {te}") from te

@traced
async def clone_index(self, base_version: str, target_version: str) -> None:
"""Create a copy of the index with the given name."""
if base_version == target_version:
Expand Down Expand Up @@ -157,6 +162,7 @@ async def clone_index(self, base_version: str, target_version: str) -> None:
msg = f"Could not clone index {base_version} to {target_version}: {te}"
raise YenteIndexError(msg) from te

@traced
async def create_index(
self, index: str, mappings: Dict[str, Any], settings: Dict[str, Any]
) -> None:
Expand All @@ -173,6 +179,7 @@ async def create_index(
return
raise YenteIndexError(f"Could not create index: {exc}") from exc

@traced
async def delete_index(self, index: str) -> None:
"""Delete a given index if it exists."""
try:
Expand All @@ -182,6 +189,7 @@ async def delete_index(self, index: str) -> None:
except TransportError as te:
raise YenteIndexError(f"Could not delete index: {te}") from te

@traced
async def exists_index_alias(self, alias: str, index: str) -> bool:
"""Check if an index exists and is linked into the given alias."""
try:
Expand All @@ -192,6 +200,7 @@ async def exists_index_alias(self, alias: str, index: str) -> bool:
except TransportError as te:
raise YenteIndexError(f"Could not check index alias: {te}") from te

@traced
async def check_health(self, index: str) -> bool:
try:
health = await self.client.cluster.health(index=index, timeout=5)
Expand All @@ -202,6 +211,7 @@ async def check_health(self, index: str) -> bool:
log.error(f"Search status failure: {te}")
return False

@traced
async def search(
self,
index: str,
Expand Down Expand Up @@ -260,6 +270,7 @@ async def search(
msg = f"Error during search: {str(exc)}"
raise YenteIndexError(msg, status=500) from exc

@traced
async def get_document(self, index: str, doc_id: str) -> Optional[Dict[str, Any]]:
"""Get a document by ID using the GET API.

Expand All @@ -273,6 +284,7 @@ async def get_document(self, index: str, doc_id: str) -> Optional[Dict[str, Any]
except Exception as exc:
raise YenteIndexError(f"Error getting document: {exc}") from exc

@traced
async def bulk_index(
self, actions: Union[Iterable[Dict[str, Any]], AsyncIterable[Dict[str, Any]]]
) -> None:
Expand Down