Skip to content
Open

Hm4 #195

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 157 additions & 2 deletions hw1/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Any, Awaitable, Callable
from urllib.parse import parse_qs
import json


async def application(
Expand All @@ -12,8 +14,161 @@ async def application(
receive: Корутина для получения сообщений от клиента
send: Корутина для отправки сообщений клиенту
"""
# TODO: Ваша реализация здесь
async def j(status: int, data: dict[str, Any]):
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
hdrs = [
(b"content-type", b"application/json; charset=utf-8"),
(b"content-length", str(len(body)).encode("ascii")),
]
await send({"type": "http.response.start", "status": status, "headers": hdrs})
await send({"type": "http.response.body", "body": body})

async def grab_body() -> bytes:
buf = b""
more = True
while more:
m = await receive()
if m.get("type") != "http.request":
continue
buf += m.get("body", b"")
more = m.get("more_body", False)
return buf

if scope["type"] == "lifespan":
while True:
msg = await receive()
if msg["type"] == "lifespan.startup":
print("boot")
await send({"type": "lifespan.startup.complete"})
elif msg["type"] == "lifespan.shutdown":
print("bye")
await send({"type": "lifespan.shutdown.complete"})
return
return

if scope["type"] != "http":
return

verb = scope.get("method", "GET").upper()
pth = scope.get("path", "")
qs = parse_qs(scope.get("query_string", b"").decode("latin-1"))

known = (pth == "/factorial") or (pth == "/mean") or pth.startswith("/fibonacci")
if verb != "GET":
if known:
print("405")
await j(405, {"detail": "method not allowed. use GET."})
else:
print("404")
await j(404, {"detail": "not found."})
return

if pth == "/factorial":
print("hit /factorial")
raw_n_list = qs.get("n") or qs.get("N")

if not raw_n_list or raw_n_list[0] == "":
print("bad n")
await j(422, {"detail": "query param 'n' is required and must be integer."})
return

try:
n = int(raw_n_list[0])
except Exception:
print("not int")
await j(422, {"detail": "'n' must be an integer."})
return

if n < 0:
print("neg n")
await j(400, {"detail": "factorial is for non-negative integers only."})
return

# inplace kringe but sorry
out = 1
k = 2
while k <= n:
out *= k
k += 1

print("ok")
await j(200, {"result": out})
return

if pth.startswith("/fibonacci"):
print("hit /fibonacci")
parts = pth.split("/", 2)
raw_n = parts[2] if len(parts) > 2 and parts[1] == "fibonacci" else ""

try:
n = int(raw_n)
except Exception:
print("bad path n")
await j(422, {"detail": "path param must be an integer."})
return

if n < 0:
print("neg n")
await j(400, {"detail": "fibonacci is for non-negative integers only."})
return

a, b = 0, 1
i = 0
while i < n:
a, b = b, a + b
i += 1

print("ok")
await j(200, {"result": a})
return

if pth == "/mean":
print("hit /mean")
raw = await grab_body()

if not raw:
print("no json")
await j(422, {"detail": "json body is required."})
return

try:
data = json.loads(raw.decode("utf-8"))
except json.JSONDecodeError:
print("bad json")
await j(422, {"detail": "body must be valid json."})
return

if isinstance(data, list):
arr = data
elif isinstance(data, dict) and ("numbers" in data):
arr = data["numbers"]
else:
print("missing numbers")
await j(422, {"detail": "expected array or object with numbers"})
return

try:
nums = [float(x) for x in arr]
except Exception:
print("not floats")
await j(422, {"detail": "numbers must be an array of numbers"})
return

if not nums:
print("empty")
await j(400, {"detail": "numbers list is empty"})
return

mean_val = sum(nums) / len(nums)
print("ok")
await j(200, {"result": mean_val})
return

print("404")
await j(404, {"detail": "not found."})


if __name__ == "__main__":
import uvicorn
uvicorn.run("app:application", host="0.0.0.0", port=8000, reload=True)
print("test app")
uvicorn.run("app:application", host="0.0.0.0", port=8000, reload=True)
12 changes: 12 additions & 0 deletions hw1/env.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: itmo_python_backend_hm1
channels:
- conda-forge
dependencies:
- python=3.10
- uvicorn>=0.24.0
- pytest>=7.4.0
- pytest-asyncio>=0.21.0
- httpx>=0.27.2
- pip
- pip:
- async-asgi-testclient>=1.4.11
201 changes: 200 additions & 1 deletion hw2/hw/shop_api/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,202 @@
from fastapi import FastAPI
from fastapi import FastAPI, HTTPException, Query, Body, WebSocket

from typing import Dict, Any, List, Optional
from fastapi.responses import JSONResponse
from fastapi import WebSocketDisconnect

from uuid import uuid4

app = FastAPI(title="Shop API")

class S:
def __init__(self):
self.items: Dict[int, Dict[str, Any]] = {}
self.carts: Dict[int, Dict[int, int]] = {}
self.iid = 1
self.cid = 1

def mk_item(self, name: str, price: float) -> Dict[str, Any]:
i = self.iid
self.iid += 1
self.items[i] = {"id": i, "name": name, "price": float(price), "deleted": False}
return self.items[i]

def one_item(self, i: int) -> Dict[str, Any]:
x = self.items.get(i)
if not x or x["deleted"]:
raise HTTPException(status_code=404)
return x

def set_item(self, i: int, name: str, price: float) -> Dict[str, Any]:
if i not in self.items or self.items[i]["deleted"]:
raise HTTPException(status_code=404)
self.items[i]["name"] = name
self.items[i]["price"] = float(price)
return self.items[i]

def patch_item(self, i: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if i not in self.items:
raise HTTPException(status_code=404)
if self.items[i]["deleted"]:
return None
if set(data.keys()) - {"name", "price"}:
raise HTTPException(status_code=422)
if "name" in data:
self.items[i]["name"] = data["name"]
if "price" in data:
self.items[i]["price"] = float(data["price"])
return self.items[i]

def del_item(self, i: int) -> None:
if i in self.items:
self.items[i]["deleted"] = True

def mk_cart(self) -> int:
c = self.cid
self.cid += 1
self.carts[c] = {}
return c

def cart_view(self, c: int) -> Dict[str, Any]:
if c not in self.carts:
raise HTTPException(status_code=404)
arr = []
total = 0.0
for iid, q in self.carts[c].items():
it = self.items.get(iid)
ok = bool(it and not it["deleted"])
nm = it["name"] if it else "unknown"
arr.append({"id": iid, "name": nm, "quantity": q, "available": ok})
if ok:
total += float(it["price"]) * q
return {"id": c, "items": arr, "price": float(total)}

def add(self, c: int, i: int) -> None:
if c not in self.carts:
raise HTTPException(status_code=404)
if i not in self.items:
raise HTTPException(status_code=404)
d = self.carts[c]
d[i] = d.get(i, 0) + 1

st = S()

def _slice(x: List[Any], o: int, l: int) -> List[Any]:
return x[o:o+l]

@app.post("/item", status_code=201)
def create_item(body: Dict[str, Any] = Body(...)) -> Dict[str, Any]:
if "name" not in body or "price" not in body:
raise HTTPException(status_code=422)
return st.mk_item(body["name"], body["price"])

@app.get("/item/{item_id}")
def read_item(item_id: int) -> Dict[str, Any]:
return st.one_item(item_id)

@app.get("/item")
def read_items(
offset: int = Query(0, ge=0),
limit: int = Query(10, gt=0),
min_price: Optional[float] = Query(None, ge=0),
max_price: Optional[float] = Query(None, ge=0),
show_deleted: bool = Query(False),
) -> List[Dict[str, Any]]:
xs = list(st.items.values())
if not show_deleted:
xs = [x for x in xs if not x["deleted"]]
if min_price is not None:
xs = [x for x in xs if float(x["price"]) >= float(min_price)]
if max_price is not None:
xs = [x for x in xs if float(x["price"]) <= float(max_price)]
xs.sort(key=lambda x: x["id"])
return _slice(xs, offset, limit)

@app.put("/item/{item_id}")
def replace_item(item_id: int, body: Dict[str, Any] = Body(...)) -> Dict[str, Any]:
if set(body.keys()) - {"name", "price"}:
raise HTTPException(status_code=422)
if "name" not in body or "price" not in body:
raise HTTPException(status_code=422)
return st.set_item(item_id, body["name"], body["price"])

@app.patch("/item/{item_id}")
def update_item(item_id: int, body: Dict[str, Any] = Body(...)):
y = st.patch_item(item_id, body)
if y is None:
return JSONResponse(status_code=304, content=None)
return y

@app.delete("/item/{item_id}")
def remove_item(item_id: int):
st.del_item(item_id)
return {"ok": True}

@app.post("/cart", status_code=201)
def create_cart():
cid = st.mk_cart()
return JSONResponse(status_code=201, content={"id": cid}, headers={"location": f"/cart/{cid}"})

@app.get("/cart/{cart_id}")
def read_cart(cart_id: int):
return st.cart_view(cart_id)

@app.get("/cart")
def read_carts(
skip: int = Query(0, ge=0, alias="offset"),
take: int = Query(10, gt=0, alias="limit"),
pmin: Optional[float] = Query(None, ge=0, alias="min_price"),
pmax: Optional[float] = Query(None, ge=0, alias="max_price"),
qmin: Optional[int] = Query(None, ge=0, alias="min_quantity"),
qmax: Optional[int] = Query(None, ge=0, alias="max_quantity"),
) -> List[Dict[str, Any]]:
bag = []
for k in sorted(st.carts):
v = st.cart_view(k)
tot = 0
for it in v["items"]:
tot += it["quantity"]
if pmin is not None and v["price"] < float(pmin):
continue
if pmax is not None and v["price"] > float(pmax):
continue
if qmin is not None and tot < int(qmin):
continue
if qmax is not None and tot > int(qmax):
continue
bag.append(v)
if skip >= len(bag):
return []
return bag[skip:skip + take]

@app.post("/cart/{cart_id}/add/{item_id}")
def add_item(cart_id: int, item_id: int):
st.add(cart_id, item_id)
return {"ok": True}

_rooms: Dict[str, List[WebSocket]] = {}
_names: Dict[WebSocket, str] = {}

@app.websocket("/chat/{room}")
async def ws_chat(ws: WebSocket, room: str):
await ws.accept()
u = "user-" + uuid4().hex[:6]
_names[ws] = u
_rooms.setdefault(room, []).append(ws)
try:
while True:
t = await ws.receive_text()
for w in list(_rooms.get(room, [])):
if w is not ws:
try:
await w.send_text(f"{u} :: {t}")
except RuntimeError:
pass
except WebSocketDisconnect:
pass
finally:
if ws in _rooms.get(room, []):
_rooms[room].remove(ws)
_names.pop(ws, None)
if _rooms.get(room) == []:
_rooms.pop(room, None)
Loading