-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcommand_handler.py
More file actions
1966 lines (1604 loc) · 87.6 KB
/
command_handler.py
File metadata and controls
1966 lines (1604 loc) · 87.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import time
import logging
import os
import re
import asyncio
import importlib.util
from pathlib import Path
from typing import Callable, Dict, List, Optional, Any
from dataclasses import dataclass
from collections import defaultdict
from plugin_manager import BotPlugin
@dataclass
class Command:
"""命令定义"""
names: List[str]
handler: Callable
admin_only: bool = False
description: str = ""
usage: str = ""
cooldown: int = 0
command_key: str = ""
class RateLimiter:
"""命令速率限制器"""
def __init__(self, default_cooldown: int = 3):
self.default_cooldown = default_cooldown
self.last_use = defaultdict(dict)
def can_use(self, user_id: int, command: str, cooldown: int = None) -> tuple:
"""检查用户是否可以使用命令"""
if cooldown is None:
cooldown = self.default_cooldown
now = time.time()
last_time = self.last_use[user_id].get(command, 0)
elapsed = now - last_time
if elapsed >= cooldown:
self.last_use[user_id][command] = now
return True, None
else:
remaining = int(cooldown - elapsed)
return False, remaining
def reset_user(self, user_id: int):
"""重置用户的所有冷却"""
if user_id in self.last_use:
del self.last_use[user_id]
class CommandHandler:
"""命令处理器"""
def __init__(self, config_manager, logger: logging.Logger, qq_server=None):
self.config_manager = config_manager
self.logger = logger
self.qq_server = qq_server
self.commands: Dict[str, Command] = {}
self.rate_limiter = RateLimiter(config_manager.get_command_cooldown())
def register_command(self,
names: List[str],
handler: Callable,
admin_only: bool = False,
description: str = "",
usage: str = "",
cooldown: int = 0,
command_key: str = ""):
"""注册命令"""
command = Command(
names=names,
handler=handler,
admin_only=admin_only,
description=description,
usage=usage,
cooldown=cooldown,
command_key=command_key
)
for name in names:
self.commands[name.lower()] = command
self.logger.debug(f"已注册命令: {', '.join(names)}")
async def handle_command(self,
command_text: str,
user_id: int,
group_id: int,
command_args: str = "",
plugin_manager = None,
**kwargs) -> Optional[str]:
"""处理命令执行"""
command_text = command_text.strip().lower()
self.logger.debug(f"处理命令: '{command_text}', 参数: '{command_args}', 用户: {user_id}")
# 第一步:检查是否是插件命令
if plugin_manager:
for cmd_name, cmd_info in plugin_manager.command_handlers.items():
cmd_names = cmd_info.get('names', [])
# 检查命令是否匹配(不区分大小写)
if command_text in [name.lower() for name in cmd_names]:
self.logger.debug(f"找到插件命令: {cmd_name}")
handler = cmd_info.get('handler')
admin_only = cmd_info.get('admin_only', False)
is_admin = self.config_manager.is_admin(user_id)
# 检查权限
if admin_only and not is_admin:
return "权限不足:此命令仅限管理员使用"
# 执行插件命令
try:
import asyncio
timeout = 60.0 if admin_only else 30.0
# 准备参数,避免重复传递
plugin_kwargs = {
'command_text': command_args,
'user_id': user_id,
'group_id': group_id,
}
# 添加其他参数,但避免覆盖已有的
for key, value in kwargs.items():
if key not in plugin_kwargs:
plugin_kwargs[key] = value
result = await asyncio.wait_for(
handler(**plugin_kwargs),
timeout=timeout
)
return result
except asyncio.TimeoutError:
self.logger.error(f"命令 {cmd_name} 执行超时 ({timeout}秒)")
return f"命令执行超时,请稍后重试"
except Exception as e:
self.logger.error(f"执行插件命令 {cmd_name} 时出错: {e}", exc_info=True)
return f"命令执行失败: {str(e)}"
# 第二步:检查内置命令
command = self.commands.get(command_text)
if not command:
self.logger.debug(f"未找到命令: '{command_text}'")
return None
self.logger.debug(f"找到命令: {command.names[0]}")
# 检查命令是否可用
is_admin = self.config_manager.is_admin(user_id)
if command.admin_only:
# 管理员命令权限检查
if not is_admin and not self.config_manager.is_admin_command_enabled(command.names[0]):
return f"命令 {command.names[0]} 已被禁用"
else:
# 基础命令权限检查
if not is_admin and command.command_key:
if not self.config_manager.is_command_enabled(command.command_key):
return None
# 检查管理员权限
if command.admin_only and not is_admin:
if not self.config_manager.is_admin_command_enabled(command.names[0]):
return "权限不足:此命令仅限管理员使用"
# 检查冷却时间
can_use, remaining = self.rate_limiter.can_use(
user_id,
command.names[0],
command.cooldown if command.cooldown > 0 else None
)
if not can_use:
return f"命令冷却中,请等待 {remaining} 秒"
# 执行命令
try:
import asyncio
timeout = 60.0 if command.admin_only and command.names[0] in ['start', 'stop', 'log', 'reconnect'] else 30.0
result = await asyncio.wait_for(
command.handler(
user_id=user_id,
group_id=group_id,
command_text=command_args,
**kwargs
),
timeout=timeout
)
return result
except asyncio.TimeoutError:
self.logger.error(f"命令 {command.names[0]} 执行超时 ({timeout}秒)")
return f"命令执行超时,请稍后重试"
except Exception as e:
self.logger.error(f"执行命令 {command.names[0]} 时出错: {e}", exc_info=True)
return f"命令执行失败: {str(e)}"
def get_help_message(self, user_id: int, detailed: bool = False) -> str:
"""获取帮助消息"""
is_admin = self.config_manager.is_admin(user_id)
basic_commands = []
admin_commands = []
enabled_admin_commands = [] # 对非管理员开放的管理员命令
seen_commands = set()
for name, command in self.commands.items():
if command.names[0] not in seen_commands:
seen_commands.add(command.names[0])
if command.admin_only:
if is_admin:
admin_commands.append(command)
else:
# 检查这个管理员命令是否对非管理员开放
if self.config_manager.is_admin_command_enabled(command.names[0]):
enabled_admin_commands.append(command)
else:
if is_admin or (command.command_key and self.config_manager.is_command_enabled(command.command_key)):
basic_commands.append(command)
lines = ["MSMP_QQBot 命令帮助", "••••••••••"]
if basic_commands:
lines.append("\n【基础命令】")
for cmd in basic_commands:
aliases = " / ".join(cmd.names[:3])
lines.append(f"• {aliases}")
if cmd.description:
lines.append(f" {cmd.description}")
if cmd.usage and detailed:
lines.append(f" 用法: {cmd.usage}")
# 对非管理员显示开放的管理员命令
if not is_admin and enabled_admin_commands:
lines.append("\n【开放的管理员命令】")
for cmd in enabled_admin_commands:
aliases = " / ".join(cmd.names[:3])
lines.append(f"• {aliases}")
if cmd.description:
lines.append(f" {cmd.description}")
if admin_commands:
lines.append("\n【管理员专属命令】")
for cmd in admin_commands:
aliases = " / ".join(cmd.names[:3])
lines.append(f"• {aliases}")
if cmd.description:
lines.append(f" {cmd.description}")
lines.append("\n【直接命令执行】")
lines.append("• !<命令>")
lines.append(" 使用 ! 前缀直接执行服务器命令")
lines.append(" 示例: !say Hello 或 !give @a diamond")
# ========== 添加自定义指令信息 ==========
if self.config_manager.is_custom_commands_enabled():
try:
custom_rules = self.config_manager.get_custom_command_rules()
if custom_rules:
# 按权限过滤自定义指令
user_custom_commands = []
for rule in custom_rules:
if not rule.get('enabled', True):
continue
admin_only = rule.get('admin_only', False)
if admin_only and not is_admin:
continue
user_custom_commands.append(rule)
if user_custom_commands:
lines.append("\n【自定义指令】")
lines.append(f"已启用 {len(user_custom_commands)} 个自定义指令:")
for rule in user_custom_commands:
rule_name = rule.get('name', 'unknown')
description = rule.get('description', '')
pattern = rule.get('pattern', '')
admin_tag = " [仅管理员]" if rule.get('admin_only', False) else ""
lines.append(f"• {rule_name}{admin_tag}")
if description:
lines.append(f" {description}")
if pattern:
# 简化显示触发模式
pattern_preview = pattern[:40]
if len(pattern) > 40:
pattern_preview += "..."
lines.append(f" 触发: {pattern_preview}")
except Exception as e:
pass
# ========== 添加自定义消息监听器信息 ==========
if self.config_manager.is_custom_listeners_enabled():
try:
listener_rules = self.config_manager.get_custom_listener_rules()
if listener_rules:
enabled_listeners = [r for r in listener_rules if r.get('enabled', True)]
if enabled_listeners:
lines.append("\n【服务器消息监听】")
lines.append(f"已启用 {len(enabled_listeners)} 个监听规则")
# 显示前几个规则的名称
listener_names = [r.get('name', 'unknown') for r in enabled_listeners]
for name in listener_names[:5]: # 最多显示5个
lines.append(f"• {name}")
if len(listener_names) > 5:
lines.append(f"... 还有 {len(listener_names) - 5} 个规则")
if is_admin:
lines.append(f"使用 listeners 查看完整监听规则详情")
except Exception as e:
pass
# 添加使用提示
if not is_admin:
lines.append(f"\n提示: 当前有 {len(enabled_admin_commands)} 个管理员命令对您开放")
else:
lines.append(f"\n您是管理员,可以使用所有命令")
return "\n".join(lines)
def list_commands(self, admin_only: bool = False) -> List[str]:
"""列出所有命令"""
seen = set()
result = []
for name, command in self.commands.items():
if command.names[0] not in seen:
seen.add(command.names[0])
if not admin_only or command.admin_only:
result.append(command.names[0])
return result
class CommandHandlers:
"""命令处理器集合"""
def __init__(self, msmp_client, rcon_client, qq_server, config_manager, logger):
self.qq_server = qq_server
self.rcon_client = rcon_client
self.config_manager = config_manager
self.logger = logger
self._stop_lock = asyncio.Lock()
self._is_stopping = False
self._shutdown_event = asyncio.Event()
self._shutdown_initiated = False
@property
def msmp_client(self):
"""动态获取 msmp_client"""
return self.qq_server.msmp_client if self.qq_server else None
def set_shutdown_mode(self):
"""设置关闭模式,停止所有连接检测"""
self._shutdown_event.set()
self._is_stopping = True
self.logger.info("已进入关闭模式,停止所有连接检测")
async def handle_list(self, **kwargs) -> str:
"""处理list命令"""
try:
client_type, client = await self.qq_server.connection_manager.get_preferred_client()
# 如果没有连接,尝试自动重连一次
if not client:
self.logger.info("检测到连接未就绪,尝试自动重连...")
await self.qq_server.connection_manager.reconnect_all()
# 重连后再次获取客户端
client_type, client = await self.qq_server.connection_manager.get_preferred_client()
if not client:
return "服务器连接未就绪\n自动重连失败,请使用 reconnect 手动重连"
try:
if client_type == 'msmp':
player_info = client.get_player_list_sync()
else:
player_info = client.get_player_list()
except Exception as e:
self.logger.error(f"获取玩家列表失败: {e}")
return f"获取玩家列表失败: {str(e)}"
lines = [f"在线人数: {player_info.current_players}/{player_info.max_players}"]
if player_info.current_players > 0 and player_info.player_names:
player_list = " ".join(player_info.player_names)
lines.append(f"在线玩家:\n{player_list}")
else:
lines.append("\n暂无玩家在线")
lines.append(f"\n[通过 {client_type.upper()} 查询]")
return "\n".join(lines)
except Exception as e:
self.logger.error(f"执行list命令失败: {e}", exc_info=True)
return f"获取玩家列表失败: {e}"
async def handle_tps(self, **kwargs) -> str:
"""处理tps命令"""
try:
if not self.config_manager.is_rcon_enabled():
return "TPS查询需要启用RCON连接"
client_type, client = await self.qq_server.connection_manager.get_client_for_command("tps")
# 如果没有RCON连接,尝试自动重连一次
if not client or client_type != 'rcon':
self.logger.info("检测到RCON连接未就绪,尝试自动重连...")
await self.qq_server.connection_manager.reconnect_rcon()
# 重连后再次获取客户端
client_type, client = await self.qq_server.connection_manager.get_client_for_command("tps")
if not client or client_type != 'rcon':
return "TPS命令需要RCON连接\n自动重连失败,请使用 reconnect_rcon 重连"
tps_command = self.config_manager.get_tps_command()
result = client.execute_command(tps_command)
if result:
# 第一步:清理Minecraft颜色代码 (§[0-9a-fk-or] 或 &[0-9a-fk-or])
cleaned = re.sub(r'[§&][0-9a-fk-orA-FK-OR]', '', result).strip()
self.logger.debug(f"原始TPS返回: {result}")
self.logger.debug(f"清理后的TPS返回: {cleaned}")
# 第二步:尝试使用正则表达式提取TPS值
tps_value = self._extract_tps_value(cleaned)
# 第三步:构建响应消息
message_lines = ["服务器TPS信息:"]
message_lines.append("=" * 20)
if tps_value is not None:
# 评估TPS状态
tps_status = self._evaluate_tps_status(tps_value)
message_lines.append(f"解析的TPS值: {tps_value:.1f} {tps_status}")
else:
message_lines.append(" 无法解析TPS值,请查看原始信息")
# 当无法解析时,记录调试信息
self.logger.warning(
f"TPS值解析失败\n"
f"正则表达式: {self.config_manager.get_tps_regex()}\n"
f"捕获组索引: {self.config_manager.get_tps_group_index()}\n"
f"清理后的文本: {cleaned}"
)
# 第四步:是否显示原始输出
if self.config_manager.is_tps_raw_output_enabled():
message_lines.append("")
message_lines.append("服务器原始TPS信息:")
message_lines.append("-" * 20)
message_lines.append(cleaned)
message_lines.append("=" * 20)
message_lines.append("[通过 RCON 查询]")
return "\n".join(message_lines)
else:
return "TPS命令执行成功,但无返回结果"
except Exception as e:
self.logger.error(f"执行TPS命令失败: {e}", exc_info=True)
return f"获取TPS信息失败: {e}"
def _extract_tps_value(self, text: str) -> Optional[float]:
"""从服务器返回的文本中提取TPS值"""
try:
tps_regex = self.config_manager.get_tps_regex()
tps_group_index = self.config_manager.get_tps_group_index()
# 验证group_index的有效性
if tps_group_index < 1:
self.logger.warning(f"无效的tps_group_index: {tps_group_index}, 使用默认值1")
tps_group_index = 1
# 使用提供的正则表达式进行匹配
pattern = re.compile(tps_regex, re.IGNORECASE)
match = pattern.search(text)
if match:
# 获取指定的捕获组
try:
tps_str = match.group(tps_group_index)
tps_value = float(tps_str)
# 验证TPS值的合理性(0-20之间)
if 0 <= tps_value <= 20:
self.logger.debug(f"成功提取TPS值: {tps_value}")
return tps_value
else:
self.logger.warning(f"TPS值超出合理范围: {tps_value}")
return None
except (IndexError, ValueError) as e:
self.logger.warning(f"提取捕获组失败 (group {tps_group_index}): {e}")
return None
else:
self.logger.warning(f"正则表达式未匹配: {tps_regex}")
self.logger.debug(f"尝试匹配的文本: {text[:200]}")
# 第五步:如果第一次匹配失败,尝试更宽松的正则表达式
# 这可以处理一些不标准的格式
fallback_patterns = [
r'(\d+(?:\.\d+)?)', # 任何数字或浮点数
r'TPS[:\s]+(\d+(?:\.\d+)?)', # TPS: 数字
r'(\d+(?:\.\d+)?)\s*(?:tps|TPS)', # 数字 TPS
]
for fallback_regex in fallback_patterns:
try:
fallback_pattern = re.compile(fallback_regex, re.IGNORECASE)
fallback_match = fallback_pattern.search(text)
if fallback_match:
fallback_tps_str = fallback_match.group(1)
fallback_tps_value = float(fallback_tps_str)
if 0 <= fallback_tps_value <= 20:
self.logger.info(
f"使用备用正则表达式成功提取TPS值: {fallback_tps_value}\n"
f"备用正则: {fallback_regex}"
)
return fallback_tps_value
except Exception as e:
self.logger.debug(f"备用正则表达式匹配失败 ({fallback_regex}): {e}")
continue
return None
except Exception as e:
self.logger.error(f"提取TPS值时出错: {e}")
return None
def _evaluate_tps_status(self, tps_value: float) -> str:
"""评估TPS状态并返回状态标签"""
if tps_value >= 19.5:
return "优秀"
elif tps_value >= 15:
return "良好"
elif tps_value >= 10:
return "一般"
elif tps_value >= 5:
return "较差"
else:
return "很差"
# 清理Minecraft格式代码
@staticmethod
def _clean_minecraft_colors(text: str) -> str:
"""清理Minecraft颜色代码和格式代码
支持以下格式:
- §[0-9a-fk-or] - Minecraft标准颜色代码
- &[0-9a-fk-or] - 另一种常见格式
"""
# 清理 § 格式的颜色代码
text = re.sub(r'§[0-9a-fk-orA-FK-OR]', '', text)
# 清理 & 格式的颜色代码
text = re.sub(r'&[0-9a-fk-orA-FK-OR]', '', text)
# 清理其他常见的ANSI转义序列
text = re.sub(r'\x1b\[[0-9;]*m', '', text)
return text
async def handle_rules(self, **kwargs) -> str:
"""处理rules命令"""
try:
if not self.config_manager.is_msmp_enabled():
return "规则查询需要启用MSMP连接"
client_type, client = await self.qq_server.connection_manager.ensure_connected()
if not client or client_type != 'msmp':
return "MSMP连接未就绪\n请使用 #reconnect_msmp 手动重连"
self.logger.info("查询服务器规则...")
lines = ["服务器规则信息", "=" * 20]
try:
gamerules_result = await self.msmp_client.get_game_rules()
if 'result' in gamerules_result and isinstance(gamerules_result['result'], list):
gamerules_list = gamerules_result['result']
gamerules_dict = {}
for rule in gamerules_list:
if isinstance(rule, dict) and 'key' in rule and 'value' in rule:
gamerules_dict[rule['key']] = rule['value']
important_rules = {
'keepInventory': '死亡不掉落',
'doDaylightCycle': '时间循环',
'doMobSpawning': '生物生成',
'mobGriefing': '生物破坏',
'doFireTick': '火焰蔓延',
'pvp': 'PVP模式',
'commandBlockOutput': '命令方块输出',
'naturalRegeneration': '自然生命恢复',
'doWeatherCycle': '天气循环',
'announceAdvancements': '成就通知',
'showDeathMessages': '显示死亡信息'
}
rules_found = False
for rule_key, rule_name in important_rules.items():
if rule_key in gamerules_dict:
if not rules_found:
lines.append("\n游戏规则:")
rules_found = True
value = gamerules_dict[rule_key]
if isinstance(value, bool):
value_str = "启用" if value else "禁用"
elif isinstance(value, str) and value.lower() in ['true', 'false']:
value_str = "启用" if value.lower() == 'true' else "禁用"
else:
value_str = str(value)
lines.append(f"• {rule_name}: {value_str}")
server_settings = {
'difficulty': '难度',
'view_distance': '视距',
'simulation_distance': '模拟距离',
'max_players': '最大玩家数',
'game_mode': '默认游戏模式',
'spawn_protection_radius': '出生点保护半径',
'player_idle_timeout': '闲置超时时间'
}
settings_found = False
for setting_key, setting_name in server_settings.items():
try:
result = await self.msmp_client.send_request(f"serversettings/{setting_key}")
if 'result' in result:
if not settings_found:
lines.append("\n服务器设置:")
settings_found = True
value = result['result']
if value is not None:
if setting_key == 'difficulty':
if isinstance(value, str):
difficulty_map = {
'peaceful': '和平',
'easy': '简单',
'normal': '普通',
'hard': '困难'
}
value_str = difficulty_map.get(value.lower(), value)
else:
difficulty_map = {0: '和平', 1: '简单', 2: '普通', 3: '困难'}
value_str = difficulty_map.get(value, str(value))
elif setting_key == 'game_mode':
gamemode_map = {
'survival': '生存',
'creative': '创造',
'adventure': '冒险',
'spectator': '旁观'
}
value_str = gamemode_map.get(str(value).lower(), str(value))
elif setting_key in ['view_distance', 'simulation_distance']:
value_str = f"{value} 区块"
elif setting_key == 'spawn_protection_radius':
value_str = f"{value} 方块"
elif setting_key == 'player_idle_timeout':
if value == 0:
value_str = "禁用"
else:
value_str = f"{value} 分'"
else:
value_str = str(value)
lines.append(f"• {setting_name}: {value_str}")
except Exception as e:
self.logger.debug(f"查询设置 {setting_key} 失败: {e}")
continue
if len(lines) == 2:
lines.append("\n未能获取到规则信息")
lines.append("提示: MSMP连接正常但无法获取规则数据")
lines.append("可能原因:")
lines.append("1. MSMP插件版本过旧")
lines.append("2. 服务器权限配置问题")
lines.append("3. 查看服务器日志了解详情")
lines.append("\n" + "=" * 20)
lines.append("[通过 MSMP 查询]")
return "\n".join(lines)
except Exception as e:
self.logger.error(f"获取规则信息失败: {e}", exc_info=True)
return f"获取规则信息失败: {str(e)}\n提示: 请检查MSMP插件版本和配置"
except Exception as e:
self.logger.error(f"执行rules命令失败: {e}", exc_info=True)
return f"查询服务器规则失败: {e}"
async def handle_status(self, **kwargs) -> str:
"""处理status命令"""
try:
qq_status = "已连接" if self.qq_server.is_connected() else "未连接"
msmp_status = "未启用"
msmp_connected = False
if self.config_manager.is_msmp_enabled():
if not self.msmp_client:
msmp_status = "客户端未初始化"
elif not self.msmp_client.is_connected():
msmp_status = "未连接"
else:
try:
status = self.msmp_client.get_server_status_sync()
version = status.get('version', {})
version_name = version.get('name', 'Unknown')
msmp_status = (
f"运行中\n"
f"版本: {version_name}"
)
msmp_connected = True
except Exception as e:
msmp_status = f"连接异常: {e}"
rcon_status = "未启用"
rcon_connected = False
if self.config_manager.is_rcon_enabled():
if not self.rcon_client:
rcon_status = "客户端未初始化"
elif not self.rcon_client.is_connected():
rcon_status = "未连接"
else:
try:
rcon_status = f"运行中"
rcon_connected = True
except Exception as e:
rcon_status = f"连接异常: {e}"
# 检测外部接入状态
external_access = msmp_connected or rcon_connected
# 添加 Minecraft 服务器状态
mc_server_status = "未启动"
server_process_running = False
if self.qq_server and self.qq_server.server_process:
if self.qq_server.server_process.poll() is None:
# 服务器进程正在运行
server_process_running = True
try:
# 尝试获取更详细的状态
client_type, client = await self.qq_server.connection_manager.ensure_connected()
if client:
if client_type == 'msmp':
player_info = self.msmp_client.get_player_list_sync()
mc_server_status = (
f"运行中 (PID: {self.qq_server.server_process.pid})\n"
f"在线: {player_info.current_players}/{player_info.max_players}"
)
elif client_type == 'rcon':
player_info = self.rcon_client.get_player_list()
mc_server_status = (
f"运行中 (PID: {self.qq_server.server_process.pid})\n"
f"在线: {player_info.current_players}/{player_info.max_players}"
)
else:
mc_server_status = f"运行中 (PID: {self.qq_server.server_process.pid})"
else:
mc_server_status = f"运行中 (PID: {self.qq_server.server_process.pid}) - 连接异常"
except Exception as e:
mc_server_status = f"运行中 (PID: {self.qq_server.server_process.pid}) - 状态获取失败"
else:
return_code = self.qq_server.server_process.poll()
mc_server_status = f"已停止 (退出码: {return_code})"
else:
# 服务器进程未运行,但可能有外部接入
if external_access:
mc_server_status = "运行中 (外部接入)"
else:
mc_server_status = "未启动"
# 构建状态信息
status_lines = [
"系统状态总览",
"■■■■■■■■■■■■■■■",
f"QQ机器人: {qq_status}",
f"MC服务器: {mc_server_status}",
f"MSMP连接: {msmp_status}",
f"RCON连接: {rcon_status}"
]
# 添加外部接入提示(当有外部接入但服务器进程未运行时)
if external_access and not server_process_running:
status_lines.append("■■■■■■■■■■■■■■■")
status_lines.append("检测到外部接入: 服务器通过MSMP/RCON远程管理")
return "\n".join(status_lines)
except Exception as e:
self.logger.error(f"执行status命令失败: {e}", exc_info=True)
return f"获取状态失败: {e}"
def _format_uptime(self, seconds: float) -> str:
"""格式化运行时间"""
days = int(seconds // 86400)
hours = int((seconds % 86400) // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
if days > 0:
return f"{days}天{hours}时{minutes}分"
elif hours > 0:
return f"{hours}时{minutes}分{seconds}秒"
elif minutes > 0:
return f"{minutes}分{seconds}秒"
else:
return f"{seconds}秒"
async def handle_help(self, user_id: int, **kwargs) -> str:
"""处理help命令"""
if hasattr(self.qq_server, 'command_handler'):
return self.qq_server.command_handler.get_help_message(user_id)
return "帮助系统未初始化"
async def handle_stop(self, user_id: int, group_id: int, websocket, is_private: bool = False, **kwargs) -> str:
"""处理stop命令(管理员) - 支持MSMP和RCON"""
async with self._stop_lock:
if self._is_stopping:
return "服务器已在停止中,请勿重复执行"
self._is_stopping = True
try:
# 先检查服务器是否在运行
if not self.qq_server or not self.qq_server.server_process:
self._is_stopping = False
return "服务器未运行"
if self.qq_server.server_process.poll() is not None:
self._is_stopping = False
return "服务器已经停止"
if is_private:
await self.qq_server.send_private_message(websocket, user_id, "正在停止服务器...")
else:
await self.qq_server.send_group_message(websocket, group_id, "正在停止服务器...")
# ============ 触发服务器停止事件 ============
if hasattr(self.qq_server, 'plugin_manager') and self.qq_server.plugin_manager:
self.logger.info("触发 server_stopping 事件给所有插件")
await self.qq_server.plugin_manager.trigger_event("server_stopping")
# ============ 事件触发结束 ============
# 第一步:立即设置服务器停止标志,停止日志采集
self.qq_server.server_stopping = True
# 第二步:先尝试通过连接发送停止命令(在关闭连接之前)
stop_success = False
try:
# 使用连接管理器获取客户端
client_type, client = await self.qq_server.connection_manager.ensure_connected()
if client:
if client_type == 'msmp':
result = client.execute_command_sync("server/stop")
if 'result' in result:
stop_success = True
self.logger.info("MSMP 停止命令已发送")
elif client_type == 'rcon':
result = client.execute_command("stop")
stop_success = True
self.logger.info("RCON停止命令已发送")
else:
self.logger.warning("无可用连接发送停止命令")
except Exception as e:
self.logger.warning(f"通过连接发送停止命令失败: {e}")
# 第三步:如果无法通过连接停止,尝试通过标准输入发送停止命令
if not stop_success:
try:
if (self.qq_server.server_process and
self.qq_server.server_process.poll() is None and
self.qq_server.server_process.stdin):
stop_command = "stop\n"
self.qq_server.server_process.stdin.write(stop_command.encode('utf-8'))
self.qq_server.server_process.stdin.flush()
self.logger.info("已通过标准输入发送停止命令")
stop_success = True
except Exception as e:
self.logger.warning(f"通过标准输入发送停止命令失败: {e}")
# 第四步:立即彻底关闭所有连接
await self._thorough_shutdown()
self.logger.info("停止命令已发送,等待服务器关闭进程...")
# 等待服务器关闭
max_wait_time = 60
wait_interval = 5
waited_time = 0
while (waited_time < max_wait_time and
self.qq_server.server_process and
self.qq_server.server_process.poll() is None):
await asyncio.sleep(wait_interval)
waited_time += wait_interval
self.logger.info(f"等待服务器关闭... ({waited_time}/{max_wait_time}秒)")
# 检查服务器是否已关闭
server_stopped = True
if self.qq_server.server_process:
return_code = self.qq_server.server_process.poll()
if return_code is None:
server_stopped = False
self.logger.warning(f"服务器进程在{max_wait_time}秒后仍未关闭")
else:
self.logger.info(f"服务器进程已关闭,返回码: {return_code}")
# 给日志采集任务一点时间读取剩余输出
await asyncio.sleep(2)
result_message = "服务器已成功关闭" if server_stopped else "停止命令已发送,但服务器进程仍在运行中。可能需要手动检查或使用 #kill 命令强制停止"
print(result_message)
return result_message
except Exception as e:
self.logger.error(f"执行stop命令失败: {e}", exc_info=True)
# 出错时也要执行关闭
await self._thorough_shutdown()
error_msg = f"停止服务器失败: {e}"
if kwargs.get('from_console', False):
print(error_msg)
return error_msg
finally:
self._is_stopping = False
async def _thorough_shutdown(self):
"""彻底关闭所有连接"""
self.logger.info("执行彻底关闭操作...")
# 第一步:通过连接管理器设置关闭模式
if hasattr(self.qq_server, 'connection_manager'):
await self.qq_server.connection_manager.set_shutdown_mode()
# 第二步:强制关闭MSMP连接
if self.msmp_client:
try:
if hasattr(self.msmp_client, 'close_sync'):
self.msmp_client.close_sync()
elif hasattr(self.msmp_client, 'close'):
await asyncio.wait_for(self.msmp_client.close(), timeout=3.0)
self.logger.info("MSMP连接已强制关闭")
except Exception as e:
self.logger.debug(f"强制关闭MSMP连接时出错: {e}")
# 第三步:强制关闭RCON连接
if self.rcon_client:
try:
if hasattr(self.rcon_client, 'close'):
self.rcon_client.close()
self.logger.info("RCON连接已强制关闭")
except Exception as e:
self.logger.debug(f"强制关闭RCON连接时出错: {e}")
# 第四步:关闭日志文件
if self.qq_server:
try:
self.qq_server._close_log_file()
self.logger.info("服务器日志文件已关闭")
except Exception as e:
self.logger.debug(f"关闭日志文件出错: {e}")
self._shutdown_initiated = True
self.logger.info("彻底关闭操作完成")
async def _immediate_shutdown(self):
"""立即关闭所有连接(用于kill命令)"""