Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions .github/workflows/hw5_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: "HW5 Tests"

on:
pull_request:
branches: [ main ]
paths: [ 'hw4_5/**' ]
push:
branches: [ main ]
paths: [ 'hw4_5/**' ]

jobs:
tests-hw5:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_DB: postgres
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- 5432:5432

env:
DATABASE_URL: postgresql+asyncpg://postgres:postgres@localhost:5432/postgres

steps:
- name: Checkout
uses: actions/checkout@v4

- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: pip
cache-dependency-path: hw4_5/requirements.txt

- name: Install deps
run: pip install -r hw4_5/requirements.txt

- name: Run tests (hw5)
working-directory: hw4_5
env:
PYTHONPATH: ${{ github.workspace }}/hw4_5
run: pytest -q --cov=src --cov-report=term-missing

- name: DB alembic migrations
working-directory: hw4_5
env:
PYTHONPATH: ${{ github.workspace }}/hw4_5
run: PYTHONPATH=. alembic upgrade head
198 changes: 197 additions & 1 deletion hw1/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,71 @@
from typing import Any, Awaitable, Callable
import json
import re



def parse_query_string(qs):
"""
Парсит query string в словарь списков: 'n=5&x=3' → {'n': ['5'], 'x': ['3']}
"""
if not qs:
return {}
parsed = {}
pairs = qs.split('&')
for pair in pairs:
if '=' in pair:
key, value = pair.split('=', 1)
key = unquote(key)
value = unquote(value)
if key in parsed:
parsed[key].append(value)
else:
parsed[key] = [value]
return parsed


def unquote(string):
"""
Минимальная реализация URL-декодирования без urllib.
Поддерживает %XX и замену '+' на пробел.
"""
string = string.replace('+', ' ')
res = ""
i = 0
while i < len(string):
if string[i] == '%' and i + 2 < len(string):
try:
hex_val = string[i+1:i+3]
byte_val = bytes.fromhex(hex_val)
char = byte_val.decode('utf-8')
res += char
i += 3
continue
except Exception:
pass
res += string[i]
i += 1
return res


async def send_json(send, status, data):
"""
Отправляет JSON-ответ. Гарантированно совместима с ASGI/h11.
"""
body = json.dumps(data, ensure_ascii=False).encode('utf-8')
headers = [
(b'content-type', b'application/json'),
]
await send({
'type': 'http.response.start',
'status': status,
'headers': headers,
})
await send({
'type': 'http.response.body',
'body': body,
})

async def application(
scope: dict[str, Any],
receive: Callable[[], Awaitable[dict[str, Any]]],
Expand All @@ -14,6 +79,137 @@ async def application(
"""
# TODO: Ваша реализация здесь

if scope["type"] != "http":
await send({
"type": "http.response.start",
"status": 400,
"headers": [(b'content-type', b'text/plain')]
})

await send({
"type": "http.response.body",
"body": b"Only HTTP supported"
})
return

path = scope["path"]
method = scope["method"]

if method != "GET":
await send_json(send, 404, {"error": "Only GET methods allowed"})
return

if path == "/factorial" and method == "GET":
await handle_factorial(scope, receive, send)

elif path.startswith("/fibonacci") and method == "GET":
await handle_fibonacci(scope, receive, send)

elif path == "/mean" and method == "GET":
await handle_mean(scope, receive, send)

else:
await send_json(send, 404, {"error": "Not Found"})


async def handle_factorial(scope, recieve, send):
query_string = scope["query_string"].decode("utf-8")
params = parse_query_string(query_string)

if "n" not in params or len(params["n"]) == 0:
await send_json(send, 422, {"error": "Missing query parameter: n"})
return

n_str = params["n"][0]

try:
n = int(n_str)
except ValueError:
await send_json(send, 422, {"error": "n must be an integer"})
return

if n < 0:
await send_json(send, 400, {"error": "n must be a non-negative integer"})
return

result = 1
for i in range(1, n + 1):
result *= i

await send_json(send, 200, {"result": result})



async def handle_fibonacci(scope, recieve, send):

path = scope["path"]
match = re.match(r"/fibonacci/(-?\d+)", path)

if not match:
await send_json(send, 422, {"error": "Invalid path format. Expected /fibonacci/<integer>"})
return

try:
n = int(match.group(1))
except ValueError:
await send_json(send, 422, {"error": "n must be an integer"})
return

if n < 0:
await send_json(send, 400, {"error": "n must be a non-negative integer"})
return

if n == 0:
fib = 0

elif n == 1:
fib = 1

else:
a, b = 0, 1
for i in range(2, n + 1):
a, b = b, a + b
fib = b

await send_json(send, 200, {"result": fib})




async def handle_mean(scope, recieve, send):
body = b""
more_body = True

while more_body:
message = await recieve()
body += message.get("body", b"")
more_body = message.get("more_body", False)

if not body:
await send_json(send, 422, {"error": "No body received"})
return

try:
data = json.loads(body)
if not isinstance(data, list):
raise ValueError("Expected JSON Array")

numbers = [float(x) for x in data]

except (json.JSONDecodeError, ValueError, TypeError, OverflowError):
await send_json(send, 422, {"error": "All values must be numbers"})
return

if len(numbers) == 0:
await send_json(send, 400, {"error": "No values received"})
return

mean_value = sum(numbers) / len(numbers)
await send_json(send, 200, {"result": mean_value})




if __name__ == "__main__":
import uvicorn
uvicorn.run("app:application", host="0.0.0.0", port=8000, reload=True)
uvicorn.run("app:application", host="0.0.0.0", port=8000, reload=True)
23 changes: 23 additions & 0 deletions hw2/hw/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM python:3.12 AS base

ARG PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=on \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=500

RUN apt-get update && apt-get install -y gcc
RUN python -m pip install --upgrade pip

WORKDIR app
COPY . .

ENV VIRTUAL_ENV=app/.venv \
PATH=app/.venv/bin:$PATH

RUN pip install -r requirements.txt

FROM base as local

CMD ["uvicorn", "shop_api.main:app", "--port", "8080", "--host", "0.0.0.0"]
68 changes: 68 additions & 0 deletions hw2/hw/client_websockets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import asyncio
import websockets


DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8000


async def chat_client(room_name: str, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT):
uri = f"ws://{host}:{port}/chat/{room_name}"
print(f"Подключение к комнате '{room_name}' по адресу {uri}...")

try:
async with websockets.connect(uri) as websocket:

welcome = await websocket.recv()
print(f"{welcome}\n")
print("Чат запущен! Введите сообщение и нажмите Enter.")
print("Чтобы выйти — введите 'quit' или нажмите Ctrl+C.\n")

async def receive_messages():
try:
while True:
message = await websocket.recv()
print(f"\r{message}")
print("Вы: ", end="", flush=True)
except websockets.exceptions.ConnectionClosed:
print("\nСоединение закрыто сервером.")
return

async def send_messages():
loop = asyncio.get_event_loop()
while True:

msg = await loop.run_in_executor(None, input, "Вы: ")
if msg.strip().lower() == 'quit':
break
if msg.strip():
await websocket.send(msg.strip())

await asyncio.gather(receive_messages(), send_messages())

except KeyboardInterrupt:
print("\nВыход по запросу пользователя.")
except Exception as e:
print(f"\nОшибка подключения: {e}")
print("Убедитесь, что сервер запущен: uvicorn main:app")


def main():
import argparse

parser = argparse.ArgumentParser(description="WebSocket чат-клиент")
parser.add_argument("room", help="Имя комнаты для подключения")
parser.add_argument("--host", default=DEFAULT_HOST, help="Хост сервера (по умолчанию: localhost)")
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Порт сервера (по умолчанию: 8000)")

args = parser.parse_args()

try:
asyncio.run(chat_client(args.room, args.host, args.port))
except KeyboardInterrupt:

print("\nДо встречи!")


if __name__ == "__main__":
main()
Loading