diff --git a/.github/workflows/hw2-tests.yml b/.github/workflows/hw2-tests.yml index be7fc297..accd5c3e 100644 --- a/.github/workflows/hw2-tests.yml +++ b/.github/workflows/hw2-tests.yml @@ -16,6 +16,21 @@ jobs: matrix: python-version: ["3.12", "3.13"] + services: + postgres: + image: postgres:16-alpine + env: + POSTGRES_USER: shop_user + POSTGRES_PASSWORD: shop_password + POSTGRES_DB: shop_db + ports: + - 5432:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + steps: - name: Checkout code uses: actions/checkout@v4 @@ -31,9 +46,15 @@ jobs: python -m pip install --upgrade pip pip install -r requirements.txt - - name: Run tests + - name: Run tests with coverage working-directory: hw2/hw env: PYTHONPATH: ${{ github.workspace }}/hw2/hw + DATABASE_URL: postgresql+asyncpg://shop_user:shop_password@localhost:5432/shop_db + run: | + pytest --cov=shop_api --cov-report=term --cov-report=xml test_homework2.py -v + + - name: Check coverage threshold + working-directory: hw2/hw run: | - pytest test_homework2.py -v + coverage report --fail-under=95 diff --git a/hw1/app.py b/hw1/app.py index 6107b870..4a779bb4 100644 --- a/hw1/app.py +++ b/hw1/app.py @@ -1,4 +1,170 @@ from typing import Any, Awaitable, Callable +import json +import math +from urllib.parse import parse_qs + +async def json_response(send, status: int, payload: dict[str, Any]) -> None: + await send( + { + "type": "http.response.start", + "status": status, + "headers": [(b"content-type", b"application/json")], + } + ) + await send( + { + "type": "http.response.body", + "body": json.dumps(payload).encode(), + } + ) + +async def read_body(receive) -> bytes: + """Считывает всё тело (даже если придёт несколькими чанками).""" + chunks: list[bytes] = [] + more = True + while more: + msg = await receive() + if msg["type"] != "http.request": + continue + chunks.append(msg.get("body", b"")) + more = msg.get("more_body", False) + return b"".join(chunks) + +def as_query(scope: dict[str, Any]) -> dict[str, list[str]]: + return parse_qs(scope.get("query_string", b"").decode()) + + +class RouteMatch: + __slots__ = ("handler", "params") + def __init__(self, handler, params: dict[str, str] | None = None): + self.handler = handler + self.params = params or {} + +class Router: + + def __init__(self): + self._static: dict[tuple[str, str], Callable] = {} + self._dynamic: list[tuple[str, Callable]] = [] + + def add(self, method: str, path: str, handler: Callable) -> None: + if path.endswith("/{n}") and path.startswith("/fibonacci"): + self._dynamic.append(("/fibonacci/", handler)) + else: + self._static[(method.upper(), path)] = handler + + def match(self, method: str, path: str) -> RouteMatch | None: + key = (method.upper(), path) + if key in self._static: + return RouteMatch(self._static[key]) + + for prefix, handler in self._dynamic: + if path.startswith(prefix): + param = path[len(prefix):] + if param != "": + return RouteMatch(handler, {"n": param}) + return None + +router = Router() + +async def view_fibonacci( + scope: dict[str, Any], + receive: Callable[[], Awaitable[dict[str, Any]]], + send: Callable[[dict[str, Any]], Awaitable[None]], + *, + n: str, +) -> None: + if scope["method"] != "GET": + return await json_response(send, 422, {"error": "Unsupported method"}) + + try: + ni = int(n) + except ValueError: + return await json_response(send, 422, {"error": "Invalid n"}) + if ni < 0: + return await json_response(send, 400, {"error": "n must be non-negative"}) + + a, b = 0, 1 + for _ in range(ni): + a, b = b, a + b + return await json_response(send, 200, {"result": a}) + +async def view_factorial( + scope: dict[str, Any], + receive: Callable[[], Awaitable[dict[str, Any]]], + send: Callable[[dict[str, Any]], Awaitable[None]], +) -> None: + if scope["method"] != "GET": + return await json_response(send, 422, {"error": "Unsupported method"}) + + query = as_query(scope) + raw = query.get("n") + if not raw or raw[0] == "": + return await json_response(send, 422, {"error": "Invalid n"}) + try: + n = int(raw[0]) + except ValueError: + return await json_response(send, 422, {"error": "Invalid n"}) + if n < 0: + return await json_response(send, 400, {"error": "n must be non-negative"}) + + return await json_response(send, 200, {"result": math.factorial(n)}) + +async def view_mean( + scope: dict[str, Any], + receive: Callable[[], Awaitable[dict[str, Any]]], + send: Callable[[dict[str, Any]], Awaitable[None]], +) -> None: + if scope["method"] != "GET": + return await json_response(send, 422, {"error": "Unsupported method"}) + + body = await read_body(receive) + if body: + try: + data = json.loads(body.decode() or "null") + except json.JSONDecodeError: + return await json_response(send, 422, {"error": "Invalid JSON body"}) + if not isinstance(data, list): + return await json_response(send, 422, {"error": "Invalid JSON body"}) + if len(data) == 0: + return await json_response(send, 400, {"error": "numbers must be non-empty list"}) + nums: list[float] = [] + for v in data: + if isinstance(v, (int, float)): + nums.append(float(v)) + else: + return await json_response(send, 422, {"error": "All items must be numbers"}) + return await json_response(send, 200, {"result": sum(nums) / len(nums)}) + + query = as_query(scope) + param = (query.get("numbers") or [None])[0] + if param is None: + return await json_response(send, 422, {"error": "Invalid JSON body"}) + parts = [p.strip() for p in param.split(",") if p.strip()] + if not parts: + return await json_response(send, 400, {"error": "numbers must be non-empty list"}) + try: + nums = [float(p) for p in parts] + except ValueError: + return await json_response(send, 422, {"error": "All items must be numbers"}) + return await json_response(send, 200, {"result": sum(nums) / len(nums)}) + + +router.add("GET", "/factorial", view_factorial) +router.add("GET", "/mean", view_mean) +router.add("GET", "/fibonacci/{n}", view_fibonacci) + +async def lifespan_app( + receive: Callable[[], Awaitable[dict[str, Any]]], + send: Callable[[dict[str, Any]], Awaitable[None]], +) -> None: + while True: + msg = await receive() + t = msg.get("type") + if t == "lifespan.startup": + await send({"type": "lifespan.startup.complete"}) + elif t == "lifespan.shutdown": + await send({"type": "lifespan.shutdown.complete"}) + return async def application( @@ -12,7 +178,24 @@ async def application( receive: Корутина для получения сообщений от клиента send: Корутина для отправки сообщений клиенту """ - # TODO: Ваша реализация здесь + stype = scope.get("type") + if stype == "lifespan": + return await lifespan_app(receive, send) + + if stype != "http": + return + + method = scope["method"] + path = scope["path"] + + match = router.match(method, path) + if match is None: + return await json_response(send, 404, {"error": "Not found"}) + + if match.params: + return await match.handler(scope, receive, send, **match.params) + else: + return await match.handler(scope, receive, send) if __name__ == "__main__": import uvicorn diff --git a/hw2/hw/.coveragerc b/hw2/hw/.coveragerc new file mode 100644 index 00000000..9e0decf1 --- /dev/null +++ b/hw2/hw/.coveragerc @@ -0,0 +1,19 @@ +[run] +source = shop_api +concurrency = thread,greenlet +omit = + */tests/* + */test_*.py + */.venv/* + */conftest.py + +[report] +precision = 2 +exclude_lines = + pragma: no cover + def __repr__ + raise AssertionError + raise NotImplementedError + if __name__ == .__main__.: + pass + diff --git a/hw2/hw/Dockerfile b/hw2/hw/Dockerfile new file mode 100644 index 00000000..0d69c828 --- /dev/null +++ b/hw2/hw/Dockerfile @@ -0,0 +1,23 @@ +FROM python:3.13 AS base + +ARG PYTHONFAULTHANDLER=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONHASHSEED=random \ + PIP_NO_CACHE_DIR=on \ + PIP_DISABLE_PIP_VERSION_CHECK=on \ + PIP_DEFAULT_TIMEOUT=500 + +RUN apt-get update && apt-get install -y gcc +RUN python -m pip install --upgrade pip + +WORKDIR /app/src +COPY . ./ + +ENV VIRTUAL_ENV=/app/src/.venv \ + PATH=/app/src/.venv/bin:$PATH + +RUN pip install -r requirements.txt + +FROM base as local + +CMD ["uvicorn", "shop_api.main:app", "--port", "8080", "--host", "0.0.0.0"] \ No newline at end of file diff --git a/hw2/hw/conftest.py b/hw2/hw/conftest.py new file mode 100644 index 00000000..b520b300 --- /dev/null +++ b/hw2/hw/conftest.py @@ -0,0 +1,29 @@ +import pytest +import asyncio + + +@pytest.fixture(scope="session", autouse=True) +def setup_database(): + from shop_api.database import engine, Base + + async def init_db(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await conn.run_sync(Base.metadata.create_all) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(init_db()) + loop.close() + + yield + + async def cleanup_db(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(cleanup_db()) + loop.close() + diff --git a/hw2/hw/docker-compose.yml b/hw2/hw/docker-compose.yml new file mode 100644 index 00000000..70af3e95 --- /dev/null +++ b/hw2/hw/docker-compose.yml @@ -0,0 +1,69 @@ +services: + + postgres: + image: postgres:16-alpine + restart: always + environment: + POSTGRES_USER: shop_user + POSTGRES_PASSWORD: shop_password + POSTGRES_DB: shop_db + ports: + - 5432:5432 + networks: + - monitoring + volumes: + - postgres_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U shop_user -d shop_db"] + interval: 5s + timeout: 5s + retries: 5 + + shop-api: + build: + context: . + dockerfile: ./Dockerfile + target: local + restart: always + ports: + - 8080:8080 + networks: + - monitoring + environment: + DATABASE_URL: postgresql+asyncpg://shop_user:shop_password@postgres:5432/shop_db + depends_on: + postgres: + condition: service_healthy + + grafana: + image: grafana/grafana:latest + ports: + - 3000:3000 + restart: always + networks: + - monitoring + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin + - GF_SECURITY_ADMIN_USER=admin + + prometheus: + image: prom/prometheus + volumes: + - ./settings/prometheus/:/etc/prometheus/ + command: + - "--config.file=/etc/prometheus/prometheus.yml" + - "--storage.tsdb.path=/prometheus" + - "--web.console.libraries=/usr/share/prometheus/console_libraries" + - "--web.console.templates=/usr/share/prometheus/consoles" + ports: + - 9090:9090 + restart: always + networks: + - monitoring + +networks: + monitoring: + driver: bridge + +volumes: + postgres_data: \ No newline at end of file diff --git a/hw2/hw/requirements.txt b/hw2/hw/requirements.txt index 207dcf5c..8ff17d54 100644 --- a/hw2/hw/requirements.txt +++ b/hw2/hw/requirements.txt @@ -2,8 +2,16 @@ fastapi>=0.117.1 uvicorn>=0.24.0 +# Работа с БД +sqlalchemy>=2.0.0 +asyncpg>=0.29.0 + # Зависимости для тестирования pytest>=7.4.0 pytest-asyncio>=0.21.0 httpx>=0.27.2 +pytest-cov>=4.0.0 Faker>=37.8.0 + +# Мониторинг +prometheus-fastapi-instrumentator>=6.1.0 diff --git a/hw2/hw/settings/prometheus/prometheus.yml b/hw2/hw/settings/prometheus/prometheus.yml new file mode 100644 index 00000000..ee5c3c81 --- /dev/null +++ b/hw2/hw/settings/prometheus/prometheus.yml @@ -0,0 +1,10 @@ +global: + scrape_interval: 10s + evaluation_interval: 10s + +scrape_configs: + - job_name: shop-api + metrics_path: /metrics + static_configs: + - targets: + - shop-api:8080 \ No newline at end of file diff --git a/hw2/hw/shop_api/database.py b/hw2/hw/shop_api/database.py new file mode 100644 index 00000000..1f459ec8 --- /dev/null +++ b/hw2/hw/shop_api/database.py @@ -0,0 +1,28 @@ +import os +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from sqlalchemy.orm import DeclarativeBase +from sqlalchemy.pool import NullPool + +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://shop_user:shop_password@localhost:5432/shop_db") + +engine = create_async_engine( + DATABASE_URL, + echo=False, + poolclass=NullPool +) +async_session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + +class Base(DeclarativeBase): + pass + + +async def get_session() -> AsyncSession: + async with async_session_maker() as session: + yield session + + +async def init_db(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + diff --git a/hw2/hw/shop_api/main.py b/hw2/hw/shop_api/main.py index f60a8c60..6eba731d 100644 --- a/hw2/hw/shop_api/main.py +++ b/hw2/hw/shop_api/main.py @@ -1,3 +1,314 @@ -from fastapi import FastAPI +from fastapi import FastAPI, HTTPException, Query, status, Response, WebSocket, WebSocketDisconnect, Depends +from pydantic import BaseModel, Field, ConfigDict, computed_field +from typing import List, Optional, Dict +from sqlalchemy import select, and_ +from sqlalchemy.ext.asyncio import AsyncSession -app = FastAPI(title="Shop API") +import uuid +from collections import defaultdict +from contextlib import asynccontextmanager + +from prometheus_fastapi_instrumentator import Instrumentator + +from .database import get_session, init_db +from .models import ItemModel, CartModel, CartItemModel + + +@asynccontextmanager +async def lifespan(app: FastAPI): + await init_db() + yield + + +app = FastAPI(title="Shop API", lifespan=lifespan) + +Instrumentator().instrument(app).expose(app) + + +class Item(BaseModel): + id: int + name: str = Field(..., min_length=1) + price: float = Field(..., gt=0) + deleted: bool = False + +class ItemCreate(BaseModel): + name: str = Field(..., min_length=1) + price: float = Field(..., gt=0) + +class ItemUpdate(BaseModel): + model_config = ConfigDict(extra="forbid") + name: Optional[str] = Field(None, min_length=1) + price: Optional[float] = Field(None, gt=0) + +class CartItem(BaseModel): + id: int + name: str + quantity: int = Field(..., ge=0) + available: bool = True + +class Cart(BaseModel): + id: int + items: List[CartItem] = [] + price: float = 0.0 + + +@app.post("/item", response_model=Item, status_code=status.HTTP_201_CREATED) +async def create_item(item: ItemCreate, session: AsyncSession = Depends(get_session)): + new_item = ItemModel(name=item.name, price=item.price, deleted=False) + session.add(new_item) + await session.commit() + await session.refresh(new_item) + return Item(id=new_item.id, name=new_item.name, price=new_item.price, deleted=new_item.deleted) + +@app.get("/item/{item_id}", response_model=Item) +async def get_item(item_id: int, session: AsyncSession = Depends(get_session)): + result = await session.execute( + select(ItemModel).where(and_(ItemModel.id == item_id, ItemModel.deleted == False)) + ) + item = result.scalar_one_or_none() + if item is None: + raise HTTPException(status_code=404, detail="Item not found") + return Item(id=item.id, name=item.name, price=item.price, deleted=item.deleted) + +@app.get("/item", response_model=List[Item]) +async def list_items( + offset: int = Query(0, ge=0), + limit: int = Query(10, gt=0), + min_price: Optional[float] = Query(None, ge=0), + max_price: Optional[float] = Query(None, ge=0), + show_deleted: bool = Query(False), + session: AsyncSession = Depends(get_session) +): + query = select(ItemModel) + + if not show_deleted: + query = query.where(ItemModel.deleted == False) + + if min_price is not None: + query = query.where(ItemModel.price >= min_price) + + if max_price is not None: + query = query.where(ItemModel.price <= max_price) + + query = query.offset(offset).limit(limit) + + result = await session.execute(query) + items = result.scalars().all() + + return [Item(id=item.id, name=item.name, price=item.price, deleted=item.deleted) for item in items] + +@app.put("/item/{item_id}", response_model=Item) +async def replace_item(item_id: int, new_item: ItemCreate, session: AsyncSession = Depends(get_session)): + result = await session.execute(select(ItemModel).where(ItemModel.id == item_id)) + item = result.scalar_one_or_none() + + if item is None: + raise HTTPException(status_code=404, detail="Item not found") + + item.name = new_item.name + item.price = new_item.price + await session.commit() + await session.refresh(item) + + return Item(id=item.id, name=item.name, price=item.price, deleted=item.deleted) + +@app.patch("/item/{item_id}", response_model=Item) +async def patch_item(item_id: int, upd: ItemUpdate, session: AsyncSession = Depends(get_session)): + result = await session.execute(select(ItemModel).where(ItemModel.id == item_id)) + item = result.scalar_one_or_none() + + if item is None: + raise HTTPException(status_code=404, detail="Item not found") + + if item.deleted: + raise HTTPException(status_code=304, detail="Item not modified") + + update_data = upd.model_dump(exclude_unset=True) + + for key, value in update_data.items(): + setattr(item, key, value) + + await session.commit() + await session.refresh(item) + + return Item(id=item.id, name=item.name, price=item.price, deleted=item.deleted) + +@app.delete("/item/{item_id}", status_code=status.HTTP_200_OK) +async def delete_item(item_id: int, session: AsyncSession = Depends(get_session)): + result = await session.execute(select(ItemModel).where(ItemModel.id == item_id)) + item = result.scalar_one_or_none() + + if item is not None: + item.deleted = True + await session.commit() + + return Response(status_code=status.HTTP_200_OK) + + +@app.post("/cart", response_model=Cart, status_code=status.HTTP_201_CREATED) +async def create_cart(response: Response, session: AsyncSession = Depends(get_session)): + new_cart = CartModel() + session.add(new_cart) + await session.commit() + await session.refresh(new_cart) + + response.headers["Location"] = f"/cart/{new_cart.id}" + return Cart(id=new_cart.id, items=[], price=0.0) + +@app.get("/cart/{cart_id}", response_model=Cart) +async def get_cart(cart_id: int, session: AsyncSession = Depends(get_session)): + result = await session.execute( + select(CartModel).where(CartModel.id == cart_id) + ) + cart = result.scalar_one_or_none() + + if cart is None: + raise HTTPException(status_code=404, detail="Cart not found") + + # Загружаем cart_items + cart_items_result = await session.execute( + select(CartItemModel).where(CartItemModel.cart_id == cart_id) + ) + cart_items = cart_items_result.scalars().all() + + items = [] + total_price = 0.0 + + for cart_item in cart_items: + # Загружаем информацию о товаре + item_result = await session.execute( + select(ItemModel).where(ItemModel.id == cart_item.item_id) + ) + item = item_result.scalar_one_or_none() + + if item: + items.append(CartItem( + id=item.id, + name=item.name, + quantity=cart_item.quantity, + available=not item.deleted + )) + total_price += item.price * cart_item.quantity + + return Cart(id=cart.id, items=items, price=total_price) + +@app.get("/cart", response_model=List[Cart]) +async def list_carts( + offset: int = Query(0, ge=0), + limit: int = Query(10, gt=0), + min_price: Optional[float] = Query(None, ge=0), + max_price: Optional[float] = Query(None, ge=0), + min_quantity: Optional[int] = Query(None, ge=0), + max_quantity: Optional[int] = Query(None, ge=0), + session: AsyncSession = Depends(get_session) +): + # Получаем все корзины + result = await session.execute(select(CartModel)) + all_carts = result.scalars().all() + + carts = [] + + for cart in all_carts: + # Загружаем cart_items для каждой корзины + cart_items_result = await session.execute( + select(CartItemModel).where(CartItemModel.cart_id == cart.id) + ) + cart_items = cart_items_result.scalars().all() + + items = [] + total_price = 0.0 + total_quantity = 0 + + for cart_item in cart_items: + # Загружаем информацию о товаре + item_result = await session.execute( + select(ItemModel).where(ItemModel.id == cart_item.item_id) + ) + item = item_result.scalar_one_or_none() + + if item: + items.append(CartItem( + id=item.id, + name=item.name, + quantity=cart_item.quantity, + available=not item.deleted + )) + total_price += item.price * cart_item.quantity + total_quantity += cart_item.quantity + + # Фильтрация + if min_price is not None and total_price < min_price: + continue + if max_price is not None and total_price > max_price: + continue + if min_quantity is not None and total_quantity < min_quantity: + continue + if max_quantity is not None and total_quantity > max_quantity: + continue + + carts.append(Cart(id=cart.id, items=items, price=total_price)) + + # Применяем offset и limit + return carts[offset : offset + limit] + +@app.post("/cart/{cart_id}/add/{item_id}", response_model=Cart, status_code=status.HTTP_200_OK) +async def add_to_cart(cart_id: int, item_id: int, session: AsyncSession = Depends(get_session)): + # Проверяем существование корзины + cart_result = await session.execute(select(CartModel).where(CartModel.id == cart_id)) + cart = cart_result.scalar_one_or_none() + + if cart is None: + raise HTTPException(status_code=404, detail="Cart not found") + + # Проверяем существование товара + item_result = await session.execute(select(ItemModel).where(ItemModel.id == item_id)) + item = item_result.scalar_one_or_none() + + if item is None: + raise HTTPException(status_code=404, detail="Item not found") + + # Проверяем, есть ли уже такой товар в корзине + cart_item_result = await session.execute( + select(CartItemModel).where( + and_(CartItemModel.cart_id == cart_id, CartItemModel.item_id == item_id) + ) + ) + cart_item = cart_item_result.scalar_one_or_none() + + if cart_item: + cart_item.quantity += 1 + else: + new_cart_item = CartItemModel(cart_id=cart_id, item_id=item_id, quantity=1) + session.add(new_cart_item) + + await session.commit() + return await get_cart(cart_id, session) + + +rooms: Dict[str, Dict[WebSocket, str]] = defaultdict(dict) + +@app.websocket("/chat/{chat_name}") +async def chat_ws(websocket: WebSocket, chat_name: str): + await websocket.accept() + username = f"user-{uuid.uuid4().hex[:8]}" + rooms[chat_name][websocket] = username + + try: + while True: + message = await websocket.receive_text() + formatted = f"{username} :: {message}" + + for ws, _uname in list(rooms[chat_name].items()): + if ws is websocket: + continue + try: + await ws.send_text(formatted) + except Exception: + rooms[chat_name].pop(ws, None) + + except WebSocketDisconnect: + pass + finally: + rooms[chat_name].pop(websocket, None) + if not rooms[chat_name]: + rooms.pop(chat_name, None) diff --git a/hw2/hw/shop_api/models.py b/hw2/hw/shop_api/models.py new file mode 100644 index 00000000..82ecf58c --- /dev/null +++ b/hw2/hw/shop_api/models.py @@ -0,0 +1,36 @@ +from sqlalchemy import Integer, String, Float, Boolean, ForeignKey +from sqlalchemy.orm import Mapped, mapped_column, relationship +from typing import List + +from .database import Base + + +class ItemModel(Base): + __tablename__ = "items" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + name: Mapped[str] = mapped_column(String, nullable=False) + price: Mapped[float] = mapped_column(Float, nullable=False) + deleted: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) + + +class CartModel(Base): + __tablename__ = "carts" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + + cart_items: Mapped[List["CartItemModel"]] = relationship("CartItemModel", back_populates="cart", cascade="all, delete-orphan") + + +class CartItemModel(Base): + __tablename__ = "cart_items" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + cart_id: Mapped[int] = mapped_column(Integer, ForeignKey("carts.id", ondelete="CASCADE"), nullable=False) + item_id: Mapped[int] = mapped_column(Integer, ForeignKey("items.id", ondelete="CASCADE"), nullable=False) + quantity: Mapped[int] = mapped_column(Integer, default=1, nullable=False) + + cart: Mapped["CartModel"] = relationship("CartModel", back_populates="cart_items") + item: Mapped["ItemModel"] = relationship("ItemModel") + + diff --git a/hw2/hw/test_homework2.py b/hw2/hw/test_homework2.py index 60a1f36a..0bae4478 100644 --- a/hw2/hw/test_homework2.py +++ b/hw2/hw/test_homework2.py @@ -282,3 +282,172 @@ def test_delete_item(existing_item: dict[str, Any]) -> None: response = client.delete(f"/item/{item_id}") assert response.status_code == HTTPStatus.OK + + +def test_add_to_cart_new_item( + existing_empty_cart_id: int, + existing_items: list[int], +) -> None: + cart_id = existing_empty_cart_id + item_id = existing_items[0] + + response = client.post(f"/cart/{cart_id}/add/{item_id}") + + assert response.status_code == HTTPStatus.OK + cart_data = response.json() + assert cart_data["id"] == cart_id + assert len(cart_data["items"]) == 1 + assert cart_data["items"][0]["id"] == item_id + assert cart_data["items"][0]["quantity"] == 1 + + +def test_add_to_cart_existing_item( + existing_empty_cart_id: int, + existing_items: list[int], +) -> None: + cart_id = existing_empty_cart_id + item_id = existing_items[0] + + client.post(f"/cart/{cart_id}/add/{item_id}") + response = client.post(f"/cart/{cart_id}/add/{item_id}") + + assert response.status_code == HTTPStatus.OK + cart_data = response.json() + assert len(cart_data["items"]) == 1 + assert cart_data["items"][0]["quantity"] == 2 + + +def test_add_to_cart_not_found_cart() -> None: + response = client.post("/cart/99999/add/1") + assert response.status_code == HTTPStatus.NOT_FOUND + + +def test_add_to_cart_not_found_item(existing_empty_cart_id: int) -> None: + response = client.post(f"/cart/{existing_empty_cart_id}/add/99999") + assert response.status_code == HTTPStatus.NOT_FOUND + + +def test_get_item_not_found() -> None: + response = client.get("/item/99999") + assert response.status_code == HTTPStatus.NOT_FOUND + + +def test_put_item_not_found() -> None: + response = client.put("/item/99999", json={"name": "test", "price": 10.0}) + assert response.status_code == HTTPStatus.NOT_FOUND + + +def test_patch_item_not_found() -> None: + response = client.patch("/item/99999", json={"price": 10.0}) + assert response.status_code == HTTPStatus.NOT_FOUND + + +def test_get_cart_not_found() -> None: + response = client.get("/cart/99999") + assert response.status_code == HTTPStatus.NOT_FOUND + + +def test_list_carts_with_filters( + existing_not_empty_carts: list[int], +) -> None: + response = client.get("/cart", params={"min_quantity": 5}) + assert response.status_code == HTTPStatus.OK + carts = response.json() + for cart in carts: + total_quantity = sum(item["quantity"] for item in cart["items"]) + assert total_quantity >= 5 + + response = client.get("/cart", params={"max_quantity": 3}) + assert response.status_code == HTTPStatus.OK + carts = response.json() + for cart in carts: + total_quantity = sum(item["quantity"] for item in cart["items"]) + assert total_quantity <= 3 + + +def test_list_items_with_deleted(existing_item: dict[str, Any]) -> None: + item_id = existing_item["id"] + + client.delete(f"/item/{item_id}") + + response = client.get("/item", params={"limit": 100}) + assert response.status_code == HTTPStatus.OK + items = response.json() + item_ids = [item["id"] for item in items] + assert item_id not in item_ids + + response = client.get("/item", params={"show_deleted": True, "limit": 100}) + assert response.status_code == HTTPStatus.OK + items = response.json() + item_ids = [item["id"] for item in items] + assert item_id in item_ids + + +def test_cart_price_calculation( + existing_empty_cart_id: int, + existing_items: list[int], +) -> None: + cart_id = existing_empty_cart_id + + for item_id in existing_items[:3]: + client.post(f"/cart/{cart_id}/add/{item_id}") + + response = client.get(f"/cart/{cart_id}") + assert response.status_code == HTTPStatus.OK + cart = response.json() + + calculated_price = 0.0 + for item in cart["items"]: + item_response = client.get(f"/item/{item['id']}") + item_data = item_response.json() + calculated_price += item_data["price"] * item["quantity"] + + assert cart["price"] == pytest.approx(calculated_price, 1e-8) + + +def test_create_item_validation() -> None: + response = client.post("/item", json={"name": "test", "price": -10.0}) + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY + + response = client.post("/item", json={"name": "test", "price": 0.0}) + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY + + response = client.post("/item", json={"name": "", "price": 10.0}) + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY + + +def test_list_items_offset_limit() -> None: + response1 = client.get("/item", params={"offset": 0, "limit": 5}) + response2 = client.get("/item", params={"offset": 5, "limit": 5}) + + assert response1.status_code == HTTPStatus.OK + assert response2.status_code == HTTPStatus.OK + + items1 = response1.json() + items2 = response2.json() + + ids1 = [item["id"] for item in items1] + ids2 = [item["id"] for item in items2] + + assert len(set(ids1) & set(ids2)) == 0 + + +def test_websocket_chat() -> None: + with client.websocket_connect("/chat/test-room") as websocket1: + with client.websocket_connect("/chat/test-room") as websocket2: + websocket1.send_text("Hello from client 1") + + data = websocket2.receive_text() + assert "Hello from client 1" in data + + websocket2.send_text("Hello from client 2") + + data = websocket1.receive_text() + assert "Hello from client 2" in data + + +def test_websocket_multiple_rooms() -> None: + with client.websocket_connect("/chat/room1") as ws_room1: + with client.websocket_connect("/chat/room2") as ws_room2: + ws_room1.send_text("Message for room1") + ws_room2.send_text("Message for room2") diff --git a/hw2/hw/transaction_demo/01_dirty_read.py b/hw2/hw/transaction_demo/01_dirty_read.py new file mode 100644 index 00000000..20d532de --- /dev/null +++ b/hw2/hw/transaction_demo/01_dirty_read.py @@ -0,0 +1,96 @@ +""" +ДЕМОНСТРАЦИЯ: Dirty Read (Грязное чтение) + +Dirty Read возникает когда транзакция читает данные, которые были изменены +другой транзакцией, но еще не закоммичены (uncommitted). + +Проблема: если вторая транзакция откатится (ROLLBACK), первая транзакция +прочитала "несуществующие" данные. + +Уровни изоляции: +- READ UNCOMMITTED: допускает Dirty Read +- READ COMMITTED: предотвращает Dirty Read (минимальный уровень в PostgreSQL) + +ВАЖНО: PostgreSQL НЕ ПОДДЕРЖИВАЕТ READ UNCOMMITTED! +Даже если вы установите READ UNCOMMITTED, PostgreSQL использует READ COMMITTED. +Мы продемонстрируем это поведение. +""" +import asyncio +from sqlalchemy import select +from db_setup import Account, get_async_session, init_database, set_isolation_level + + +async def transaction_1_read_uncommitted(): + async with get_async_session() as session: + async with session.begin(): + await set_isolation_level(session, "READ UNCOMMITTED") + print("[T1] Начало транзакции (READ UNCOMMITTED)") + await asyncio.sleep(0.5) + + result = await session.execute(select(Account).where(Account.id == 1)) + account = result.scalar_one() + print(f"[T1] Баланс Alice: ${account.balance}") + print("[T1] PostgreSQL использует READ COMMITTED вместо READ UNCOMMITTED") + + +async def transaction_2_modify_and_rollback(): + async with get_async_session() as session: + async with session.begin(): + print("[T2] Начало транзакции") + + result = await session.execute(select(Account).where(Account.id == 1)) + account = result.scalar_one() + old_balance = account.balance + account.balance = 9999.99 + + print(f"[T2] Изменили баланс: ${old_balance} -> ${account.balance} (не закоммичено)") + await asyncio.sleep(1) + + await session.rollback() + print("[T2] ROLLBACK") + + +async def transaction_1_read_committed(): + async with get_async_session() as session: + async with session.begin(): + await set_isolation_level(session, "READ COMMITTED") + print("[T1] Начало транзакции (READ COMMITTED)") + await asyncio.sleep(0.5) + + result = await session.execute(select(Account).where(Account.id == 1)) + account = result.scalar_one() + print(f"[T1] Баланс Alice: ${account.balance}") + print("[T1] READ COMMITTED предотвращает Dirty Read") + + +async def demo_dirty_read_attempt(): + print("\n" + "="*70) + print("СЦЕНАРИЙ 1: READ UNCOMMITTED (PostgreSQL не поддерживает)") + print("="*70 + "\n") + + await init_database() + await asyncio.gather( + transaction_2_modify_and_rollback(), + transaction_1_read_uncommitted() + ) + + +async def demo_read_committed(): + print("\n" + "="*70) + print("СЦЕНАРИЙ 2: READ COMMITTED предотвращает Dirty Read") + print("="*70 + "\n") + + await asyncio.gather( + transaction_2_modify_and_rollback(), + transaction_1_read_committed() + ) + + +async def main(): + await demo_dirty_read_attempt() + await asyncio.sleep(1) + await demo_read_committed() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/hw2/hw/transaction_demo/02_non_repeatable_read.py b/hw2/hw/transaction_demo/02_non_repeatable_read.py new file mode 100644 index 00000000..f56e361f --- /dev/null +++ b/hw2/hw/transaction_demo/02_non_repeatable_read.py @@ -0,0 +1,112 @@ +""" +ДЕМОНСТРАЦИЯ: Non-Repeatable Read (Неповторяющееся чтение) + +Non-Repeatable Read возникает когда транзакция читает одни и те же данные дважды, +но между чтениями другая транзакция изменяет и коммитит эти данные. + +Проблема: один и тот же SELECT в рамках транзакции возвращает разные результаты. + +Уровни изоляции: +- READ COMMITTED: допускает Non-Repeatable Read +- REPEATABLE READ: предотвращает Non-Repeatable Read +""" +import asyncio +from sqlalchemy import select +from db_setup import Account, get_async_session, init_database, set_isolation_level + + +async def transaction_1_read_committed(): + async with get_async_session() as session: + async with session.begin(): + await set_isolation_level(session, "READ COMMITTED") + print("[T1] Начало транзакции (READ COMMITTED)") + + result = await session.execute(select(Account).where(Account.id == 1)) + account = result.scalar_one() + balance_1 = account.balance + print(f"[T1] Первое чтение: Alice = ${balance_1}") + + await asyncio.sleep(1) + + session.expire_all() + result = await session.execute(select(Account).where(Account.id == 1)) + account = result.scalar_one() + balance_2 = account.balance + print(f"[T1] Второе чтение: Alice = ${balance_2}") + + if balance_1 != balance_2: + print(f"[T1] NON-REPEATABLE READ: значение изменилось") + + +async def transaction_2_modify_and_commit(): + await asyncio.sleep(0.5) + + async with get_async_session() as session: + async with session.begin(): + print("[T2] Начало транзакции") + + result = await session.execute(select(Account).where(Account.id == 1)) + account = result.scalar_one() + old_balance = account.balance + account.balance = old_balance + 500 + + print(f"[T2] Изменили баланс: ${old_balance} -> ${account.balance}") + await session.commit() + print("[T2] COMMIT") + + +async def transaction_1_repeatable_read(): + async with get_async_session() as session: + async with session.begin(): + await set_isolation_level(session, "REPEATABLE READ") + print("[T1] Начало транзакции (REPEATABLE READ)") + + result = await session.execute(select(Account).where(Account.id == 1)) + account = result.scalar_one() + balance_1 = account.balance + print(f"[T1] Первое чтение: Alice = ${balance_1}") + + await asyncio.sleep(1) + + session.expire_all() + result = await session.execute(select(Account).where(Account.id == 1)) + account = result.scalar_one() + balance_2 = account.balance + print(f"[T1] Второе чтение: Alice = ${balance_2}") + + if balance_1 == balance_2: + print("[T1] REPEATABLE READ работает: данные не изменились") + + +async def demo_non_repeatable_read(): + print("\n" + "="*70) + print("СЦЕНАРИЙ 1: READ COMMITTED допускает Non-Repeatable Read") + print("="*70 + "\n") + + await init_database() + await asyncio.gather( + transaction_1_read_committed(), + transaction_2_modify_and_commit() + ) + + +async def demo_repeatable_read(): + print("\n" + "="*70) + print("СЦЕНАРИЙ 2: REPEATABLE READ предотвращает Non-Repeatable Read") + print("="*70 + "\n") + + await init_database() + await asyncio.gather( + transaction_1_repeatable_read(), + transaction_2_modify_and_commit() + ) + + +async def main(): + await demo_non_repeatable_read() + await asyncio.sleep(1) + await demo_repeatable_read() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/hw2/hw/transaction_demo/03_phantom_reads.py b/hw2/hw/transaction_demo/03_phantom_reads.py new file mode 100644 index 00000000..63794a9c --- /dev/null +++ b/hw2/hw/transaction_demo/03_phantom_reads.py @@ -0,0 +1,140 @@ +""" +ДЕМОНСТРАЦИЯ: Phantom Reads (Фантомное чтение) + +Phantom Read возникает когда транзакция выполняет один и тот же запрос дважды, +но между выполнениями другая транзакция добавляет или удаляет строки, +которые удовлетворяют условию запроса. + +Проблема: количество строк в результате запроса изменяется в рамках одной транзакции. + +Уровни изоляции: +- REPEATABLE READ: в PostgreSQL предотвращает Phantom Reads (в отличие от SQL стандарта!) +- SERIALIZABLE: гарантированно предотвращает Phantom Reads + +ВАЖНО: В PostgreSQL REPEATABLE READ уже предотвращает Phantom Reads благодаря MVCC! +Это отличается от SQL стандарта. +""" +import asyncio +from sqlalchemy import select, func +from db_setup import Account, get_async_session, init_database, set_isolation_level + + +async def transaction_1_repeatable_read(): + async with get_async_session() as session: + async with session.begin(): + await set_isolation_level(session, "REPEATABLE READ") + print("[T1] Начало транзакции (REPEATABLE READ)") + + result = await session.execute( + select(func.count(Account.id)).where(Account.balance > 500) + ) + count_1 = result.scalar() + print(f"[T1] Первое чтение: найдено {count_1} счетов") + + await asyncio.sleep(1) + + result = await session.execute( + select(func.count(Account.id)).where(Account.balance > 500) + ) + count_2 = result.scalar() + print(f"[T1] Второе чтение: найдено {count_2} счетов") + + if count_1 == count_2: + print("[T1] REPEATABLE READ предотвращает Phantom Reads") + + +async def transaction_2_insert_account(): + await asyncio.sleep(0.5) + + async with get_async_session() as session: + async with session.begin(): + print("[T2] Начало транзакции") + + new_account = Account(id=4, name="David", balance=800.00) + session.add(new_account) + print("[T2] Добавили новый счет David") + + await session.commit() + print("[T2] COMMIT") + + +async def transaction_1_serializable(): + async with get_async_session() as session: + async with session.begin(): + await set_isolation_level(session, "SERIALIZABLE") + print("[T1] Начало транзакции (SERIALIZABLE)") + + result = await session.execute(select(Account).where(Account.id == 1)) + account = result.scalar_one() + print(f"[T1] Прочитали Alice: ${account.balance}") + + await asyncio.sleep(1) + + account.balance += 100 + print(f"[T1] Обновляем баланс Alice") + + try: + await session.commit() + print("[T1] COMMIT успешен") + except Exception as e: + print(f"[T1] ОШИБКА: {type(e).__name__}") + + +async def transaction_2_serializable(): + await asyncio.sleep(0.5) + + async with get_async_session() as session: + async with session.begin(): + await set_isolation_level(session, "SERIALIZABLE") + print("[T2] Начало транзакции (SERIALIZABLE)") + + result = await session.execute(select(Account).where(Account.id == 1)) + account = result.scalar_one() + print(f"[T2] Прочитали Alice: ${account.balance}") + + account.balance += 50 + print(f"[T2] Обновляем баланс Alice") + + try: + await session.commit() + print("[T2] COMMIT успешен") + except Exception as e: + print(f"[T2] ОШИБКА: {type(e).__name__}") + + +async def demo_phantom_reads(): + print("\n" + "="*70) + print("СЦЕНАРИЙ 1: REPEATABLE READ предотвращает Phantom Reads") + print("="*70 + "\n") + + await init_database() + await asyncio.gather( + transaction_1_repeatable_read(), + transaction_2_insert_account() + ) + + +async def demo_serializable(): + print("\n" + "="*70) + print("СЦЕНАРИЙ 2: SERIALIZABLE обнаруживает конфликты") + print("="*70 + "\n") + + await init_database() + + try: + await asyncio.gather( + transaction_1_serializable(), + transaction_2_serializable() + ) + except Exception: + pass + + +async def main(): + await demo_phantom_reads() + await asyncio.sleep(1) + await demo_serializable() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/hw2/hw/transaction_demo/__init__.py b/hw2/hw/transaction_demo/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/hw2/hw/transaction_demo/db_setup.py b/hw2/hw/transaction_demo/db_setup.py new file mode 100644 index 00000000..76c0a7ff --- /dev/null +++ b/hw2/hw/transaction_demo/db_setup.py @@ -0,0 +1,66 @@ +""" +Базовая настройка подключения к БД и модели для демонстрации проблем транзакций +""" +import os +import asyncio +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from sqlalchemy import Column, Integer, String, Numeric, text +from sqlalchemy.orm import declarative_base + +# Подключение к PostgreSQL через asyncpg +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://shop_user:shop_password@localhost:5432/shop_db") + +# Создаём async engine +engine = create_async_engine(DATABASE_URL, echo=False) + +Base = declarative_base() + + +class Account(Base): + """Модель банковского счета для демонстрации""" + __tablename__ = "demo_accounts" + + id = Column(Integer, primary_key=True) + name = Column(String(100), nullable=False) + balance = Column(Numeric(10, 2), nullable=False, default=0) + + def __repr__(self): + return f"" + + +async def init_database(): + """Создание таблиц и начальных данных""" + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await conn.run_sync(Base.metadata.create_all) + + async with AsyncSession(engine) as session: + accounts = [ + Account(id=1, name="Alice", balance=1000.00), + Account(id=2, name="Bob", balance=500.00), + Account(id=3, name="Charlie", balance=750.00), + ] + + session.add_all(accounts) + await session.commit() + + +def get_async_session(isolation_level=None): + """Получить async сессию с заданным уровнем изоляции""" + if isolation_level: + return AsyncSession( + engine, + expire_on_commit=False, + ) + return AsyncSession(engine, expire_on_commit=False) + + +async def set_isolation_level(session: AsyncSession, level: str): + """Установить уровень изоляции для сессии""" + # В PostgreSQL уровни изоляции: + # READ UNCOMMITTED, READ COMMITTED, REPEATABLE READ, SERIALIZABLE + await session.execute(text(f"SET TRANSACTION ISOLATION LEVEL {level}")) + + +if __name__ == "__main__": + asyncio.run(init_database()) diff --git a/hw2/hw/transaction_demo/run_all_demos.py b/hw2/hw/transaction_demo/run_all_demos.py new file mode 100644 index 00000000..abd5ad0a --- /dev/null +++ b/hw2/hw/transaction_demo/run_all_demos.py @@ -0,0 +1,39 @@ +""" +Запускает все демонстрации подряд +""" +import asyncio +import subprocess +import sys +import os + + +async def run_demo(demo_file: str, title: str): + print("\n" + "="*70) + print(title) + print("="*70) + + script_dir = os.path.dirname(os.path.abspath(__file__)) + demo_path = os.path.join(script_dir, demo_file) + + result = subprocess.run([sys.executable, demo_path]) + return result.returncode == 0 + + +async def main(): + demos = [ + ("01_dirty_read.py", "Dirty Read"), + ("02_non_repeatable_read.py", "Non-Repeatable Read"), + ("03_phantom_reads.py", "Phantom Reads"), + ] + + for demo_file, title in demos: + await run_demo(demo_file, title) + if demo_file != demos[-1][0]: + await asyncio.sleep(2) + + return 0 + + +if __name__ == "__main__": + exit_code = asyncio.run(main()) + sys.exit(exit_code)