diff --git a/.github/workflows/hw5-tests.yml b/.github/workflows/hw5-tests.yml new file mode 100644 index 00000000..8cf9731b --- /dev/null +++ b/.github/workflows/hw5-tests.yml @@ -0,0 +1,39 @@ +name: "HW5 Tests" + +# Запускаем тесты при изменении файлов в hw2/hw/ +on: + pull_request: + branches: [ main ] + paths: [ 'hw2/hw/**' ] + push: + branches: [ main ] + paths: [ 'hw2/hw/**' ] + +jobs: + test-hw2: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.12", "3.13"] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + working-directory: hw2/hw + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Tests coverage report + working-directory: hw2/hw + env: + PYTHONPATH: ${{ github.workspace }}/hw2/hw + run: | + pytest --cov diff --git a/hw1/app.py b/hw1/app.py index 6107b870..340e0012 100644 --- a/hw1/app.py +++ b/hw1/app.py @@ -1,10 +1,17 @@ -from typing import Any, Awaitable, Callable +from http import HTTPStatus +from typing import ( + Any, + Awaitable, + Callable, +) +import json as jsonlib +from urllib.parse import parse_qs async def application( - scope: dict[str, Any], - receive: Callable[[], Awaitable[dict[str, Any]]], - send: Callable[[dict[str, Any]], Awaitable[None]], + scope: dict[str, Any], + receive: Callable[[], Awaitable[dict[str, Any]]], + send: Callable[[dict[str, Any]], Awaitable[None]], ): """ Args: @@ -12,8 +19,120 @@ async def application( receive: Корутина для получения сообщений от клиента send: Корутина для отправки сообщений клиенту """ - # TODO: Ваша реализация здесь + # assert scope['type'] == 'http' + method: str = scope.get("method", "GET") + path: str = scope.get("path", "/") + query_string: bytes = scope.get("query_string", b"") + + async def send_json(status: HTTPStatus, payload: dict[str, Any] | None = None) -> None: + await send({ + "type": "http.response.start", + "status": int(status), + "headers": [ + [b"content-type", b"application/json"], + ], + }) + await send({ + "type": "http.response.body", + "body": jsonlib.dumps(payload or {}).encode("utf-8"), + }) + + # Route handling + if method == "GET" and path == "/factorial": + params = parse_qs(query_string.decode("utf-8")) if query_string else {} + raw_n = params.get("n", [None])[0] + if raw_n is None or raw_n == "": + await send_json(HTTPStatus.UNPROCESSABLE_ENTITY) + return + try: + n = int(raw_n) + except (TypeError, ValueError): + await send_json(HTTPStatus.UNPROCESSABLE_ENTITY) + return + if n < 0: + await send_json(HTTPStatus.BAD_REQUEST) + return + result = 1 + for i in range(2, n + 1): + result *= i + await send_json(HTTPStatus.OK, {"result": result}) + return + + if method == "GET" and path.startswith("/fibonacci"): + parts = path.split("/") + raw_n = parts[2] if len(parts) > 2 and parts[2] != "" else None + if raw_n is None: + await send_json(HTTPStatus.UNPROCESSABLE_ENTITY) + return + try: + n = int(raw_n) + except (TypeError, ValueError): + await send_json(HTTPStatus.UNPROCESSABLE_ENTITY) + return + if n < 0: + await send_json(HTTPStatus.BAD_REQUEST) + return + a, b = 0, 1 + for _ in range(n): + a, b = b, a + b + await send_json(HTTPStatus.OK, {"result": a}) + return + + if method == "GET" and path == "/mean": + body = b"" + more_body = True + while more_body: + message = await receive() + msg_type = message.get("type") + if msg_type == "http.disconnect": + break + if msg_type != "http.request": + continue + body += message.get("body", b"") + more_body = message.get("more_body", False) + if body: + try: + data = jsonlib.loads(body.decode("utf-8")) + except jsonlib.JSONDecodeError: + await send_json(HTTPStatus.UNPROCESSABLE_ENTITY) + return + if not isinstance(data, list): + await send_json(HTTPStatus.UNPROCESSABLE_ENTITY) + return + if len(data) == 0: + await send_json(HTTPStatus.BAD_REQUEST) + return + try: + numbers = [float(x) for x in data] + except (TypeError, ValueError): + await send_json(HTTPStatus.UNPROCESSABLE_ENTITY) + return + mean_value = sum(numbers) / len(numbers) + await send_json(HTTPStatus.OK, {"result": mean_value}) + return + + params = parse_qs(query_string.decode("utf-8")) if query_string else {} + numbers_param = params.get("numbers", [None])[0] + if numbers_param is None: + await send_json(HTTPStatus.UNPROCESSABLE_ENTITY) + return + items = [p for p in numbers_param.split(',') if p != ""] + if len(items) == 0: + await send_json(HTTPStatus.BAD_REQUEST) + return + try: + numbers = [float(x) for x in items] + except (TypeError, ValueError): + await send_json(HTTPStatus.UNPROCESSABLE_ENTITY) + return + mean_value = sum(numbers) / len(numbers) + await send_json(HTTPStatus.OK, {"result": mean_value}) + return + + await send_json(HTTPStatus.NOT_FOUND) + if __name__ == "__main__": import uvicorn + uvicorn.run("app:application", host="0.0.0.0", port=8000, reload=True) diff --git a/hw2/hw/Dockerfile b/hw2/hw/Dockerfile new file mode 100644 index 00000000..5f03c9b2 --- /dev/null +++ b/hw2/hw/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.12 AS base + +ARG PYTHONFAULTHANDLER=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONHASHSEED=random \ + PIP_NO_CACHE_DIR=on \ + PIP_DISABLE_PIP_VERSION_CHECK=on \ + PIP_DEFAULT_TIMEOUT=500 + +RUN apt-get update && apt-get install -y gcc +RUN python -m pip install --upgrade pip + +WORKDIR /app +COPY requirements.txt ./requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +# Keep source under /app/hw so the package name `hw` is importable +RUN mkdir -p /app/hw +COPY . /app/hw + +# Ensure both top-level `hw` and `shop_api` are importable +ENV PYTHONPATH=/app:/app/hw + +FROM base as local +EXPOSE 8080 +CMD ["uvicorn", "shop_api.main:app", "--port", "8080", "--host", "0.0.0.0"] \ No newline at end of file diff --git a/hw2/hw/docker-compose.yml b/hw2/hw/docker-compose.yml new file mode 100644 index 00000000..ee7d1b9b --- /dev/null +++ b/hw2/hw/docker-compose.yml @@ -0,0 +1,36 @@ +services: + + local: + build: + context: . + dockerfile: ./Dockerfile + target: local + restart: always + ports: + - 8080:8080 + + grafana: + depends_on: + - prometheus + volumes: + - ./settings/grafana/provisioning/:/etc/grafana/provisioning/ + - ./settings/grafana/dashboards/:/var/lib/grafana/dashboards/ + image: grafana/grafana:latest + ports: + - 3000:3000 + restart: always + + prometheus: + image: prom/prometheus + depends_on: + - local + volumes: + - ./settings/prometheus/:/etc/prometheus/ + command: + - "--config.file=/etc/prometheus/prometheus.yml" + - "--storage.tsdb.path=/prometheus" + - "--web.console.libraries=/usr/share/prometheus/console_libraries" + - "--web.console.templates=/usr/share/prometheus/consoles" + ports: + - 9090:9090 + restart: always \ No newline at end of file diff --git a/hw2/hw/requirements.txt b/hw2/hw/requirements.txt index 207dcf5c..e15f65d9 100644 --- a/hw2/hw/requirements.txt +++ b/hw2/hw/requirements.txt @@ -1,6 +1,9 @@ # Основные зависимости для ASGI приложения fastapi>=0.117.1 uvicorn>=0.24.0 +prometheus_fastapi_instrumentator +sqlalchemy>=2.0.36,<3 +pytest-cov # Зависимости для тестирования pytest>=7.4.0 diff --git a/hw2/hw/settings/grafana/dashboards/shop.json b/hw2/hw/settings/grafana/dashboards/shop.json new file mode 100644 index 00000000..bee12333 --- /dev/null +++ b/hw2/hw/settings/grafana/dashboards/shop.json @@ -0,0 +1,432 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 0, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "ef026yfm257nkf" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 8, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "ef026yfm257nkf" + }, + "editorMode": "code", + "expr": "sum(rate(process_cpu_seconds_total[1m]))", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "CPU Usage", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "ef026yfm257nkf" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 5, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "ef026yfm257nkf" + }, + "editorMode": "code", + "expr": "sum(rate(http_request_duration_seconds_sum{handler!=\"/metrics\"}[1m]))\n/\nsum(rate(http_request_duration_seconds_count{handler!=\"/metrics\"}[1m]))", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Average response time", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "ef026yfm257nkf" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 7, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "ef026yfm257nkf" + }, + "editorMode": "code", + "expr": "sum by (handler) (rate(http_requests_total{handler!=\"/metrics\"}[1m]))", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "RPS, by endpoint", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "ef026yfm257nkf" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 8 + }, + "id": 6, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "ef026yfm257nkf" + }, + "editorMode": "code", + "expr": "process_virtual_memory_bytes / 1048576", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Memory, MB", + "type": "timeseries" + } + ], + "preload": false, + "schemaVersion": 42, + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-30m", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "Shop API / Main", + "uid": "adt84s7", + "version": 11 +} \ No newline at end of file diff --git a/hw2/hw/settings/grafana/provisioning/dashboards/dashboards.yml b/hw2/hw/settings/grafana/provisioning/dashboards/dashboards.yml new file mode 100644 index 00000000..67984a09 --- /dev/null +++ b/hw2/hw/settings/grafana/provisioning/dashboards/dashboards.yml @@ -0,0 +1,9 @@ +apiVersion: 1 +providers: + - name: shop-dashboards + folder: Shop + type: file + allowUiUpdates: false + updateIntervalSeconds: 10 + options: + path: /var/lib/grafana/dashboards \ No newline at end of file diff --git a/hw2/hw/settings/grafana/provisioning/datasources/datasource.yml b/hw2/hw/settings/grafana/provisioning/datasources/datasource.yml new file mode 100644 index 00000000..686e9262 --- /dev/null +++ b/hw2/hw/settings/grafana/provisioning/datasources/datasource.yml @@ -0,0 +1,9 @@ +apiVersion: 1 +datasources: + - name: Prometheus + uid: prometheus-ds + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + editable: false \ No newline at end of file diff --git a/hw2/hw/settings/prometheus/prometheus.yml b/hw2/hw/settings/prometheus/prometheus.yml new file mode 100644 index 00000000..6bdf88e7 --- /dev/null +++ b/hw2/hw/settings/prometheus/prometheus.yml @@ -0,0 +1,10 @@ +global: + scrape_interval: 10s + evaluation_interval: 10s + +scrape_configs: + - job_name: demo-service-local + metrics_path: /metrics + static_configs: + - targets: + - local:8080 diff --git a/hw2/hw/shop_api/contracts.py b/hw2/hw/shop_api/contracts.py new file mode 100644 index 00000000..9eb673c2 --- /dev/null +++ b/hw2/hw/shop_api/contracts.py @@ -0,0 +1,63 @@ +from typing import ( + List, + Optional, +) +from pydantic import ( + BaseModel, + Field, + ConfigDict, +) +from hw import store + + +class ItemCreate(BaseModel): + name: str + price: float = Field(ge=0) + + +class ItemPut(BaseModel): + name: str + price: float = Field(ge=0) + + +class ItemPatch(BaseModel): + model_config = ConfigDict(extra="forbid") + + name: Optional[str] = None + price: Optional[float] = Field(default=None, ge=0) + + +class ItemOut(BaseModel): + id: int + name: str + price: float + deleted: bool + + @staticmethod + def item_to_out(item: store.Item) -> "ItemOut": + return ItemOut(id=item.id, name=item.name, price=item.price, deleted=item.deleted) + + +class CartItemOut(BaseModel): + id: int + name: str + quantity: int + available: bool + + +class CartOut(BaseModel): + id: int + items: List[CartItemOut] + price: float + + @staticmethod + def cart_to_out(cart: store.Cart) -> "CartOut": + items_out = [] + for ci in cart.items: + items_out.append({ + "id": ci.id, + "name": ci.name, + "quantity": ci.quantity, + "available": ci.available, + }) + return CartOut(id=cart.id, items=items_out, price=cart.price) diff --git a/hw2/hw/shop_api/main.py b/hw2/hw/shop_api/main.py index f60a8c60..25edd0d2 100644 --- a/hw2/hw/shop_api/main.py +++ b/hw2/hw/shop_api/main.py @@ -1,3 +1,161 @@ -from fastapi import FastAPI +from http import HTTPStatus +from typing import Annotated + +from fastapi import ( + FastAPI, + HTTPException, + Query, + Response, +) +from prometheus_fastapi_instrumentator import Instrumentator + +try: + from hw import store # type: ignore +except ModuleNotFoundError: # pragma: no cover + import store # type: ignore +from .contracts import ( + ItemCreate, + ItemPut, + ItemPatch, + ItemOut, + CartOut, +) app = FastAPI(title="Shop API") +Instrumentator().instrument(app).expose(app) + + +@app.post("/cart", status_code=HTTPStatus.CREATED) +def post_cart(response: Response) -> dict[str, int]: + cart_id = store.post_cart() + response.headers["location"] = f"/cart/{cart_id}" + return {"id": cart_id} + + +@app.get( + "/cart/{cart_id}", + responses={ + HTTPStatus.OK: { + "description": "Successfully found a cart with a given ID", + }, + HTTPStatus.NOT_FOUND: { + "description": "Failed to find a cart with a given ID" + } + } +) +def get_cart(cart_id: int) -> CartOut: + cart = store.get_cart(cart_id) + if cart is None: + raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail=f"Requested /cart/{cart_id} wasn't found") + return CartOut.cart_to_out(cart) + + +@app.get("/cart") +def get_cart_list( + offset: Annotated[int, Query(ge=0)] = 0, + limit: Annotated[int, Query(gt=0)] = 10, + min_price: Annotated[float | None, Query(ge=0)] = None, + max_price: Annotated[float | None, Query(ge=0)] = None, + min_quantity: Annotated[int | None, Query(ge=0)] = None, + max_quantity: Annotated[int | None, Query(ge=0)] = None, +) -> list[CartOut]: + carts = store.get_carts_list( + offset=offset, + limit=limit, + min_price=min_price, + max_price=max_price, + min_quantity=min_quantity, + max_quantity=max_quantity, + ) + return [CartOut.cart_to_out(cart) for cart in carts] + + +@app.post( + "/cart/{cart_id}/add/{item_id}", + responses={ + HTTPStatus.NOT_FOUND: { + "description": "Requested cart_id or item_id wasn't found", + }, + } +) +def post_add_to_cart(cart_id: int, item_id: int) -> CartOut: + cart = store.get_cart(cart_id) + if cart is None: + raise HTTPException( + status_code=HTTPStatus.NOT_FOUND, + detail=f"Requested /cart/{cart_id} wasn't found" + ) + item = store.get_item(item_id) + if item is None: + raise HTTPException( + status_code=HTTPStatus.NOT_FOUND, + detail=f"Requested /cart/{cart_id}/add/{item_id} wasn't found" + ) + store.add_item_to_cart(cart_id, item_id) + return CartOut.cart_to_out(store.get_cart(cart_id)) + + +@app.post("/item", status_code=HTTPStatus.CREATED) +def post_item(body: ItemCreate) -> ItemOut: + item_id = store.post_item(name=body.name, price=body.price) + item = store.get_item(item_id) + return ItemOut.item_to_out(item) + + +@app.get( + "/item/{item_id}", + responses={ + HTTPStatus.NOT_FOUND: { + "description": "Requested item_id wasn't found", + }, + } +) +def get_item(item_id: int) -> ItemOut: + item = store.get_item(item_id) + if item is None: + raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail=f"Requested /item/{item_id} wasn't found") + return ItemOut.item_to_out(item) + + +@app.get("/item") +def get_items( + offset: Annotated[int, Query(ge=0)] = 0, + limit: Annotated[int, Query(gt=0)] = 10, + min_price: Annotated[float | None, Query(ge=0)] = None, + max_price: Annotated[float | None, Query(ge=0)] = None, + show_deleted: bool = False, +) -> list[ItemOut]: + items = store.get_items_list( + offset=offset, + limit=limit, + min_price=min_price, + max_price=max_price, + show_deleted=show_deleted, + ) + return [ItemOut.item_to_out(item) for item in items] + + +@app.put("/item/{item_id}") +def put_item(item_id: int, body: ItemPut) -> ItemOut: + item = store.put_item(item_id=item_id, name=body.name, price=body.price) + if item is None: + raise HTTPException(status_code=HTTPStatus.UNPROCESSABLE_ENTITY) + return ItemOut.item_to_out(item) + + +@app.patch("/item/{item_id}") +def patch_item(item_id: int, body: ItemPatch): + item = store.patch_item(item_id=item_id, name=body.name, price=body.price) + if item is None: + # tests expect NOT_MODIFIED for deleted only + orig = store.get_item_including_deleted(item_id) + if orig is not None and orig.deleted is True: + return Response(status_code=HTTPStatus.NOT_MODIFIED) + raise HTTPException(status_code=HTTPStatus.UNPROCESSABLE_ENTITY) + return ItemOut.item_to_out(item) + + +@app.delete("/item/{item_id}") +def delete_item(item_id: int): + store.delete_item(item_id) + return Response(status_code=HTTPStatus.OK) diff --git a/hw2/hw/store/__init__.py b/hw2/hw/store/__init__.py new file mode 100644 index 00000000..019fe9da --- /dev/null +++ b/hw2/hw/store/__init__.py @@ -0,0 +1,34 @@ +from .models import ( + CartItem, + Cart, + Item, +) + +from .queries import ( + post_cart, + get_cart, + get_carts_list, + add_item_to_cart, + post_item, + get_item, + get_item_including_deleted, + get_items_list, + put_item, + patch_item, + delete_item, +) + +__all__ = [ + 'CartItem', 'Cart', 'Item', + 'post_cart', + 'get_cart', + 'get_carts_list', + 'add_item_to_cart', + 'post_item', + 'get_item', + 'get_item_including_deleted', + 'get_items_list', + 'put_item', + 'patch_item', + 'delete_item', +] diff --git a/hw2/hw/store/db.py b/hw2/hw/store/db.py new file mode 100644 index 00000000..5781cb32 --- /dev/null +++ b/hw2/hw/store/db.py @@ -0,0 +1,21 @@ +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker, Session + +from .models import Base + + +engine = create_engine("sqlite:///./shop.sqlite", connect_args={"check_same_thread": False}) +SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) + + +def init_db() -> None: + # ensure clean schema for tests + Base.metadata.drop_all(bind=engine) + Base.metadata.create_all(bind=engine) + + +def get_session() -> Session: + return SessionLocal() + + +init_db() diff --git a/hw2/hw/store/models.py b/hw2/hw/store/models.py new file mode 100644 index 00000000..7b43f18d --- /dev/null +++ b/hw2/hw/store/models.py @@ -0,0 +1,319 @@ +from dataclasses import ( + dataclass, + field, +) +from sqlalchemy import ( + Column, + Integer, + Float, + String, + Boolean, + ForeignKey, +) +from sqlalchemy.orm import Session, relationship, declarative_base + + +# === domain data === + +@dataclass(slots=True) +class CartItem: + id: int + name: str + quantity: int + available: bool + + +@dataclass(slots=True) +class Cart: + id: int + price: float + items: list[CartItem] = field(default_factory=list) + + +@dataclass(slots=True) +class Item: + id: int + name: str + price: float + deleted: bool + + +# === SQL Alchemy models === + +Base = declarative_base() + + +class CartOrm(Base): + __tablename__ = 'cart' + + id = Column(Integer, primary_key=True) + price = Column(Float, nullable=False) + items = relationship("CartItemOrm", back_populates="cart", cascade="all, delete-orphan") + + +class CartItemOrm(Base): + __tablename__ = 'cart_item' + + id = Column(Integer, primary_key=True) + cart_id = Column(Integer, ForeignKey('cart.id'), nullable=False) + item_id = Column(Integer, nullable=False) + name = Column(String(255), nullable=False) + quantity = Column(Integer, nullable=False) + available = Column(Boolean, nullable=False) + + cart = relationship("CartOrm", back_populates="items") + + +class ItemOrm(Base): + __tablename__ = 'item' + + id = Column(Integer, primary_key=True) + name = Column(String, nullable=False) + price = Column(Float, nullable=False) + deleted = Column(Boolean, nullable=False) + + +# === Mappers === + +class CartMapper: + @staticmethod + def to_domain(orm_cart: CartOrm): + if orm_cart is None: + return None + items: list[CartItem] = [] + for orm_item in getattr(orm_cart, "items", []) or []: + items.append(CartItemMapper.to_domain(orm_item)) + return Cart( + id=orm_cart.id, + price=orm_cart.price, + items=items, + ) + + @staticmethod + def to_orm( + domain_cart: Cart, + orm_cart: CartOrm, + ): + if orm_cart is None: + orm_cart = CartOrm() + + orm_cart.id = domain_cart.id + orm_cart.price = domain_cart.price + + new_items: list[CartItemOrm] = [] + for domain_item in domain_cart.items: + orm_item = CartItemMapper.to_orm(domain_item, None) + orm_item.cart_id = domain_cart.id + new_items.append(orm_item) + orm_cart.items = new_items + + return orm_cart + + +class CartItemMapper: + @staticmethod + def to_domain(orm_cart_item: CartItemOrm) -> CartItem: + return CartItem( + id=getattr(orm_cart_item, "item_id", orm_cart_item.id), + name=orm_cart_item.name, + quantity=orm_cart_item.quantity, + available=orm_cart_item.available, + ) + + @staticmethod + def to_orm( + domain_cart_item: CartItem, + orm_cart_item: CartItemOrm, + ) -> CartItemOrm: + if orm_cart_item is None: + orm_cart_item = CartItemOrm() + + # domain `item_id` is `id` in cart context + orm_cart_item.item_id = domain_cart_item.id + orm_cart_item.name = domain_cart_item.name + orm_cart_item.quantity = domain_cart_item.quantity + orm_cart_item.available = domain_cart_item.available + + return orm_cart_item + + +class ItemMapper: + @staticmethod + def to_domain(orm_item: ItemOrm) -> Item: + return Item( + id=orm_item.id, + name=orm_item.name, + price=orm_item.price, + deleted=orm_item.deleted, + ) + + @staticmethod + def to_orm( + domain_item: Item, + orm_item: ItemOrm, + ) -> ItemOrm: + if orm_item is None: + orm_item = ItemOrm() + + orm_item.id = domain_item.id + orm_item.name = domain_item.name + orm_item.price = domain_item.price + orm_item.deleted = domain_item.deleted + + return orm_item + + +class SqlAlchemyCartRepository: + def __init__(self, session: Session) -> None: + self.session = session + + def post_cart(self) -> int: + orm = CartOrm(price=0.0) + self.session.add(orm) + self.session.commit() + self.session.refresh(orm) + return orm.id + + def _map_cart(self, orm_cart: CartOrm) -> Cart | None: + if orm_cart is None: + return None + cart = CartMapper.to_domain(orm_cart) + # sync availability from ItemOrm.deleted + for ci in cart.items: + item = self.session.get(ItemOrm, ci.id) + if item is not None: + ci.available = not item.deleted + return cart + + def get_cart(self, cart_id: int) -> Cart | None: + orm = self.session.get(CartOrm, cart_id) + return self._map_cart(orm) + + def get_carts_list( + self, + offset: int = 0, + limit: int = 10, + min_price: float | None = None, + max_price: float | None = None, + min_quantity: int | None = None, + max_quantity: int | None = None, + ) -> list[Cart]: + assert offset >= 0 + assert limit > 0 + + q = self.session.query(CartOrm) + if min_price is not None: + q = q.filter(CartOrm.price >= min_price) + if max_price is not None: + q = q.filter(CartOrm.price <= max_price) + + carts = [self._map_cart(orm) for orm in q.all()] + + def quantity(c: Cart) -> int: + return sum(i.quantity for i in c.items) + + if min_quantity is not None: + carts = [c for c in carts if quantity(c) >= min_quantity] + if max_quantity is not None: + carts = [c for c in carts if quantity(c) <= max_quantity] + + return carts[offset: offset + limit] + +class SqlAlchemyCartItemRepository: + def __init__(self, session: Session) -> None: + self.session = session + + def add_item_to_cart(self, cart_id: int, item_id: int) -> None: + cart = self.session.get(CartOrm, cart_id) + item = self.session.get(ItemOrm, item_id) + if cart is None or item is None: + return + + link = ( + self.session.query(CartItemOrm) + .filter(CartItemOrm.cart_id == cart_id, CartItemOrm.item_id == item_id) + .one_or_none() + ) + + if link is None: + link = CartItemOrm( + cart_id=cart_id, + item_id=item_id, + name=item.name, + quantity=1, + available=not item.deleted, + ) + self.session.add(link) + else: + link.quantity += 1 + + cart.price += item.price + self.session.commit() + +class SqlAlchemyItemRepository: + def __init__(self, session: Session) -> None: + self.session = session + + def post_item(self, name: str, price: float, deleted: bool = False) -> int: + orm = ItemOrm(name=name, price=price, deleted=deleted) + self.session.add(orm) + self.session.commit() + self.session.refresh(orm) + return orm.id + + def get_item(self, item_id: int) -> Item | None: + orm = self.session.get(ItemOrm, item_id) + if orm is None or orm.deleted: + return None + return ItemMapper.to_domain(orm) + + def get_items_list( + self, + offset: int = 0, + limit: int = 10, + min_price: float | None = None, + max_price: float | None = None, + show_deleted: bool = False, + ) -> list[Item]: + assert offset >= 0 + assert limit > 0 + + q = self.session.query(ItemOrm) + if not show_deleted: + q = q.filter(ItemOrm.deleted.is_(False)) + if min_price is not None: + q = q.filter(ItemOrm.price >= min_price) + if max_price is not None: + q = q.filter(ItemOrm.price <= max_price) + + q = q.offset(offset).limit(limit) + return [ItemMapper.to_domain(orm) for orm in q.all()] + + def put_item(self, item_id: int, name: str, price: float) -> Item | None: + orm = self.session.get(ItemOrm, item_id) + if orm is None or orm.deleted: + return None + orm.name = name + orm.price = price + self.session.commit() + self.session.refresh(orm) + return ItemMapper.to_domain(orm) + + def patch_item(self, item_id: int, name: str | None = None, price: float | None = None) -> Item | None: + orm = self.session.get(ItemOrm, item_id) + if orm is None or orm.deleted: + return None + if name is not None: + orm.name = name + if price is not None: + orm.price = price + self.session.commit() + self.session.refresh(orm) + return ItemMapper.to_domain(orm) + + def delete_item(self, item_id: int) -> None: + orm = self.session.get(ItemOrm, item_id) + if orm is None: + return + orm.deleted = True + self.session.commit() diff --git a/hw2/hw/store/queries.py b/hw2/hw/store/queries.py new file mode 100644 index 00000000..f9512bd4 --- /dev/null +++ b/hw2/hw/store/queries.py @@ -0,0 +1,98 @@ +from sqlalchemy.orm import Session + +from .db import get_session +from .models import ( + Cart, + Item, + SqlAlchemyCartRepository, + SqlAlchemyCartItemRepository, + SqlAlchemyItemRepository, + ItemMapper, + ItemOrm, +) + + +## Cart methods +def post_cart() -> int: + with get_session() as session: + return SqlAlchemyCartRepository(session).post_cart() + + +def get_cart(id: int) -> Cart | None: + with get_session() as session: + return SqlAlchemyCartRepository(session).get_cart(id) + + +def get_carts_list( + offset: int = 0, + limit: int = 10, + min_price: float | None = None, + max_price: float | None = None, + min_quantity: int | None = None, + max_quantity: int | None = None, +) -> list[Cart]: + with get_session() as session: + return SqlAlchemyCartRepository(session).get_carts_list( + offset=offset, + limit=limit, + min_price=min_price, + max_price=max_price, + min_quantity=min_quantity, + max_quantity=max_quantity, + ) + + +def add_item_to_cart(cart_id: int, item_id: int) -> None: + with get_session() as session: + SqlAlchemyCartItemRepository(session).add_item_to_cart(cart_id, item_id) + + +## Item methods +def post_item(name: str, price: float, deleted: bool = False) -> int: + with get_session() as session: + return SqlAlchemyItemRepository(session).post_item(name=name, price=price, deleted=deleted) + + +def get_item(item_id: int) -> Item | None: + with get_session() as session: + return SqlAlchemyItemRepository(session).get_item(item_id) + + +def get_item_including_deleted(item_id: int) -> Item | None: + with get_session() as session: + orm = session.get(ItemOrm, item_id) + if orm is None: + return None + return ItemMapper.to_domain(orm) + + +def get_items_list( + offset: int = 0, + limit: int = 10, + min_price: float | None = None, + max_price: float | None = None, + show_deleted: bool = False +) -> list[Item]: + with get_session() as session: + return SqlAlchemyItemRepository(session).get_items_list( + offset=offset, + limit=limit, + min_price=min_price, + max_price=max_price, + show_deleted=show_deleted, + ) + + +def put_item(item_id: int, name: str, price: float) -> Item | None: + with get_session() as session: + return SqlAlchemyItemRepository(session).put_item(item_id=item_id, name=name, price=price) + + +def patch_item(item_id: int, name: str | None = None, price: float | None = None) -> Item | None: + with get_session() as session: + return SqlAlchemyItemRepository(session).patch_item(item_id=item_id, name=name, price=price) + + +def delete_item(item_id: int) -> None: + with get_session() as session: + SqlAlchemyItemRepository(session).delete_item(item_id) diff --git a/hw2/hw/test_homework2.py b/hw2/hw/test_homework2.py index 60a1f36a..6cfd0172 100644 --- a/hw2/hw/test_homework2.py +++ b/hw2/hw/test_homework2.py @@ -7,6 +7,14 @@ from fastapi.testclient import TestClient from shop_api.main import app +from store.models import ( + Cart, + CartItem, + Item, + CartMapper, + CartItemMapper, + ItemMapper, +) client = TestClient(app) faker = Faker() @@ -122,6 +130,7 @@ def test_get_cart(request, cart: int, not_empty: bool) -> None: ({"max_price": 20.0}, HTTPStatus.OK), ({"min_quantity": 1}, HTTPStatus.OK), ({"max_quantity": 0}, HTTPStatus.OK), + ({"show_deleted": False}, HTTPStatus.OK), ({"offset": -1}, HTTPStatus.UNPROCESSABLE_ENTITY), ({"limit": 0}, HTTPStatus.UNPROCESSABLE_ENTITY), ({"limit": -1}, HTTPStatus.UNPROCESSABLE_ENTITY), @@ -210,6 +219,22 @@ def test_get_item_list(query: dict[str, Any], status_code: int) -> None: assert all(item["deleted"] is False for item in data) +def test_get_cart_not_found() -> None: + response = client.get("/cart/999999") + assert response.status_code == HTTPStatus.NOT_FOUND + + +def test_add_to_cart_cart_not_found(existing_item: dict[str, Any]) -> None: + item_id = existing_item["id"] + response = client.post(f"/cart/999999/add/{item_id}") + assert response.status_code == HTTPStatus.NOT_FOUND + + +def test_add_to_cart_item_not_found(existing_empty_cart_id: int) -> None: + response = client.post(f"/cart/{existing_empty_cart_id}/add/999999") + assert response.status_code == HTTPStatus.NOT_FOUND + + @pytest.mark.parametrize( ("body", "status_code"), [ @@ -234,6 +259,17 @@ def test_put_item( assert response.json() == new_item +@pytest.mark.parametrize( + ("item_id", "status_code"), + [ + (999999, HTTPStatus.UNPROCESSABLE_ENTITY), # non-existent + ], +) +def test_put_item_invalid(item_id: int, status_code: int) -> None: + response = client.put(f"/item/{item_id}", json={"name": "x", "price": 1.0}) + assert response.status_code == status_code + + @pytest.mark.parametrize( ("item", "body", "status_code"), [ @@ -271,6 +307,11 @@ def test_patch_item(request, item: str, body: dict[str, Any], status_code: int) assert patched_item == patch_response_body +def test_patch_item_nonexistent() -> None: + response = client.patch("/item/999999", json={}) + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY + + def test_delete_item(existing_item: dict[str, Any]) -> None: item_id = existing_item["id"] @@ -282,3 +323,37 @@ def test_delete_item(existing_item: dict[str, Any]) -> None: response = client.delete(f"/item/{item_id}") assert response.status_code == HTTPStatus.OK + + +def test_delete_nonexistent_item() -> None: + response = client.delete("/item/999999") + assert response.status_code == HTTPStatus.OK + + +def test_mappers_cover_none_and_to_orm_paths() -> None: + # CartMapper.to_domain(None) + assert CartMapper.to_domain(None) is None + + # CartItemMapper.to_orm with None + domain_ci = CartItem(id=1, name="n", quantity=2, available=True) + orm_ci = CartItemMapper.to_orm(domain_ci, None) + assert orm_ci.item_id == 1 and orm_ci.name == "n" and orm_ci.quantity == 2 and orm_ci.available is True + + # ItemMapper.to_orm with None + domain_item = Item(id=5, name="p", price=3.14, deleted=False) + orm_item = ItemMapper.to_orm(domain_item, None) + assert orm_item.id == 5 and orm_item.name == "p" and orm_item.price == 3.14 and orm_item.deleted is False + + # CartMapper.to_orm with None and nested items + domain_cart = Cart(id=10, price=0.0, items=[ + CartItem(id=5, name="p", quantity=1, available=True), + CartItem(id=6, name="q", quantity=3, available=False), + ]) + orm_cart = CartMapper.to_orm(domain_cart, None) + assert orm_cart.id == 10 and orm_cart.price == 0.0 + assert len(orm_cart.items) == 2 + assert {i.item_id for i in orm_cart.items} == {5, 6} + +def test_queries_get_item_including_deleted_none_path() -> None: + from store.queries import get_item_including_deleted + assert get_item_including_deleted(999999) is None diff --git a/hw2/hw/txdemo/sqlite_isolation_demo.py b/hw2/hw/txdemo/sqlite_isolation_demo.py new file mode 100644 index 00000000..d1fae789 --- /dev/null +++ b/hw2/hw/txdemo/sqlite_isolation_demo.py @@ -0,0 +1,121 @@ +""" +Run with (from hw directory): python txdemo/sqlite_isolation_demo.py +""" +import time +import threading +from sqlalchemy.orm import sessionmaker +from sqlalchemy import create_engine, text + +from store.models import Base + +DEMO_DB = "sqlite:///./isolation_models_demo.sqlite" + +engine = create_engine(DEMO_DB, connect_args={"check_same_thread": False}) +SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) +Base.metadata.create_all(bind=engine) + +def reset_data(): + with engine.begin() as conn: + conn.exec_driver_sql("DELETE FROM item") + conn.exec_driver_sql("INSERT INTO item(id, name, price, deleted) VALUES (1, 'A', 10.0, 0)") + conn.exec_driver_sql("INSERT INTO item(id, name, price, deleted) VALUES (2, 'B', 20.0, 0)") + +def dirty_read(): + print("\n--- Dirty Read with models (SQLite) ---") + with engine.begin() as conn: + conn.exec_driver_sql("PRAGMA read_uncommitted = 1") + + def writer(): + s = SessionLocal() + try: + s.execute(text("BEGIN")) + s.execute(text("UPDATE item SET price = price + 100 WHERE id = 1")) + time.sleep(2.0) # keep uncommitted + s.rollback() + finally: + s.close() + + def reader(res: dict): + s = SessionLocal() + try: + s.execute(text("BEGIN")) + price = s.execute(text("SELECT price FROM item WHERE id = 1")).scalar_one() + res["price"] = price + s.commit() + finally: + s.close() + + reset_data() + res = {} + t1 = threading.Thread(target=writer) + t2 = threading.Thread(target=reader, args=(res,)) + t1.start(); time.sleep(0.2); t2.start() + t2.join(); t1.join() + print("Observed price (expected 10.0):", res["price"]) + +def non_repeatable_read(): + print("\n--- Non-Repeatable Read with models (SQLite) ---") + def tx1(res: dict): + s = SessionLocal() + try: + s.execute(text("BEGIN")) + res["v1"] = s.execute(text("SELECT price FROM item WHERE id = 2")).scalar_one() + time.sleep(1.0) # allow concurrent commit + res["v2"] = s.execute(text("SELECT price FROM item WHERE id = 2")).scalar_one() + s.commit() + finally: + s.close() + + def tx2(): + s = SessionLocal() + try: + s.execute(text("BEGIN")) + s.execute(text("UPDATE item SET price = price + 1 WHERE id = 2")) + s.commit() + finally: + s.close() + + reset_data() + res = {} + t1 = threading.Thread(target=tx1, args=(res,)) + t2 = threading.Thread(target=tx2) + t1.start(); time.sleep(0.2); t2.start() + t1.join(); t2.join() + print("First read:", res["v1"], "Second read (same tx):", res["v2"]) + +def phantom_read(): + print("\n--- Phantom Read with models (SQLite) ---") + def tx1(res: dict): + s = SessionLocal() + try: + s.execute(text("BEGIN")) + c1 = s.execute(text("SELECT COUNT(*) FROM item WHERE price >= 15")).scalar_one() + time.sleep(1.0) + c2 = s.execute(text("SELECT COUNT(*) FROM item WHERE price >= 15")).scalar_one() + s.commit() + res["c1"], res["c2"] = c1, c2 + finally: + s.close() + + def tx2(): + s = SessionLocal() + try: + s.execute(text("BEGIN")) + s.execute(text("INSERT INTO item(name, price, deleted) VALUES('C', 100.0, 0)")) + s.commit() + finally: + s.close() + + reset_data() + res = {} + t1 = threading.Thread(target=tx1, args=(res,)) + t2 = threading.Thread(target=tx2) + t1.start(); time.sleep(0.2); t2.start() + t1.join(); t2.join() + print("Count first:", res["c1"], "second (same tx):", res["c2"]) + +if __name__ == "__main__": + reset_data() + dirty_read() + non_repeatable_read() + phantom_read() \ No newline at end of file diff --git a/hw2/rest_example/api/pokemon/routes.py b/hw2/rest_example/api/pokemon/routes.py index ab935c9a..7fb45754 100644 --- a/hw2/rest_example/api/pokemon/routes.py +++ b/hw2/rest_example/api/pokemon/routes.py @@ -1,8 +1,16 @@ from http import HTTPStatus from typing import Annotated -from fastapi import APIRouter, HTTPException, Query, Response -from pydantic import NonNegativeInt, PositiveInt +from fastapi import ( + APIRouter, + HTTPException, + Query, + Response, +) +from pydantic import ( + NonNegativeInt, + PositiveInt, +) from hw2.rest_example import store @@ -17,8 +25,8 @@ @router.get("/") async def get_pokemon_list( - offset: Annotated[NonNegativeInt, Query()] = 0, - limit: Annotated[PositiveInt, Query()] = 10, + offset: Annotated[NonNegativeInt, Query()] = 0, + limit: Annotated[PositiveInt, Query()] = 10, ) -> list[PokemonResponse]: return [PokemonResponse.from_entity(e) for e in store.get_many(offset, limit)] @@ -94,9 +102,9 @@ async def patch_pokemon(id: int, info: PatchPokemonRequest) -> PokemonResponse: } ) async def put_pokemon( - id: int, - info: PokemonRequest, - upsert: Annotated[bool, Query()] = False, + id: int, + info: PokemonRequest, + upsert: Annotated[bool, Query()] = False, ) -> PokemonResponse: entity = ( store.upsert(id, info.as_pokemon_info()) diff --git a/hw2/ws_example/server.py b/hw2/ws_example/server.py index 2bb5f1d1..850d2ad2 100644 --- a/hw2/ws_example/server.py +++ b/hw2/ws_example/server.py @@ -1,7 +1,15 @@ -from dataclasses import dataclass, field +from dataclasses import ( + dataclass, + field, +) from uuid import uuid4 -from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect +from fastapi import ( + FastAPI, + Request, + WebSocket, + WebSocketDisconnect, +) app = FastAPI() @@ -17,8 +25,10 @@ async def subscribe(self, ws: WebSocket) -> None: async def unsubscribe(self, ws: WebSocket) -> None: self.subscribers.remove(ws) - async def publish(self, message: str) -> None: + async def publish(self, message: str, exclude: WebSocket | None = None) -> None: for ws in self.subscribers: + if exclude is not None and ws is exclude: + continue await ws.send_text(message) @@ -42,5 +52,27 @@ async def ws_subscribe(ws: WebSocket): text = await ws.receive_text() await broadcaster.publish(text) except WebSocketDisconnect: - broadcaster.unsubscribe(ws) + await broadcaster.unsubscribe(ws) await broadcaster.publish(f"client {client_id} unsubscribed") + + +chats = {} +@app.websocket("/chat/{chat_name}") +async def ws_subscribe_chat(chat_name, ws: WebSocket): + username = f"user-{str(uuid4())[:8]}" + if chat_name in chats: + chat_broadcaster = chats[chat_name] + else: + chats[chat_name] = Broadcaster() + chat_broadcaster = chats[chat_name] + + await chat_broadcaster.subscribe(ws) + await chat_broadcaster.publish(f"{username} :: joined", exclude=ws) + + try: + while True: + text = await ws.receive_text() + await chat_broadcaster.publish(f"{username} :: {text}", exclude=ws) + except WebSocketDisconnect: + await chat_broadcaster.unsubscribe(ws) + await chat_broadcaster.publish(f"{username} :: left")