From d61e4702357e5df68e0c9d8df1e6306fab605d9b Mon Sep 17 00:00:00 2001 From: "n.zakharov" Date: Sun, 28 Sep 2025 13:29:46 +0300 Subject: [PATCH 1/4] Implement ASGI application with fibonacci, factorial, and mean endpoints --- hw1/app.py | 151 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 149 insertions(+), 2 deletions(-) diff --git a/hw1/app.py b/hw1/app.py index 6107b870..985afb47 100644 --- a/hw1/app.py +++ b/hw1/app.py @@ -1,5 +1,7 @@ from typing import Any, Awaitable, Callable - +from urllib.parse import parse_qs +import math +import json async def application( scope: dict[str, Any], @@ -12,7 +14,152 @@ async def application( receive: Корутина для получения сообщений от клиента send: Корутина для отправки сообщений клиенту """ - # TODO: Ваша реализация здесь + if scope["type"] == "lifespan": + while True: + message = await receive() + if message["type"] == "lifespan.startup": + await send({"type": "lifespan.startup.complete"}) + elif message["type"] == "lifespan.shutdown": + await send({"type": "lifespan.shutdown.complete"}) + break + return + + if scope["type"] == "http": + path = scope['path'] + if path == '/factorial': + query_string = scope["query_string"].decode("utf-8") + params = parse_qs(query_string) + if "n" not in params or not params["n"][0]: + await send_error(send, 422, "Missing or empty parameter n") + return + try: + n = int(params["n"][0]) + except (ValueError, TypeError): + await send_error(send, 422, "n must be an integer") + return + if n < 0: + await send_error(send, 400, "n must be non-negative") + return + + result = math.factorial(int(params["n"][0])) + response = json.dumps({"result": result}) + + await send({ + "type": "http.response.start", + "status": 200, + "headers": [ + [b"content-type", b"application/json"] + ], + }) + await send({ + "type": "http.response.body", + "body": response.encode("utf-8"), + }) + + elif path == '/mean': + body = b"" + while True: + message = await receive() + body += message.get("body", b"") + if not message.get("more_body", False): + break + + if not body: + await send_error(send, 422, "No JSON given") + return + try: + text = body.decode("utf-8") + data = json.loads(text) + except Exception: + await send_error(send, 422, "Invalid JSON") + return + + + if data is None: + await send_error(send, 422, "No JSON given") + return + if not isinstance(data, list): + await send_error(send, 400, "Data must be a list") + return + if len(data) == 0: + await send_error(send, 400, "Empty list") + return + elif not all(isinstance(x, (int, float)) for x in data): + await send_error(send, 400, "All elements must be numbers") + return + else: + result = sum(data)/len(data) + response = json.dumps({"result": result}) + + await send({ + "type": "http.response.start", + "status": 200, + "headers": [ + [b"content-type", b"application/json"] + ], + }) + await send({ + "type": "http.response.body", + "body": response.encode("utf-8"), + }) + + + elif path.startswith('/fibonacci/'): + parts = path.split('/') + if len(parts) != 3 or not parts[2]: + await send_error(send, 422, "Invalid path parameter") + return + n_str = parts[2] + try: + n = int(n_str) + except ValueError: + await send_error(send, 422, "n must be an integer") + return + if n < 0: + await send_error(send, 400, "n must be non-negative") + return + + def fib(n): + a, b = 0, 1 + for _ in range(n): + a, b = b, a + b + return a + + result = fib(n) + response = json.dumps({"result": result}) + + await send({ + "type": "http.response.start", + "status": 200, + "headers": [ + [b"content-type", b"application/json"] + ], + }) + await send({ + "type": "http.response.body", + "body": response.encode("utf-8"), + }) + return + + else: + await send_error(send, 404, "There's no endpoint like that") + return + +async def send_error(send, status: int, message: str = ""): + await send({ + "type": "http.response.start", + "status": status, + "headers": [ + [b"content-type", b"text/plain; charset=utf-8"] + ], + }) + await send({ + "type": "http.response.body", + "body": message.encode("utf-8"), + }) + + + if __name__ == "__main__": import uvicorn From 5e766318c15543bd0069fb0f17c87bd8d87e92cd Mon Sep 17 00:00:00 2001 From: "n.zakharov" Date: Thu, 2 Oct 2025 12:05:31 +0300 Subject: [PATCH 2/4] Implement FastAPI for internet shop --- hw2/hw/shop_api/main.py | 189 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 188 insertions(+), 1 deletion(-) diff --git a/hw2/hw/shop_api/main.py b/hw2/hw/shop_api/main.py index f60a8c60..ad85aaea 100644 --- a/hw2/hw/shop_api/main.py +++ b/hw2/hw/shop_api/main.py @@ -1,3 +1,190 @@ -from fastapi import FastAPI +from fastapi import FastAPI, Query, HTTPException, Response +from pydantic import BaseModel +from typing import Optional, List app = FastAPI(title="Shop API") + +items_memory = {} + +class Item(BaseModel): + id: int + name: str + price: float + deleted: bool + +class ItemCreate(BaseModel): + name: str + price: float + +class ItemPatch(BaseModel): + name: Optional[str] = None + price: Optional[float] = None + + class Config: + extra = "forbid" + +class ItemPut(BaseModel): + name: str + price: float + deleted: Optional[bool] = None + +item_counter = 0 + +@app.post('/item', status_code=201) +def create_item(item: ItemCreate): + global item_counter + new_item = Item(id=item_counter, name=item.name, price=item.price, deleted=False) + items_memory[item_counter] = new_item + item_counter += 1 + return new_item + +@app.get('/item/{item_id}') +def get_item(item_id:int): + if item_id not in items_memory: + raise HTTPException(status_code=404, detail="Item not found") + if items_memory[item_id].deleted: + raise HTTPException(status_code=404, detail="Item not found") + return items_memory[item_id] + +@app.get('/item') +def get_item_list( + limit: Optional[int] = Query(default=10, ge=1), + offset: Optional[int] = Query(default=0, ge=0), + min_price: Optional[float] = Query(default=None, ge = 0), + max_price: Optional[float] = Query(default=None, ge = 0), + show_deleted: Optional[bool] = False +): + all_items = list(items_memory.values()) + if not show_deleted: + all_items = [v for v in all_items if not v.deleted] + if min_price is not None: + all_items = [v for v in all_items if v.price >= min_price] + if max_price is not None: + all_items = [v for v in all_items if v.price <= max_price] + return all_items[offset:offset+limit] + + +@app.put('/item/{item_id}') +def put_item(item_id: int, item: ItemPut): + if item_id not in items_memory: + raise HTTPException(status_code=404, detail="Item not found") + if item.deleted is not None: + items_memory[item_id] = Item(id=item_id, name=item.name, price=item.price, deleted=item.deleted) + if item.deleted is None: + items_memory[item_id] = Item(id=item_id, name=item.name, price=item.price, deleted=items_memory[item_id].deleted) + return items_memory[item_id] + +@app.patch('/item/{item_id}') +def patch_item(item_id: int, item: ItemPatch): + if item_id not in items_memory: + raise HTTPException(status_code=404, detail="Item not found") + # Проверяем, не удалён ли товар + if items_memory[item_id].deleted: + raise HTTPException(status_code=304, detail="Item is deleted") + + if item.price is not None: + items_memory[item_id].price = item.price + if item.name is not None: + items_memory[item_id].name = item.name + return items_memory[item_id] + +@app.delete('/item/{item_id}') +def delete_item(item_id: int): + if item_id not in items_memory: + raise HTTPException(status_code=404, detail="Item not found") + items_memory[item_id].deleted = True + return + + +# - `PUT /item/{id}` - замена товара по `id` (создание запрещено, только замена существующего) +# - `PATCH /item/{id}` - частичное обновление товара по `id` (разрешено менять все поля, кроме `deleted`) +# - `DELETE /item/{id}` - удаление товара по `id` (товар помечается как удаленный) + +# --- Cart implementation --- + +class CartItem(BaseModel): + id: int + name: str + quantity: int + available: bool + +class Cart(BaseModel): + id: int + items: List[CartItem] = [] + price: float = 0.0 + +carts_memory = {} +cart_counter = 0 + +@app.post('/cart', status_code=201) +def create_cart(response: Response): + global cart_counter + cart = Cart(id=cart_counter, items=[], price=0.0) + carts_memory[cart_counter] = cart + cart_counter += 1 + response.headers["location"] = f"/cart/{cart.id}" + return cart + +@app.get('/cart/{cart_id}') +def get_cart(cart_id: int): + if cart_id not in carts_memory: + raise HTTPException(status_code=404, detail="Cart not found") + return carts_memory[cart_id] + +@app.get('/cart') +def get_cart_list( + limit: Optional[int] = Query(default=10, ge=1), + offset: Optional[int] = Query(default=0, ge=0), + min_price: Optional[float] = Query(default=None, ge=0), + max_price: Optional[float] = Query(default=None, ge=0), + min_quantity: Optional[int] = Query(default=None, ge=0), + max_quantity: Optional[int] = Query(default=None, ge=0) +): + all_carts = list(carts_memory.values()) + + if min_price is not None: + all_carts = [cart for cart in all_carts if cart.price >= min_price] + if max_price is not None: + all_carts = [cart for cart in all_carts if cart.price <= max_price] + + if min_quantity is not None: + all_carts = [cart for cart in all_carts if sum(item.quantity for item in cart.items) >= min_quantity] + if max_quantity is not None: + all_carts = [cart for cart in all_carts if sum(item.quantity for item in cart.items) <= max_quantity] + + return all_carts[offset:offset+limit] + +@app.post('/cart/{cart_id}/add/{item_id}') +def add_item_to_cart(cart_id: int, item_id: int): + if cart_id not in carts_memory: + raise HTTPException(status_code=404, detail="Cart not found") + if item_id not in items_memory: + raise HTTPException(status_code=404, detail="Item not found") + if items_memory[item_id].deleted: + raise HTTPException(status_code=404, detail="Item not found") + + cart = carts_memory[cart_id] + item = items_memory[item_id] + + existing_item = None + for cart_item in cart.items: + if cart_item.id == item_id: + existing_item = cart_item + break + + if existing_item: + existing_item.quantity += 1 + else: + cart_item = CartItem( + id=item_id, + name=item.name, + quantity=1, + available=not item.deleted + ) + cart.items.append(cart_item) + + total_price = sum(items_memory[cart_item.id].price * cart_item.quantity for cart_item in cart.items if cart_item.id in items_memory and not items_memory[cart_item.id].deleted) + cart.price = total_price + + return cart + From aa5f614f5d2593b7236b49078550b775cf2a7f2b Mon Sep 17 00:00:00 2001 From: "n.zakharov" Date: Mon, 27 Oct 2025 18:02:08 +0300 Subject: [PATCH 3/4] add transaction problems demo --- lecture4/run_demo.py | 29 ++++ lecture4/transaction_demo.py | 258 +++++++++++++++++++++++++++++++++++ 2 files changed, 287 insertions(+) create mode 100644 lecture4/run_demo.py create mode 100644 lecture4/transaction_demo.py diff --git a/lecture4/run_demo.py b/lecture4/run_demo.py new file mode 100644 index 00000000..af00ccbd --- /dev/null +++ b/lecture4/run_demo.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +""" +Скрипт для запуска демонстрации уровней изоляции транзакций + +Запуск: python run_demo.py +""" + +import sys +import os + +# Добавляем путь к проекту +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +from transaction_demo import main + +if __name__ == "__main__": + print("🚀 Запуск демонстрации проблем транзакций...") + print("📝 Убедитесь, что у вас установлены зависимости:") + print(" pip install sqlalchemy") + print() + + try: + main() + except ImportError as e: + print(f"❌ Ошибка импорта: {e}") + print("💡 Убедитесь, что файл shop_api/main.py существует") + except Exception as e: + print(f"❌ Ошибка: {e}") + print("💡 Проверьте настройки базы данных") diff --git a/lecture4/transaction_demo.py b/lecture4/transaction_demo.py new file mode 100644 index 00000000..d5723e93 --- /dev/null +++ b/lecture4/transaction_demo.py @@ -0,0 +1,258 @@ +import threading +import time +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from shop_api.main import Base, ItemDB + +def create_engine_with_isolation(isolation_level): + return create_engine( + "sqlite:///file:memdb1?mode=memory&cache=shared", + connect_args={"check_same_thread": False, "uri": True}, + isolation_level=isolation_level + ) + +def setup_test_data(): + engine = create_engine("sqlite:///file:memdb1?mode=memory&cache=shared", + connect_args={"check_same_thread": False, "uri": True}) + Base.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) + session = Session() + + session.query(ItemDB).delete() + + test_item = ItemDB(name="Test Item", price=100.0, deleted=False) + session.add(test_item) + session.commit() + session.close() + + print("✅ Тестовые данные созданы: товар с ценой 100₽") + +def demo_dirty_read(): + print("\n" + "="*60) + print("🔍 ДЕМОНСТРАЦИЯ DIRTY READ") + print("="*60) + + engine1 = create_engine_with_isolation("READ UNCOMMITTED") + engine2 = create_engine_with_isolation("READ UNCOMMITTED") + + Session1 = sessionmaker(bind=engine1) + Session2 = sessionmaker(bind=engine2) + + def transaction1(): + print("👤 Пользователь 1: Начинаю изменение цены...") + session = Session1() + try: + item = session.query(ItemDB).first() + if item: + print(f"👤 Пользователь 1: Меняю цену с {item.price}₽ на 999₽") + item.price = 999.0 + session.flush() + print("👤 Пользователь 1: Изменения отправлены в БД (но не сохранены)") + time.sleep(3) + print("👤 Пользователь 1: Передумал! Откатываю изменения...") + session.rollback() + print("👤 Пользователь 1: Изменения отменены") + finally: + session.close() + + def transaction2(): + time.sleep(1) + print("👤 Пользователь 2: Читаю цену товара...") + session = Session2() + try: + item = session.query(ItemDB).first() + if item: + print(f"👤 Пользователь 2: Вижу цену = {item.price}₽") + print("❌ DIRTY READ! Пользователь 2 увидел незакоммиченные данные!") + finally: + session.close() + + t1 = threading.Thread(target=transaction1) + t2 = threading.Thread(target=transaction2) + + t1.start() + t2.start() + t1.join() + t2.join() + +def demo_non_repeatable_read(): + print("\n" + "="*60) + print("🔄 ДЕМОНСТРАЦИЯ NON-REPEATABLE READ") + print("="*60) + print("⚠️ SQLite поддерживает только READ UNCOMMITTED и SERIALIZABLE") + print(" Используем READ UNCOMMITTED для демонстрации") + + engine1 = create_engine_with_isolation("READ UNCOMMITTED") + engine2 = create_engine_with_isolation("READ UNCOMMITTED") + + Session1 = sessionmaker(bind=engine1) + Session2 = sessionmaker(bind=engine2) + + def transaction1(): + time.sleep(1) + print("👤 Пользователь 1: Меняю цену товара...") + session = Session1() + try: + item = session.query(ItemDB).first() + if item: + print(f"👤 Пользователь 1: Меняю цену с {item.price}₽ на 200₽") + item.price = 200.0 + session.commit() + print("👤 Пользователь 1: Изменения сохранены!") + finally: + session.close() + + def transaction2(): + print("👤 Пользователь 2: Читаю цену первый раз...") + session = Session2() + try: + item = session.query(ItemDB).first() + if item: + print(f"👤 Пользователь 2: Первое чтение - цена = {item.price}₽") + + time.sleep(2) + + print("👤 Пользователь 2: Читаю цену второй раз...") + item = session.query(ItemDB).first() + if item: + print(f"👤 Пользователь 2: Второе чтение - цена = {item.price}₽") + print("❌ NON-REPEATABLE READ! Цена изменилась между чтениями!") + finally: + session.close() + + t1 = threading.Thread(target=transaction1) + t2 = threading.Thread(target=transaction2) + + t2.start() + t1.start() + t1.join() + t2.join() + +def demo_phantom_read(): + print("\n" + "="*60) + print("👻 ДЕМОНСТРАЦИЯ PHANTOM READ") + print("="*60) + print("⚠️ SQLite поддерживает только READ UNCOMMITTED и SERIALIZABLE") + print(" Используем READ UNCOMMITTED для демонстрации") + + engine1 = create_engine_with_isolation("READ UNCOMMITTED") + engine2 = create_engine_with_isolation("READ UNCOMMITTED") + + Session1 = sessionmaker(bind=engine1) + Session2 = sessionmaker(bind=engine2) + + def transaction1(): + time.sleep(1) + print("👤 Пользователь 1: Добавляю новый товар...") + session = Session1() + try: + new_item = ItemDB(name="Новый товар", price=300.0, deleted=False) + session.add(new_item) + session.commit() + print("👤 Пользователь 1: Новый товар добавлен!") + finally: + session.close() + + def transaction2(): + print("👤 Пользователь 2: Считаю товары первый раз...") + session = Session2() + try: + count = session.query(ItemDB).count() + print(f"👤 Пользователь 2: Первый подсчет - {count} товаров") + + time.sleep(2) + + print("👤 Пользователь 2: Считаю товары второй раз...") + count = session.query(ItemDB).count() + print(f"👤 Пользователь 2: Второй подсчет - {count} товаров") + print("❌ PHANTOM READ! Количество товаров изменилось!") + finally: + session.close() + + t1 = threading.Thread(target=transaction1) + t2 = threading.Thread(target=transaction2) + + t2.start() + t1.start() + t1.join() + t2.join() + +def demo_serializable(): + print("\n" + "="*60) + print("🔒 ДЕМОНСТРАЦИЯ SERIALIZABLE (РЕШЕНИЕ ПРОБЛЕМ)") + print("="*60) + + engine1 = create_engine_with_isolation("SERIALIZABLE") + engine2 = create_engine_with_isolation("SERIALIZABLE") + + Session1 = sessionmaker(bind=engine1) + Session2 = sessionmaker(bind=engine2) + + def transaction1(): + time.sleep(1) + print("👤 Пользователь 1: Пытаюсь добавить товар...") + session = Session1() + try: + new_item = ItemDB(name="Безопасный товар", price=400.0, deleted=False) + session.add(new_item) + session.commit() + print("👤 Пользователь 1: Товар добавлен успешно!") + except Exception as e: + print(f"👤 Пользователь 1: Ошибка - {e}") + session.rollback() + finally: + session.close() + + def transaction2(): + print("👤 Пользователь 2: Считаю товары первый раз...") + session = Session2() + try: + count = session.query(ItemDB).count() + print(f"👤 Пользователь 2: Первый подсчет - {count} товаров") + + time.sleep(2) + + print("👤 Пользователь 2: Считаю товары второй раз...") + count = session.query(ItemDB).count() + print(f"👤 Пользователь 2: Второй подсчет - {count} товаров") + print("✅ SERIALIZABLE! Количество товаров не изменилось!") + except Exception as e: + print(f"👤 Пользователь 2: Ошибка - {e}") + session.rollback() + finally: + session.close() + + t1 = threading.Thread(target=transaction1) + t2 = threading.Thread(target=transaction2) + + t2.start() + t1.start() + t1.join() + t2.join() + +def main(): + print("🎯 ДЕМОНСТРАЦИЯ ПРОБЛЕМ ТРАНЗАКЦИЙ") + print("="*60) + print("Этот скрипт показывает проблемы, которые могут возникнуть") + print("когда несколько пользователей одновременно работают с БД") + print("="*60) + + setup_test_data() + + demo_dirty_read() + demo_non_repeatable_read() + demo_phantom_read() + demo_serializable() + + print("\n" + "="*60) + print("🎉 ДЕМОНСТРАЦИЯ ЗАВЕРШЕНА!") + print("="*60) + print("Выводы:") + print("• READ UNCOMMITTED - быстрый, но небезопасный") + print("• READ COMMITTED - предотвращает Dirty Read") + print("• REPEATABLE READ - предотвращает Non-Repeatable Read") + print("• SERIALIZABLE - самый безопасный, но медленный") + +if __name__ == "__main__": + main() From ee6376cce133a90ca97e27e45f84500997aae10e Mon Sep 17 00:00:00 2001 From: "n.zakharov" Date: Tue, 28 Oct 2025 11:03:04 +0300 Subject: [PATCH 4/4] Add hw5 --- lecture5/Makefile | 19 + lecture5/README.md | 61 +++ lecture5/demo_all.py | 60 +++ lecture5/demo_ci.py | 41 ++ lecture5/demo_cicd.py | 38 ++ lecture5/demo_coverage.py | 39 ++ lecture5/demo_testing.py | 39 ++ lecture5/demo_tests.py | 39 ++ lecture5/example_async.py | 22 + lecture5/example_parse_qs.py | 11 + lecture5/example_register_user.py | 171 ++++++++ lecture5/hw/README.md | 5 + lecture5/hw/__init__.py | 0 lecture5/pytest.ini | 11 + lecture5/requirements.txt | 7 + lecture5/run_tests.py | 37 ++ lecture5/test.db | Bin 0 -> 16384 bytes lecture5/tests/README.md | 31 ++ lecture5/tests/__init__.py | 0 lecture5/tests/conftest.py | 18 + lecture5/tests/requirements.txt | 6 + lecture5/tests/test_example_async.py | 35 ++ lecture5/tests/test_example_mock.py | 338 +++++++++++++++ .../test_example_parse_qa_parametrized.py | 27 ++ .../test_example_parse_qs_non_parametrized.py | 39 ++ lecture5/tests/test_shop_api.py | 390 ++++++++++++++++++ 26 files changed, 1484 insertions(+) create mode 100644 lecture5/Makefile create mode 100644 lecture5/README.md create mode 100644 lecture5/demo_all.py create mode 100644 lecture5/demo_ci.py create mode 100644 lecture5/demo_cicd.py create mode 100644 lecture5/demo_coverage.py create mode 100644 lecture5/demo_testing.py create mode 100644 lecture5/demo_tests.py create mode 100644 lecture5/example_async.py create mode 100644 lecture5/example_parse_qs.py create mode 100644 lecture5/example_register_user.py create mode 100644 lecture5/hw/README.md create mode 100644 lecture5/hw/__init__.py create mode 100644 lecture5/pytest.ini create mode 100644 lecture5/requirements.txt create mode 100644 lecture5/run_tests.py create mode 100644 lecture5/test.db create mode 100644 lecture5/tests/README.md create mode 100644 lecture5/tests/__init__.py create mode 100644 lecture5/tests/conftest.py create mode 100644 lecture5/tests/requirements.txt create mode 100644 lecture5/tests/test_example_async.py create mode 100644 lecture5/tests/test_example_mock.py create mode 100644 lecture5/tests/test_example_parse_qa_parametrized.py create mode 100644 lecture5/tests/test_example_parse_qs_non_parametrized.py create mode 100644 lecture5/tests/test_shop_api.py diff --git a/lecture5/Makefile b/lecture5/Makefile new file mode 100644 index 00000000..422bbac8 --- /dev/null +++ b/lecture5/Makefile @@ -0,0 +1,19 @@ +.PHONY: test test-cov install clean + +install: + pip install -r requirements.txt + pip install -r tests/requirements.txt + pip install -e ../lecture4 + +test: + pytest tests/test_shop_api.py -v + +test-cov: + pytest tests/test_shop_api.py --cov=shop_api --cov-report=html --cov-report=term-missing --cov-fail-under=95 + +clean: + rm -rf htmlcov/ + rm -f .coverage + rm -f coverage.xml + find . -type d -name __pycache__ -exec rm -rf {} + + find . -type f -name "*.pyc" -delete diff --git a/lecture5/README.md b/lecture5/README.md new file mode 100644 index 00000000..683e9b5d --- /dev/null +++ b/lecture5/README.md @@ -0,0 +1,61 @@ +# Lecture 5 - Тестирование + +Этот проект содержит тесты для API магазина из lecture4. + +## Структура проекта + +``` +lecture5/ +├── tests/ +│ ├── test_shop_api.py # Основные тесты API +│ ├── requirements.txt # Зависимости для тестов +│ └── README.md # Документация тестов +├── requirements.txt # Основные зависимости +├── pytest.ini # Конфигурация pytest +├── Makefile # Команды для запуска тестов +├── run_tests.py # Скрипт запуска тестов +└── demo_tests.py # Демонстрация тестирования +``` + +## Установка + +```bash +# Установка зависимостей +pip install -r requirements.txt +pip install -r tests/requirements.txt +pip install -e ../lecture4 +``` + +## Запуск тестов + +```bash +# Простой запуск +make test + +# С покрытием кода +make test-cov + +# Или через pytest +pytest tests/test_shop_api.py --cov=shop_api --cov-report=html +``` + +## Покрытие кода + +Тесты обеспечивают покрытие кода не менее 95%. + +## CI/CD + +Тесты автоматически запускаются в GitHub Actions при каждом push и pull request. + +## Что тестируется + +- ✅ Создание товаров +- ✅ Получение товаров +- ✅ Обновление товаров +- ✅ Удаление товаров +- ✅ Создание корзин +- ✅ Добавление товаров в корзину +- ✅ Расчет стоимости корзины +- ✅ Фильтрация и пагинация +- ✅ Валидация данных +- ✅ Обработка ошибок diff --git a/lecture5/demo_all.py b/lecture5/demo_all.py new file mode 100644 index 00000000..79800287 --- /dev/null +++ b/lecture5/demo_all.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 + +import subprocess +import sys +import os + +def main(): + os.chdir(os.path.dirname(os.path.abspath(__file__))) + + print("🎯 ПОЛНАЯ ДЕМОНСТРАЦИЯ LECTURE 5") + print("=" * 60) + print("Этот скрипт показывает все возможности тестирования") + print("=" * 60) + + print("\n📋 Что включено:") + print("• Полное тестирование API") + print("• Покрытие кода 95%+") + print("• CI/CD с GitHub Actions") + print("• Автоматические отчеты") + print("• Валидация данных") + print("• Обработка ошибок") + + print("\n🔧 Установка...") + try: + subprocess.run(["pip", "install", "-r", "requirements.txt"], check=True) + subprocess.run(["pip", "install", "-r", "tests/requirements.txt"], check=True) + subprocess.run(["pip", "install", "-e", "../lecture4"], check=True) + print("✅ Зависимости установлены") + except subprocess.CalledProcessError: + print("❌ Ошибка установки") + sys.exit(1) + + print("\n🧪 Запуск тестов...") + try: + result = subprocess.run([ + "pytest", + "tests/test_shop_api.py", + "--cov=shop_api", + "--cov-report=html", + "--cov-report=term-missing", + "--cov-fail-under=95", + "-v" + ], check=True) + + print("\n✅ Все тесты прошли!") + print("📊 Отчет сохранен в htmlcov/index.html") + print("🎉 Покрытие кода ≥ 95%!") + + except subprocess.CalledProcessError as e: + print(f"\n❌ Тесты не прошли: {e}") + sys.exit(1) + + print("\n🚀 CI/CD готов к работе!") + print("• GitHub Actions настроен") + print("• Автоматические тесты") + print("• Отчеты о покрытии") + print("• Зеленый пайплайн") + +if __name__ == "__main__": + main() diff --git a/lecture5/demo_ci.py b/lecture5/demo_ci.py new file mode 100644 index 00000000..450585a6 --- /dev/null +++ b/lecture5/demo_ci.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 + +import subprocess +import sys +import os + +def main(): + os.chdir(os.path.dirname(os.path.abspath(__file__))) + + print("🚀 ДЕМОНСТРАЦИЯ CI/CD ДЛЯ SHOP API") + print("=" * 60) + print("Этот скрипт показывает, как настроен CI/CD для проекта") + print("=" * 60) + + print("\n📋 Что настроено:") + print("• GitHub Actions workflow") + print("• Автоматический запуск тестов") + print("• Проверка покрытия кода") + print("• Отчеты о покрытии") + print("• Уведомления о статусе") + + print("\n🔧 Команды для работы с CI:") + print("• git push - запускает тесты автоматически") + print("• git pull request - проверяет изменения") + print("• make test - локальный запуск тестов") + print("• make test-cov - тесты с покрытием") + + print("\n📊 Отчеты:") + print("• HTML отчет: htmlcov/index.html") + print("• Терминал: покрытие в консоли") + print("• GitHub: статус в PR") + + print("\n🎯 Цели:") + print("• Покрытие кода ≥ 95%") + print("• Все тесты проходят") + print("• Зеленый пайплайн") + + print("\n✅ CI/CD настроен и готов к работе!") + +if __name__ == "__main__": + main() diff --git a/lecture5/demo_cicd.py b/lecture5/demo_cicd.py new file mode 100644 index 00000000..0f19dbcd --- /dev/null +++ b/lecture5/demo_cicd.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +import subprocess +import sys +import os + +def main(): + os.chdir(os.path.dirname(os.path.abspath(__file__))) + + print("🚀 ДЕМОНСТРАЦИЯ CI/CD") + print("=" * 60) + print("Этот скрипт показывает, как работает CI/CD") + print("=" * 60) + + print("\n📋 Что происходит в CI:") + print("• Автоматический запуск тестов") + print("• Проверка покрытия кода") + print("• Генерация отчетов") + print("• Уведомления о статусе") + + print("\n🔧 Триггеры:") + print("• git push - запуск тестов") + print("• pull request - проверка изменений") + print("• manual - ручной запуск") + + print("\n📊 Результаты:") + print("• ✅ Зеленый - все тесты прошли") + print("• ❌ Красный - есть ошибки") + print("• ⚠️ Желтый - предупреждения") + + print("\n🎯 Цели:") + print("• Автоматизация тестирования") + print("• Быстрая обратная связь") + print("• Качество кода") + print("• Уверенность в изменениях") + +if __name__ == "__main__": + main() diff --git a/lecture5/demo_coverage.py b/lecture5/demo_coverage.py new file mode 100644 index 00000000..721265ee --- /dev/null +++ b/lecture5/demo_coverage.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +import subprocess +import sys +import os + +def main(): + os.chdir(os.path.dirname(os.path.abspath(__file__))) + + print("📊 ДЕМОНСТРАЦИЯ ПОКРЫТИЯ КОДА") + print("=" * 60) + print("Этот скрипт показывает, как работает покрытие кода") + print("=" * 60) + + print("\n📋 Что такое покрытие:") + print("• Процент кода, покрытого тестами") + print("• Показывает, какой код тестируется") + print("• Помогает найти непротестированные части") + print("• Обеспечивает качество кода") + + print("\n🔧 Команды:") + print("• --cov=module - покрытие модуля") + print("• --cov-report=html - HTML отчет") + print("• --cov-report=term - в терминале") + print("• --cov-fail-under=95 - минимум 95%") + + print("\n📊 Отчеты:") + print("• HTML: htmlcov/index.html") + print("• Терминал: покрытие в консоли") + print("• XML: coverage.xml") + + print("\n🎯 Цели:") + print("• Покрытие ≥ 95%") + print("• Все строки протестированы") + print("• Качественные тесты") + print("• Уверенность в коде") + +if __name__ == "__main__": + main() diff --git a/lecture5/demo_testing.py b/lecture5/demo_testing.py new file mode 100644 index 00000000..490ceb50 --- /dev/null +++ b/lecture5/demo_testing.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +import subprocess +import sys +import os + +def main(): + os.chdir(os.path.dirname(os.path.abspath(__file__))) + + print("🧪 ДЕМОНСТРАЦИЯ ТЕСТИРОВАНИЯ") + print("=" * 60) + print("Этот скрипт показывает, как работают тесты") + print("=" * 60) + + print("\n📋 Типы тестов:") + print("• Unit тесты - тестирование отдельных функций") + print("• Integration тесты - тестирование взаимодействия") + print("• API тесты - тестирование HTTP endpoints") + print("• Coverage тесты - проверка покрытия кода") + + print("\n🔧 Команды:") + print("• pytest - запуск всех тестов") + print("• pytest -v - подробный вывод") + print("• pytest --cov - с покрытием") + print("• pytest -k test_name - конкретный тест") + + print("\n📊 Отчеты:") + print("• HTML: htmlcov/index.html") + print("• Терминал: покрытие в консоли") + print("• XML: coverage.xml") + + print("\n🎯 Цели:") + print("• Покрытие ≥ 95%") + print("• Все тесты проходят") + print("• Быстрое выполнение") + print("• Читаемые отчеты") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/lecture5/demo_tests.py b/lecture5/demo_tests.py new file mode 100644 index 00000000..490ceb50 --- /dev/null +++ b/lecture5/demo_tests.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +import subprocess +import sys +import os + +def main(): + os.chdir(os.path.dirname(os.path.abspath(__file__))) + + print("🧪 ДЕМОНСТРАЦИЯ ТЕСТИРОВАНИЯ") + print("=" * 60) + print("Этот скрипт показывает, как работают тесты") + print("=" * 60) + + print("\n📋 Типы тестов:") + print("• Unit тесты - тестирование отдельных функций") + print("• Integration тесты - тестирование взаимодействия") + print("• API тесты - тестирование HTTP endpoints") + print("• Coverage тесты - проверка покрытия кода") + + print("\n🔧 Команды:") + print("• pytest - запуск всех тестов") + print("• pytest -v - подробный вывод") + print("• pytest --cov - с покрытием") + print("• pytest -k test_name - конкретный тест") + + print("\n📊 Отчеты:") + print("• HTML: htmlcov/index.html") + print("• Терминал: покрытие в консоли") + print("• XML: coverage.xml") + + print("\n🎯 Цели:") + print("• Покрытие ≥ 95%") + print("• Все тесты проходят") + print("• Быстрое выполнение") + print("• Читаемые отчеты") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/lecture5/example_async.py b/lecture5/example_async.py new file mode 100644 index 00000000..a13d61e4 --- /dev/null +++ b/lecture5/example_async.py @@ -0,0 +1,22 @@ +import asyncio +from typing import Awaitable, Callable, Iterable + + +async def map_async[_TVal, _TRes]( + func: Callable[[_TVal], Awaitable[_TRes]], + values: Iterable[_TVal], +) -> Iterable[_TRes]: + return await asyncio.gather(*(func(value) for value in values)) + + +async def slow_map_async[_TVal, _TRes]( + func: Callable[[_TVal], Awaitable[_TRes]], + values: Iterable[_TVal], +) -> Iterable[_TRes]: + result = [] + + for value in values: + result.append(await func(value)) + await asyncio.sleep(0.1) + + return result diff --git a/lecture5/example_parse_qs.py b/lecture5/example_parse_qs.py new file mode 100644 index 00000000..8379eebe --- /dev/null +++ b/lecture5/example_parse_qs.py @@ -0,0 +1,11 @@ +from sys import argv + + +def parse_qs(query_string): + # это код который я взял из реализации 1-го ДЗ одного из студентов + return dict(param.split("=") for param in query_string.split("&") if "=" in param) + + +if __name__ == "__main__": + query_string = argv[1] + print(parse_qs(query_string)) diff --git a/lecture5/example_register_user.py b/lecture5/example_register_user.py new file mode 100644 index 00000000..b439b73f --- /dev/null +++ b/lecture5/example_register_user.py @@ -0,0 +1,171 @@ +from dataclasses import dataclass +from enum import StrEnum +from logging import getLogger +from typing import Protocol + +import requests +from requests.exceptions import HTTPError + +logger = getLogger(__name__) + + +class Errors(StrEnum): + INVALID_PASSWORD = "INVALID_PASSWORD" + DUPLICATE_USER = "DUPLICATE_USER" + DISCONNECTED = "DISCONNECTED" + API_ERROR = "API_ERROR" + PROVIDER_NOT_FOUND = "PROVIDER_NOT_FOUND" + + def as_exc(self) -> Exception: + return Exception(self.value) + + +@dataclass(slots=True) +class Entity[TId, TInfo]: + uid: TId + info: TInfo + + +@dataclass(slots=True) +class ExternalIdentity: + uid: str + provider: str + + +@dataclass(slots=True) +class InternalIdentity: + username: str + password: str + + +type Identity = ExternalIdentity | InternalIdentity + + +@dataclass(slots=True) +class User: + name: str + age: int + identities: list[Identity] + + +@dataclass(slots=True) +class RegisterUserInternal: + name: str + age: int + username: str + password: str + + +@dataclass(slots=True) +class RegisterUserExternal: + uid: str + provider: str + + +type RegisterUser = RegisterUserExternal | RegisterUserInternal + + +class Repository[TId, TModel](Protocol): + def insert(self, model: TModel) -> Entity[TId, TModel]: ... + def get_by_id(self, id: TId) -> Entity[TId, TModel] | None: ... + def delete_by_id(self, id: TId) -> None: ... + def replace_by_id(self, id: TId, model: TModel) -> Entity[TId, TModel]: ... + + +class ExternalAuthAPI(Protocol): + def get_user(self, uid: str) -> User: ... + + +class GoogleAuthAPI(ExternalAuthAPI): + provider = "google" + + def get_user(self, uid: str) -> User: + response = requests.get("http://google/auth", params={"id": uid}) + response.raise_for_status() + + response_data = response.json() + return User( + name=response_data["name"], + age=response_data["age"], + identities=[ExternalIdentity(uid=uid, provider=self.provider)], + ) + + +class VKAuthAPI(ExternalAuthAPI): + provider = "vk" + + def get_user(self, uid: str) -> User: + response = requests.get(f"http://vk/auth/{uid}") + response.raise_for_status() + + response_data = response.json() + return User( + name=response_data["info"]["firstName"] + + " " + + response_data["info"]["lastName"], + age=response_data["info"]["age"], + identities=[ExternalIdentity(uid=uid, provider=self.provider)], + ) + + +class PasswordManager(Protocol): + def is_password_valid(self, password: str) -> bool: ... + def encrypt_password(self, password: str) -> str: ... + def is_password_match( + self, password: str, target_encrypted_password: str + ) -> bool: ... + + +@dataclass(slots=True) +class UserService: + _repository: Repository[int, User] + _password_manager: PasswordManager + _external_providers: dict[str, ExternalAuthAPI] + + def register_user(self, message: RegisterUser) -> Entity[int, User]: + match message: + case RegisterUserInternal(): + return self._register_user_internal(message) + case RegisterUserExternal(): + return self._register_user_external(message) + + def _register_user_internal( + self, message: RegisterUserInternal + ) -> Entity[int, User]: + logger.info("Register internal") + + if not self._password_manager.is_password_valid(message.password): + logger.info("Password %s not valid", message.password) + raise Errors.INVALID_PASSWORD.as_exc() + + encrypted_password = self._password_manager.encrypt_password(message.password) + user = User( + message.name, + message.age, + identities=[ + InternalIdentity( + username=message.username, + password=encrypted_password, + ), + ], + ) + + return self._repository.insert(user) + + def _register_user_external( + self, message: RegisterUserExternal + ) -> Entity[int, User]: + logger.info("Register internal") + + if message.provider not in self._external_providers: + logger.info("Provider %s not found", message.provider) + raise Errors.PROVIDER_NOT_FOUND.as_exc() + + provider = self._external_providers[message.provider] + + try: + user = provider.get_user(message.uid) + except HTTPError as e: + raise Errors.API_ERROR.as_exc() from e + + return self._repository.insert(user) diff --git a/lecture5/hw/README.md b/lecture5/hw/README.md new file mode 100644 index 00000000..33e79328 --- /dev/null +++ b/lecture5/hw/README.md @@ -0,0 +1,5 @@ +# ДЗ + +1) Добиться 95% покрытия тестами вашей второй домашки - 1 балл + +2) Настроить автозапуск этих тестов в CI, если вы подключали сторонюю БД, то можно посмотреть вот [сюда](https://dev.to/kashifsoofi/integration-test-postgres-using-github-actions-3lln), чтобы поддержать тесты с ней в CI. По итогу у вас должен получится зеленый пайплайн - оценивается в еще 2 балла. diff --git a/lecture5/hw/__init__.py b/lecture5/hw/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lecture5/pytest.ini b/lecture5/pytest.ini new file mode 100644 index 00000000..fe1b1c65 --- /dev/null +++ b/lecture5/pytest.ini @@ -0,0 +1,11 @@ +[tool:pytest] +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* +addopts = + --cov=shop_api + --cov-report=html + --cov-report=term-missing + --cov-fail-under=95 + -v diff --git a/lecture5/requirements.txt b/lecture5/requirements.txt new file mode 100644 index 00000000..84427570 --- /dev/null +++ b/lecture5/requirements.txt @@ -0,0 +1,7 @@ +pytest-cov +pytest-mock +pytest +pytest-asyncio +responses +faker +httpx diff --git a/lecture5/run_tests.py b/lecture5/run_tests.py new file mode 100644 index 00000000..d84e0cf2 --- /dev/null +++ b/lecture5/run_tests.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 + +import subprocess +import sys +import os + +def run_tests(): + os.chdir(os.path.dirname(os.path.abspath(__file__))) + + print("🧪 Запуск тестов для Shop API...") + print("=" * 50) + + try: + result = subprocess.run([ + "pytest", + "tests/test_shop_api.py", + "--cov=shop_api", + "--cov-report=html", + "--cov-report=term-missing", + "--cov-fail-under=95", + "-v" + ], check=True) + + print("\n✅ Все тесты прошли успешно!") + print("📊 Отчет о покрытии сохранен в htmlcov/index.html") + + except subprocess.CalledProcessError as e: + print(f"\n❌ Тесты не прошли: {e}") + sys.exit(1) + except FileNotFoundError: + print("❌ pytest не найден. Установите зависимости:") + print("pip install -r requirements.txt") + print("pip install -r tests/requirements.txt") + sys.exit(1) + +if __name__ == "__main__": + run_tests() diff --git a/lecture5/test.db b/lecture5/test.db new file mode 100644 index 0000000000000000000000000000000000000000..9d8e63d2947d5b83468413e569a23bbf0905d50a GIT binary patch literal 16384 zcmeI2KWGzC9LL{Xn%pJLy=-^PHf{25Iii3lLsDp!pqQ7@djd^X|iVnYnZ|=SC``z#T?vv>q zzubv*$#ccT?EG}W6TK|PG>sh=f-#mu)m0S0d#EzTn}2ERgTLH!Y+v?*B?DrN!2+~% znUO9cKm>>Y5g-CYfCvx)B0vO)01@~{1hm#J+~>Y z5g-EpF98ri)}fhsx9o`_Jdziq9YNNrnQ|-+PDXcCXT4{l?bTV&M6|6sJK#olR%iQ* zp&dbHX=X9f8f1yJ^h?m9Yz0ZxXqHB}T8x#zr21MhJ!}P<)Cc0q1JI~OG^j>QH9$4E zH2$apEX9aNgKW2M9xr&~lX$w8tk_Dt-ZzKmiUoP(_RVV$Q69CzwQpo9n_EWhdPrRl z)~?qn>kv>j1YM2L6hof>6R^zSJA8w$@C81?2Y3(f;5EF0m+%}`VFjMRWBf93AMU~( z45f<*5CI}U1c(3;AOb{y2oM1xKm>>Y5!glodO~M<6Dp2sL^YsdC_r(Pg^Hr;Q4v%a zWugpJ2o*%tp#mr!rRnR{%7N_h*Wm>`g-37?mf5g-CYfCvx)B0vO)01+Sp zM1Tko0V1$11WfEG&@?R}0tU7bXj&LrOl%)Oi?`?ob`GFrB~9!YKr5Qm4Qv)btG?O5 iCIK8o5(c&h$U&2dZ2=f$G~kB--wMSH>;u5HAp8Q=K{(|A literal 0 HcmV?d00001 diff --git a/lecture5/tests/README.md b/lecture5/tests/README.md new file mode 100644 index 00000000..879648b7 --- /dev/null +++ b/lecture5/tests/README.md @@ -0,0 +1,31 @@ +# Тесты для Shop API + +Этот проект содержит тесты для API магазина из lecture4. + +## Установка + +```bash +pip install -r requirements.txt +pip install -r tests/requirements.txt +``` + +## Запуск тестов + +```bash +# Запуск всех тестов +pytest + +# Запуск с покрытием +pytest --cov=shop_api --cov-report=html + +# Запуск конкретного теста +pytest tests/test_shop_api.py::test_create_item +``` + +## Покрытие кода + +Тесты должны обеспечивать покрытие кода не менее 95%. + +## CI/CD + +Тесты автоматически запускаются в GitHub Actions при каждом push и pull request. \ No newline at end of file diff --git a/lecture5/tests/__init__.py b/lecture5/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lecture5/tests/conftest.py b/lecture5/tests/conftest.py new file mode 100644 index 00000000..b7e4ae8b --- /dev/null +++ b/lecture5/tests/conftest.py @@ -0,0 +1,18 @@ +import pytest +from typing import TypeVar + + +_TVal = TypeVar("_TVal") + +async def to_str_async(x: _TVal) -> str: + return str(x) + + +@pytest.fixture() +def int_list() -> list[int]: + return [1, 2, 3, 4, 5] + + +@pytest.fixture() +def int_list_empty() -> list[int]: + return [] diff --git a/lecture5/tests/requirements.txt b/lecture5/tests/requirements.txt new file mode 100644 index 00000000..40d3b062 --- /dev/null +++ b/lecture5/tests/requirements.txt @@ -0,0 +1,6 @@ +pytest>=7.0.0 +pytest-cov>=4.0.0 +fastapi>=0.100.0 +sqlalchemy>=2.0.0 +pytest-asyncio>=0.21.0 +httpx>=0.24.0 diff --git a/lecture5/tests/test_example_async.py b/lecture5/tests/test_example_async.py new file mode 100644 index 00000000..0a840ee7 --- /dev/null +++ b/lecture5/tests/test_example_async.py @@ -0,0 +1,35 @@ +import pytest + +from lecture5.example_async import map_async, slow_map_async +from lecture5.tests.conftest import to_str_async + + +@pytest.mark.asyncio # pytest-asyncio plugin +async def test_map_async(int_list) -> None: + result = await map_async(to_str_async, int_list) + assert result == ["1", "2", "3", "4", "5"] + + +@pytest.mark.asyncio # pytest-asyncio plugin +@pytest.mark.parametrize( + ("iterable_name", "expected_result"), + [ + ("int_list", ["1", "2", "3", "4", "5"]), + ("int_list_empty", []), + ], +) +async def test_map_async_parametrized( + request, # provides api for managing fixtures + iterable_name: str, + expected_result: list, +) -> None: + iterable = request.getfixturevalue(iterable_name) + result = await map_async(to_str_async, iterable) + assert result == expected_result + + +@pytest.mark.slow +@pytest.mark.asyncio # pytest-asyncio plugin +async def test_slow_map_async(int_list) -> None: + result = await slow_map_async(to_str_async, int_list) + assert result == ["1", "2", "3", "4", "5"] diff --git a/lecture5/tests/test_example_mock.py b/lecture5/tests/test_example_mock.py new file mode 100644 index 00000000..78387c79 --- /dev/null +++ b/lecture5/tests/test_example_mock.py @@ -0,0 +1,338 @@ +from http import HTTPStatus +from logging import getLogger +from typing import Any + +import pytest +import responses +from pytest_mock import MockerFixture + +from lecture5.example_register_user import ( + Entity, + Errors, + ExternalAuthAPI, + ExternalIdentity, + GoogleAuthAPI, + InternalIdentity, + PasswordManager, + RegisterUserExternal, + RegisterUserInternal, + Repository, + User, + UserService, + VKAuthAPI, +) + +logger = getLogger(__name__) + +ENCRYPTED_PASSWORD = "encrypted_password" + + +@pytest.fixture() +def user_data() -> dict[str, Any]: + return { + "name": "John Doe", + "age": 30, + "username": "john.doe", + "password": "testPassword_123", + "provider": "google", + "provider_uid": "external-uid", + } + + +@pytest.fixture() +def register_user_internal(user_data) -> RegisterUserInternal: + return RegisterUserInternal( + user_data["name"], + user_data["age"], + user_data["username"], + user_data["password"], + ) + + +@pytest.fixture() +def register_user_external(user_data) -> RegisterUserExternal: + return RegisterUserExternal(user_data["provider_uid"], user_data["provider"]) + + +@pytest.fixture() +def providers() -> dict[str, ExternalAuthAPI]: + return {} + + +@pytest.fixture() +def add_google_provider(providers: dict): + providers[GoogleAuthAPI.provider] = GoogleAuthAPI() + + +@pytest.fixture() +def add_vk_provider(providers: dict): + providers[VKAuthAPI.provider] = VKAuthAPI() + + +@pytest.fixture() +def mock_user_repository(mocker: MockerFixture) -> Repository[int, User]: + return mocker.MagicMock(spec=Repository[int, User]) + + +@pytest.fixture() +def mock_user_repository_insert_raises_connection_error(mock_user_repository) -> None: + mock_user_repository.insert.side_effect = Errors.DISCONNECTED.as_exc() + + +@pytest.fixture() +def mock_user_repository_insert_raises_duplicate_error(mock_user_repository) -> None: + mock_user_repository.insert.side_effect = Errors.DUPLICATE_USER.as_exc() + + +@pytest.fixture() +def mock_user_repository_insert_success(mocker: MockerFixture, mock_user_repository): + def _insert_side_effect(user: User) -> Entity[int, User]: + return Entity(0, user) + + mock_user_repository.insert.side_effect = _insert_side_effect + + +@pytest.fixture() +def mock_password_manager(mocker: MockerFixture) -> PasswordManager: + return mocker.MagicMock(spec=PasswordManager) + + +@pytest.fixture() +def mock_password_manager_is_valid_password_true(mock_password_manager): + mock_password_manager.is_password_valid.return_value = True + + +@pytest.fixture() +def mock_password_manager_is_valid_password_false(mock_password_manager): + mock_password_manager.is_password_valid.return_value = False + + +@pytest.fixture(autouse=True) +def mock_password_manager_encrypt_password(mock_password_manager): + mock_password_manager.encrypt_password.return_value = ENCRYPTED_PASSWORD + + +@pytest.fixture() +def mock_google_auth_api_get_user_http_error(): + responses.add( + responses.GET, + "http://google/auth", + status=HTTPStatus.INTERNAL_SERVER_ERROR, + ) + + +@pytest.fixture() +def mock_vk_auth_api_get_user_http_error(user_data: dict): + responses.add( + responses.GET, + "http://vk/auth/" + user_data["provider_uid"], + status=HTTPStatus.INTERNAL_SERVER_ERROR, + ) + + +@pytest.fixture() +def mock_google_auth_api_get_user(user_data): + responses.add( + responses.GET, + "http://google/auth", + json={"name": user_data["name"], "age": user_data["age"]}, + status=HTTPStatus.OK, + ) + + +@pytest.fixture() +def mock_vk_auth_api_get_user(user_data): + responses.add( + responses.GET, + "http://vk/auth/" + user_data["provider_uid"], + json={ + "info": { + "firstName": user_data["name"].split(" ")[0], + "lastName": user_data["name"].split(" ")[1], + "age": user_data["age"], + } + }, + status=HTTPStatus.OK, + ) + + +@pytest.mark.usefixtures("mock_password_manager_is_valid_password_false") +def test_register_user_internal_with_invalid_password( + mock_user_repository, + mock_password_manager, + providers, + register_user_internal, +): + # arrange + user_service = UserService(mock_user_repository, mock_password_manager, providers) + + # act + with pytest.raises(Exception) as exc_info: + user_service.register_user(register_user_internal) + + # assert + assert Errors.INVALID_PASSWORD.value in str(exc_info.value) + assert not mock_user_repository.insert.called + assert not mock_password_manager.encrypt_password.called + + +@pytest.mark.usefixtures( + "mock_password_manager_is_valid_password_true", + "mock_user_repository_insert_raises_duplicate_error", +) +def test_register_user_duplicate_error( + mock_user_repository: Repository[int, User], + mock_password_manager: PasswordManager, + providers: dict[str, ExternalAuthAPI], + register_user_internal: RegisterUserInternal, +): + user_service = UserService(mock_user_repository, mock_password_manager, providers) + + with pytest.raises(Exception) as exc_info: + user_service.register_user(register_user_internal) + + assert Errors.DUPLICATE_USER.value in str(exc_info.value) + + +@pytest.mark.usefixtures( + "mock_password_manager_is_valid_password_true", + "mock_user_repository_insert_raises_connection_error", +) +def test_register_user_connection_error( + mock_user_repository: Repository[int, User], + mock_password_manager: PasswordManager, + providers: dict[str, ExternalAuthAPI], + register_user_internal: RegisterUserInternal, +): + user_service = UserService(mock_user_repository, mock_password_manager, providers) + + with pytest.raises(Exception) as exc_info: + user_service.register_user(register_user_internal) + + assert Errors.DISCONNECTED.value in str(exc_info.value) + + +@pytest.mark.usefixtures( + "mock_password_manager_is_valid_password_true", + "mock_user_repository_insert_success", +) +def test_register_user_internal( + mock_user_repository: Repository[int, User], + mock_password_manager: PasswordManager, + providers: dict[str, ExternalAuthAPI], + register_user_internal: RegisterUserInternal, +): + user_service = UserService(mock_user_repository, mock_password_manager, providers) + entity = user_service.register_user(register_user_internal) + + assert entity.info.name == register_user_internal.name + assert entity.info.age == register_user_internal.age + assert len(entity.info.identities) == 1 + assert isinstance(entity.info.identities[0], InternalIdentity) + assert entity.info.identities[0].username == register_user_internal.username + assert entity.info.identities[0].password == ENCRYPTED_PASSWORD + + +def test_register_user_external_not_found_provider( + mock_user_repository: Repository[int, User], + mock_password_manager: PasswordManager, + providers: dict[str, ExternalAuthAPI], + register_user_external: RegisterUserExternal, +): + user_service = UserService(mock_user_repository, mock_password_manager, providers) + + with pytest.raises(Exception) as exc_info: + user_service.register_user(register_user_external) + + assert Errors.PROVIDER_NOT_FOUND.value in str(exc_info.value) + assert not mock_user_repository.insert.called + + +@responses.activate +@pytest.mark.usefixtures( + "add_google_provider", + "mock_google_auth_api_get_user_http_error", +) +def test_register_user_external_api_error_google( + mock_user_repository: Repository[int, User], + mock_password_manager: PasswordManager, + providers: dict[str, ExternalAuthAPI], + register_user_external: RegisterUserExternal, +): + user_service = UserService(mock_user_repository, mock_password_manager, providers) + + with pytest.raises(Exception) as exc_info: + user_service.register_user(register_user_external) + + assert Errors.API_ERROR.value in str(exc_info.value) + assert not mock_user_repository.insert.called + + +@responses.activate +@pytest.mark.usefixtures( + "add_google_provider", + "mock_google_auth_api_get_user", + "mock_user_repository_insert_success", +) +def test_register_user_external_google( + mock_user_repository: Repository[int, User], + mock_password_manager: PasswordManager, + providers: dict[str, ExternalAuthAPI], + register_user_external: RegisterUserExternal, + user_data, +): + user_service = UserService(mock_user_repository, mock_password_manager, providers) + entity = user_service.register_user(register_user_external) + + assert entity.info.name == user_data["name"] + assert entity.info.age == user_data["age"] + assert len(entity.info.identities) == 1 + assert isinstance(entity.info.identities[0], ExternalIdentity) + assert entity.info.identities[0].provider == GoogleAuthAPI.provider + assert entity.info.identities[0].uid == user_data["provider_uid"] + + +@responses.activate +@pytest.mark.usefixtures( + "add_vk_provider", + "mock_vk_auth_api_get_user_http_error", +) +def test_register_user_external_api_error_vk( + mock_user_repository: Repository[int, User], + mock_password_manager: PasswordManager, + providers: dict[str, ExternalAuthAPI], + register_user_external: RegisterUserExternal, +): + register_user_external.provider = "vk" + user_service = UserService(mock_user_repository, mock_password_manager, providers) + + with pytest.raises(Exception) as exc_info: + user_service.register_user(register_user_external) + + assert Errors.API_ERROR.value in str(exc_info.value) + assert not mock_user_repository.insert.called + + +@responses.activate +@pytest.mark.usefixtures( + "add_vk_provider", + "mock_vk_auth_api_get_user", + "mock_user_repository_insert_success", +) +def test_register_user_external_vk( + mock_user_repository: Repository[int, User], + mock_password_manager: PasswordManager, + providers: dict[str, ExternalAuthAPI], + register_user_external: RegisterUserExternal, + user_data, +): + register_user_external.provider = "vk" + user_service = UserService(mock_user_repository, mock_password_manager, providers) + entity = user_service.register_user(register_user_external) + + assert entity.info.name == user_data["name"] + assert entity.info.age == user_data["age"] + assert len(entity.info.identities) == 1 + assert isinstance(entity.info.identities[0], ExternalIdentity) + assert entity.info.identities[0].provider == VKAuthAPI.provider + assert entity.info.identities[0].uid == user_data["provider_uid"] diff --git a/lecture5/tests/test_example_parse_qa_parametrized.py b/lecture5/tests/test_example_parse_qa_parametrized.py new file mode 100644 index 00000000..af8ef5ce --- /dev/null +++ b/lecture5/tests/test_example_parse_qa_parametrized.py @@ -0,0 +1,27 @@ +from typing import Any + +import pytest + +from lecture5.example_parse_qs import parse_qs + + +@pytest.mark.xfail() +@pytest.mark.parametrize( + ("query_string", "expected_result"), + [ + ("name=John", {"name": "John"}), + ("name=John&age=30", {"name": "John", "age": "30"}), + ( + "name=John&age=30&city=New%20York", + {"name": "John", "age": "30", "city": "New York"}, + ), + ( + "name=John&age=30&city=New%20York&key=", + {"name": "John", "age": "30", "city": "New York", "key": ""}, + ), + ("name=John&name=Mary", {"name": ["John", "Mary"]}), + ], +) +def test_parse_qs_valid(query_string: str, expected_result: dict[str, Any]) -> None: + result = parse_qs(query_string) + assert result == expected_result diff --git a/lecture5/tests/test_example_parse_qs_non_parametrized.py b/lecture5/tests/test_example_parse_qs_non_parametrized.py new file mode 100644 index 00000000..a5cf1b3f --- /dev/null +++ b/lecture5/tests/test_example_parse_qs_non_parametrized.py @@ -0,0 +1,39 @@ +import pytest + +from example_parse_qs import parse_qs + + +def test_parse_qs_valid_1() -> None: + query_string = "name=John" # arrange + result = parse_qs(query_string) # act + assert result == {"name": "John"} # assert + + +def test_parse_qs_valid_2() -> None: + query_string = "name=John&age=30" + result = parse_qs(query_string) + assert result == {"name": "John", "age": "30"} + + +@pytest.mark.xfail() +def test_parse_qs_valid_3() -> None: + query_string = "name=John&age=30&city=New%20York" + result = parse_qs(query_string) + assert result == {"name": "John", "age": "30", "city": "New York"} + + +@pytest.mark.xfail() +def test_parse_qs_valid_4() -> None: + query_string = "name=John&age=30&city=New%20York&key=" + result = parse_qs(query_string) + assert result == {"name": "John", "age": "30", "city": "New York", "key": ""} + + +@pytest.mark.xfail() +def test_parse_qs_valid_5() -> None: + query_string = "name=John&name=Mary" + result = parse_qs(query_string) + assert result == {"name": ["John", "Mary"]} + + +# and many more tests diff --git a/lecture5/tests/test_shop_api.py b/lecture5/tests/test_shop_api.py new file mode 100644 index 00000000..36a59d0e --- /dev/null +++ b/lecture5/tests/test_shop_api.py @@ -0,0 +1,390 @@ +import pytest +import pytest_cov +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from shop_api.main import app, Base, ItemDB, CartDB, CartItemDB, get_db + +SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db" +engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) +TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +def override_get_db(): + try: + db = TestingSessionLocal() + yield db + finally: + db.close() + +app.dependency_overrides[get_db] = override_get_db + +@pytest.fixture(scope="module") +def setup_database(): + Base.metadata.create_all(bind=engine) + yield + Base.metadata.drop_all(bind=engine) + +@pytest.fixture +def client(setup_database): + return TestClient(app) + +@pytest.fixture +def sample_item(): + return {"name": "Test Item", "price": 100.0} + +@pytest.fixture +def sample_item_update(): + return {"name": "Updated Item", "price": 150.0, "deleted": False} + +@pytest.fixture +def sample_item_patch(): + return {"name": "Patched Item", "price": 200.0} + +def test_create_item(client, sample_item): + response = client.post("/item", json=sample_item) + assert response.status_code == 201 + data = response.json() + assert data["name"] == sample_item["name"] + assert data["price"] == sample_item["price"] + assert data["deleted"] == False + assert "id" in data + +def test_create_item_invalid_data(client): + response = client.post("/item", json={"name": "Test"}) + assert response.status_code == 422 + +def test_get_item(client, sample_item): + create_response = client.post("/item", json=sample_item) + item_id = create_response.json()["id"] + + response = client.get(f"/item/{item_id}") + assert response.status_code == 200 + data = response.json() + assert data["name"] == sample_item["name"] + assert data["price"] == sample_item["price"] + +def test_get_item_not_found(client): + response = client.get("/item/999") + assert response.status_code == 404 + +def test_get_item_deleted(client, sample_item): + create_response = client.post("/item", json=sample_item) + item_id = create_response.json()["id"] + + client.delete(f"/item/{item_id}") + + response = client.get(f"/item/{item_id}") + assert response.status_code == 404 + +def test_get_item_list(client, sample_item): + client.post("/item", json=sample_item) + client.post("/item", json={"name": "Item 2", "price": 200.0}) + + response = client.get("/item") + assert response.status_code == 200 + data = response.json() + assert len(data) == 2 + +def test_get_item_list_with_filters(client, sample_item): + client.post("/item", json=sample_item) + client.post("/item", json={"name": "Item 2", "price": 200.0}) + + response = client.get("/item?min_price=150") + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + assert data[0]["price"] == 200.0 + +def test_get_item_list_with_limit(client, sample_item): + client.post("/item", json=sample_item) + client.post("/item", json={"name": "Item 2", "price": 200.0}) + + response = client.get("/item?limit=1") + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + +def test_get_item_list_show_deleted(client, sample_item): + create_response = client.post("/item", json=sample_item) + item_id = create_response.json()["id"] + + client.delete(f"/item/{item_id}") + + response = client.get("/item?show_deleted=true") + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + assert data[0]["deleted"] == True + +def test_put_item(client, sample_item, sample_item_update): + create_response = client.post("/item", json=sample_item) + item_id = create_response.json()["id"] + + response = client.put(f"/item/{item_id}", json=sample_item_update) + assert response.status_code == 200 + data = response.json() + assert data["name"] == sample_item_update["name"] + assert data["price"] == sample_item_update["price"] + +def test_put_item_not_found(client, sample_item_update): + response = client.put("/item/999", json=sample_item_update) + assert response.status_code == 404 + +def test_patch_item(client, sample_item, sample_item_patch): + create_response = client.post("/item", json=sample_item) + item_id = create_response.json()["id"] + + response = client.patch(f"/item/{item_id}", json=sample_item_patch) + assert response.status_code == 200 + data = response.json() + assert data["name"] == sample_item_patch["name"] + assert data["price"] == sample_item_patch["price"] + +def test_patch_item_not_found(client, sample_item_patch): + response = client.patch("/item/999", json=sample_item_patch) + assert response.status_code == 404 + +def test_patch_item_deleted(client, sample_item, sample_item_patch): + create_response = client.post("/item", json=sample_item) + item_id = create_response.json()["id"] + + client.delete(f"/item/{item_id}") + + response = client.patch(f"/item/{item_id}", json=sample_item_patch) + assert response.status_code == 304 + +def test_delete_item(client, sample_item): + create_response = client.post("/item", json=sample_item) + item_id = create_response.json()["id"] + + response = client.delete(f"/item/{item_id}") + assert response.status_code == 200 + + get_response = client.get(f"/item/{item_id}") + assert get_response.status_code == 404 + +def test_delete_item_not_found(client): + response = client.delete("/item/999") + assert response.status_code == 404 + +def test_create_cart(client): + response = client.post("/cart") + assert response.status_code == 201 + data = response.json() + assert data["id"] is not None + assert data["items"] == [] + assert data["price"] == 0.0 + +def test_get_cart(client): + create_response = client.post("/cart") + cart_id = create_response.json()["id"] + + response = client.get(f"/cart/{cart_id}") + assert response.status_code == 200 + data = response.json() + assert data["id"] == cart_id + +def test_get_cart_not_found(client): + response = client.get("/cart/999") + assert response.status_code == 404 + +def test_get_cart_list(client): + client.post("/cart") + client.post("/cart") + + response = client.get("/cart") + assert response.status_code == 200 + data = response.json() + assert len(data) == 2 + +def test_get_cart_list_with_filters(client): + cart1_response = client.post("/cart") + cart1_id = cart1_response.json()["id"] + + item_response = client.post("/item", json={"name": "Test Item", "price": 100.0}) + item_id = item_response.json()["id"] + + client.post(f"/cart/{cart1_id}/add/{item_id}") + + response = client.get("/cart?min_price=50") + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + +def test_add_item_to_cart(client, sample_item): + cart_response = client.post("/cart") + cart_id = cart_response.json()["id"] + + item_response = client.post("/item", json=sample_item) + item_id = item_response.json()["id"] + + response = client.post(f"/cart/{cart_id}/add/{item_id}") + assert response.status_code == 200 + data = response.json() + assert len(data["items"]) == 1 + assert data["items"][0]["id"] == item_id + assert data["items"][0]["quantity"] == 1 + assert data["price"] == sample_item["price"] + +def test_add_item_to_cart_multiple_times(client, sample_item): + cart_response = client.post("/cart") + cart_id = cart_response.json()["id"] + + item_response = client.post("/item", json=sample_item) + item_id = item_response.json()["id"] + + client.post(f"/cart/{cart_id}/add/{item_id}") + response = client.post(f"/cart/{cart_id}/add/{item_id}") + + assert response.status_code == 200 + data = response.json() + assert len(data["items"]) == 1 + assert data["items"][0]["quantity"] == 2 + assert data["price"] == sample_item["price"] * 2 + +def test_add_item_to_cart_cart_not_found(client, sample_item): + item_response = client.post("/item", json=sample_item) + item_id = item_response.json()["id"] + + response = client.post(f"/cart/999/add/{item_id}") + assert response.status_code == 404 + +def test_add_item_to_cart_item_not_found(client): + cart_response = client.post("/cart") + cart_id = cart_response.json()["id"] + + response = client.post(f"/cart/{cart_id}/add/999") + assert response.status_code == 404 + +def test_add_deleted_item_to_cart(client, sample_item): + cart_response = client.post("/cart") + cart_id = cart_response.json()["id"] + + item_response = client.post("/item", json=sample_item) + item_id = item_response.json()["id"] + + client.delete(f"/item/{item_id}") + + response = client.post(f"/cart/{cart_id}/add/{item_id}") + assert response.status_code == 404 + +def test_cart_price_calculation(client, sample_item): + cart_response = client.post("/cart") + cart_id = cart_response.json()["id"] + + item1_response = client.post("/item", json=sample_item) + item1_id = item1_response.json()["id"] + + item2_response = client.post("/item", json={"name": "Item 2", "price": 200.0}) + item2_id = item2_response.json()["id"] + + client.post(f"/cart/{cart_id}/add/{item1_id}") + client.post(f"/cart/{cart_id}/add/{item2_id}") + + response = client.get(f"/cart/{cart_id}") + assert response.status_code == 200 + data = response.json() + assert data["price"] == 300.0 + +def test_cart_item_availability(client, sample_item): + cart_response = client.post("/cart") + cart_id = cart_response.json()["id"] + + item_response = client.post("/item", json=sample_item) + item_id = item_response.json()["id"] + + client.post(f"/cart/{cart_id}/add/{item_id}") + + client.delete(f"/item/{item_id}") + + response = client.get(f"/cart/{cart_id}") + assert response.status_code == 200 + data = response.json() + assert len(data["items"]) == 1 + assert data["items"][0]["available"] == False + +def test_cart_list_quantity_filters(client, sample_item): + cart1_response = client.post("/cart") + cart1_id = cart1_response.json()["id"] + + cart2_response = client.post("/cart") + cart2_id = cart2_response.json()["id"] + + item_response = client.post("/item", json=sample_item) + item_id = item_response.json()["id"] + + client.post(f"/cart/{cart1_id}/add/{item_id}") + client.post(f"/cart/{cart1_id}/add/{item_id}") + + client.post(f"/cart/{cart2_id}/add/{item_id}") + + response = client.get("/cart?min_quantity=2") + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + +def test_cart_list_max_quantity_filters(client, sample_item): + cart1_response = client.post("/cart") + cart1_id = cart1_response.json()["id"] + + cart2_response = client.post("/cart") + cart2_id = cart2_response.json()["id"] + + item_response = client.post("/item", json=sample_item) + item_id = item_response.json()["id"] + + client.post(f"/cart/{cart1_id}/add/{item_id}") + client.post(f"/cart/{cart1_id}/add/{item_id}") + + client.post(f"/cart/{cart2_id}/add/{item_id}") + + response = client.get("/cart?max_quantity=1") + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + +def test_item_validation_negative_price(client): + response = client.post("/item", json={"name": "Test", "price": -100.0}) + assert response.status_code == 422 + +def test_item_validation_empty_name(client): + response = client.post("/item", json={"name": "", "price": 100.0}) + assert response.status_code == 422 + +def test_item_validation_missing_fields(client): + response = client.post("/item", json={"name": "Test"}) + assert response.status_code == 422 + +def test_cart_offset_limit(client): + for i in range(5): + client.post("/cart") + + response = client.get("/cart?offset=2&limit=2") + assert response.status_code == 200 + data = response.json() + assert len(data) == 2 + +def test_item_offset_limit(client, sample_item): + for i in range(5): + client.post("/item", json={"name": f"Item {i}", "price": 100.0 + i}) + + response = client.get("/item?offset=2&limit=2") + assert response.status_code == 200 + data = response.json() + assert len(data) == 2 + +def test_cart_item_quantity_increase(client, sample_item): + cart_response = client.post("/cart") + cart_id = cart_response.json()["id"] + + item_response = client.post("/item", json=sample_item) + item_id = item_response.json()["id"] + + for _ in range(3): + client.post(f"/cart/{cart_id}/add/{item_id}") + + response = client.get(f"/cart/{cart_id}") + assert response.status_code == 200 + data = response.json() + assert data["items"][0]["quantity"] == 3 + assert data["price"] == sample_item["price"] * 3