diff --git a/.github/workflows/hw4_5-tests.yml b/.github/workflows/hw4_5-tests.yml new file mode 100644 index 00000000..9d875bf7 --- /dev/null +++ b/.github/workflows/hw4_5-tests.yml @@ -0,0 +1,55 @@ +name: Run tests HW4 and HW5 + +on: + push: + branches: [ main, master ] + pull_request: + branches: [ main, master ] + +jobs: + test: + runs-on: ubuntu-latest + + services: + postgres: + image: postgres:16 + env: + POSTGRES_USER: user + POSTGRES_PASSWORD: password + POSTGRES_DB: hw2_db + ports: + - 5432:5432 + options: >- + --health-cmd="pg_isready -U user -d hw2_db" + --health-interval=5s + --health-timeout=5s + --health-retries=10 + + steps: + - name: Checkout repo + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r hw2/hw/requirements.txt + + - name: Wait for PostgreSQL + run: | + until pg_isready -h localhost -p 5432 -U user; do + echo "Waiting for Postgres..." + sleep 2 + done + + - name: Run tests with coverage + working-directory: hw2/hw + env: + PYTHONPATH: . + DATABASE_URL: postgresql+psycopg2://user:password@localhost:5432/hw2_db + run: | + pytest --cov=shop_api --cov-report=term-missing --cov-fail-under=98 diff --git a/hw1/app.py b/hw1/app.py index 6107b870..00f79ffe 100644 --- a/hw1/app.py +++ b/hw1/app.py @@ -1,5 +1,84 @@ from typing import Any, Awaitable, Callable +import math +import json +from urllib.parse import parse_qs +def fibonacci(n): + a, b = 0, 1 + for _ in range(n): + a, b = b, a + b + return a + +def mean(nums): + return sum(nums) / len(nums) + +async def send_response(send, status, data): + body = json.dumps(data).encode() + await send({ + "type": "http.response.start", + "status": status, + "headers": [(b"content-type", b"application/json")], + }) + await send({"type": "http.response.body", "body": body}) + +async def factorial_endpoint(send, query_string): + q = parse_qs(query_string) + nums = q.get("n") + if not nums: + await send_response(send, 422, {}) + return + + try: + n = int(nums[0]) + except ValueError: + await send_response(send, 422, {}) + return + + if n < 0: + await send_response(send, 400, {}) + return + + await send_response(send, 200, {"result": math.factorial(n)}) + +async def fibonacci_endpoint(send, n_str): + try: + n = int(n_str) + except ValueError: + await send_response(send, 422, {}) + return + + if n < 0: + await send_response(send, 400, {"error": "n must be >= 0"}) + return + await send_response(send, 200, {"result": fibonacci(n)}) + +async def mean_endpoint(send, receive): + body = b"" + while True: + message = await receive() + if message["type"] == "http.request": + body += message.get("body", b"") + if not message.get("more_body", False): + break + if not body: + await send_response(send, 422, {}) + return + try: + data = json.loads(body.decode()) + except (json.JSONDecodeError, UnicodeDecodeError): + await send_response(send, 422, {}) + return + if not isinstance(data, list): + await send_response(send, 422, {}) + return + if len(data) == 0: + await send_response(send, 400, {}) + return + try: + await send_response(send, 200, {"result": mean(data)}) + except (TypeError, ValueError): + await send_response(send, 422, {}) + return async def application( scope: dict[str, Any], @@ -13,6 +92,35 @@ async def application( send: Корутина для отправки сообщений клиенту """ # TODO: Ваша реализация здесь + + if scope['type'] == 'lifespan': + while True: + message = await receive() + if message['type'] == 'lifespan.startup': + await send({'type': 'lifespan.startup.complete'}) + elif message['type'] == 'lifespan.shutdown': + await send({'type': 'lifespan.shutdown.complete'}) + break + return + if scope['type'] != 'http': + return + method = scope['method'] + path = scope['path'] + query_string = scope.get('query_string', b'').decode() + + if method != 'GET': + await send_response(send, 404, {"error": "Not found"}) + return + + if path == '/factorial': + await factorial_endpoint(send, query_string) + elif path.startswith('/fibonacci/'): + n_str = path[len('/fibonacci/'):] + await fibonacci_endpoint(send, n_str) + elif path == '/mean': + await mean_endpoint(send, receive) + else: + await send_response(send, 404, {"error": "Not found"}) if __name__ == "__main__": import uvicorn diff --git a/hw2/hw/Dockerfile b/hw2/hw/Dockerfile new file mode 100644 index 00000000..27b0b636 --- /dev/null +++ b/hw2/hw/Dockerfile @@ -0,0 +1,23 @@ +FROM python:3.12 AS base + +ARG PYTHONFAULTHANDLER=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONHASHSEED=random \ + PIP_NO_CACHE_DIR=on \ + PIP_DISABLE_PIP_VERSION_CHECK=on \ + PIP_DEFAULT_TIMEOUT=500 + +RUN apt-get update && apt-get install -y gcc +RUN python -m pip install --upgrade pip + +WORKDIR $APP_ROOT/src +COPY . ./ + +ENV VIRTUAL_ENV=$APP_ROOT/src/.venv \ + PATH=$APP_ROOT/src/.venv/bin:$PATH + +RUN pip install -r requirements.txt + +FROM base as local + +CMD ["uvicorn", "shop_api.main:app", "--port", "8080", "--host", "0.0.0.0"] diff --git a/hw2/hw/docker-compose.yml b/hw2/hw/docker-compose.yml new file mode 100644 index 00000000..29531094 --- /dev/null +++ b/hw2/hw/docker-compose.yml @@ -0,0 +1,47 @@ +services: + db: + image: postgres:16 + restart: always + environment: + POSTGRES_DB: hw2_db + POSTGRES_USER: user + POSTGRES_PASSWORD: password + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U user -d hw2_db"] + interval: 2s + timeout: 2s + retries: 30 + + local: + build: + context: . + dockerfile: ./Dockerfile + target: local + restart: always + # зависит от готовности БД, чтобы скрипты не ловили гонку + depends_on: + db: + condition: service_healthy + ports: + - "8080:8080" + + grafana: + image: grafana/grafana:latest + ports: + - "3000:3000" + restart: always + + prometheus: + image: prom/prometheus + volumes: + - ./prometheus/:/etc/prometheus/ + command: + - "--config.file=/etc/prometheus/prometheus.yml" + - "--storage.tsdb.path=/prometheus" + - "--web.console.libraries=/usr/share/prometheus/console_libraries" + - "--web.console.templates=/usr/share/prometheus/consoles" + ports: + - "9090:9090" + restart: always diff --git a/hw2/hw/grafana/dashboard.json b/hw2/hw/grafana/dashboard.json new file mode 100644 index 00000000..564e79e8 --- /dev/null +++ b/hw2/hw/grafana/dashboard.json @@ -0,0 +1,258 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 1, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "cf0uyho4gzn5sd" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "percent" + }, + "overrides": [ + { + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": [ + "{instance=\"local:8080\", job=\"demo-service-local\"}" + ], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": true, + "viz": true + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0", + "targets": [ + { + "editorMode": "builder", + "expr": "rate(process_cpu_seconds_total[1m])", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "cpu usage, %", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "cf0uyho4gzn5sd" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "cf0uyho4gzn5sd" + }, + "editorMode": "builder", + "expr": "sum(rate(http_requests_total[1m]))", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "incoming http requests (1m rate)", + "type": "timeseries" + } + ], + "preload": false, + "schemaVersion": 42, + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-5m", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "hw3", + "uid": "ad6jrql", + "version": 7 +} \ No newline at end of file diff --git a/hw2/hw/prometheus/prometheus.yml b/hw2/hw/prometheus/prometheus.yml new file mode 100644 index 00000000..6bdf88e7 --- /dev/null +++ b/hw2/hw/prometheus/prometheus.yml @@ -0,0 +1,10 @@ +global: + scrape_interval: 10s + evaluation_interval: 10s + +scrape_configs: + - job_name: demo-service-local + metrics_path: /metrics + static_configs: + - targets: + - local:8080 diff --git a/hw2/hw/requirements.txt b/hw2/hw/requirements.txt index 207dcf5c..634b12b1 100644 --- a/hw2/hw/requirements.txt +++ b/hw2/hw/requirements.txt @@ -1,9 +1,15 @@ # Основные зависимости для ASGI приложения fastapi>=0.117.1 uvicorn>=0.24.0 +prometheus-fastapi-instrumentator -# Зависимости для тестирования -pytest>=7.4.0 +# Тесты +pytest>=8.2.0 +pytest-cov>=4.1.0 pytest-asyncio>=0.21.0 httpx>=0.27.2 Faker>=37.8.0 + +# БД +SQLAlchemy>=2.0.30 +psycopg2-binary>=2.9.9 diff --git a/hw2/hw/shop_api/main.py b/hw2/hw/shop_api/main.py index f60a8c60..bc1f115f 100644 --- a/hw2/hw/shop_api/main.py +++ b/hw2/hw/shop_api/main.py @@ -1,3 +1,258 @@ -from fastapi import FastAPI +from fastapi import FastAPI, HTTPException, Query, Path, Response, Depends +from pydantic import BaseModel, Field +from typing import Optional, List, Dict +from prometheus_fastapi_instrumentator import Instrumentator +from sqlalchemy import ( + create_engine, Integer, String, Float, Boolean, ForeignKey, select +) +from sqlalchemy.orm import ( + declarative_base, Mapped, mapped_column, relationship, + sessionmaker, Session +) +import os + +# FastAPI + metrics app = FastAPI(title="Shop API") +Instrumentator().instrument(app).expose(app) + +# DB setup +DATABASE_URL = os.environ.get( + "DATABASE_URL", + "postgresql+psycopg2://user:password@db:5432/hw2_db", +) +engine = create_engine(DATABASE_URL, pool_pre_ping=True, future=True) +SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True) +Base = declarative_base() + +# ORM models +class Item(Base): + __tablename__ = "items" + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + name: Mapped[str] = mapped_column(String(255), nullable=False) + price: Mapped[float] = mapped_column(Float, nullable=False) + deleted: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) + + cart_items: Mapped[List["CartItem"]] = relationship(back_populates="item", cascade="all, delete-orphan") + +class Cart(Base): + __tablename__ = "carts" + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + + items: Mapped[List["CartItem"]] = relationship(back_populates="cart", cascade="all, delete-orphan") + +class CartItem(Base): + __tablename__ = "cart_items" + cart_id: Mapped[int] = mapped_column(ForeignKey("carts.id", ondelete="CASCADE"), primary_key=True) + item_id: Mapped[int] = mapped_column(ForeignKey("items.id", ondelete="CASCADE"), primary_key=True) + quantity: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + + cart: Mapped["Cart"] = relationship(back_populates="items") + item: Mapped["Item"] = relationship(back_populates="cart_items") + +# Create tables at import (как было) +Base.metadata.create_all(bind=engine) + +def get_db() -> Session: + db = SessionLocal() + try: + yield db + finally: + db.close() + +# Schemas +class ItemCreate(BaseModel): + name: str = Field(..., min_length=1) + price: float = Field(..., ge=0.0) + +class ItemPut(BaseModel): + name: str = Field(..., min_length=1) + price: float = Field(..., ge=0.0) + +class ItemPatch(BaseModel): + name: Optional[str] = Field(None, min_length=1) + price: Optional[float] = Field(None, ge=0.0) + class Config: + extra = "forbid" + +class ItemOut(BaseModel): + id: int + name: str + price: float + deleted: bool = False + +class CartItemOut(BaseModel): + id: int + name: str + quantity: int + available: bool + +class CartOut(BaseModel): + id: int + items: List[CartItemOut] + price: float + +# utils +def get_item_or_404(db: Session, item_id: int, allow_deleted: bool = False) -> Item: + it = db.get(Item, item_id) + if not it or (it.deleted and not allow_deleted): + raise HTTPException(404, "Item not found") + return it + +def get_cart_or_404(db: Session, cart_id: int) -> Cart: + cart = db.get(Cart, cart_id) + if not cart: + raise HTTPException(404, "Cart not found") + return cart + +def cart_snapshot(db: Session, cart_id: int) -> CartOut: + cart = get_cart_or_404(db, cart_id) + stmt = ( + select(CartItem, Item) + .select_from(CartItem) + .join(Item, Item.id == CartItem.item_id) + .where(CartItem.cart_id == cart.id) + ) + items_out: List[CartItemOut] = [] + total = 0.0 + for ci, it in db.execute(stmt).all(): + items_out.append( + CartItemOut( + id=it.id, + name=it.name, + quantity=ci.quantity, + available=not it.deleted, + ) + ) + total += float(it.price) * ci.quantity + return CartOut(id=cart.id, items=items_out, price=total) + +# cart endpoints +@app.post("/cart", status_code=201) +def create_cart(response: Response, db: Session = Depends(get_db)): + cart = Cart() + db.add(cart) + db.commit() + db.refresh(cart) + response.headers["Location"] = f"/cart/{cart.id}" + return {"id": cart.id} + +@app.get("/cart/{cart_id}", response_model=CartOut) +def get_cart(cart_id: int = Path(..., ge=1), db: Session = Depends(get_db)): + return cart_snapshot(db, cart_id) + +@app.get("/cart", response_model=List[CartOut]) +def list_carts( + offset: int = Query(0, ge=0), + limit: int = Query(10, gt=0), + min_price: Optional[float] = Query(None, ge=0.0), + max_price: Optional[float] = Query(None, ge=0.0), + min_quantity: Optional[int] = Query(None, ge=0), + max_quantity: Optional[int] = Query(None, ge=0), + db: Session = Depends(get_db), +): + result: List[CartOut] = [] + for cart_id_value, in db.execute(select(Cart.id).order_by(Cart.id)).all(): + snap = cart_snapshot(db, cart_id_value) + qty = sum(ci.quantity for ci in snap.items) + if min_quantity is not None and qty < min_quantity: + continue + if max_quantity is not None and qty > max_quantity: + continue + if min_price is not None and snap.price < min_price: + continue + if max_price is not None and snap.price > max_price: + continue + result.append(snap) + return result[offset: offset + limit] + +@app.post("/cart/{cart_id}/add/{item_id}", response_model=CartOut) +def add_to_cart( + cart_id: int = Path(..., ge=1), + item_id: int = Path(..., ge=1), + db: Session = Depends(get_db), +): + cart = get_cart_or_404(db, cart_id) + it = get_item_or_404(db, item_id, allow_deleted=True) + if it.deleted: + raise HTTPException(400, "Cannot add a deleted item to cart") + + existing_ci = db.execute( + select(CartItem).where( + CartItem.cart_id == cart.id, + CartItem.item_id == it.id, + ) + ).scalar_one_or_none() + + if existing_ci is not None: + existing_ci.quantity += 1 + else: + db.add(CartItem(cart_id=cart.id, item_id=it.id, quantity=1)) + + db.commit() + return cart_snapshot(db, cart_id) + +# item endpoints +@app.post("/item", response_model=ItemOut, status_code=201) +def create_item(body: ItemCreate, db: Session = Depends(get_db)): + it = Item(name=body.name, price=float(body.price), deleted=False) + db.add(it) + db.commit() + db.refresh(it) + return ItemOut(id=it.id, name=it.name, price=it.price, deleted=it.deleted) + +@app.get("/item/{item_id}", response_model=ItemOut) +def get_item(item_id: int = Path(..., ge=1), db: Session = Depends(get_db)): + it = get_item_or_404(db, item_id) + return ItemOut(id=it.id, name=it.name, price=it.price, deleted=it.deleted) + +@app.get("/item", response_model=List[ItemOut]) +def list_items( + offset: int = Query(0, ge=0), + limit: int = Query(10, gt=0), + min_price: Optional[float] = Query(None, ge=0.0), + max_price: Optional[float] = Query(None, ge=0.0), + show_deleted: bool = Query(False), + db: Session = Depends(get_db), +): + stmt = select(Item).order_by(Item.id) + items = [it for (it,) in db.execute(stmt).all()] + out: List[ItemOut] = [] + for it in items: + if not show_deleted and it.deleted: + continue + if min_price is not None and float(it.price) < min_price: + continue + if max_price is not None and float(it.price) > max_price: + continue + out.append(ItemOut(id=it.id, name=it.name, price=it.price, deleted=it.deleted)) + return out[offset: offset + limit] + +@app.put("/item/{item_id}", response_model=ItemOut) +def put_item(item_id: int = Path(..., ge=1), body: ItemPut = ..., db: Session = Depends(get_db)): + it = get_item_or_404(db, item_id) + it.name = body.name + it.price = float(body.price) + db.commit() + db.refresh(it) + return ItemOut(id=it.id, name=it.name, price=it.price, deleted=it.deleted) + +@app.patch("/item/{item_id}", response_model=ItemOut) +def patch_item(item_id: int = Path(..., ge=1), body: ItemPatch = ..., db: Session = Depends(get_db)): + it = get_item_or_404(db, item_id, allow_deleted=True) + if it.deleted: + return Response(status_code=304) + if body.name is not None: + it.name = body.name + if body.price is not None: + it.price = float(body.price) + db.commit() + db.refresh(it) + return ItemOut(id=it.id, name=it.name, price=it.price, deleted=it.deleted) + +@app.delete("/item/{item_id}") +def delete_item(item_id: int = Path(..., ge=1), db: Session = Depends(get_db)): + it = get_item_or_404(db, item_id, allow_deleted=True) + it.deleted = True + db.commit() + return {} diff --git a/hw2/hw/tests/test.py b/hw2/hw/tests/test.py new file mode 100644 index 00000000..9161e490 --- /dev/null +++ b/hw2/hw/tests/test.py @@ -0,0 +1,29 @@ +from http import HTTPStatus +from fastapi.testclient import TestClient +from shop_api.main import app + +client = TestClient(app) + + +def test_get_item_not_found(): + """Проверяет 404 при запросе несуществующего товара""" + resp = client.get("/item/999999") + assert resp.status_code == HTTPStatus.NOT_FOUND + + +def test_get_cart_not_found(): + """Проверяет 404 при запросе несуществующей корзины""" + resp = client.get("/cart/999999") + assert resp.status_code == HTTPStatus.NOT_FOUND + + +def test_add_deleted_item_to_cart_returns_400(): + """Проверяет 400 при добавлении удалённого товара в корзину""" + # создаём корзину + cart_id = client.post("/cart").json()["id"] + # создаём и удаляем товар + item_id = client.post("/item", json={"name": "X", "price": 1.0}).json()["id"] + client.delete(f"/item/{item_id}") + # пытаемся добавить удалённый товар + resp = client.post(f"/cart/{cart_id}/add/{item_id}") + assert resp.status_code == HTTPStatus.BAD_REQUEST diff --git a/hw2/hw/tx_demos.py b/hw2/hw/tx_demos.py new file mode 100644 index 00000000..b7761c6c --- /dev/null +++ b/hw2/hw/tx_demos.py @@ -0,0 +1,135 @@ +from __future__ import annotations + +import time +from contextlib import contextmanager +from typing import Iterator + +from sqlalchemy import create_engine, text +from sqlalchemy.engine import Engine, Connection +from sqlalchemy.exc import OperationalError, DBAPIError + +DATABASE_URL = "postgresql+psycopg2://user:password@db:5432/hw2_db" + +def wait_for_db(engine: Engine, attempts: int = 60, pause: float = 1.0) -> None: + for _ in range(attempts): + try: + with engine.connect() as conn: + conn.execute(text("SELECT 1")) + return + except OperationalError: + time.sleep(pause) + raise RuntimeError("DB is not ready after wait") + + +def reset_accounts(engine: Engine) -> None: + with engine.connect() as conn: + conn.execute(text("DROP TABLE IF EXISTS accounts")) + conn.execute( + text( + """ + CREATE TABLE accounts( + id SERIAL PRIMARY KEY, + balance INT NOT NULL + ) + """ + ) + ) + conn.execute(text("INSERT INTO accounts(balance) VALUES (100), (200), (300)")) + conn.commit() + + +def dump_accounts(engine: Engine, label: str) -> None: + with engine.connect() as conn: + rows = conn.execute(text("SELECT id, balance FROM accounts ORDER BY id")).all() + print(label, [{"id": r.id, "balance": r.balance} for r in rows]) + + +@contextmanager +def tx(engine: Engine, isolation: str) -> Iterator[Connection]: + conn = engine.connect().execution_options(isolation_level=isolation) + trans = conn.begin() + try: + yield conn + trans.commit() + except Exception: + trans.rollback() + raise + finally: + conn.close() + + +def demo_non_repeatable_read(engine: Engine) -> None: + print("\nNon-Repeatable Read READ COMMITTED") + with tx(engine, "READ COMMITTED") as t1, tx(engine, "READ COMMITTED") as t2: + v1 = t1.execute(text("SELECT balance FROM accounts WHERE id=1")).scalar_one() + print(f"T1: first read (id=1) = {v1}") + + t2.execute(text("UPDATE accounts SET balance = balance + 50 WHERE id=1")) + # t2 коммитится внутри контекста при выходе + + v2 = t1.execute(text("SELECT balance FROM accounts WHERE id=1")).scalar_one() + print(f"T1: second read (id=1) = {v2} -> изменилось? {'YES' if v2 != v1 else 'NO'}") + + +def demo_repeatable_read_snapshot(engine: Engine) -> None: + print("\nSnapshot REPEATABLE READ (non-repeatable отсутствует в PG)") + with tx(engine, "REPEATABLE READ") as t1, tx(engine, "READ COMMITTED") as t2: + v1 = t1.execute(text("SELECT balance FROM accounts WHERE id=2")).scalar_one() + print(f"T1: first read (id=2) = {v1}") + + t2.execute(text("UPDATE accounts SET balance = balance + 100 WHERE id=2")) + # t2 коммитится при выходе + + v2 = t1.execute(text("SELECT balance FROM accounts WHERE id=2")).scalar_one() + print(f"T1: second read (id=2) = {v2} -> осталось прежним? {'YES' if v2 == v1 else 'NO'}") + + +def demo_phantom_repeatable(engine: Engine) -> None: + print("\nPhantom REPEATABLE READ (в PG фантомов нет)") + with tx(engine, "REPEATABLE READ") as t1, tx(engine, "READ COMMITTED") as t2: + c1 = t1.execute(text("SELECT COUNT(*) FROM accounts WHERE balance >= 100")).scalar_one() + print(f"T1: count BEFORE insert (balance>=100) = {c1}") + + t2.execute(text("INSERT INTO accounts(balance) VALUES (150)")) + # t2 коммитится при выходе + + c2 = t1.execute(text("SELECT COUNT(*) FROM accounts WHERE balance >= 100")).scalar_one() + print(f"T1: count AFTER insert (balance>=100) = {c2} -> совпало? {'YES' if c2 == c1 else 'NO'}") + + +def demo_serializable(engine: Engine) -> None: + print("\nSERIALIZABLE (возможен serialization failure у конкурента)") + with tx(engine, "SERIALIZABLE") as t1: + c1 = t1.execute(text("SELECT COUNT(*) FROM accounts WHERE balance >= 100")).scalar_one() + print(f"T1: count BEFORE = {c1}") + + # Отдельная транзакция T2 — попробует вставить конфликтующую запись + try: + with tx(engine, "SERIALIZABLE") as t2: + t2.execute(text("INSERT INTO accounts(balance) VALUES (999)")) + print("T2: INSERT 999 committed") + except DBAPIError as e: + # SQLSTATE '40001' / 'Serialization failure' — нормальная ситуация + print(f"T2: serialization failure -> rolled back ({e.__class__.__name__}: {e})") + + c2 = t1.execute(text("SELECT COUNT(*) FROM accounts WHERE balance >= 100")).scalar_one() + print(f"T1: count AFTER = {c2} -> инвариант сериализуемости соблюдён") + + +def main() -> None: + engine = create_engine(DATABASE_URL, future=True) + wait_for_db(engine) + + reset_accounts(engine) + dump_accounts(engine, "Initial data:") + + demo_non_repeatable_read(engine) + demo_repeatable_read_snapshot(engine) + demo_phantom_repeatable(engine) + demo_serializable(engine) + + dump_accounts(engine, "Final data:") + + +if __name__ == "__main__": + main()