diff --git a/hw1/app.py b/hw1/app.py index 6107b870..0b962343 100644 --- a/hw1/app.py +++ b/hw1/app.py @@ -1,4 +1,57 @@ -from typing import Any, Awaitable, Callable +from typing import Any, Awaitable, Callable, List +from http import HTTPStatus +from json import dumps, loads +from json.decoder import JSONDecodeError +from math import factorial + +def fibonacci(n: int) -> int: + if n < 2: + return n + a = 0 + b = 1 + for i in range(2, n + 1): + a, b = b, a + b + return b + +def mean(numbers: List[float]) -> float: + return sum(numbers) / len(numbers) + +async def return_ok(send: Callable[[dict[str, Any]], Awaitable[None]], + payload_bytes: bytes) -> None: + #code 200 + await send({ + "type": "http.response.start", + "status": HTTPStatus.OK, + "headers": [(b"content-type", b"application/json")], + }) + await send({"type": "http.response.body", "body": payload_bytes}) + +async def return_bad_request(send: Callable[[dict[str, Any]], Awaitable[None]]) -> None: + #code 400 + await send({ + "type": "http.response.start", + "status": HTTPStatus.BAD_REQUEST, + "headers": [(b"content-type", b"text/plain")], + }) + await send({"type": "http.response.body", "body": b"Bad Request"}) + +async def return_not_found(send: Callable[[dict[str, Any]], Awaitable[None]]) -> None: + #code 404 + await send({ + "type": "http.response.start", + "status": HTTPStatus.NOT_FOUND, + "headers": [(b"content-type", b"text/plain")], + }) + await send({"type": "http.response.body", "body": b"Not Found"}) + +async def return_unprocessable_entity(send: Callable[[dict[str, Any]], Awaitable[None]]) -> None: + #code 422 + await send({ + "type": "http.response.start", + "status": HTTPStatus.UNPROCESSABLE_ENTITY, + "headers": [(b"content-type", b"text/plain")], + }) + await send({"type": "http.response.body", "body": b"Unprocessable Entity"}) async def application( @@ -14,6 +67,106 @@ async def application( """ # TODO: Ваша реализация здесь + scope_type = scope.get("type") + + + if scope_type == "lifespan": + while True: + event = await receive() + if event["type"] == "lifespan.startup": + await send({"type": "lifespan.startup.complete"}) + elif event["type"] == "lifespan.shutdown": + await send({"type": "lifespan.shutdown.complete"}) + return + return + + if scope_type != "http": + return + + method = scope.get("method", "GET") + path = scope.get("path", "/") + + + if method == "GET": + if "fibonacci" in path: + parts = path.split("/") + if len(parts) == 3 and parts[:2] == ["", "fibonacci"]: + try: + n = int(parts[2]) + except ValueError: + await return_unprocessable_entity(send) + return + + if n < 0: + await return_bad_request(send) + return + + result = fibonacci(n) + payload = dumps({"result": result}).encode() + + await return_ok(send, payload) + return + + if path == "/factorial": + query_string = scope["query_string"].decode() + parts = query_string.split("=") + if len(parts) == 2 and parts[0] == "n": + try: + n = int(parts[1]) + except ValueError: + await return_unprocessable_entity(send) + return + + if n < 0: + await return_bad_request(send) + return + + result = factorial(n) + payload = dumps({"result": result}).encode() + + await return_ok(send, payload) + return + + await return_unprocessable_entity(send) + return + + if path == "/mean": + body = b"" + while True: + event = await receive() + if event["type"] == "http.request": + body += event.get("body", b"") + if not event.get("more_body", False): + break + + try: + body = loads(body.decode()) + except JSONDecodeError: + await return_unprocessable_entity(send) + return + + if not isinstance(body, list): + await return_unprocessable_entity(send) + return + + if len(body) == 0: + await return_bad_request(send) + return + + if all(map(lambda x: isinstance(x, int) or isinstance(x, float), body)): + numbers = body + result = mean(numbers) + payload = dumps({"result": result}).encode() + await return_ok(send, payload) + return + + await return_unprocessable_entity(send) + return + + await return_not_found(send) + return + + if __name__ == "__main__": import uvicorn uvicorn.run("app:application", host="0.0.0.0", port=8000, reload=True) diff --git a/hw2/hw/Dockerfile b/hw2/hw/Dockerfile new file mode 100644 index 00000000..dd053aef --- /dev/null +++ b/hw2/hw/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.12 + +WORKDIR /app + +COPY requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +EXPOSE 8000 + +# Run the app with uvicorn +CMD ["uvicorn", "shop_api.main:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/hw2/hw/dashboard1.png b/hw2/hw/dashboard1.png new file mode 100644 index 00000000..4b941f4b Binary files /dev/null and b/hw2/hw/dashboard1.png differ diff --git a/hw2/hw/dashboard2.png b/hw2/hw/dashboard2.png new file mode 100644 index 00000000..7b3c993d Binary files /dev/null and b/hw2/hw/dashboard2.png differ diff --git a/hw2/hw/docker-compose.yml b/hw2/hw/docker-compose.yml new file mode 100644 index 00000000..cba42203 --- /dev/null +++ b/hw2/hw/docker-compose.yml @@ -0,0 +1,51 @@ +version: "3" + +services: + + local: + build: + context: . + dockerfile: ./Dockerfile + restart: always + ports: + - 8000:8000 + depends_on: + - db + + grafana: + image: grafana/grafana:latest + ports: + - 3000:3000 + restart: always + + prometheus: + image: prom/prometheus + volumes: + - ./settings/prometheus/:/etc/prometheus/ + command: + - "--config.file=/etc/prometheus/prometheus.yml" + - "--storage.tsdb.path=/prometheus" + - "--web.console.libraries=/usr/share/prometheus/console_libraries" + - "--web.console.templates=/usr/share/prometheus/consoles" + ports: + - 9090:9090 + restart: always + + db: + image: postgres:15 + environment: + POSTGRES_USER: user + POSTGRES_PASSWORD: pass + POSTGRES_DB: shopdb + ports: + - 5432:5432 + volumes: + - postgres_data:/var/lib/postgresql/data + - ./migrations/init.sql:/docker-entrypoint-initdb.d/init.sql + healthcheck: + test: ["CMD-SHELL", "pg_isready -U user -d shopdb -h localhost"] + interval: 5s + timeout: 5s + retries: 5 +volumes: + postgres_data: \ No newline at end of file diff --git a/hw2/hw/migrations/init.sql b/hw2/hw/migrations/init.sql new file mode 100644 index 00000000..770a101a --- /dev/null +++ b/hw2/hw/migrations/init.sql @@ -0,0 +1,23 @@ +-- Очистка старых таблиц +DROP TABLE IF EXISTS items_in_carts CASCADE; +DROP TABLE IF EXISTS carts CASCADE; +DROP TABLE IF EXISTS items CASCADE; + +-- Создание таблиц заново +CREATE TABLE items ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + price NUMERIC(12,2) NOT NULL, + deleted BOOLEAN NOT NULL DEFAULT FALSE +); + +CREATE TABLE carts ( + id SERIAL PRIMARY KEY +); + +CREATE TABLE items_in_carts ( + cart_id INTEGER NOT NULL REFERENCES carts(id) ON DELETE CASCADE, + item_id INTEGER NOT NULL REFERENCES items(id), + quantity INTEGER NOT NULL DEFAULT 1, + PRIMARY KEY (cart_id, item_id) +); diff --git a/hw2/hw/random_requests.py b/hw2/hw/random_requests.py new file mode 100644 index 00000000..fce65dd0 --- /dev/null +++ b/hw2/hw/random_requests.py @@ -0,0 +1,34 @@ +import requests +import time +import json +import random + +URL = "http://localhost:8000/" # Change this + + +r = requests.post(f"{URL}/item", json={"name": 'testing', 'price': 100}) +id = json.loads(r.text)['id'] + +def send_requests(): + N = random.randint(1, 10) + INTERVAL = 10 / N + for i in range(N): + try: + r = requests.get(f"{URL}/item/{id}") + print(f"[{i+1}/{N}] Status: {r.status_code}") + if random.random() < 0.5: + r = requests.get(f"{URL}/item/{id+1}") + print(f"[{i + 1}/{N}] Status: {r.status_code}") + if random.random() < 0.5: + r = requests.post(f"{URL}/abc") + print(f"[{i + 1}/{N}] Status: {r.status_code}") + except Exception as e: + print(f"[{i+1}/{N}] Error: {e}") + time.sleep(INTERVAL) + +if __name__ == "__main__": + while True: + print("\n--- Sending batch of GET requests ---") + send_requests() + print("Waiting for next 10 seconds...") + time.sleep(10) diff --git a/hw2/hw/requirements.txt b/hw2/hw/requirements.txt index 207dcf5c..e0c9a657 100644 --- a/hw2/hw/requirements.txt +++ b/hw2/hw/requirements.txt @@ -1,6 +1,12 @@ # Основные зависимости для ASGI приложения fastapi>=0.117.1 uvicorn>=0.24.0 +prometheus-fastapi-instrumentator + +sqlalchemy==2.0.25 +psycopg2-binary==2.9.9 +asyncpg +alembic # Зависимости для тестирования pytest>=7.4.0 diff --git a/hw2/hw/settings/prometheus/prometheus.yml b/hw2/hw/settings/prometheus/prometheus.yml new file mode 100644 index 00000000..c5ffcc17 --- /dev/null +++ b/hw2/hw/settings/prometheus/prometheus.yml @@ -0,0 +1,10 @@ +global: + scrape_interval: 10s + evaluation_interval: 10s + +scrape_configs: + - job_name: demo-service-local + metrics_path: /metrics + static_configs: + - targets: + - local:8000 diff --git a/hw2/hw/shop_api/db.py b/hw2/hw/shop_api/db.py new file mode 100644 index 00000000..a1dfa359 --- /dev/null +++ b/hw2/hw/shop_api/db.py @@ -0,0 +1,34 @@ +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column +from sqlalchemy import ForeignKey, Numeric, Integer, Boolean, Text, UniqueConstraint +import os + +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://user:pass@db:5432/shopdb") + +engine = create_async_engine(DATABASE_URL, echo=False) +SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession) + +class Base(DeclarativeBase): + pass + +class Item(Base): + __tablename__ = "items" + + id: Mapped[int] = mapped_column(primary_key=True) + name: Mapped[str] = mapped_column(Text, nullable=False) + price: Mapped[float] = mapped_column(Numeric(12,2), nullable=False) + deleted: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) + +class Cart(Base): + __tablename__ = "carts" + + id: Mapped[int] = mapped_column(primary_key=True) + +class ItemInCart(Base): + __tablename__ = "items_in_carts" + + __table_args__ = (UniqueConstraint("cart_id", "item_id", name="uq_cart_item"),) + + cart_id: Mapped[int] = mapped_column(ForeignKey("carts.id", ondelete="CASCADE"), primary_key=True) + item_id: Mapped[int] = mapped_column(ForeignKey("items.id"), primary_key=True) + quantity: Mapped[int] = mapped_column(Integer, nullable=False) diff --git a/hw2/hw/shop_api/main.py b/hw2/hw/shop_api/main.py index f60a8c60..f42a8db9 100644 --- a/hw2/hw/shop_api/main.py +++ b/hw2/hw/shop_api/main.py @@ -1,3 +1,307 @@ -from fastapi import FastAPI +from http import HTTPStatus +from typing import List, Optional +from prometheus_fastapi_instrumentator import Instrumentator + +from fastapi import FastAPI, HTTPException, Query, Response, Depends +from sqlalchemy import select, and_, func +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.ext.asyncio import AsyncSession + +from .db import ( + SessionLocal, + Item as ItemORM, + Cart as CartORM, + ItemInCart as ItemInCartORM, +) +from .schemas import ( + ItemCreate, ItemUpdate, ItemPatch, ItemOut, + CartOut, ItemInCartOut, +) app = FastAPI(title="Shop API") + +Instrumentator().instrument(app).expose(app) + +# Dependency that provides a DB session per request +async def get_session() -> AsyncSession: + async with SessionLocal() as session: + yield session + + + +@app.post("/item", response_model=ItemOut, status_code=HTTPStatus.CREATED) +async def create_item( + data: ItemCreate, + session: AsyncSession = Depends(get_session), +): + obj = ItemORM(name=data.name, price=data.price, deleted=False) + + session.add(obj) + + await session.commit() + + await session.refresh(obj) + + return ItemOut.model_validate(obj, from_attributes=True) + + +@app.get("/item/{id}", response_model=ItemOut) +async def get_item( + id: int, + session: AsyncSession = Depends(get_session), +): + res = await session.execute( + select(ItemORM).where(ItemORM.id == id, ItemORM.deleted == False) + ) + obj = res.scalar_one_or_none() + + if not obj: + raise HTTPException(status_code=404, detail="Item not found") + + return ItemOut.model_validate(obj, from_attributes=True) + + +@app.get("/item", response_model=List[ItemOut]) +async def get_items_list( + offset: int = Query(0, ge=0), + limit: int = Query(10, gt=0), + min_price: Optional[float] = Query(None, ge=0), + max_price: Optional[float] = Query(None, ge=0), + show_deleted: bool = Query(False), + session: AsyncSession = Depends(get_session), +): + stmt = select(ItemORM) + + conditions = [] + if not show_deleted: + conditions.append(ItemORM.deleted == False) + if min_price is not None: + conditions.append(ItemORM.price >= min_price) + if max_price is not None: + conditions.append(ItemORM.price <= max_price) + + if conditions: + stmt = stmt.where(and_(*conditions)) + + stmt = stmt.offset(offset).limit(limit) + + result = await session.execute(stmt) + items = result.scalars().all() + + return [ItemOut.model_validate(i, from_attributes=True) for i in items] + + +@app.put("/item/{id}", response_model=ItemOut) +async def update_item( + id: int, + params: ItemUpdate, + session: AsyncSession = Depends(get_session), +): + obj = await session.get(ItemORM, id) + if not obj: + raise HTTPException(status_code=404, detail="Item not found") + + obj.name = params.name + obj.price = params.price + obj.deleted = params.deleted or False + + await session.commit() + await session.refresh(obj) + return ItemOut.model_validate(obj, from_attributes=True) + +@app.patch("/item/{id}", response_model=ItemOut) +async def patch_item( + id: int, + params: ItemPatch, + session: AsyncSession = Depends(get_session), +): + obj = await session.get(ItemORM, id) + if obj is None: + raise HTTPException(status_code=404, detail="Item not found") + + if obj.deleted: + return Response(status_code=304) + + data = params.model_dump(exclude_unset=True) + + if not data: + return ItemOut.model_validate(obj, from_attributes=True) + + for field, value in data.items(): + setattr(obj, field, value) + + await session.commit() + await session.refresh(obj) + + return ItemOut.model_validate(obj, from_attributes=True) + + +@app.delete("/item/{id}") +async def delete_item( + id: int, + session: AsyncSession = Depends(get_session), +): + res = await session.get(ItemORM, id) + if res is None: + return + + res.deleted = True + await session.commit() + return + + +@app.post("/cart", response_model=CartOut, status_code=HTTPStatus.CREATED) +async def create_cart( session: AsyncSession = Depends(get_session), ): + obj = CartORM() + + session.add(obj) + + await session.commit() + + await session.refresh(obj) + + return CartOut(id=obj.id, items=[], price=0.0) + + +@app.get("/cart/{id}", response_model=CartOut) +async def get_cart( + id: int, + session: AsyncSession = Depends(get_session), +): + res = await session.execute(select(CartORM).where(CartORM.id == id)) + cart = res.scalar_one_or_none() + if not cart: + raise HTTPException(status_code=404, detail="Cart not found") + + ci, it = ItemInCartORM, ItemORM + res = await session.execute( + select(ci.item_id, it.name, ci.quantity, it.deleted) + .join(it, it.id == ci.item_id) + .where(ci.cart_id == id) + ) + rows = res.all() + + items = [ + ItemInCartOut( + id=item_id, + name=name, + quantity=quantity, + available=not deleted_flag, + ) + for (item_id, name, quantity, deleted_flag) in rows + ] + + total_q = await session.execute( + select(func.coalesce(func.sum(ci.quantity * it.price), 0)) + .join(it, it.id == ci.item_id) + .where(ci.cart_id == id, it.deleted == False) + ) + total = float(total_q.scalar_one()) + + return CartOut(id=id, items=items, price=total) + + +@app.get("/cart", response_model=List[CartOut]) +async def get_carts_list( + offset: int = Query(0, ge=0), + limit: int = Query(10, gt=0), + min_price: Optional[float] = Query(None, ge=0), + max_price: Optional[float] = Query(None, ge=0), + min_quantity: Optional[int] = Query(None, ge=0), + max_quantity: Optional[int] = Query(None, ge=0), + session: AsyncSession = Depends(get_session), +): + c, ci, it = CartORM, ItemInCartORM, ItemORM + + agg = ( + select( + c.id.label("cart_id"), + func.coalesce(func.sum(ci.quantity), 0).label("qty"), + func.coalesce( + func.sum((ci.quantity * it.price).filter(it.deleted == False)), + 0 + ).label("price"), + ) + .select_from(c) + .join(ci, ci.cart_id == c.id, isouter=True) + .join(it, it.id == ci.item_id, isouter=True) + .group_by(c.id) + ) + + sub = agg.subquery() + + stmt = select(sub.c.cart_id, sub.c.qty, sub.c.price) + + if min_price is not None: + stmt = stmt.where(sub.c.price >= min_price) + if max_price is not None: + stmt = stmt.where(sub.c.price <= max_price) + if min_quantity is not None: + stmt = stmt.where(sub.c.qty >= min_quantity) + if max_quantity is not None: + stmt = stmt.where(sub.c.qty <= max_quantity) + + stmt = stmt.offset(offset).limit(limit) + + res = await session.execute(stmt) + rows = res.mappings().all() + + return [ + CartOut( + id=row["cart_id"], + items=[], + price=float(row["price"]), + ) + for row in rows + ] + +@app.post("/cart/{cart_id}/add/{item_id}") +async def add_item_to_cart( + cart_id: int, + item_id: int, + session: AsyncSession = Depends(get_session), +): + res = await session.execute( + select(ItemORM).where(ItemORM.id == item_id, ItemORM.deleted == False) + ) + obj = res.scalar_one_or_none() + if not obj: + raise HTTPException(status_code=404, detail="Item not found") + + res = await session.execute( + select(CartORM).where(CartORM.id == cart_id) + ) + obj = res.scalar_one_or_none() + if not obj: + raise HTTPException(status_code=404, detail="Cart not found") + + stmt = pg_insert(ItemInCartORM).values( + cart_id=cart_id, + item_id=item_id, + quantity=1, + ).on_conflict_do_update( + index_elements=[ItemInCartORM.cart_id, ItemInCartORM.item_id], + set_={"quantity": ItemInCartORM.quantity + 1}, + ) + await session.execute(stmt) + await session.commit() + + + + + + + + + + + + + + + + + + + + diff --git a/hw2/hw/shop_api/schemas.py b/hw2/hw/shop_api/schemas.py new file mode 100644 index 00000000..4e7e556c --- /dev/null +++ b/hw2/hw/shop_api/schemas.py @@ -0,0 +1,37 @@ +from pydantic import BaseModel, Field, ConfigDict +from typing import Optional, List + +class ItemBase(BaseModel): + name: str = Field(..., min_length=1) + price: float = Field(..., ge=0) + +class ItemCreate(ItemBase): + pass + +class ItemUpdate(BaseModel): + name: str = Field(..., min_length=1) + price: float = Field(..., ge=0) + deleted: Optional[bool] = False + +class ItemPatch(BaseModel): + name: Optional[str] = Field(None, min_length=1) + price: Optional[float] = Field(None, ge=0) + model_config = ConfigDict(extra='forbid') + +class ItemOut(ItemBase): + id: int + deleted: bool + + class Config: + from_attributes = True + +class ItemInCartOut(BaseModel): + id: int + name: str + quantity: int + available: bool + +class CartOut(BaseModel): + id: int + items: List[ItemInCartOut] + price: float diff --git a/hw2/hw/shop_api/transaction_tests/__init__.py b/hw2/hw/shop_api/transaction_tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/hw2/hw/shop_api/transaction_tests/run_tests.py b/hw2/hw/shop_api/transaction_tests/run_tests.py new file mode 100644 index 00000000..fade809b --- /dev/null +++ b/hw2/hw/shop_api/transaction_tests/run_tests.py @@ -0,0 +1,22 @@ +import asyncio + +async def main(): + print("\n=== DIRTY READ: READ UNCOMMITTED vs READ COMMITTED (PostgreSQL: одинаково) ===") + from .test_dirty_read import run as run_dirty + await run_dirty() + + print("\n=== NON-REPEATABLE READ (READ COMMITTED) ===") + from .test_non_repeatable_read import run as run_nonrep + await run_nonrep() + + print("\n=== REPEATABLE READ (нет non-repeatable) ===") + from .test_repeatable_read import run as run_rr + await run_rr() + + print("\n=== PHANTOM READ (READ COMMITTED vs REPEATABLE READ: PostgreSQL: нет phantom в REPEATABLE READ) ===") + from .test_phantom_read import run as run_phantom + await run_phantom() + +if __name__ == "__main__": + asyncio.run(main()) + diff --git a/hw2/hw/shop_api/transaction_tests/test_dirty_read.py b/hw2/hw/shop_api/transaction_tests/test_dirty_read.py new file mode 100644 index 00000000..28baf6df --- /dev/null +++ b/hw2/hw/shop_api/transaction_tests/test_dirty_read.py @@ -0,0 +1,33 @@ +import asyncio +from sqlalchemy import text +from .utils import engine, setup, SELECT_ONE, UPDATE_TO_PENCIL + +async def _writer_hold_then_rollback(): + # T1: UPDATE без коммита, затем ROLLBACK + async with engine.connect() as conn: + await conn.execute(text("BEGIN")) + await conn.execute(text(UPDATE_TO_PENCIL)) + print("T1: UPDATE name='pencil' (не закоммичено)") + await asyncio.sleep(2) + await conn.execute(text("ROLLBACK")) + print("T1: ROLLBACK") + +async def _reader_select(level: str, tag: str): + # T2: чтение под заданным уровнем + async with engine.connect() as conn: + await conn.execute(text(f"BEGIN ISOLATION LEVEL {level}")) + rows = (await conn.execute(text(SELECT_ONE))).fetchall() + print(f"{tag}: SELECT * -> {rows}") + await conn.execute(text("COMMIT")) + +async def run(): + await setup() + t_writer = asyncio.create_task(_writer_hold_then_rollback()) + await asyncio.sleep(1) # дать писателю обновить и удерживать + await _reader_select("READ UNCOMMITTED", "T2 (RU)") # в PG это будет как RC + await _reader_select("READ COMMITTED", "T2 (RC)") + await t_writer + # финальная проверка + async with engine.connect() as conn: + rows = (await conn.execute(text(SELECT_ONE))).fetchall() + print(f"FINAL: SELECT * -> {rows}") diff --git a/hw2/hw/shop_api/transaction_tests/test_non_repeatable_read.py b/hw2/hw/shop_api/transaction_tests/test_non_repeatable_read.py new file mode 100644 index 00000000..9f16298b --- /dev/null +++ b/hw2/hw/shop_api/transaction_tests/test_non_repeatable_read.py @@ -0,0 +1,34 @@ +import asyncio +from sqlalchemy import text +from .utils import engine, setup, SELECT_ONE, UPDATE_TO_PENCIL + +async def _reader_rc_two_selects(): + # T1: в одной транзакции RC дважды читает одну строку + async with engine.connect() as conn: + await conn.execute(text("BEGIN ISOLATION LEVEL READ COMMITTED")) + rows1 = (await conn.execute(text(SELECT_ONE))).fetchall() + print(f"T1: SELECT #1 -> {rows1}") + await asyncio.sleep(1) # ждём, пока писатель закоммитит изменение + rows2 = (await conn.execute(text(SELECT_ONE))).fetchall() + print(f"T1: SELECT #2 -> {rows2}") + await conn.execute(text("COMMIT")) + +async def _writer_commit_update(): + # T2: обновляет и коммитит между двумя SELECT T1 + async with engine.connect() as conn: + await asyncio.sleep(0.5) # чтобы SELECT #1 уже выполнился + await conn.execute(text("BEGIN")) + await conn.execute(text(UPDATE_TO_PENCIL)) + await conn.execute(text("COMMIT")) + print("T2: UPDATE name='pencil' и COMMIT") + +async def run(): + await setup() + await asyncio.gather( + _reader_rc_two_selects(), + _writer_commit_update(), + ) + # итог + async with engine.connect() as conn: + rows = (await conn.execute(text(SELECT_ONE))).fetchall() + print(f"FINAL: SELECT * -> {rows}") diff --git a/hw2/hw/shop_api/transaction_tests/test_phantom_read.py b/hw2/hw/shop_api/transaction_tests/test_phantom_read.py new file mode 100644 index 00000000..7c1664e7 --- /dev/null +++ b/hw2/hw/shop_api/transaction_tests/test_phantom_read.py @@ -0,0 +1,40 @@ +import asyncio +from sqlalchemy import text +from .utils import engine, setup + +SELECT_SQL = "SELECT id, name FROM test_items WHERE name LIKE 'p%' ORDER BY id;" +INSERT_SQL = "INSERT INTO test_items (id, name) VALUES (2, 'paper');" + +async def _reader(level: str): + async with engine.connect() as conn: + await conn.execute(text(f"BEGIN ISOLATION LEVEL {level}")) + rows1 = (await conn.execute(text(SELECT_SQL))).fetchall() + print(f"T1[{level}] SELECT #1 -> {rows1}") + await asyncio.sleep(2) # ждём, пока T2 вставит строку + rows2 = (await conn.execute(text(SELECT_SQL))).fetchall() + print(f"T1[{level}] SELECT #2 -> {rows2}") + await conn.execute(text("COMMIT")) + +async def _writer(): + async with engine.connect() as conn: + await asyncio.sleep(1) + await conn.execute(text("BEGIN")) + await conn.execute(text(INSERT_SQL)) + await conn.execute(text("COMMIT")) + print("T2: INSERT (2, 'paper') + COMMIT") + +async def run(): + # READ COMMITTED → фантом возможен + await setup() + print("\n--- PHANTOM READ on READ COMMITTED ---") + await asyncio.gather(_reader("READ COMMITTED"), _writer()) + + # REPEATABLE READ → фантома нет + await setup() + print("\n--- NO PHANTOM on REPEATABLE READ ---") + await asyncio.gather(_reader("REPEATABLE READ"), _writer()) + + # финальная проверка (вне транзакций) + async with engine.connect() as conn: + final = (await conn.execute(text(SELECT_SQL))).fetchall() + print(f"FINAL outside: {final}") diff --git a/hw2/hw/shop_api/transaction_tests/test_repeatable_read.py b/hw2/hw/shop_api/transaction_tests/test_repeatable_read.py new file mode 100644 index 00000000..2412d757 --- /dev/null +++ b/hw2/hw/shop_api/transaction_tests/test_repeatable_read.py @@ -0,0 +1,40 @@ +import asyncio +from sqlalchemy import text +from .utils import engine, setup, SELECT_ONE, UPDATE_TO_PENCIL + + +async def _rr_reader_same_row(): + async with engine.connect() as conn: + await conn.execute(text("BEGIN ISOLATION LEVEL REPEATABLE READ")) + rows1 = (await conn.execute(text(SELECT_ONE))).fetchall() + print(f"T1[RR] SELECT #1 (by id): {rows1}") + await asyncio.sleep(2) # время на UPDATE другой транзакции + rows2 = (await conn.execute(text(SELECT_ONE))).fetchall() + print(f"T1[RR] SELECT #2 (by id): {rows2}") + await conn.execute(text("COMMIT")) + + +async def _writer_update(): + async with engine.connect() as conn: + await asyncio.sleep(1) + await conn.execute(text("BEGIN")) + await conn.execute(text(UPDATE_TO_PENCIL)) # pen -> pencil + await conn.execute(text("COMMIT")) + + +async def test_rr_no_non_repeatable_read(): + await setup() # [(1,'pen')] + await asyncio.gather( + _rr_reader_same_row(), + _writer_update(), + ) + # Снаружи транзакции T1 изменение уже видно: + async with engine.connect() as conn: + final = (await conn.execute(text(SELECT_ONE))).fetchall() + print(f"FINAL outside (by id): {final}") + + +async def run(): + print("\n--- REPEATABLE READ: no non-repeatable read ---") + await test_rr_no_non_repeatable_read() + diff --git a/hw2/hw/shop_api/transaction_tests/utils.py b/hw2/hw/shop_api/transaction_tests/utils.py new file mode 100644 index 00000000..1f860d8b --- /dev/null +++ b/hw2/hw/shop_api/transaction_tests/utils.py @@ -0,0 +1,30 @@ +import os +from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine +from sqlalchemy import text + +DATABASE_URL = os.getenv( + "DATABASE_URL", + "postgresql+asyncpg://user:pass@db:5432/shopdb" +) + +engine: AsyncEngine = create_async_engine(DATABASE_URL, echo=False) + +DDL = """ +CREATE TABLE IF NOT EXISTS test_items ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL +); +""" + + +SELECT_ONE = "SELECT * FROM test_items WHERE id = 1;" +UPDATE_TO_PENCIL = "UPDATE test_items SET name = 'pencil' WHERE id = 1;" + +async def setup(): + async with engine.begin() as conn: + await conn.execute(text(DDL)) + await conn.execute(text("TRUNCATE test_items")) + await conn.execute( + text("INSERT INTO test_items (id, name) VALUES (:id, 'pen')"), + {"id": 1}, + ) diff --git a/hw2/hw/test_output_1.png b/hw2/hw/test_output_1.png new file mode 100644 index 00000000..178406e9 Binary files /dev/null and b/hw2/hw/test_output_1.png differ diff --git a/hw2/hw/test_output_2.png b/hw2/hw/test_output_2.png new file mode 100644 index 00000000..ed830269 Binary files /dev/null and b/hw2/hw/test_output_2.png differ