diff --git a/pyproject.toml b/pyproject.toml index f1a7090..6599ed0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,10 +51,14 @@ lerobot = [ sim = [ "robot_descriptions>=1.11.0,<2.0.0", ] +mesh = [ + "eclipse-zenoh>=1.0.0,<2.0.0", +] all = [ "strands-robots[groot-service]", "strands-robots[lerobot]", "strands-robots[sim]", + "strands-robots[mesh]", ] dev = [ "pytest>=6.0,<9.0.0", @@ -128,7 +132,7 @@ ignore_missing_imports = false # Third-party libs without type stubs [[tool.mypy.overrides]] -module = ["lerobot.*", "gr00t.*", "draccus.*", "msgpack.*", "zmq.*", "huggingface_hub.*", "serial.*", "psutil.*", "torch.*", "torchvision.*", "transformers.*", "einops.*", "robot_descriptions.*"] +module = ["lerobot.*", "gr00t.*", "draccus.*", "msgpack.*", "zmq.*", "huggingface_hub.*", "serial.*", "psutil.*", "torch.*", "torchvision.*", "transformers.*", "einops.*", "robot_descriptions.*", "zenoh.*"] ignore_missing_imports = true # @tool decorator injects runtime signatures mypy cannot check diff --git a/strands_robots/mesh_session.py b/strands_robots/mesh_session.py new file mode 100644 index 0000000..57990dd --- /dev/null +++ b/strands_robots/mesh_session.py @@ -0,0 +1,349 @@ +"""Shared Zenoh session and peer registry for the mesh networking layer. + +This module provides a single, ref-counted :func:`zenoh.open` session per process +and a thread-safe registry of discovered peers. It is the lowest layer of the +mesh stack — higher-level constructs (``Mesh``, presence, RPC) build on top. + +The Zenoh dependency is **lazy**: ``import strands_robots.mesh_session`` does not +import ``zenoh`` at module level. The first call to :func:`get_session` triggers +the real import. If ``eclipse-zenoh`` is not installed the function returns +``None`` and all publish helpers become safe no-ops. + +Connection strategy (when no explicit endpoint is configured): + +1. Try to **listen** on ``tcp/127.0.0.1:{STRANDS_MESH_PORT}`` — this makes the + first process the local router. +2. If the port is already bound, fall back to **client** mode and connect to the + same endpoint. +3. Zenoh scouting (multicast) handles LAN discovery automatically. + +Environment variables +--------------------- +``ZENOH_CONNECT`` + Comma-separated remote endpoint(s) — e.g. ``tcp/10.0.0.1:7447``. +``ZENOH_LISTEN`` + Comma-separated listen endpoint(s). +``STRANDS_MESH_PORT`` + Local auto-mesh port (default ``7447``). +``STRANDS_MESH`` + Set to ``false`` to disable mesh globally. +""" + +from __future__ import annotations + +import atexit +import json +import logging +import threading +import time +from dataclasses import dataclass, field +from typing import Any + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Session singleton — one ``zenoh.Session`` per process, ref-counted +# --------------------------------------------------------------------------- + +_SESSION: Any | None = None # zenoh.Session when open, else None +_SESSION_LOCK = threading.Lock() +_SESSION_REFS: int = 0 + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +#: Default heartbeat frequency (Hz). Presence payloads are published at this rate. +HEARTBEAT_HZ: float = 2.0 + +#: Default state-publishing frequency (Hz). +STATE_HZ: float = 10.0 + +#: Seconds without a heartbeat before a peer is considered dead. +PEER_TIMEOUT: float = 10.0 + + +# --------------------------------------------------------------------------- +# PeerInfo +# --------------------------------------------------------------------------- + + +@dataclass +class PeerInfo: + """A discovered peer on the Zenoh mesh. + + Attributes: + peer_id: Unique identifier for this peer (e.g. ``"so100-a1b2"``). + peer_type: One of ``"robot"``, ``"sim"``, or ``"agent"``. + hostname: The hostname the peer reported. + last_seen: :func:`time.time` of the most recent heartbeat. + caps: Arbitrary capability dictionary broadcast in the presence payload. + """ + + peer_id: str + peer_type: str = "robot" + hostname: str = "" + last_seen: float = 0.0 + caps: dict[str, Any] = field(default_factory=dict) + + @property + def age(self) -> float: + """Seconds since the last heartbeat.""" + return time.time() - self.last_seen + + def to_dict(self) -> dict[str, Any]: + """Serialise to a plain dict (JSON-friendly).""" + return { + "peer_id": self.peer_id, + "type": self.peer_type, + "hostname": self.hostname, + "age": round(self.age, 1), + **self.caps, + } + + +# --------------------------------------------------------------------------- +# Peer registry — shared across all Mesh instances in the same process +# --------------------------------------------------------------------------- + +_PEERS: dict[str, PeerInfo] = {} +_PEERS_VERSION: int = 0 +_PEERS_LOCK = threading.Lock() + + +def update_peer(peer_id: str, peer_type: str, hostname: str, caps: dict[str, Any]) -> bool: + """Insert or update a peer. Returns ``True`` when the peer is new.""" + global _PEERS_VERSION # noqa: PLW0603 — module-level singleton by design + with _PEERS_LOCK: + is_new = peer_id not in _PEERS + _PEERS[peer_id] = PeerInfo( + peer_id=peer_id, + peer_type=peer_type, + hostname=hostname, + last_seen=time.time(), + caps=caps, + ) + if is_new: + _PEERS_VERSION += 1 + return is_new + + +def prune_peers(timeout: float = PEER_TIMEOUT) -> list[str]: + """Remove peers that have not sent a heartbeat within *timeout* seconds. + + Returns: + List of pruned peer IDs (may be empty). + """ + global _PEERS_VERSION # noqa: PLW0603 + now = time.time() + pruned: list[str] = [] + with _PEERS_LOCK: + stale = [pid for pid, p in _PEERS.items() if now - p.last_seen > timeout] + for pid in stale: + del _PEERS[pid] + _PEERS_VERSION += 1 + pruned.append(pid) + for pid in pruned: + logger.info("Mesh: peer %s timed out", pid) + return pruned + + +def get_peers() -> list[dict[str, Any]]: + """Return all known peers as plain dicts.""" + with _PEERS_LOCK: + return [p.to_dict() for p in _PEERS.values()] + + +def get_peer(peer_id: str) -> dict[str, Any] | None: + """Return a single peer by *peer_id*, or ``None`` if unknown.""" + with _PEERS_LOCK: + p = _PEERS.get(peer_id) + return p.to_dict() if p else None + + +def peer_count() -> int: + """Number of currently known (non-stale) peers.""" + with _PEERS_LOCK: + return len(_PEERS) + + +def clear_peers() -> None: + """Remove **all** peers. Intended for tests only.""" + global _PEERS_VERSION # noqa: PLW0603 + with _PEERS_LOCK: + _PEERS.clear() + _PEERS_VERSION += 1 + + +# --------------------------------------------------------------------------- +# Session lifecycle +# --------------------------------------------------------------------------- + + +def _build_config() -> Any: + """Create a ``zenoh.Config`` from environment variables. + + Returns: + A ``zenoh.Config`` instance. + + Raises: + ImportError: If ``eclipse-zenoh`` is not installed. + """ + import os + + import zenoh + + config = zenoh.Config() + + connect = os.getenv("ZENOH_CONNECT") + listen = os.getenv("ZENOH_LISTEN") + + if connect: + endpoints = [e.strip() for e in connect.split(",")] + config.insert_json5("connect/endpoints", json.dumps(endpoints)) + if listen: + endpoints = [e.strip() for e in listen.split(",")] + config.insert_json5("listen/endpoints", json.dumps(endpoints)) + + return config + + +def get_session() -> Any | None: + """Acquire the shared Zenoh session (lazy, ref-counted). + + * First call opens the session; subsequent calls increment the refcount. + * If ``eclipse-zenoh`` is not installed, returns ``None``. + * Thread-safe. + + Returns: + An open ``zenoh.Session``, or ``None`` if Zenoh is unavailable. + """ + global _SESSION, _SESSION_REFS # noqa: PLW0603 + + with _SESSION_LOCK: + if _SESSION is not None: + _SESSION_REFS += 1 + return _SESSION + + try: + import zenoh # noqa: F811 — lazy import + except ImportError: + logger.debug("eclipse-zenoh not installed — mesh disabled") + return None + + import os + + mesh_port = int(os.getenv("STRANDS_MESH_PORT", "7447")) + local_ep = f"tcp/127.0.0.1:{mesh_port}" + + connect_env = os.getenv("ZENOH_CONNECT") + listen_env = os.getenv("ZENOH_LISTEN") + + # When no explicit endpoints are set, try to become the local router. + if not connect_env and not listen_env: + try: + cfg = zenoh.Config() + cfg.insert_json5("listen/endpoints", json.dumps([local_ep])) + cfg.insert_json5("connect/endpoints", json.dumps([local_ep])) + _SESSION = zenoh.open(cfg) + _SESSION_REFS = 1 + logger.info("Zenoh mesh session opened (listener on %s)", local_ep) + return _SESSION + except Exception: + # Port already bound — another process is the local router. + pass + + # Fall back to client mode — connect to the existing listener. + try: + cfg = _build_config() + cfg.insert_json5("mode", '"client"') + cfg.insert_json5("connect/endpoints", json.dumps([local_ep])) + _SESSION = zenoh.open(cfg) + _SESSION_REFS = 1 + logger.info("Zenoh mesh session opened (client → %s)", local_ep) + return _SESSION + except Exception as exc: + logger.warning("Zenoh session open failed (client mode): %s", exc) + return None + + # Explicit endpoints provided via env vars. + try: + cfg = _build_config() + _SESSION = zenoh.open(cfg) + _SESSION_REFS = 1 + logger.info("Zenoh mesh session opened") + return _SESSION + except Exception as exc: + logger.warning("Zenoh session open failed: %s", exc) + return None + + +def release_session() -> None: + """Release one reference to the shared session. + + When the refcount reaches zero the underlying ``zenoh.Session`` is closed. + """ + global _SESSION, _SESSION_REFS # noqa: PLW0603 + + with _SESSION_LOCK: + if _SESSION_REFS <= 0: + return + _SESSION_REFS -= 1 + if _SESSION_REFS <= 0 and _SESSION is not None: + try: + _SESSION.close() + except Exception: + pass + _SESSION = None + _SESSION_REFS = 0 + logger.info("Zenoh mesh session closed") + + +def session_alive() -> bool: + """Return ``True`` if a Zenoh session is currently open.""" + with _SESSION_LOCK: + return _SESSION is not None + + +# --------------------------------------------------------------------------- +# Publish helper +# --------------------------------------------------------------------------- + + +def put(key: str, data: dict[str, Any]) -> None: + """Publish a JSON payload to the mesh. + + This is a fire-and-forget helper. If no session is open the call is a + no-op (no exception raised). + + Args: + key: Zenoh key expression (e.g. ``"strands/picker/presence"``). + data: JSON-serialisable dictionary. + """ + if _SESSION is None: + return + try: + _SESSION.put(key, json.dumps(data).encode()) + except Exception as exc: + logger.debug("Zenoh put error on %s: %s", key, exc) + + +# --------------------------------------------------------------------------- +# Process cleanup +# --------------------------------------------------------------------------- + + +def _atexit_cleanup() -> None: + """Best-effort session teardown on process exit.""" + global _SESSION, _SESSION_REFS # noqa: PLW0603 + with _SESSION_LOCK: + if _SESSION is not None: + try: + _SESSION.close() + except Exception: + pass + _SESSION = None + _SESSION_REFS = 0 + + +atexit.register(_atexit_cleanup) diff --git a/tests/mesh/__init__.py b/tests/mesh/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/tests/mesh/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/mesh/test_mesh_session.py b/tests/mesh/test_mesh_session.py new file mode 100644 index 0000000..7541358 --- /dev/null +++ b/tests/mesh/test_mesh_session.py @@ -0,0 +1,447 @@ +"""Tests for strands_robots.mesh_session — session singleton + peer registry. + +All tests mock zenoh so no network or real zenoh install is required. +""" + +from __future__ import annotations + +import json +import threading +import time +from collections.abc import Iterator +from unittest.mock import MagicMock, patch + +import pytest + +from strands_robots.mesh_session import ( + PeerInfo, + clear_peers, + get_peer, + get_peers, + peer_count, + prune_peers, + put, + update_peer, +) + +# --------------------------------------------------------------------------- +# PeerInfo dataclass +# --------------------------------------------------------------------------- + + +class TestPeerInfo: + """PeerInfo stores discovery metadata and exposes age/to_dict.""" + + def test_defaults(self) -> None: + p = PeerInfo(peer_id="arm-1") + assert p.peer_id == "arm-1" + assert p.peer_type == "robot" + assert p.hostname == "" + assert p.caps == {} + + def test_age_increases(self) -> None: + p = PeerInfo(peer_id="arm-1", last_seen=time.time() - 5.0) + assert p.age >= 5.0 + + def test_to_dict_includes_caps(self) -> None: + p = PeerInfo( + peer_id="g1", + peer_type="sim", + hostname="jetson-01", + last_seen=time.time(), + caps={"tool_name": "unitree_g1", "connected": True}, + ) + d = p.to_dict() + assert d["peer_id"] == "g1" + assert d["type"] == "sim" + assert d["hostname"] == "jetson-01" + assert d["tool_name"] == "unitree_g1" + assert d["connected"] is True + assert "age" in d + + def test_to_dict_age_is_rounded(self) -> None: + p = PeerInfo(peer_id="x", last_seen=time.time() - 1.234) + d = p.to_dict() + # age is rounded to 1 decimal + assert isinstance(d["age"], float) + assert d["age"] == round(d["age"], 1) + + +# --------------------------------------------------------------------------- +# Peer registry +# --------------------------------------------------------------------------- + + +class TestPeerRegistry: + """Peer registry: thread-safe upsert, prune, query.""" + + @pytest.fixture(autouse=True) + def _clean_peers(self) -> Iterator[None]: + """Ensure a clean registry for every test.""" + clear_peers() + yield + clear_peers() + + def test_update_peer_new_returns_true(self) -> None: + assert update_peer("arm-1", "robot", "host-a", {}) is True + + def test_update_peer_existing_returns_false(self) -> None: + update_peer("arm-1", "robot", "host-a", {}) + assert update_peer("arm-1", "robot", "host-a", {}) is False + + def test_get_peers_returns_all(self) -> None: + update_peer("arm-1", "robot", "h1", {"hw": "so100"}) + update_peer("arm-2", "sim", "h2", {}) + peers = get_peers() + assert len(peers) == 2 + ids = {p["peer_id"] for p in peers} + assert ids == {"arm-1", "arm-2"} + + def test_get_peer_found(self) -> None: + update_peer("arm-1", "robot", "h1", {}) + p = get_peer("arm-1") + assert p is not None + assert p["peer_id"] == "arm-1" + + def test_get_peer_not_found(self) -> None: + assert get_peer("nonexistent") is None + + def test_peer_count(self) -> None: + assert peer_count() == 0 + update_peer("a", "robot", "", {}) + update_peer("b", "robot", "", {}) + assert peer_count() == 2 + + def test_prune_removes_stale(self) -> None: + update_peer("fresh", "robot", "", {}) + # Manually backdate one peer + from strands_robots.mesh_session import _PEERS, _PEERS_LOCK + + with _PEERS_LOCK: + _PEERS["stale"] = PeerInfo(peer_id="stale", last_seen=time.time() - 30) + + pruned = prune_peers(timeout=10.0) + assert "stale" in pruned + assert get_peer("stale") is None + assert get_peer("fresh") is not None + + def test_prune_returns_empty_when_all_fresh(self) -> None: + update_peer("a", "robot", "", {}) + pruned = prune_peers(timeout=10.0) + assert pruned == [] + + def test_clear_peers(self) -> None: + update_peer("a", "robot", "", {}) + update_peer("b", "robot", "", {}) + clear_peers() + assert peer_count() == 0 + + def test_concurrent_updates(self) -> None: + """Multiple threads updating peers simultaneously don't corrupt state.""" + errors: list[Exception] = [] + + def worker(prefix: str) -> None: + try: + for i in range(50): + update_peer(f"{prefix}-{i}", "robot", "", {}) + except Exception as exc: + errors.append(exc) + + threads = [threading.Thread(target=worker, args=(f"t{n}",)) for n in range(4)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) + + assert not errors + assert peer_count() == 200 # 4 threads × 50 peers + + +# --------------------------------------------------------------------------- +# Session lifecycle (mocked zenoh) +# --------------------------------------------------------------------------- + + +class TestSessionLifecycle: + """get_session / release_session with mocked zenoh.""" + + @pytest.fixture(autouse=True) + def _reset_session(self) -> Iterator[None]: + """Reset module-level session state between tests.""" + import strands_robots.mesh_session as mod + + with mod._SESSION_LOCK: + if mod._SESSION is not None: + try: + mod._SESSION.close() + except Exception: + pass + mod._SESSION = None + mod._SESSION_REFS = 0 + yield + with mod._SESSION_LOCK: + if mod._SESSION is not None: + try: + mod._SESSION.close() + except Exception: + pass + mod._SESSION = None + mod._SESSION_REFS = 0 + + def test_returns_none_when_zenoh_missing(self) -> None: + from strands_robots.mesh_session import get_session + + with patch.dict("sys.modules", {"zenoh": None}): + with patch("builtins.__import__", side_effect=ImportError("no zenoh")): + result = get_session() + assert result is None + + def test_session_opened_as_listener(self) -> None: + """First process should try to listen, succeeding makes it the router.""" + mock_zenoh = MagicMock() + mock_session = MagicMock() + mock_zenoh.open.return_value = mock_session + mock_zenoh.Config.return_value = MagicMock() + + from strands_robots.mesh_session import get_session + + with patch.dict("sys.modules", {"zenoh": mock_zenoh}), patch.dict("os.environ", {}, clear=False): + # Remove any env overrides that might interfere + import os + + os.environ.pop("ZENOH_CONNECT", None) + os.environ.pop("ZENOH_LISTEN", None) + + session = get_session() + + assert session is mock_session + mock_zenoh.open.assert_called_once() + + def test_refcount_increments(self) -> None: + """Second call to get_session increments refcount, doesn't re-open.""" + import strands_robots.mesh_session as mod + + mock_session = MagicMock() + with mod._SESSION_LOCK: + mod._SESSION = mock_session + mod._SESSION_REFS = 1 + + s = mod.get_session() + assert s is mock_session + assert mod._SESSION_REFS == 2 + + def test_release_decrements(self) -> None: + import strands_robots.mesh_session as mod + + mock_session = MagicMock() + with mod._SESSION_LOCK: + mod._SESSION = mock_session + mod._SESSION_REFS = 2 + + mod.release_session() + assert mod._SESSION_REFS == 1 + assert mod._SESSION is mock_session # still open + + def test_release_closes_at_zero(self) -> None: + import strands_robots.mesh_session as mod + + mock_session = MagicMock() + with mod._SESSION_LOCK: + mod._SESSION = mock_session + mod._SESSION_REFS = 1 + + mod.release_session() + assert mod._SESSION is None + assert mod._SESSION_REFS == 0 + mock_session.close.assert_called_once() + + def test_release_noop_when_no_session(self) -> None: + """release_session on an already-closed session doesn't crash.""" + import strands_robots.mesh_session as mod + + mod.release_session() # should not raise + assert mod._SESSION_REFS == 0 + + def test_session_alive(self) -> None: + import strands_robots.mesh_session as mod + + assert mod.session_alive() is False + with mod._SESSION_LOCK: + mod._SESSION = MagicMock() + mod._SESSION_REFS = 1 + assert mod.session_alive() is True + + def test_listener_fallback_to_client(self) -> None: + """If listen fails (port taken), should fall back to client mode.""" + import strands_robots.mesh_session as mod + + mock_zenoh = MagicMock() + mock_session = MagicMock() + + call_count = 0 + + def open_side_effect(cfg: object) -> MagicMock: + nonlocal call_count + call_count += 1 + if call_count == 1: + # First call (listener) fails — port taken + raise OSError("Address already in use") + # Second call (client) succeeds + return mock_session + + mock_zenoh.open.side_effect = open_side_effect + mock_zenoh.Config.return_value = MagicMock() + + with patch.dict("sys.modules", {"zenoh": mock_zenoh}), patch.dict("os.environ", {}, clear=False): + import os + + os.environ.pop("ZENOH_CONNECT", None) + os.environ.pop("ZENOH_LISTEN", None) + + session = mod.get_session() + + assert session is mock_session + assert mock_zenoh.open.call_count == 2 + + +# --------------------------------------------------------------------------- +# put() helper +# --------------------------------------------------------------------------- + + +class TestPut: + """put() publishes JSON or is a no-op when session is None.""" + + @pytest.fixture(autouse=True) + def _reset_session(self) -> Iterator[None]: + import strands_robots.mesh_session as mod + + original = mod._SESSION + yield + with mod._SESSION_LOCK: + mod._SESSION = original + + def test_put_noop_when_no_session(self) -> None: + import strands_robots.mesh_session as mod + + with mod._SESSION_LOCK: + mod._SESSION = None + + # Should not raise + put("strands/test/presence", {"peer_id": "test"}) + + def test_put_publishes_json(self) -> None: + import strands_robots.mesh_session as mod + + mock_session = MagicMock() + with mod._SESSION_LOCK: + mod._SESSION = mock_session + + payload = {"peer_id": "arm-1", "t": 1234} + put("strands/arm-1/presence", payload) + + mock_session.put.assert_called_once() + call_args = mock_session.put.call_args + assert call_args[0][0] == "strands/arm-1/presence" + assert json.loads(call_args[0][1].decode()) == payload + + def test_put_swallows_exception(self) -> None: + """put() logs but doesn't raise on publish failure.""" + import strands_robots.mesh_session as mod + + mock_session = MagicMock() + mock_session.put.side_effect = RuntimeError("network down") + with mod._SESSION_LOCK: + mod._SESSION = mock_session + + # Should not raise + put("strands/test/state", {"x": 1}) + + +# --------------------------------------------------------------------------- +# Connection config from env vars +# --------------------------------------------------------------------------- + + +class TestConnectionConfig: + """_build_config reads ZENOH_CONNECT and ZENOH_LISTEN from env.""" + + def test_explicit_connect(self) -> None: + mock_zenoh = MagicMock() + mock_config = MagicMock() + mock_zenoh.Config.return_value = mock_config + + with ( + patch.dict("sys.modules", {"zenoh": mock_zenoh}), + patch.dict("os.environ", {"ZENOH_CONNECT": "tcp/10.0.0.1:7447,tcp/10.0.0.2:7447"}), + ): + from strands_robots.mesh_session import _build_config + + _build_config() + + mock_config.insert_json5.assert_any_call( + "connect/endpoints", + json.dumps(["tcp/10.0.0.1:7447", "tcp/10.0.0.2:7447"]), + ) + + def test_explicit_listen(self) -> None: + mock_zenoh = MagicMock() + mock_config = MagicMock() + mock_zenoh.Config.return_value = mock_config + + with ( + patch.dict("sys.modules", {"zenoh": mock_zenoh}), + patch.dict("os.environ", {"ZENOH_LISTEN": "tcp/0.0.0.0:7448"}), + ): + from strands_robots.mesh_session import _build_config + + _build_config() + + mock_config.insert_json5.assert_any_call( + "listen/endpoints", + json.dumps(["tcp/0.0.0.0:7448"]), + ) + + +# --------------------------------------------------------------------------- +# atexit cleanup +# --------------------------------------------------------------------------- + + +class TestAtexitCleanup: + """_atexit_cleanup closes session without raising.""" + + @pytest.fixture(autouse=True) + def _reset_session(self) -> Iterator[None]: + import strands_robots.mesh_session as mod + + with mod._SESSION_LOCK: + mod._SESSION = None + mod._SESSION_REFS = 0 + yield + with mod._SESSION_LOCK: + mod._SESSION = None + mod._SESSION_REFS = 0 + + def test_cleanup_closes_session(self) -> None: + import strands_robots.mesh_session as mod + + mock_session = MagicMock() + with mod._SESSION_LOCK: + mod._SESSION = mock_session + mod._SESSION_REFS = 3 + + mod._atexit_cleanup() + + assert mod._SESSION is None + assert mod._SESSION_REFS == 0 + mock_session.close.assert_called_once() + + def test_cleanup_noop_when_no_session(self) -> None: + import strands_robots.mesh_session as mod + + with mod._SESSION_LOCK: + mod._SESSION = None + mod._SESSION_REFS = 0 + + mod._atexit_cleanup() # should not raise