Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions logfire/_internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
QuietLogExporter,
QuietSpanExporter,
RetryFewerSpansSpanExporter,
cleanup_disk_retryers,
)
from .exporters.processor_wrapper import CheckSuppressInstrumentationProcessorWrapper, MainSpanProcessorWrapper
from .exporters.quiet_metrics import QuietMetricExporter
Expand Down Expand Up @@ -1539,6 +1540,7 @@ def patched_os_exit(code: int): # pragma: no cover
config = config_ref()
if config is not None:
config.force_flush()
cleanup_disk_retryers()
except: # noqa # weird errors can happen during shutdown, ignore them *all* with a bare except
pass
return original_os_exit(code)
Expand Down
41 changes: 40 additions & 1 deletion logfire/_internal/exporters/otlp.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations

import atexit
import random
import shutil
import time
import uuid
import weakref
from collections import deque
from collections.abc import Mapping, Sequence
from functools import cached_property
Expand All @@ -24,6 +27,16 @@
from ..utils import logger, platform_is_emscripten
from .wrapper import WrapperLogExporter, WrapperSpanExporter

_DISK_RETRYERS: list[weakref.ref[DiskRetryer]] = []


@atexit.register
def cleanup_disk_retryers() -> None:
for retryer_ref in _DISK_RETRYERS:
retryer = retryer_ref()
if retryer is not None:
retryer.close()


class BodySizeCheckingOTLPSpanExporter(OTLPSpanExporter):
# 5MB is significantly less than what our backend currently accepts,
Expand Down Expand Up @@ -94,6 +107,12 @@ def retryer(self) -> DiskRetryer:
# and because the full set of headers are only set some time after this session is created.
return DiskRetryer(self.headers)

def close(self) -> None:
retryer = self.__dict__.get('retryer')
if retryer is not None:
retryer.close()
super().close()


def raise_for_retryable_status(response: requests.Response):
# These are status codes that OTEL should retry.
Expand Down Expand Up @@ -121,6 +140,7 @@ def __init__(self, headers: Mapping[str, str | bytes]):
self.thread: Thread | None = None
self.tasks: deque[tuple[Path, dict[str, Any]]] = deque()
self.total_size = 0
self.closed = False

# Make a new session rather than using the OTLPExporterHttpSession directly
# because thread safety of Session is questionable.
Expand All @@ -130,12 +150,29 @@ def __init__(self, headers: Mapping[str, str | bytes]):

# The directory where the export files are stored.
self.dir = Path(mkdtemp(prefix='logfire-retryer-'))
_DISK_RETRYERS.append(weakref.ref(self))

self.last_log_time = -float('inf')

@staticmethod
def _cleanup_dir(path: Path) -> None:
shutil.rmtree(path, ignore_errors=True)

def close(self) -> None:
with self.lock:
self.closed = True
self.tasks = deque(maxlen=0)
self.total_size = 0
self.thread = None

self.session.close()
self._cleanup_dir(self.dir)

def add_task(self, data: bytes, kwargs: dict[str, Any]):
try:
with self.lock:
if self.closed:
return
if self.total_size >= self.MAX_TASK_SIZE: # pragma: no cover
if self._should_log():
logger.error(
Expand Down Expand Up @@ -194,6 +231,8 @@ def _run(self):
path, kwargs = task
data = path.read_bytes()
while True:
if self.closed:
return
# Exponential backoff with jitter.
# The jitter is proportional to the delay, in particular so that if we go down for a while
# and then come back up then retry requests will be spread out over a time of MAX_DELAY.
Expand All @@ -211,7 +250,7 @@ def _run(self):
# Success, set the delay to a small value (so that remaining tasks can be done quickly),
# remove the file, and move on to the next task.
delay = 0.2
path.unlink()
path.unlink(missing_ok=True)
with self.lock:
self.total_size -= len(data)
break
Expand Down
85 changes: 85 additions & 0 deletions tests/exporters/test_otlp_session.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
import gc
import os
import subprocess
import sys
import textwrap
import weakref
from pathlib import Path
from typing import Any
from unittest.mock import Mock

Expand All @@ -12,7 +19,9 @@
from logfire._internal.exporters.otlp import (
BodySizeCheckingOTLPSpanExporter,
BodyTooLargeError,
DiskRetryer,
OTLPExporterHttpSession,
cleanup_disk_retryers,
)
from tests.exporters.test_retry_fewer_spans import TEST_SPANS

Expand Down Expand Up @@ -130,3 +139,79 @@ def send(self, request: PreparedRequest, *args: Any, **kwargs: Any) -> Response:
# After that the number of failed exports is unpredictable because the main thread is adding to it
# at the same time as the retryer thread removes from it.
assert caplog.messages[0] == snapshot('Currently retrying 1 failed export(s) (3 bytes)')


def test_disk_retryer_cleanup_after_logfire_shutdown(tmp_path: Path) -> None:
retryer_dir = tmp_path / 'retryer-dir'
marker_file = tmp_path / 'retryer-marker.txt'

code = textwrap.dedent(
"""
import os
from pathlib import Path

import requests
import logfire
from logfire._internal.exporters import otlp

retryer_dir = Path(os.environ['LOGFIRE_RETRYER_DIR'])
marker_file = Path(os.environ['LOGFIRE_RETRYER_MARKER'])

original_mkdtemp = otlp.mkdtemp
original_post = otlp.OTLPExporterHttpSession._post

def fake_mkdtemp(prefix: str) -> str:
marker_file.write_text(str(retryer_dir))
retryer_dir.mkdir()
return str(retryer_dir)

def fail(self, url, data, **kwargs):
raise requests.exceptions.RequestException('boom')

otlp.mkdtemp = fake_mkdtemp
otlp.OTLPExporterHttpSession._post = fail

logfire.configure(send_to_logfire=True, token='pyt_foobar', inspect_arguments=False)
logfire.info('hi')
"""
)

env = {
**os.environ,
'LOGFIRE_RETRYER_DIR': str(retryer_dir),
'LOGFIRE_RETRYER_MARKER': str(marker_file),
}
result = subprocess.run([sys.executable, '-c', code], cwd=Path.cwd(), env=env, capture_output=True, text=True)

assert result.returncode == 0, result.stderr
assert marker_file.read_text() == str(retryer_dir)
assert not retryer_dir.exists()


def test_cleanup_disk_retryers_skips_dead_weakrefs(monkeypatch: pytest.MonkeyPatch) -> None:
live_retryer = DiskRetryer({})
dead_retryer = DiskRetryer({})
dead_ref = weakref.ref(dead_retryer)
del dead_retryer
gc.collect()

monkeypatch.setattr(
'logfire._internal.exporters.otlp._DISK_RETRYERS',
[dead_ref, weakref.ref(live_retryer)],
)

cleanup_disk_retryers()

assert dead_ref() is None
assert not live_retryer.dir.exists()


def test_disk_retryer_add_task_after_close_does_nothing() -> None:
retryer = DiskRetryer({})
retryer.close()

retryer.add_task(b'123', {'url': 'http://example.com/'})

assert retryer.total_size == 0
assert not retryer.tasks
assert retryer.thread is None
Loading