Skip to content

Commit d1626f5

Browse files
committed
fix: removing typealias
1 parent 33146d5 commit d1626f5

File tree

1 file changed

+150
-0
lines changed

1 file changed

+150
-0
lines changed

LoopStructural/utils/observer.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
from __future__ import annotations
2+
3+
from collections.abc import Callable
4+
from contextlib import contextmanager
5+
from typing import Any, Generic, Protocol, TypeVar, runtime_checkable
6+
import threading
7+
import weakref
8+
9+
__all__ = ["Observer", "Observable", "Disposable"]
10+
11+
12+
@runtime_checkable
13+
class Observer(Protocol):
14+
"""Objects implementing an *update* method can subscribe."""
15+
16+
def update(self, observable: "Observable", event: str, *args: Any, **kwargs: Any) -> None:
17+
"""Receive a notification."""
18+
19+
20+
Callback = Callable[["Observable", str, Any], None]
21+
T = TypeVar("T", bound="Observable")
22+
23+
24+
class Disposable:
25+
"""A small helper that detaches an observer when disposed."""
26+
27+
__slots__ = ("_detach",)
28+
29+
def __init__(self, detach: Callable[[], None]):
30+
self._detach = detach
31+
32+
def dispose(self) -> None:
33+
"""Detach the associated observer immediately."""
34+
35+
self._detach()
36+
37+
# Allow use as a context‑manager for temporary subscriptions
38+
def __enter__(self) -> "Disposable":
39+
return self
40+
41+
def __exit__(self, exc_type, exc, tb):
42+
self.dispose()
43+
return False # do not swallow exceptions
44+
45+
46+
class Observable(Generic[T]):
47+
"""Base‑class that provides Observer pattern plumbing."""
48+
49+
#: Internal storage: mapping *event* → WeakSet[Callback]
50+
_observers: dict[str, weakref.WeakSet[Callback]]
51+
_any_observers: weakref.WeakSet[Callback]
52+
53+
def __init__(self) -> None:
54+
self._lock = threading.RLock()
55+
self._observers = {}
56+
self._any_observers = weakref.WeakSet()
57+
self._frozen = 0
58+
self._pending: list[tuple[str, tuple[Any, ...], dict[str, Any]]] = []
59+
60+
# ‑‑‑ subscription api --------------------------------------------------
61+
def attach(self, listener: Observer | Callback, event: str | None = None) -> Disposable:
62+
"""Register *listener* for *event* (all events if *event* is None).
63+
64+
Returns a :class:`Disposable` so the caller can easily detach again.
65+
"""
66+
callback: Callback = (
67+
listener.update # type: ignore[attr‑defined]
68+
if isinstance(listener, Observer) # type: ignore[misc]
69+
else listener # already a callable
70+
)
71+
72+
with self._lock:
73+
if event is None:
74+
self._any_observers.add(callback)
75+
else:
76+
self._observers.setdefault(event, weakref.WeakSet()).add(callback)
77+
78+
return Disposable(lambda: self.detach(listener, event))
79+
80+
def detach(self, listener: Observer | Callback, event: str | None = None) -> None:
81+
"""Unregister a previously attached *listener*."""
82+
83+
callback: Callback = (
84+
listener.update # type: ignore[attr‑defined]
85+
if isinstance(listener, Observer) # type: ignore[misc]
86+
else listener
87+
)
88+
89+
with self._lock:
90+
if event is None:
91+
self._any_observers.discard(callback)
92+
for s in self._observers.values():
93+
s.discard(callback)
94+
else:
95+
self._observers.get(event, weakref.WeakSet()).discard(callback)
96+
def __getstate__(self):
97+
state = self.__dict__.copy()
98+
state.pop('_lock', None) # RLock cannot be pickled
99+
state.pop('_observers', None) # WeakSet cannot be pickled
100+
state.pop('_any_observers', None)
101+
return state
102+
def __setstate__(self, state):
103+
self.__dict__.update(state)
104+
self._lock = threading.RLock()
105+
self._observers = {}
106+
self._any_observers = weakref.WeakSet()
107+
self._frozen = 0
108+
# ‑‑‑ notification api --------------------------------------------------
109+
def notify(self: T, event: str, *args: Any, **kwargs: Any) -> None:
110+
"""Notify observers that *event* happened."""
111+
112+
with self._lock:
113+
if self._frozen:
114+
# defer until freeze_notifications() exits
115+
self._pending.append((event, args, kwargs))
116+
return
117+
118+
observers = list(self._any_observers)
119+
observers.extend(self._observers.get(event, ()))
120+
121+
# Call outside lock — prevent deadlocks if observers trigger other
122+
# notifications.
123+
for cb in observers:
124+
try:
125+
cb(self, event, *args, **kwargs)
126+
except Exception: # pragma: no cover
127+
# Optionally log; never allow an observer error to break flow.
128+
import logging
129+
130+
logging.getLogger(__name__).exception(
131+
"Unhandled error in observer %s for event %s", cb, event
132+
)
133+
134+
# ‑‑‑ batching ----------------------------------------------------------
135+
@contextmanager
136+
def freeze_notifications(self):
137+
"""Context manager that batches notifications until exit."""
138+
139+
with self._lock:
140+
self._frozen += 1
141+
try:
142+
yield self
143+
finally:
144+
with self._lock:
145+
self._frozen -= 1
146+
if self._frozen == 0 and self._pending:
147+
pending = self._pending[:]
148+
self._pending.clear()
149+
for event, args, kw in pending: # type: ignore[has‑type]
150+
self.notify(event, *args, **kw)

0 commit comments

Comments
 (0)