From 38b4b14cbef3638ad7db5b83cea45313d5086194 Mon Sep 17 00:00:00 2001 From: Galyautdinov Askar Date: Tue, 23 Sep 2025 12:44:34 +0300 Subject: [PATCH 1/3] test hw1 --- hw1/app.py | 123 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 115 insertions(+), 8 deletions(-) diff --git a/hw1/app.py b/hw1/app.py index 6107b870..712603b6 100644 --- a/hw1/app.py +++ b/hw1/app.py @@ -1,18 +1,125 @@ from typing import Any, Awaitable, Callable - +import json +import math +from urllib.parse import parse_qs async def application( scope: dict[str, Any], receive: Callable[[], Awaitable[dict[str, Any]]], send: Callable[[dict[str, Any]], Awaitable[None]], ): - """ - Args: - scope: Словарь с информацией о запросе - receive: Корутина для получения сообщений от клиента - send: Корутина для отправки сообщений клиенту - """ - # TODO: Ваша реализация здесь + + + async def send_json(status: int, payload: dict[str, Any]) -> None: + await send( + { + "type": "http.response.start", + "status": status, + "headers": [(b"content-type", b"application/json; charset=utf-8")], + } + ) + await send({"type": "http.response.body", "body": json.dumps(payload).encode()}) + + path: str = scope.get("path", "") + method: str = scope.get("method", "GET") + + # Разрешаем только GET на известных ручках + if method != "GET": + await send_json(404, {"detail": "Not Found"}) + return + + #/factorial?n=INT + if path == "/factorial": + # Валидация query + qs = parse_qs(scope.get("query_string", b"").decode()) + if "n" not in qs or len(qs["n"]) == 0 or qs["n"][0].strip() == "": + await send_json(422, {"detail": "Query parameter 'n' is required"}) + return + try: + n = int(qs["n"][0]) + except Exception: + await send_json(422, {"detail": "Parameter 'n' must be integer"}) + return + if n < 0: + await send_json(400, {"detail": "n must be >= 0"}) + return + + # Вычисление факториала + result = 1 + for i in range(2, n + 1): + result *= i + await send_json(200, {"result": result}) + return + + #/fibonacci/ + if path.startswith("/fibonacci"): + parts = path.split("/") + if len(parts) < 3 or parts[2] == "": + await send_json(422, {"detail": "Path parameter 'n' is required"}) + return + try: + n = int(parts[2]) + except Exception: + await send_json(422, {"detail": "Path parameter 'n' must be integer"}) + return + if n < 0: + await send_json(400, {"detail": "n must be >= 0"}) + return + + # Вычисление числа Фибоначчи + if n == 0: + fib = 0 + else: + a, b = 0, 1 + for _ in range(1, n): + a, b = b, a + b + fib = b + await send_json(200, {"result": fib}) + return + + # /mean (GET с JSON-массивом чисел в теле) + if path == "/mean": + # Считываем тело (может приходить частями) + body = b"" + more = True + while more: + message = await receive() + if message.get("type") == "http.request": + body += message.get("body", b"") + more = message.get("more_body", False) + else: + more = False + + if not body: + await send_json(422, {"detail": "JSON body is required"}) + return + try: + data = json.loads(body.decode() or "null") + except Exception: + await send_json(422, {"detail": "Body must be valid JSON"}) + return + if not isinstance(data, list): + await send_json(422, {"detail": "Body must be a JSON array"}) + return + if len(data) == 0: + await send_json(400, {"detail": "Array must be non-empty"}) + return + # Валидация элементов + nums: list[float] = [] + for x in data: + if isinstance(x, (int, float)): + nums.append(float(x)) + else: + await send_json(422, {"detail": "Array must contain only numbers"}) + return + + await send_json(200, {"result": sum(nums) / len(nums)}) + return + + # Неизвестные пути + await send_json(404, {"detail": "Not Found"}) + + if __name__ == "__main__": import uvicorn From ca63ac857207f7737f47547cfd1a416cf7eb2842 Mon Sep 17 00:00:00 2001 From: Galyautdinov Askar Date: Wed, 24 Sep 2025 21:04:25 +0300 Subject: [PATCH 2/3] Implement ASGI application with fibonacci, factorial, and mean endpoints --- hw1/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hw1/app.py b/hw1/app.py index 712603b6..1a2ef60e 100644 --- a/hw1/app.py +++ b/hw1/app.py @@ -23,7 +23,7 @@ async def send_json(status: int, payload: dict[str, Any]) -> None: path: str = scope.get("path", "") method: str = scope.get("method", "GET") - # Разрешаем только GET на известных ручках + #Разрешаем только GET на известных ручках if method != "GET": await send_json(404, {"detail": "Not Found"}) return From 29ab994a2d9d4ee90b3f18f16357eec6d0e39b5a Mon Sep 17 00:00:00 2001 From: Galyautdinov Askar Date: Sun, 5 Oct 2025 18:26:46 +0300 Subject: [PATCH 3/3] hw2 --- .gitignore | 3 +- hw2/hw/shop_api/main.py | 346 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 347 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 852216e6..480f08f0 100644 --- a/.gitignore +++ b/.gitignore @@ -129,6 +129,7 @@ dmypy.json *.swp *.swo *~ - +.claude/ # macOS .DS_Store + diff --git a/hw2/hw/shop_api/main.py b/hw2/hw/shop_api/main.py index f60a8c60..863fa3d7 100644 --- a/hw2/hw/shop_api/main.py +++ b/hw2/hw/shop_api/main.py @@ -1,3 +1,347 @@ -from fastapi import FastAPI +from typing import Annotated +from http import HTTPStatus +import random +import string + +from fastapi import FastAPI, HTTPException, Response, Query, WebSocket, WebSocketDisconnect +from pydantic import BaseModel, Field, ConfigDict app = FastAPI(title="Shop API") + + +# Models +class ItemCreateRequest(BaseModel): + name: str + price: float = Field(ge=0) + + +class ItemUpdateRequest(BaseModel): + name: str + price: float = Field(ge=0) + + +class ItemPatchRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + name: str | None = None + price: float | None = Field(default=None, ge=0) + + +class Item(BaseModel): + id: int + name: str + price: float + deleted: bool = False + + +class CartItem(BaseModel): + id: int + name: str + quantity: int + available: bool + + +class Cart(BaseModel): + id: int + items: list[CartItem] + price: float + + +class CartCreateResponse(BaseModel): + id: int + + +# Storage +class Storage: + def __init__(self): + self.items: dict[int, Item] = {} + self.carts: dict[int, dict] = {} + self.item_id_counter = 0 + self.cart_id_counter = 0 + + def create_item(self, name: str, price: float) -> Item: + self.item_id_counter += 1 + item = Item(id=self.item_id_counter, name=name, price=price, deleted=False) + self.items[item.id] = item + return item + + def get_item(self, item_id: int) -> Item | None: + return self.items.get(item_id) + + def get_items( + self, + offset: int = 0, + limit: int = 10, + min_price: float | None = None, + max_price: float | None = None, + show_deleted: bool = False, + ) -> list[Item]: + filtered = [] + for item in self.items.values(): + if not show_deleted and item.deleted: + continue + if min_price is not None and item.price < min_price: + continue + if max_price is not None and item.price > max_price: + continue + filtered.append(item) + + return filtered[offset:offset + limit] + + def update_item(self, item_id: int, name: str, price: float) -> Item | None: + if item_id not in self.items: + return None + self.items[item_id].name = name + self.items[item_id].price = price + return self.items[item_id] + + def patch_item(self, item_id: int, name: str | None = None, price: float | None = None) -> Item | None: + if item_id not in self.items: + return None + item = self.items[item_id] + if item.deleted: + return None + if name is not None: + item.name = name + if price is not None: + item.price = price + return item + + def delete_item(self, item_id: int) -> bool: + if item_id in self.items: + self.items[item_id].deleted = True + return True + return False + + def create_cart(self) -> int: + self.cart_id_counter += 1 + cart_id = self.cart_id_counter + self.carts[cart_id] = {} + return cart_id + + def get_cart(self, cart_id: int) -> Cart | None: + if cart_id not in self.carts: + return None + + cart_items = [] + total_price = 0.0 + + for item_id, quantity in self.carts[cart_id].items(): + item = self.items.get(item_id) + if item: + available = not item.deleted + cart_items.append( + CartItem( + id=item.id, + name=item.name, + quantity=quantity, + available=available, + ) + ) + if available: + total_price += item.price * quantity + + return Cart(id=cart_id, items=cart_items, price=total_price) + + def get_carts( + self, + offset: int = 0, + limit: int = 10, + min_price: float | None = None, + max_price: float | None = None, + min_quantity: int | None = None, + max_quantity: int | None = None, + ) -> list[Cart]: + filtered = [] + + for cart_id in self.carts.keys(): + cart = self.get_cart(cart_id) + if cart is None: + continue + + if min_price is not None and cart.price < min_price: + continue + if max_price is not None and cart.price > max_price: + continue + + total_quantity = sum(item.quantity for item in cart.items) + if min_quantity is not None and total_quantity < min_quantity: + continue + if max_quantity is not None and total_quantity > max_quantity: + continue + + filtered.append(cart) + + return filtered[offset:offset + limit] + + def add_item_to_cart(self, cart_id: int, item_id: int) -> bool: + if cart_id not in self.carts: + return False + if item_id not in self.items: + return False + + if item_id in self.carts[cart_id]: + self.carts[cart_id][item_id] += 1 + else: + self.carts[cart_id][item_id] = 1 + + return True + + +storage = Storage() + + +# Chat management +class ChatManager: + def __init__(self): + # Dictionary: chat_name -> list of (websocket, username) tuples + self.active_connections: dict[str, list[tuple[WebSocket, str]]] = {} + + def generate_username(self) -> str: + """Generate a random username""" + return ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + + async def connect(self, websocket: WebSocket, chat_name: str) -> str: + """Connect a new user to a chat room""" + await websocket.accept() + username = self.generate_username() + + if chat_name not in self.active_connections: + self.active_connections[chat_name] = [] + + self.active_connections[chat_name].append((websocket, username)) + return username + + def disconnect(self, websocket: WebSocket, chat_name: str): + """Disconnect a user from a chat room""" + if chat_name in self.active_connections: + self.active_connections[chat_name] = [ + (ws, user) for ws, user in self.active_connections[chat_name] + if ws != websocket + ] + # Clean up empty chat rooms + if not self.active_connections[chat_name]: + del self.active_connections[chat_name] + + async def broadcast(self, message: str, chat_name: str, sender_username: str): + """Broadcast a message to all users in a chat room""" + if chat_name not in self.active_connections: + return + + formatted_message = f"{sender_username} :: {message}" + + # Send to all connections in this chat + for websocket, username in self.active_connections[chat_name]: + try: + await websocket.send_text(formatted_message) + except: + # If sending fails, we'll handle it in the disconnect + pass + + +chat_manager = ChatManager() + + +# Item endpoints +@app.post("/item", status_code=HTTPStatus.CREATED) +def create_item(item: ItemCreateRequest) -> Item: + return storage.create_item(item.name, item.price) + + +@app.get("/item/{id}") +def get_item(id: int) -> Item: + item = storage.get_item(id) + if item is None or item.deleted: + raise HTTPException(status_code=HTTPStatus.NOT_FOUND) + return item + + +@app.get("/item") +def get_items( + offset: Annotated[int, Query(ge=0)] = 0, + limit: Annotated[int, Query(gt=0)] = 10, + min_price: Annotated[float | None, Query(ge=0)] = None, + max_price: Annotated[float | None, Query(ge=0)] = None, + show_deleted: bool = False, +) -> list[Item]: + return storage.get_items(offset, limit, min_price, max_price, show_deleted) + + +@app.put("/item/{id}") +def update_item(id: int, item: ItemUpdateRequest) -> Item: + updated = storage.update_item(id, item.name, item.price) + if updated is None: + raise HTTPException(status_code=HTTPStatus.NOT_FOUND) + return updated + + +@app.patch("/item/{id}") +def patch_item(id: int, item: ItemPatchRequest) -> Item: + patched = storage.patch_item(id, item.name, item.price) + if patched is None: + stored_item = storage.get_item(id) + if stored_item and stored_item.deleted: + raise HTTPException(status_code=HTTPStatus.NOT_MODIFIED) + raise HTTPException(status_code=HTTPStatus.NOT_FOUND) + return patched + + +@app.delete("/item/{id}") +def delete_item(id: int) -> Response: + storage.delete_item(id) + return Response(status_code=HTTPStatus.OK) + + +# Cart endpoints +@app.post("/cart", status_code=HTTPStatus.CREATED) +def create_cart(response: Response) -> CartCreateResponse: + cart_id = storage.create_cart() + response.headers["location"] = f"/cart/{cart_id}" + return CartCreateResponse(id=cart_id) + + +@app.get("/cart/{id}") +def get_cart(id: int) -> Cart: + cart = storage.get_cart(id) + if cart is None: + raise HTTPException(status_code=HTTPStatus.NOT_FOUND) + return cart + + +@app.get("/cart") +def get_carts( + offset: Annotated[int, Query(ge=0)] = 0, + limit: Annotated[int, Query(gt=0)] = 10, + min_price: Annotated[float | None, Query(ge=0)] = None, + max_price: Annotated[float | None, Query(ge=0)] = None, + min_quantity: Annotated[int | None, Query(ge=0)] = None, + max_quantity: Annotated[int | None, Query(ge=0)] = None, +) -> list[Cart]: + return storage.get_carts(offset, limit, min_price, max_price, min_quantity, max_quantity) + + +@app.post("/cart/{cart_id}/add/{item_id}") +def add_item_to_cart(cart_id: int, item_id: int) -> Cart: + success = storage.add_item_to_cart(cart_id, item_id) + if not success: + raise HTTPException(status_code=HTTPStatus.NOT_FOUND) + + cart = storage.get_cart(cart_id) + if cart is None: + raise HTTPException(status_code=HTTPStatus.NOT_FOUND) + + return cart + + +# WebSocket chat endpoint +@app.websocket("/chat/{chat_name}") +async def websocket_chat(websocket: WebSocket, chat_name: str): + username = await chat_manager.connect(websocket, chat_name) + try: + while True: + # Receive message from client + message = await websocket.receive_text() + # Broadcast to all users in the chat + await chat_manager.broadcast(message, chat_name, username) + except WebSocketDisconnect: + chat_manager.disconnect(websocket, chat_name)