diff --git a/logfire/_internal/config.py b/logfire/_internal/config.py index 613254ecd..b4d673aad 100644 --- a/logfire/_internal/config.py +++ b/logfire/_internal/config.py @@ -96,6 +96,7 @@ QuietLogExporter, QuietSpanExporter, RetryFewerSpansSpanExporter, + cleanup_disk_retryers, ) from .exporters.processor_wrapper import CheckSuppressInstrumentationProcessorWrapper, MainSpanProcessorWrapper from .exporters.quiet_metrics import QuietMetricExporter @@ -1539,6 +1540,7 @@ def patched_os_exit(code: int): # pragma: no cover config = config_ref() if config is not None: config.force_flush() + cleanup_disk_retryers() except: # noqa # weird errors can happen during shutdown, ignore them *all* with a bare except pass return original_os_exit(code) diff --git a/logfire/_internal/exporters/otlp.py b/logfire/_internal/exporters/otlp.py index 9202cd85c..1356f4172 100644 --- a/logfire/_internal/exporters/otlp.py +++ b/logfire/_internal/exporters/otlp.py @@ -1,8 +1,11 @@ from __future__ import annotations +import atexit import random +import shutil import time import uuid +import weakref from collections import deque from collections.abc import Mapping, Sequence from functools import cached_property @@ -24,6 +27,16 @@ from ..utils import logger, platform_is_emscripten from .wrapper import WrapperLogExporter, WrapperSpanExporter +_DISK_RETRYERS: list[weakref.ref[DiskRetryer]] = [] + + +@atexit.register +def cleanup_disk_retryers() -> None: + for retryer_ref in _DISK_RETRYERS: + retryer = retryer_ref() + if retryer is not None: + retryer.close() + class BodySizeCheckingOTLPSpanExporter(OTLPSpanExporter): # 5MB is significantly less than what our backend currently accepts, @@ -94,6 +107,12 @@ def retryer(self) -> DiskRetryer: # and because the full set of headers are only set some time after this session is created. return DiskRetryer(self.headers) + def close(self) -> None: + retryer = self.__dict__.get('retryer') + if retryer is not None: + retryer.close() + super().close() + def raise_for_retryable_status(response: requests.Response): # These are status codes that OTEL should retry. @@ -121,6 +140,7 @@ def __init__(self, headers: Mapping[str, str | bytes]): self.thread: Thread | None = None self.tasks: deque[tuple[Path, dict[str, Any]]] = deque() self.total_size = 0 + self.closed = False # Make a new session rather than using the OTLPExporterHttpSession directly # because thread safety of Session is questionable. @@ -130,12 +150,29 @@ def __init__(self, headers: Mapping[str, str | bytes]): # The directory where the export files are stored. self.dir = Path(mkdtemp(prefix='logfire-retryer-')) + _DISK_RETRYERS.append(weakref.ref(self)) self.last_log_time = -float('inf') + @staticmethod + def _cleanup_dir(path: Path) -> None: + shutil.rmtree(path, ignore_errors=True) + + def close(self) -> None: + with self.lock: + self.closed = True + self.tasks = deque(maxlen=0) + self.total_size = 0 + self.thread = None + + self.session.close() + self._cleanup_dir(self.dir) + def add_task(self, data: bytes, kwargs: dict[str, Any]): try: with self.lock: + if self.closed: + return if self.total_size >= self.MAX_TASK_SIZE: # pragma: no cover if self._should_log(): logger.error( @@ -194,6 +231,8 @@ def _run(self): path, kwargs = task data = path.read_bytes() while True: + if self.closed: + return # Exponential backoff with jitter. # The jitter is proportional to the delay, in particular so that if we go down for a while # and then come back up then retry requests will be spread out over a time of MAX_DELAY. @@ -211,7 +250,7 @@ def _run(self): # Success, set the delay to a small value (so that remaining tasks can be done quickly), # remove the file, and move on to the next task. delay = 0.2 - path.unlink() + path.unlink(missing_ok=True) with self.lock: self.total_size -= len(data) break diff --git a/tests/exporters/test_otlp_session.py b/tests/exporters/test_otlp_session.py index 2a1a32253..e9e8c132d 100644 --- a/tests/exporters/test_otlp_session.py +++ b/tests/exporters/test_otlp_session.py @@ -1,3 +1,10 @@ +import gc +import os +import subprocess +import sys +import textwrap +import weakref +from pathlib import Path from typing import Any from unittest.mock import Mock @@ -12,7 +19,9 @@ from logfire._internal.exporters.otlp import ( BodySizeCheckingOTLPSpanExporter, BodyTooLargeError, + DiskRetryer, OTLPExporterHttpSession, + cleanup_disk_retryers, ) from tests.exporters.test_retry_fewer_spans import TEST_SPANS @@ -130,3 +139,79 @@ def send(self, request: PreparedRequest, *args: Any, **kwargs: Any) -> Response: # After that the number of failed exports is unpredictable because the main thread is adding to it # at the same time as the retryer thread removes from it. assert caplog.messages[0] == snapshot('Currently retrying 1 failed export(s) (3 bytes)') + + +def test_disk_retryer_cleanup_after_logfire_shutdown(tmp_path: Path) -> None: + retryer_dir = tmp_path / 'retryer-dir' + marker_file = tmp_path / 'retryer-marker.txt' + + code = textwrap.dedent( + """ + import os + from pathlib import Path + + import requests + import logfire + from logfire._internal.exporters import otlp + + retryer_dir = Path(os.environ['LOGFIRE_RETRYER_DIR']) + marker_file = Path(os.environ['LOGFIRE_RETRYER_MARKER']) + + original_mkdtemp = otlp.mkdtemp + original_post = otlp.OTLPExporterHttpSession._post + + def fake_mkdtemp(prefix: str) -> str: + marker_file.write_text(str(retryer_dir)) + retryer_dir.mkdir() + return str(retryer_dir) + + def fail(self, url, data, **kwargs): + raise requests.exceptions.RequestException('boom') + + otlp.mkdtemp = fake_mkdtemp + otlp.OTLPExporterHttpSession._post = fail + + logfire.configure(send_to_logfire=True, token='pyt_foobar', inspect_arguments=False) + logfire.info('hi') + """ + ) + + env = { + **os.environ, + 'LOGFIRE_RETRYER_DIR': str(retryer_dir), + 'LOGFIRE_RETRYER_MARKER': str(marker_file), + } + result = subprocess.run([sys.executable, '-c', code], cwd=Path.cwd(), env=env, capture_output=True, text=True) + + assert result.returncode == 0, result.stderr + assert marker_file.read_text() == str(retryer_dir) + assert not retryer_dir.exists() + + +def test_cleanup_disk_retryers_skips_dead_weakrefs(monkeypatch: pytest.MonkeyPatch) -> None: + live_retryer = DiskRetryer({}) + dead_retryer = DiskRetryer({}) + dead_ref = weakref.ref(dead_retryer) + del dead_retryer + gc.collect() + + monkeypatch.setattr( + 'logfire._internal.exporters.otlp._DISK_RETRYERS', + [dead_ref, weakref.ref(live_retryer)], + ) + + cleanup_disk_retryers() + + assert dead_ref() is None + assert not live_retryer.dir.exists() + + +def test_disk_retryer_add_task_after_close_does_nothing() -> None: + retryer = DiskRetryer({}) + retryer.close() + + retryer.add_task(b'123', {'url': 'http://example.com/'}) + + assert retryer.total_size == 0 + assert not retryer.tasks + assert retryer.thread is None