From b4e64af9084baaa98e2334dae881da9c904388dc Mon Sep 17 00:00:00 2001 From: Alex Hall Date: Thu, 16 Apr 2026 13:40:35 +0200 Subject: [PATCH 1/3] Clean up DiskRetryer temp dirs on shutdown --- logfire/_internal/config.py | 2 ++ logfire/_internal/exporters/otlp.py | 51 +++++++++++++++++++++++++-- tests/exporters/test_otlp_session.py | 52 ++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 2 deletions(-) diff --git a/logfire/_internal/config.py b/logfire/_internal/config.py index 613254ecd..b4d673aad 100644 --- a/logfire/_internal/config.py +++ b/logfire/_internal/config.py @@ -96,6 +96,7 @@ QuietLogExporter, QuietSpanExporter, RetryFewerSpansSpanExporter, + cleanup_disk_retryers, ) from .exporters.processor_wrapper import CheckSuppressInstrumentationProcessorWrapper, MainSpanProcessorWrapper from .exporters.quiet_metrics import QuietMetricExporter @@ -1539,6 +1540,7 @@ def patched_os_exit(code: int): # pragma: no cover config = config_ref() if config is not None: config.force_flush() + cleanup_disk_retryers() except: # noqa # weird errors can happen during shutdown, ignore them *all* with a bare except pass return original_os_exit(code) diff --git a/logfire/_internal/exporters/otlp.py b/logfire/_internal/exporters/otlp.py index 9202cd85c..85e539efc 100644 --- a/logfire/_internal/exporters/otlp.py +++ b/logfire/_internal/exporters/otlp.py @@ -1,8 +1,11 @@ from __future__ import annotations +import atexit import random +import shutil import time import uuid +import weakref from collections import deque from collections.abc import Mapping, Sequence from functools import cached_property @@ -24,6 +27,16 @@ from ..utils import logger, platform_is_emscripten from .wrapper import WrapperLogExporter, WrapperSpanExporter +_DISK_RETRYERS: list[weakref.ref[DiskRetryer]] = [] + + +@atexit.register +def cleanup_disk_retryers() -> None: + for retryer_ref in _DISK_RETRYERS: + retryer = retryer_ref() + if retryer is not None: + retryer.close() + class BodySizeCheckingOTLPSpanExporter(OTLPSpanExporter): # 5MB is significantly less than what our backend currently accepts, @@ -94,6 +107,12 @@ def retryer(self) -> DiskRetryer: # and because the full set of headers are only set some time after this session is created. return DiskRetryer(self.headers) + def close(self) -> None: + retryer = self.__dict__.get('retryer') + if retryer is not None: + retryer.close() + super().close() + def raise_for_retryable_status(response: requests.Response): # These are status codes that OTEL should retry. @@ -121,6 +140,7 @@ def __init__(self, headers: Mapping[str, str | bytes]): self.thread: Thread | None = None self.tasks: deque[tuple[Path, dict[str, Any]]] = deque() self.total_size = 0 + self.closed = False # Make a new session rather than using the OTLPExporterHttpSession directly # because thread safety of Session is questionable. @@ -130,12 +150,31 @@ def __init__(self, headers: Mapping[str, str | bytes]): # The directory where the export files are stored. self.dir = Path(mkdtemp(prefix='logfire-retryer-')) + _DISK_RETRYERS.append(weakref.ref(self)) self.last_log_time = -float('inf') + @staticmethod + def _cleanup_dir(path: Path) -> None: + shutil.rmtree(path, ignore_errors=True) + + def close(self) -> None: + with self.lock: + if self.closed: + return + self.closed = True + self.tasks.clear() + self.total_size = 0 + self.thread = None + + self.session.close() + self._cleanup_dir(self.dir) + def add_task(self, data: bytes, kwargs: dict[str, Any]): try: with self.lock: + if self.closed: + return if self.total_size >= self.MAX_TASK_SIZE: # pragma: no cover if self._should_log(): logger.error( @@ -181,6 +220,9 @@ def _run(self): delay = 1 while True: with self.lock: + if self.closed: + self.thread = None + break if not self.tasks: # All done, end the thread. self.thread = None @@ -192,8 +234,12 @@ def _run(self): try: path, kwargs = task + if self.closed: + break data = path.read_bytes() while True: + if self.closed: + return # Exponential backoff with jitter. # The jitter is proportional to the delay, in particular so that if we go down for a while # and then come back up then retry requests will be spread out over a time of MAX_DELAY. @@ -211,9 +257,10 @@ def _run(self): # Success, set the delay to a small value (so that remaining tasks can be done quickly), # remove the file, and move on to the next task. delay = 0.2 - path.unlink() + path.unlink(missing_ok=True) with self.lock: - self.total_size -= len(data) + if not self.closed: + self.total_size -= len(data) break except Exception: # pragma: no cover diff --git a/tests/exporters/test_otlp_session.py b/tests/exporters/test_otlp_session.py index 2a1a32253..beeef6102 100644 --- a/tests/exporters/test_otlp_session.py +++ b/tests/exporters/test_otlp_session.py @@ -1,3 +1,8 @@ +import os +import subprocess +import sys +import textwrap +from pathlib import Path from typing import Any from unittest.mock import Mock @@ -130,3 +135,50 @@ def send(self, request: PreparedRequest, *args: Any, **kwargs: Any) -> Response: # After that the number of failed exports is unpredictable because the main thread is adding to it # at the same time as the retryer thread removes from it. assert caplog.messages[0] == snapshot('Currently retrying 1 failed export(s) (3 bytes)') + + +def test_disk_retryer_cleanup_after_logfire_shutdown(tmp_path: Path) -> None: + retryer_dir = tmp_path / 'retryer-dir' + marker_file = tmp_path / 'retryer-marker.txt' + + code = textwrap.dedent( + """ + import os + from pathlib import Path + + import requests + import logfire + from logfire._internal.exporters import otlp + + retryer_dir = Path(os.environ['LOGFIRE_RETRYER_DIR']) + marker_file = Path(os.environ['LOGFIRE_RETRYER_MARKER']) + + original_mkdtemp = otlp.mkdtemp + original_post = otlp.OTLPExporterHttpSession._post + + def fake_mkdtemp(prefix: str) -> str: + marker_file.write_text(str(retryer_dir)) + retryer_dir.mkdir() + return str(retryer_dir) + + def fail(self, url, data, **kwargs): + raise requests.exceptions.RequestException('boom') + + otlp.mkdtemp = fake_mkdtemp + otlp.OTLPExporterHttpSession._post = fail + + logfire.configure(send_to_logfire=True, token='pyt_foobar', inspect_arguments=False) + logfire.info('hi') + """ + ) + + env = { + **os.environ, + 'LOGFIRE_RETRYER_DIR': str(retryer_dir), + 'LOGFIRE_RETRYER_MARKER': str(marker_file), + } + result = subprocess.run([sys.executable, '-c', code], cwd=Path.cwd(), env=env, capture_output=True, text=True) + + assert result.returncode == 0, result.stderr + assert marker_file.read_text() == str(retryer_dir) + assert not retryer_dir.exists() From 1315637bc6b0eaa06d458daeb52f13dc7df05cc7 Mon Sep 17 00:00:00 2001 From: Alex Hall Date: Thu, 16 Apr 2026 13:52:15 +0200 Subject: [PATCH 2/3] Simplify DiskRetryer shutdown state checks --- logfire/_internal/exporters/otlp.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/logfire/_internal/exporters/otlp.py b/logfire/_internal/exporters/otlp.py index 85e539efc..1356f4172 100644 --- a/logfire/_internal/exporters/otlp.py +++ b/logfire/_internal/exporters/otlp.py @@ -160,10 +160,8 @@ def _cleanup_dir(path: Path) -> None: def close(self) -> None: with self.lock: - if self.closed: - return self.closed = True - self.tasks.clear() + self.tasks = deque(maxlen=0) self.total_size = 0 self.thread = None @@ -220,9 +218,6 @@ def _run(self): delay = 1 while True: with self.lock: - if self.closed: - self.thread = None - break if not self.tasks: # All done, end the thread. self.thread = None @@ -234,8 +229,6 @@ def _run(self): try: path, kwargs = task - if self.closed: - break data = path.read_bytes() while True: if self.closed: @@ -259,8 +252,7 @@ def _run(self): delay = 0.2 path.unlink(missing_ok=True) with self.lock: - if not self.closed: - self.total_size -= len(data) + self.total_size -= len(data) break except Exception: # pragma: no cover From 8c6336b0fbacd449c7b4fc34b59c8a84ef7431bb Mon Sep 17 00:00:00 2001 From: Alex Hall Date: Thu, 16 Apr 2026 14:21:22 +0200 Subject: [PATCH 3/3] Cover DiskRetryer shutdown cleanup branches --- tests/exporters/test_otlp_session.py | 33 ++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/exporters/test_otlp_session.py b/tests/exporters/test_otlp_session.py index beeef6102..e9e8c132d 100644 --- a/tests/exporters/test_otlp_session.py +++ b/tests/exporters/test_otlp_session.py @@ -1,7 +1,9 @@ +import gc import os import subprocess import sys import textwrap +import weakref from pathlib import Path from typing import Any from unittest.mock import Mock @@ -17,7 +19,9 @@ from logfire._internal.exporters.otlp import ( BodySizeCheckingOTLPSpanExporter, BodyTooLargeError, + DiskRetryer, OTLPExporterHttpSession, + cleanup_disk_retryers, ) from tests.exporters.test_retry_fewer_spans import TEST_SPANS @@ -182,3 +186,32 @@ def fail(self, url, data, **kwargs): assert result.returncode == 0, result.stderr assert marker_file.read_text() == str(retryer_dir) assert not retryer_dir.exists() + + +def test_cleanup_disk_retryers_skips_dead_weakrefs(monkeypatch: pytest.MonkeyPatch) -> None: + live_retryer = DiskRetryer({}) + dead_retryer = DiskRetryer({}) + dead_ref = weakref.ref(dead_retryer) + del dead_retryer + gc.collect() + + monkeypatch.setattr( + 'logfire._internal.exporters.otlp._DISK_RETRYERS', + [dead_ref, weakref.ref(live_retryer)], + ) + + cleanup_disk_retryers() + + assert dead_ref() is None + assert not live_retryer.dir.exists() + + +def test_disk_retryer_add_task_after_close_does_nothing() -> None: + retryer = DiskRetryer({}) + retryer.close() + + retryer.add_task(b'123', {'url': 'http://example.com/'}) + + assert retryer.total_size == 0 + assert not retryer.tasks + assert retryer.thread is None