This repository was archived by the owner on Mar 4, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathserver.py
More file actions
77 lines (57 loc) · 2.13 KB
/
server.py
File metadata and controls
77 lines (57 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import asyncio
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic import BaseModel, confloat
from util.geo_utli import aget_current_location
from services.state_store import set_live_location, get_live_location, get_mood, set_mood
app = FastAPI()
class LocationValidation(BaseModel):
latitude: float
longitude: float
timestamp: float
class MoodValidation(BaseModel):
Affection: confloat(ge=-1.0, le=1.0) = 0
Amused: confloat(ge=-1.0, le=1.0) = 0
Inspired: confloat(ge=-1.0, le=1.0) = 0
Frustrated: confloat(ge=-1.0, le=1.0) = 0
Concerned: confloat(ge=-1.0, le=1.0) = 0
Curious: confloat(ge=-1.0, le=1.0) = 0
# Receive and store live location.
@app.post("/api/location")
async def receive_location(data: LocationValidation):
await set_live_location(data.latitude, data.longitude, data.timestamp)
return {"status": "success", "message": "Location received"}
# Get the current human-readable location.
@app.get("/api/location")
async def location():
coordinates = await get_live_location()
data = await aget_current_location(coordinates["latitude"], coordinates["longitude"])
return data
# Get raw latitude/longitude location data.
@app.get("/api/locations")
async def locations():
return await get_live_location()
# Return the current mood state.
@app.get("/api/mood")
async def mood_get():
return await get_mood()
@app.post("/api/mood")
async def mood_post(data: MoodValidation):
await set_mood(data.dict())
return {"status": "success", "message": "Mood updated"}
# Stream mood values every few seconds.
@app.websocket("/ws/mood")
async def mood_websocket(ws: WebSocket):
await ws.accept()
try:
while True:
await asyncio.sleep(2.5)
mood_values = await get_mood()
await ws.send_json(mood_values)
except WebSocketDisconnect:
print("Client disconnected")
async def run_server():
config = uvicorn.Config(app=app, host="0.0.0.0", port=8080, loop="asyncio")
server = uvicorn.Server(config)
await server.serve()
# uvicorn.run("main:app", host="0.0.0.0", port=8080, reload=True)