Skip to content

Commit 939742d

Browse files
committed
init project
1 parent 43eb16f commit 939742d

39 files changed

+4690
-1
lines changed

.python-version

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.13

Makefile

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
lint:
2+
ruff format .
3+
mypy .
4+
ruff check --fix .
5+
flake8 .
6+
7+
test:
8+
pytest --cov context_async_sqlalchemy exmaples/fastapi_example/tests --cov-report=term-missing
9+
10+
uv:
11+
uv sync
12+
source .venv/bin/activate

README.md

Lines changed: 185 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,186 @@
11
# context-async-sqlalchemy
2-
A convenient way to configure and interact with a async sqlalchemy session through context in asynchronous applications
2+
3+
ContextVar + async sqlalchemy = happiness.
4+
5+
A convenient way to configure and interact with async sqlalchemy session
6+
through context in asynchronous applications.
7+
8+
## What does usage look like?
9+
10+
```python
11+
from context_async_sqlalchemy import db_session
12+
from sqlalchemy import insert
13+
14+
from ..models import ExampleTable
15+
16+
async def some_func() -> None:
17+
# Created a session (no connection to the database yet)
18+
# If you call db_session again, it will return the same session
19+
# even in child coroutines.
20+
session = await db_session()
21+
22+
stmt = insert(ExampleTable).values(text="example_with_db_session")
23+
24+
# On the first request, a connection and transaction were opened
25+
await session.execute(stmt)
26+
27+
# The commit and closing of the session will occur automatically
28+
```
29+
30+
31+
## how to use
32+
33+
The repository includes na example integration with FastAPI,
34+
which describes numerous workflows.
35+
[FastAPI example](exmaples/fastapi_example/routes)
36+
37+
38+
It also includes two types of test setups you can use in your projects.
39+
40+
[FastAPI tests example](exmaples/fastapi_example/tests)
41+
42+
### The most basic example
43+
44+
#### 1. configure the connection to the database
45+
46+
for example for PostgreSQL:
47+
48+
```python
49+
from sqlalchemy.ext.asyncio import (
50+
async_sessionmaker,
51+
AsyncSession,
52+
create_async_engine,
53+
)
54+
from context_async_sqlalchemy import db_connect
55+
56+
db_connect.engine = create_async_engine(
57+
f"postgresql+asyncpg://"
58+
f"{pg_user}:{pg_password}"
59+
f"@{host}:{pg_port}"
60+
f"/{pg_db}",
61+
future=True,
62+
pool_pre_ping=True,
63+
)
64+
db_connect.session_maker = async_sessionmaker(
65+
db_connect.engine, class_=AsyncSession, expire_on_commit=False
66+
)
67+
68+
```
69+
70+
#### 2. Close the resources at the end of your application's life
71+
72+
Example for FastAPI:
73+
74+
```python
75+
import asyncio
76+
from typing import Any, AsyncGenerator
77+
from contextlib import asynccontextmanager
78+
from fastapi import FastAPI
79+
from context_async_sqlalchemy import db_connect
80+
81+
@asynccontextmanager
82+
async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
83+
"""
84+
It is important to clean up resources at the end of an application's
85+
life.
86+
"""
87+
yield
88+
await asyncio.gather(
89+
db_connect.close(), # Close the engine if it was open
90+
... # other resources in your application
91+
)
92+
```
93+
94+
95+
#### 3. Setup context lifetime
96+
97+
For a contextual session to work, a context needs to be set. This assumes some
98+
kind of middleware.
99+
100+
I'll use FastAPI middleware as an example:
101+
```python
102+
from fastapi import Request
103+
from starlette.middleware.base import ( # type: ignore[attr-defined]
104+
Response,
105+
RequestResponseEndpoint,
106+
)
107+
108+
from context_async_sqlalchemy import (
109+
auto_commit_by_status_code,
110+
init_db_session_ctx,
111+
is_context_initiated,
112+
reset_db_session_ctx,
113+
rollback_db_session,
114+
)
115+
116+
117+
async def fastapi_db_session_middleware(
118+
request: Request, call_next: RequestResponseEndpoint
119+
) -> Response:
120+
"""
121+
Database session lifecycle management.
122+
The session itself is created on demand in db_session().
123+
124+
Transaction auto-commit is implemented if there is no exception and
125+
the response status is < 400. Otherwise, a rollback is performed.
126+
127+
But you can commit or rollback manually in the handler.
128+
"""
129+
# Tests may have different session management rules
130+
# so if the context variable is already set, we do nothing
131+
if is_context_initiated():
132+
return await call_next(request)
133+
134+
# We set the context here, meaning all child coroutines will receive the
135+
# same context. And even if a child coroutine requests the
136+
# session first, the dictionary itself is shared, and this coroutine will
137+
# add the session to dictionary = shared context.
138+
token = init_db_session_ctx()
139+
try:
140+
response = await call_next(request)
141+
await auto_commit_by_status_code(response.status_code)
142+
return response
143+
except Exception:
144+
await rollback_db_session()
145+
raise
146+
finally:
147+
await reset_db_session_ctx(token)
148+
```
149+
150+
151+
You can use ready-made FastAPI middleware:
152+
```python
153+
from context_async_sqlalchemy import fastapi_db_session_middleware
154+
from starlette.middleware.base import BaseHTTPMiddleware
155+
156+
app.add_middleware(
157+
BaseHTTPMiddleware, dispatch=fastapi_db_session_middleware
158+
)
159+
```
160+
161+
162+
#### 4. Write a function that will work with the session
163+
164+
```python
165+
from context_async_sqlalchemy import db_session
166+
from sqlalchemy import insert
167+
168+
from ..models import ExampleTable
169+
170+
async def handler_with_db_session() -> None:
171+
"""
172+
An example of a typical handle that uses a context session to work with
173+
a database.
174+
Autocommit or autorollback occurs automatically at the end of a request
175+
(in middleware).
176+
"""
177+
# Created a session (no connection to the database yet)
178+
# If you call db_session again, it will return the same session
179+
# even in child coroutines.
180+
session = await db_session()
181+
182+
stmt = insert(ExampleTable).values(text="example_with_db_session")
183+
184+
# On the first request, a connection and transaction were opened
185+
await session.execute(stmt)
186+
```
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from .context import (
2+
init_db_session_ctx,
3+
is_context_initiated,
4+
reset_db_session_ctx,
5+
get_db_session_from_context,
6+
put_db_session_to_context,
7+
pop_db_session_from_context,
8+
run_in_new_ctx,
9+
)
10+
from .connect import (
11+
DBConnect,
12+
db_connect,
13+
)
14+
from .session import (
15+
db_session,
16+
atomic_db_session,
17+
run_with_new_db_session,
18+
run_with_new_atomic_db_session,
19+
commit_db_session,
20+
rollback_db_session,
21+
close_db_session,
22+
)
23+
from .auto_commit import auto_commit_by_status_code
24+
from .fastapi_utls.middleware import fastapi_db_session_middleware
25+
26+
__all__ = [
27+
"init_db_session_ctx",
28+
"is_context_initiated",
29+
"reset_db_session_ctx",
30+
"get_db_session_from_context",
31+
"put_db_session_to_context",
32+
"pop_db_session_from_context",
33+
"run_in_new_ctx",
34+
"DBConnect",
35+
"db_connect",
36+
"db_session",
37+
"atomic_db_session",
38+
"run_with_new_db_session",
39+
"run_with_new_atomic_db_session",
40+
"commit_db_session",
41+
"rollback_db_session",
42+
"close_db_session",
43+
"auto_commit_by_status_code",
44+
"fastapi_db_session_middleware",
45+
]
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from http import HTTPStatus
2+
3+
from .context import get_db_session_from_context
4+
5+
6+
async def auto_commit_by_status_code(status_code: int) -> None:
7+
"""
8+
Implements automatic commit or rollback.
9+
It should be used, for example, in the middleware or anywhere else
10+
where you expect session lifecycle management.
11+
"""
12+
session = get_db_session_from_context()
13+
14+
if session and session.in_transaction():
15+
if status_code < HTTPStatus.BAD_REQUEST:
16+
await session.commit()
17+
else:
18+
await session.rollback()
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import asyncio
2+
from sqlalchemy.ext.asyncio import (
3+
async_sessionmaker,
4+
AsyncEngine,
5+
AsyncSession,
6+
)
7+
8+
9+
class DBConnect:
10+
"""stores the database connection parameters"""
11+
12+
def __init__(self) -> None:
13+
self.engine: AsyncEngine | None = None
14+
self.session_maker: async_sessionmaker[AsyncSession] | None = None
15+
self._lock = asyncio.Lock()
16+
17+
async def close(self) -> None:
18+
if self.engine:
19+
await self.engine.dispose()
20+
self.engine = None
21+
22+
23+
db_connect = DBConnect()
24+
25+
26+
def get_session_maker() -> async_sessionmaker[AsyncSession]:
27+
"""Gets the session maker"""
28+
assert db_connect.session_maker
29+
return db_connect.session_maker
30+
31+
32+
def create_session() -> AsyncSession:
33+
"""Creates a new session"""
34+
assert db_connect.session_maker
35+
return db_connect.session_maker()

0 commit comments

Comments
 (0)