forked from z1youRA/WeMeet
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi.py
More file actions
298 lines (252 loc) · 10.2 KB
/
api.py
File metadata and controls
298 lines (252 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# to run the script with uvicorn with fastapi at port 55722: uvicorn main:app --host 0.0.0.0 --port 55722
#这个是服务端代码,fastapi
from fastapi import FastAPI
from datetime import date
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from fastapi import Request
import json
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from bson.json_util import dumps
from datetime import datetime
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List, Dict
import json
# 定义事件的结构
class RoomEvent(BaseModel):
type: str = "room"
eventType: str
userId: str
name: str
pinCode: str
class ChatMessage(BaseModel):
type: str = "chat"
pinCode: str
userId: str
name: str
message: str
timestamp: int
class LocationUpdate(BaseModel):
type: str = "location"
pinCode: str
userId: str
username: str
latitude: float
longitude: float
timestamp: int
app = FastAPI()
uri = ""
client = MongoClient(uri, server_api=ServerApi('1'))
db = client.ChatApp
# 房间管理类
class RoomManager:
def __init__(self):
self.rooms: Dict[str, List[WebSocket]] = {} # 房间ID (pinCode) 到 WebSocket 列表的映射
async def connect(self, websocket: WebSocket, pin_code: str):
await websocket.accept()
if pin_code not in self.rooms:
self.rooms[pin_code] = []
self.rooms[pin_code].append(websocket)
def disconnect(self, websocket: WebSocket, pin_code: str):
if pin_code in self.rooms:
self.rooms[pin_code].remove(websocket)
if not self.rooms[pin_code]:
del self.rooms[pin_code]
async def broadcast(self, pin_code: str, user_id: str, name: str, message: str, timestamp: int):
"""
向房间内所有用户发送消息。
消息格式为 ChatMessage 类型,并序列化为 JSON 格式。
"""
if pin_code in self.rooms:
# 创建 ChatMessage 实例
chat_message = ChatMessage(
pinCode=pin_code,
userId=user_id,
name=name,
message=message,
timestamp=timestamp
)
# 序列化为 JSON 字符串
json_message = chat_message.json()
# 向房间内所有连接广播消息
for connection in self.rooms[pin_code]:
try:
await connection.send_text(json_message)
print(json_message)
except Exception as e:
print(f"Failed to send message to a client: {e}")
async def send_locations(self, pin_code: str, user_id: str, username: str, latitude: float, longitude:float, timestamp: int):
'''向前端发送location信息 wait test
class LocationUpdate(BaseModel):
type: str = "location"
userId: str
username: str
latitude: float
longitude: float
timestamp: int
'''
if pin_code in self.rooms:
# 创建 ChatMessage 实例
location_message = LocationUpdate(
pinCode = pin_code,
userId = user_id,
username = username,
latitude = latitude,
longitude = longitude,
timestamp = timestamp
)
# 序列化为 JSON 字符串
json_message = location_message.json()
print(json_message)
# 向房间内所有连接广播消息
for connection in self.rooms[pin_code]:
try:
await connection.send_text(json_message)
print(json_message)
except Exception as e:
print(f"Failed to send location to a client: {e}")
manager = RoomManager()
async def send_messages(pin_code: str):
"""
按时间顺序逐条发送历史消息给客户端
"""
try:
# 查询数据库中的消息,按时间戳升序排序
messages_cursor = db.messages.find(
{"room_id": str(pin_code)},
{"_id": 0, "userId": 1, "name": 1, "message": 1, "timestamp": 1}
).sort("timestamp", 1)
# 逐条读取消息并发送
for single_message in messages_cursor:
user_id = single_message["userId"]
name = single_message["name"]
message = single_message["message"]
timestamp = int(single_message["timestamp"])
# 转换为 JSON 格式并发送
await manager.broadcast(pin_code, str(user_id), name, message, timestamp)
except Exception as e:
print(f"Failed to send messages: {e}")
@app.websocket("/ws/{pin_code}")
async def websocket_endpoint(websocket: WebSocket, pin_code: str):
await manager.connect(websocket, pin_code)
print(f"User connected to room {pin_code}")
try:
while True:
# 接收来自客户端的消息
data = await websocket.receive_text()
event = json.loads(data) # 解析 JSON 数据
#joinRoom
if event.get("eventType") == "join":
# 处理 join 事件
user_id = event.get("userId")
username = event.get("name")
pinCode = event.get("pinCode")
query = {"room_id": pinCode}
room = db.rooms.find_one(query)
if room:
print("已存在该聊天室")
# 读取当前房间的 user_num
user_num = int(room.get("user_num", 0))
# 删除旧的 room
db.rooms.delete_one(query)
# 更新 user_num
user_num += 1
# 重新插入更新后的 room
updated_room = {
"room_id": str(pinCode),
"user_num": str(user_num)
}
db.rooms.insert_one(updated_room)
#加载历史聊天消息
await send_messages(pinCode)
else:
initial_room = {
"room_id": str(pinCode),
"user_num": "1"
}
db.rooms.insert_one(initial_room)
print(f"新建聊天室{pinCode}")
print(f"User {username} ({str(user_id)}) joined room {pin_code}")
#sendMessage
if event.get("message"):
'''
ChatMessage:
val type: String = "chat",
val pinCode: String,
val userId: String,
val name: String,
val message: String,
val timestamp: Long = System.currentTimeMillis()
'''
# 接收消息
print("receive one message")
pinCode = event.get("pinCode")
userId = event.get("userId")
name = event.get("name")
message_content = event.get("message")
if event.get("timestamp"):
timestamp = event.get("timestamp")
else:
current_timestamp = int(datetime.now().timestamp() * 1000)
timestamp = current_timestamp
print(f"{pinCode} got one message {message_content}")
message = {
"room_id": str(pinCode),
"userId": str(userId),
"name": str(name),
"message": str(message_content),
"timestamp": int(timestamp),
}
db.messages.insert_one(message)
await manager.broadcast(str(pin_code), str(userId), str(name), str(message_content), int(timestamp))
#sendLocationToServer
if event.get("latitude"):
'''接收位置信息,保存数据库'''
pinCode = event.get("pinCode")
userId = event.get("userId")
username = event.get("username")
latitude = event.get("latitude")
longitude = event.get("longitude")
if event.get("timestamp"):
timestamp = event.get("timestamp")
else:
current_timestamp = int(datetime.now().timestamp() * 1000)
timestamp = current_timestamp
location = {
"room_id": str(pinCode),
"userId": str(userId),
"username": str(username),
"latitude": float(latitude),
"longitude": float(longitude),
'timestamp': int(timestamp)
}
db.locations.insert_one(location)
await manager.send_locations(str(pin_code), str(userId), str(username), float(latitude), float(longitude), int(timestamp))
#leaveRoom
if event.get("eventType") == "leave":
pinCode = event.get("pinCode")
query = {"room_id": pinCode}
room = db.rooms.find_one(query)
if room:
# 读取当前房间的 user_num
user_num = int(room.get("user_num", 0))
# 删除旧的 room
db.rooms.delete_one(query)
# 更新 user_num
user_num -= 1
if user_num != 0:
# 重新插入更新后的 room
updated_room = {
"room_id": str(pinCode),
"user_num": str(user_num)
}
db.rooms.insert_one(updated_room)
else:
'''如果退出后房间中没有用户了就删除该房间'''
db.rooms.delete_one(query)
except WebSocketDisconnect:
manager.disconnect(websocket, pin_code)
print(f"User disconnected from room {pin_code}")