Skip to content

Commit 740d3bf

Browse files
committed
LISTEN/NOTIFY funcionality
Signed-off-by: chandr-andr (Kiselev Aleksandr) <chandr@chandr.net>
1 parent 0aabe17 commit 740d3bf

File tree

8 files changed

+412
-19
lines changed

8 files changed

+412
-19
lines changed

python/psqlpy/_internal/__init__.pyi

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1828,7 +1828,7 @@ class Listener:
18281828
self: Self,
18291829
channel: str,
18301830
callback: Callable[
1831-
[str, str, int, Connection],
1831+
[Connection, str, str, int],
18321832
Awaitable[None],
18331833
],
18341834
) -> None:
@@ -1837,12 +1837,14 @@ class Listener:
18371837
Callback must be async function and have signature like this:
18381838
```python
18391839
async def callback(
1840-
channel: str,
1841-
payload: str,
1842-
process_id: str,
18431840
connection: Connection,
1841+
payload: str,
1842+
channel: str,
1843+
process_id: int,
18441844
) -> None: ...
18451845
```
1846+
1847+
Callback parameters are passed as args.
18461848
"""
18471849

18481850
async def clear_channel_callbacks(self, channel: str) -> None:
@@ -1852,12 +1854,14 @@ class Listener:
18521854
- `channel`: name of the channel.
18531855
"""
18541856

1857+
async def clear_all_channels(self) -> None:
1858+
"""Clear all channels callbacks."""
1859+
18551860
def listen(self: Self) -> None:
18561861
"""Start listening.
18571862
18581863
Start actual listening.
18591864
In the background it creates task in Rust event loop.
1860-
You must save returned Future to the array.
18611865
"""
18621866

18631867
def abort_listen(self: Self) -> None:

python/psqlpy/_internal/exceptions.pyi

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,15 @@ class PyToRustValueMappingError(RustPSQLDriverPyBaseError):
7979
You can get this exception when executing queries with parameters.
8080
So, if there are no parameters for the query, don't handle this error.
8181
"""
82+
83+
class BaseListenerError(RustPSQLDriverPyBaseError):
84+
"""Base error for all Listener errors."""
85+
86+
class ListenerStartError(BaseListenerError):
87+
"""Error if listener start failed."""
88+
89+
class ListenerClosedError(BaseListenerError):
90+
"""Error if listener manipulated but it's closed."""
91+
92+
class ListenerCallbackError(BaseListenerError):
93+
"""Error if callback passed to listener isn't a coroutine."""

python/psqlpy/exceptions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
BaseConnectionError,
33
BaseConnectionPoolError,
44
BaseCursorError,
5+
BaseListenerError,
56
BaseTransactionError,
67
ConnectionClosedError,
78
ConnectionExecuteError,
@@ -12,6 +13,9 @@
1213
CursorCloseError,
1314
CursorFetchError,
1415
CursorStartError,
16+
ListenerCallbackError,
17+
ListenerClosedError,
18+
ListenerStartError,
1519
MacAddrConversionError,
1620
PyToRustValueMappingError,
1721
RustPSQLDriverPyBaseError,
@@ -29,6 +33,7 @@
2933
"BaseConnectionError",
3034
"BaseConnectionPoolError",
3135
"BaseCursorError",
36+
"BaseListenerError",
3237
"BaseTransactionError",
3338
"ConnectionClosedError",
3439
"ConnectionExecuteError",
@@ -39,6 +44,9 @@
3944
"CursorClosedError",
4045
"CursorFetchError",
4146
"CursorStartError",
47+
"ListenerCallbackError",
48+
"ListenerClosedError",
49+
"ListenerStartError",
4250
"MacAddrConversionError",
4351
"PyToRustValueMappingError",
4452
"RustPSQLDriverPyBaseError",

python/tests/conftest.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ def table_name() -> str:
6868
return random_string()
6969

7070

71+
@pytest.fixture
72+
def listener_table_name() -> str:
73+
return random_string()
74+
75+
7176
@pytest.fixture
7277
def number_database_records() -> int:
7378
return random.randint(10, 35)
@@ -137,6 +142,23 @@ async def create_default_data_for_tests(
137142
)
138143

139144

145+
@pytest.fixture
146+
async def create_table_for_listener_tests(
147+
psql_pool: ConnectionPool,
148+
listener_table_name: str,
149+
) -> AsyncGenerator[None, None]:
150+
await psql_pool.execute(
151+
f"CREATE TABLE {listener_table_name}"
152+
f"(id SERIAL, payload VARCHAR(255),"
153+
f"channel VARCHAR(255), process_id INT)",
154+
)
155+
156+
yield
157+
await psql_pool.execute(
158+
f"DROP TABLE {listener_table_name}",
159+
)
160+
161+
140162
@pytest.fixture
141163
async def test_cursor(
142164
psql_pool: ConnectionPool,

0 commit comments

Comments
 (0)