-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmac_server.py
More file actions
executable file
·295 lines (265 loc) · 10.9 KB
/
mac_server.py
File metadata and controls
executable file
·295 lines (265 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
#!/usr/bin/env python3
"""
Mac 音乐控制服务器
用于接收来自 Apple Watch 的控制命令
"""
from http.server import HTTPServer, BaseHTTPRequestHandler
import subprocess
import sys
import time
import threading
PORT = 12266
# 音量控制相关的全局变量
volume_lock = threading.Lock()
volume_queue = {'target': None, 'last_set_time': 0, 'pending_timer': None}
volume_min_interval = 0.1 # 最小执行间隔(秒)
class MediaControlHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
"""自定义日志格式"""
sys.stdout.write("%s - [%s] %s\n" %
(self.address_string(),
self.log_date_time_string(),
format % args))
def do_POST(self):
"""处理 POST 请求"""
command = self.path.strip('/')
print(f"收到命令: {command}")
# 执行 AppleScript 命令控制媒体播放
success = self.execute_media_command(command)
try:
if success:
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'OK')
else:
self.send_response(400)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'Invalid command')
except BrokenPipeError:
# 客户端已断开连接,忽略此错误(命令已执行成功)
print("客户端连接已断开,但命令已执行")
def do_GET(self):
"""处理 GET 请求 - 用于获取音量和心跳检查"""
if self.path == '/ping':
# 轻量级心跳检查,不执行任何操作
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'pong')
elif self.path == '/volume':
try:
script = '''
output volume of (get volume settings)
'''
result = subprocess.run(['osascript', '-e', script],
capture_output=True,
text=True,
check=True)
volume = result.stdout.strip()
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(volume.encode())
print(f"返回当前音量: {volume}")
except Exception as e:
print(f"获取音量失败: {e}")
self.send_response(500)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'Error')
elif self.path == '/playstate':
try:
script = '''
tell application "Music"
if player state is playing then
return "playing"
else
return "paused"
end if
end tell
'''
result = subprocess.run(['osascript', '-e', script],
capture_output=True,
text=True,
check=True)
state = result.stdout.strip()
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(state.encode())
print(f"返回播放状态: {state}")
except Exception as e:
print(f"获取播放状态失败: {e}")
self.send_response(500)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'Error')
else:
self.send_response(404)
self.end_headers()
def execute_volume_change(self, volume):
"""实际执行音量设置"""
try:
script = f'''
set volume output volume {volume}
'''
subprocess.run(['osascript', '-e', script], check=True)
print(f"✓ 设置音量为 {volume}")
except Exception as e:
print(f"设置音量失败: {e}")
def schedule_volume_execution(self, volume, delay):
"""延迟执行音量设置"""
def delayed_set():
time.sleep(delay)
with volume_lock:
# 检查是否还需要执行(可能有新的请求)
if volume_queue['target'] == volume:
volume_queue['last_set_time'] = time.time()
volume_queue['pending_timer'] = None
self.execute_volume_change(volume)
print(f"延迟执行: 设置音量为 {volume}")
timer = threading.Timer(delay, delayed_set)
timer.daemon = True
timer.start()
return timer
def execute_media_command(self, command):
"""执行媒体控制命令"""
try:
# 处理设置音量命令 - 使用节流机制
if command.startswith('setvolume/'):
try:
volume = int(command.split('/')[1])
volume = max(0, min(100, volume)) # 限制在 0-100
current_time = time.time()
with volume_lock:
# 更新目标音量
volume_queue['target'] = volume
# 取消之前的待执行任务
if volume_queue['pending_timer']:
volume_queue['pending_timer'].cancel()
volume_queue['pending_timer'] = None
# 检查是否可以立即执行
time_since_last = current_time - volume_queue['last_set_time']
if time_since_last < volume_min_interval:
# 太快了,安排延迟执行
delay = volume_min_interval - time_since_last
print(f"节流: 目标音量 {volume},将在 {delay:.2f}s 后执行")
volume_queue['pending_timer'] = self.schedule_volume_execution(volume, delay)
return True
# 可以立即执行
volume_queue['last_set_time'] = current_time
# 立即执行设置音量
self.execute_volume_change(volume)
return True
except (ValueError, IndexError):
print(f"无效的音量值: {command}")
return False
if command == 'playpause':
# 播放/暂停 - 直接控制 Music 应用
script = '''
tell application "Music"
playpause
end tell
'''
elif command == 'next':
# 下一曲 - 直接控制 Music 应用
script = '''
tell application "Music"
next track
end tell
'''
elif command == 'previous':
# 上一曲 - 直接控制 Music 应用
script = '''
tell application "Music"
previous track
end tell
'''
elif command == 'volumeup':
# 音量增加 - 按钮点击,大步长 6.25%
script = '''
set currentVolume to output volume of (get volume settings)
if currentVolume < 100 then
set volume output volume (currentVolume + 6.25)
end if
'''
elif command == 'volumedown':
# 音量减小 - 按钮点击,大步长 6.25%
script = '''
set currentVolume to output volume of (get volume settings)
if currentVolume > 0 then
set volume output volume (currentVolume - 6.25)
end if
'''
elif command == 'volumeup_small':
# 音量增加 - 滚轮旋转,小步长 2%
script = '''
set currentVolume to output volume of (get volume settings)
if currentVolume < 100 then
set volume output volume (currentVolume + 2)
end if
'''
elif command == 'volumedown_small':
# 音量减小 - 滚轮旋转,小步长 2%
script = '''
set currentVolume to output volume of (get volume settings)
if currentVolume > 0 then
set volume output volume (currentVolume - 2)
end if
'''
else:
return False
# 执行 AppleScript
result = subprocess.run(['osascript', '-e', script],
capture_output=True,
text=True,
check=True)
print(f"命令执行成功: {command}")
return True
except subprocess.CalledProcessError as e:
print(f"执行命令失败: {e}")
print(f"错误输出: {e.stderr}")
if "not allowed" in e.stderr or "not permitted" in e.stderr:
print("⚠️ 可能缺少辅助功能权限!")
print("请前往:系统设置 > 隐私与安全性 > 辅助功能")
print("添加 Terminal 或 Python 的权限")
return False
except Exception as e:
print(f"执行命令失败: {e}")
return False
def get_local_ip():
"""获取本机 IP 地址"""
try:
result = subprocess.run(['ifconfig'], capture_output=True, text=True)
lines = result.stdout.split('\n')
for i, line in enumerate(lines):
if 'en0:' in line or 'en1:' in line:
for j in range(i, min(i+10, len(lines))):
if 'inet ' in lines[j] and '127.0.0.1' not in lines[j]:
ip = lines[j].split()[1]
return ip
except:
pass
return "无法获取"
def main():
server_address = ('', PORT)
httpd = HTTPServer(server_address, MediaControlHandler)
local_ip = get_local_ip()
print("=" * 50)
print("Mac 音乐控制服务器已启动")
print("=" * 50)
print(f"本机 IP 地址: {local_ip}")
print(f"服务器端口: {PORT}")
print(f"请在 Apple Watch 设置中输入: {local_ip}")
print("=" * 50)
print("按 Ctrl+C 停止服务器")
print()
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\n服务器已停止")
sys.exit(0)
if __name__ == '__main__':
main()