Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions hw1/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,57 @@
from typing import Any, Awaitable, Callable

import json
import math
from urllib.parse import parse_qs
from http import HTTPStatus

async def _read_request_body(reader: Callable[[], Awaitable[dict[str, Any]]]) -> bytes:
buffer = bytearray()
continue_reading = True
while continue_reading:
event = await reader()
if event.get("type") != "http.request":
break
chunk = event.get("body", b"")
if chunk:
buffer.extend(chunk)
continue_reading = bool(event.get("more_body", False))
return bytes(buffer)


async def _respond_json(writer: Callable[[dict[str, Any]], Awaitable[None]], status_code: int, data: dict,) -> None:
body_bytes = json.dumps(data).encode("utf-8")
headers = [
(b"content-type", b"application/json"),
(b"content-length", str(len(body_bytes)).encode("utf-8")),
]
await writer({"type": "http.response.start", "status": status_code, "headers": headers})
await writer({"type": "http.response.body", "body": body_bytes})


def _decode_query_params(raw_query: bytes) -> dict[str, list[str]]:
query_str = raw_query.decode("utf-8")
return parse_qs(query_str, keep_blank_values=True)


def _safe_int_cast(text: str):
if text == "":
return None, "empty"
try:
return int(text), None
except Exception:
return None, "bad"


def _calc_factorial(num: int) -> int:
return math.factorial(num)


def _calc_fibonacci(num: int) -> int:
first, second = 0, 1
for _ in range(num):
first, second = second, first + second
return first

async def application(
scope: dict[str, Any],
Expand All @@ -13,6 +65,84 @@ async def application(
send: Корутина для отправки сообщений клиенту
"""
# TODO: Ваша реализация здесь

if scope.get("type") != "http":
await _respond_json(send, HTTPStatus.NOT_FOUND, {"detail": "Не найдено"})
return

http_method = scope.get("method", "")
url_path = scope.get("path", "")

if http_method != "GET":
await _respond_json(send, HTTPStatus.NOT_FOUND, {"detail": "Не найдено"})
return


if url_path == "/factorial":
params = _decode_query_params(scope.get("query_string", b""))
if "n" not in params:
await _respond_json(send, HTTPStatus.UNPROCESSABLE_ENTITY, {"detail": "Отсутствует n"})
return
n_raw = params["n"][0]
n_value, err = _safe_int_cast(n_raw)
if err is not None:
await _respond_json(send, HTTPStatus.UNPROCESSABLE_ENTITY, {"detail": "Параметр n должен быть целым числом"})
return
if n_value < 0:
await _respond_json(send, HTTPStatus.BAD_REQUEST, {"detail": "n должен быть неотрицательным"})
return

try:
result_val = _calc_factorial(n_value)
except (OverflowError, ValueError) as exc:
await _respond_json(send, HTTPStatus.BAD_REQUEST, {"detail": str(exc)})
return
await _respond_json(send, HTTPStatus.OK, {"result": result_val})
return


if url_path.startswith("/fibonacci/"):
n_str = url_path[len("/fibonacci/") :]
n_value, err = _safe_int_cast(n_str)
if err is not None:
await _respond_json(send, HTTPStatus.UNPROCESSABLE_ENTITY, {"detail": "Параметр пути должен быть целым числом"})
return
if n_value < 0:
await _respond_json(send, HTTPStatus.BAD_REQUEST, {"detail": "n должен быть неотрицательным"})
return
fib_val = _calc_fibonacci(n_value)
await _respond_json(send, HTTPStatus.OK, {"result": fib_val})
return


if url_path == "/mean":
body_content = await _read_request_body(receive)
if not body_content:
await _respond_json(send, HTTPStatus.UNPROCESSABLE_ENTITY, {"detail": "Отсутствует тело запроса в формате JSON"})
return
try:
parsed_data = json.loads(body_content.decode("utf-8"))
except Exception:
await _respond_json(send, HTTPStatus.UNPROCESSABLE_ENTITY, {"detail": "Некорректный JSON"})
return
if not isinstance(parsed_data, list):
await _respond_json(send, HTTPStatus.UNPROCESSABLE_ENTITY, {"detail": "JSON должен быть массивом чисел"})
return
if len(parsed_data) == 0:
await _respond_json(send, HTTPStatus.BAD_REQUEST, {"detail": "Массив не должен быть пустым"})
return
numbers = []
for element in parsed_data:
if isinstance(element, (int, float)):
numbers.append(float(element))
else:
await _respond_json(send, HTTPStatus.UNPROCESSABLE_ENTITY, {"detail": "Элементы массива должны быть числами"})
return
avg_val = sum(numbers) / len(numbers)
await _respond_json(send, HTTPStatus.OK, {"result": avg_val})
return

await _respond_json(send, HTTPStatus.NOT_FOUND, {"detail": "Не найдено"})

if __name__ == "__main__":
import uvicorn
Expand Down
24 changes: 24 additions & 0 deletions hw2/hw/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[run]
source = shop_api
omit =
*/tests/*
*/test_*.py
*/__pycache__/*
*/venv/*
*/.venv/*

[report]
precision = 2
show_missing = True
skip_covered = False
exclude_lines =
pragma: no cover
def __repr__
raise AssertionError
raise NotImplementedError
if __name__ == .__main__.:
if TYPE_CHECKING:
@abstractmethod

[html]
directory = coverage_html
17 changes: 17 additions & 0 deletions hw2/hw/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[pytest]
testpaths = .
python_files = test_*.py
python_classes = Test*
python_functions = test_*

addopts =
-v
--strict-markers
--cov=shop_api
--cov-report=term-missing
--cov-report=html
--cov-fail-under=95

markers =
slow: mark test as slow running
integration: integration tests
2 changes: 2 additions & 0 deletions hw2/hw/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ pytest>=7.4.0
pytest-asyncio>=0.21.0
httpx>=0.27.2
Faker>=37.8.0

pytest-cov>=6.0.0
175 changes: 173 additions & 2 deletions hw2/hw/shop_api/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,174 @@
from fastapi import FastAPI
from __future__ import annotations
from fastapi import FastAPI, HTTPException, Query, Response
from pydantic import BaseModel, PositiveInt, NonNegativeInt, Field
from http import HTTPStatus

app = FastAPI(title="Shop API")
app = FastAPI(title="Online Store API")

items: dict[int, dict] = {}
carts: dict[int, dict] = {}
next_item_id = 1
next_cart_id = 1

class ItemCreate(BaseModel):
name: str
price: float = Field(ge=0.0)

class ItemResponse(ItemCreate):
id: int
deleted: bool = False

class ItemPatch(BaseModel):
name: str | None = None
price: float | None = Field(None, ge=0.0)

class Config:
extra = "forbid"

class CartItem(BaseModel):
id: int
name: str
quantity: int
available: bool

class CartResponse(BaseModel):
id: int
items: list[CartItem]
total_price: float

@app.post("/item", status_code=HTTPStatus.CREATED)
def create_item(item: ItemCreate, response: Response):
global next_item_id
item_id = next_item_id
next_item_id += 1

data = item.dict() | {"id": item_id, "deleted": False}
items[item_id] = data

response.headers["location"] = f"/item/{item_id}"
return data

@app.get("/item/{id}")
def get_item(id: int):
item = items.get(id)
if not item or item["deleted"]:
raise HTTPException(HTTPStatus.NOT_FOUND)
return item

@app.get("/item")
def list_items(
offset: NonNegativeInt = 0,
limit: PositiveInt = 10,
min_price: float | None = Query(None, ge=0.0),
max_price: float | None = Query(None, ge=0.0),
show_deleted: bool = False,
):
result = list(items.values())
if not show_deleted:
result = [i for i in result if not i["deleted"]]
if min_price is not None:
result = [i for i in result if i["price"] >= min_price]
if max_price is not None:
result = [i for i in result if i["price"] <= max_price]

return result[offset: offset + limit]

@app.put("/item/{id}")
def put_item(id: int, body: ItemCreate):
if id not in items or items[id]["deleted"]:
raise HTTPException(HTTPStatus.NOT_MODIFIED)
items[id].update(body.dict())
return items[id]

@app.patch("/item/{id}")
def patch_item(id: int, body: ItemPatch):
if id not in items or items[id]["deleted"]:
raise HTTPException(HTTPStatus.NOT_MODIFIED)

if "deleted" in body.model_dump():
raise HTTPException(HTTPStatus.UNPROCESSABLE_ENTITY)

for k, v in body.dict(exclude_unset=True).items():
items[id][k] = v
return items[id]

@app.delete("/item/{id}")
def delete_item(id: int):
if id in items:
items[id]["deleted"] = True
return {"ok": True}

@app.post("/cart", status_code=HTTPStatus.CREATED)
def create_cart(response: Response):
global next_cart_id
cid = next_cart_id
next_cart_id += 1
carts[cid] = {"id": cid, "items": []}
response.headers["location"] = f"/cart/{cid}"
return {"id": cid}

@app.get("/cart/{id}")
def get_cart(id: int):
cart = carts.get(id)
if not cart:
raise HTTPException(HTTPStatus.NOT_FOUND)

result_items = []
total = 0.0
for entry in cart["items"]:
item = items.get(entry["id"])
available = item is not None and not item["deleted"]
price = item["price"] if available else 0
total += price * entry["quantity"]
result_items.append({
"id": entry["id"],
"name": entry["name"],
"quantity": entry["quantity"],
"available": available,
})

return {"id": id, "items": result_items, "total_price": total}

@app.get("/cart")
def list_carts(
offset: NonNegativeInt = 0,
limit: PositiveInt = 10,
min_price: float | None = Query(None, ge=0.0),
max_price: float | None = Query(None, ge=0.0),
min_quantity: int | None = Query(None, ge=0),
max_quantity: int | None = Query(None, ge=0),
):
carts_list = []
for cart in carts.values():
data = get_cart(cart["id"])
total_quantity = sum(i["quantity"] for i in data["items"])
if min_price is not None and data["total_price"] < min_price:
continue
if max_price is not None and data["total_price"] > max_price:
continue
if min_quantity is not None and total_quantity < min_quantity:
continue
if max_quantity is not None and total_quantity > max_quantity:
continue
carts_list.append(data)

return carts_list[offset: offset + limit]

@app.post("/cart/{cart_id}/add/{item_id}")
def add_item_to_cart(cart_id: int, item_id: int):
if cart_id not in carts:
raise HTTPException(HTTPStatus.NOT_FOUND)
if item_id not in items:
raise HTTPException(HTTPStatus.NOT_FOUND)

cart = carts[cart_id]
for entry in cart["items"]:
if entry["id"] == item_id:
entry["quantity"] += 1
break
else:
cart["items"].append(
{"id": item_id, "name": items[item_id]["name"], "quantity": 1}
)

return {"id": cart_id, "items": cart["items"]}
Loading