-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathzing_bot.py
More file actions
106 lines (91 loc) · 3.23 KB
/
zing_bot.py
File metadata and controls
106 lines (91 loc) · 3.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import os
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from dotenv import load_dotenv
import httpx
from typing import Optional
import urllib3
urllib3.disable_warnings()
load_dotenv()
TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN')
DEEPSEEK_API_KEY = os.getenv('DEEPSEEK_API_KEY')
USE_PROXY = os.getenv('USE_PROXY', 'false').lower() == 'true'
PROXY_URL = os.getenv('PROXY_URL')
TIMEOUT = 60.0
def get_proxy_settings() -> Optional[dict]:
if USE_PROXY:
return {
'http://': PROXY_URL,
'https://': PROXY_URL
}
return None
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"我是 zingBot,专门怼人的机器人。\n"
"来啊,互相伤害啊!说点什么,让我教教你做人!"
)
async def get_deepseek_response(message: str) -> str:
transport = httpx.AsyncHTTPTransport(
verify=False,
proxy=PROXY_URL if USE_PROXY else None
)
async with httpx.AsyncClient(
transport=transport,
timeout=httpx.Timeout(TIMEOUT)
) as client:
response = await client.post(
"https://api.deepseek.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
},
json={
"model": "deepseek-chat",
"messages": [
{
"role": "system",
"content": "你是一个毒舌机器人,喜欢用幽默但尖锐的方式怼人。记住要有趣但不能太过分或带有侮辱性。"
},
{
"role": "user",
"content": message
}
],
"temperature": 0.7
}
)
result = response.json()
return result['choices'][0]['message']['content']
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_message = update.message.text
try:
response = await get_deepseek_response(user_message)
await update.message.reply_text(response)
except Exception as e:
await update.message.reply_text("哎呀,我正在酝酿更毒的话,请稍后再试~")
print(f"Error: {str(e)}")
def main():
proxy_url = PROXY_URL if USE_PROXY else None
application = (
Application.builder()
.token(TELEGRAM_TOKEN)
.proxy_url(proxy_url)
.get_updates_proxy_url(proxy_url)
.build()
)
application.add_handler(CommandHandler("start", start))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
print("Bot is running...")
print(f"Proxy enabled: {USE_PROXY}")
print(f"Proxy URL: {PROXY_URL if USE_PROXY else 'None'}")
application.run_polling(
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=True,
timeout=TIMEOUT,
read_timeout=TIMEOUT,
write_timeout=TIMEOUT,
connect_timeout=TIMEOUT,
pool_timeout=TIMEOUT
)
if __name__ == '__main__':
main()