diff --git a/dashboard/server.py b/dashboard/server.py index 57fd64b..77fd87b 100644 --- a/dashboard/server.py +++ b/dashboard/server.py @@ -15,6 +15,13 @@ from http.server import BaseHTTPRequestHandler, HTTPServer from urllib.parse import urlparse from urllib.request import Request, urlopen +import shutil + +if hasattr(sys.stdout, 'reconfigure'): + sys.stdout.reconfigure(encoding='utf-8', errors='replace') +if hasattr(sys.stderr, 'reconfigure'): + sys.stderr.reconfigure(encoding='utf-8', errors='replace') +os.environ.setdefault('PYTHONIOENCODING', 'utf-8') # 引入文件锁工具,确保与其他脚本并发安全 scripts_dir = str(pathlib.Path(__file__).parent.parent / 'scripts') @@ -60,7 +67,7 @@ def read_json(path, default=None): try: - return json.loads(path.read_text()) + return json.loads(path.read_text(encoding='utf-8-sig')) except Exception: return default if default is not None else {} @@ -669,14 +676,24 @@ def handle_review_action(task_id, action, comment=''): ] +def _find_openclaw_cmd(): + """跨平台定位 openclaw CLI,可用于 subprocess 调用。""" + candidates = [ + shutil.which('openclaw'), + shutil.which('openclaw.cmd'), + shutil.which('openclaw.ps1'), + r'C:\nvm4w\nodejs\openclaw.cmd', + r'C:\nvm4w\nodejs\openclaw', + ] + for c in candidates: + if c and pathlib.Path(c).exists(): + return c + return 'openclaw' + + def _check_gateway_alive(): - """检测 Gateway 进程是否在运行。""" - try: - result = subprocess.run(['pgrep', '-f', 'openclaw-gateway'], - capture_output=True, text=True, timeout=5) - return result.returncode == 0 - except Exception: - return False + """跨平台检查 Gateway 是否在线:以本地 HTTP probe 为准。""" + return _check_gateway_probe() def _check_gateway_probe(): @@ -833,7 +850,7 @@ def wake_agent(agent_id, message=''): def do_wake(): try: - cmd = ['openclaw', 'agent', '--agent', runtime_id, '-m', msg, '--timeout', '120'] + cmd = [_find_openclaw_cmd(), 'agent', '--agent', runtime_id, '-m', msg, '--timeout', '120'] log.info(f'🔔 唤醒 {agent_id}...') # 带重试(最多2次) for attempt in range(1, 3): @@ -1258,6 +1275,17 @@ def _collect_message_text(msg): return ''.join(parts) + + +def _clean_garbled_text(text): + """尽量清洗常见乱码与不可显示字符,避免前端出现大面积锟斤拷。""" + if not isinstance(text, str): + return '' + t = text.replace('\ufeff', '').replace('\x00', '') + if '锟斤拷' in t: + t = t.replace('锟斤拷', '�') + t = ''.join(ch for ch in t if ch >= ' ' or ch in '\n\t') + return t.strip() def _parse_activity_entry(item): """将 session jsonl 的 message 统一解析成看板活动条目。""" msg = item.get('message') or {} @@ -1270,9 +1298,9 @@ def _parse_activity_entry(item): tool_calls = [] for c in msg.get('content', []) or []: if c.get('type') == 'text' and c.get('text') and not text: - text = str(c.get('text', '')).strip() + text = _clean_garbled_text(str(c.get('text', '')).strip()) elif c.get('type') == 'thinking' and c.get('thinking') and not thinking: - thinking = str(c.get('thinking', '')).strip()[:200] + thinking = _clean_garbled_text(str(c.get('thinking', '')).strip())[:200] elif c.get('type') == 'tool_use': tool_calls.append({ 'name': c.get('name', ''), @@ -1297,13 +1325,13 @@ def _parse_activity_entry(item): output = '' for c in msg.get('content', []) or []: if c.get('type') == 'text' and c.get('text'): - output = str(c.get('text', '')).strip()[:200] + output = _clean_garbled_text(str(c.get('text', '')).strip())[:200] break if not output: for key in ('output', 'stdout', 'stderr', 'message'): val = details.get(key) if isinstance(val, str) and val.strip(): - output = val.strip()[:200] + output = _clean_garbled_text(val.strip())[:200] break entry = { @@ -1322,7 +1350,7 @@ def _parse_activity_entry(item): text = '' for c in msg.get('content', []) or []: if c.get('type') == 'text' and c.get('text'): - text = str(c.get('text', '')).strip() + text = _clean_garbled_text(str(c.get('text', '')).strip()) break if not text: return None @@ -1350,7 +1378,7 @@ def get_agent_activity(agent_id, limit=30, task_id=None): for session_file in files_to_scan: try: - lines = session_file.read_text(errors='ignore').splitlines() + lines = session_file.read_text(encoding='utf-8', errors='replace').splitlines() except Exception: continue @@ -1416,7 +1444,7 @@ def get_agent_activity_by_keywords(agent_id, keywords, limit=20): target_file = None for sf in jsonl_files[:5]: try: - content = sf.read_text(errors='ignore') + content = sf.read_text(encoding='utf-8', errors='replace') except Exception: continue hits = sum(1 for kw in keywords if kw.lower() in content.lower()) @@ -1956,7 +1984,7 @@ def _do_dispatch(): 'lastDispatchTrigger': trigger, })) return - cmd = ['openclaw', 'agent', '--agent', agent_id, '-m', msg, + cmd = [_find_openclaw_cmd(), 'agent', '--agent', agent_id, '-m', msg, '--deliver', '--channel', 'feishu', '--timeout', '300'] max_retries = 2 err = '' @@ -2487,3 +2515,4 @@ def main(): if __name__ == '__main__': main() + diff --git a/scripts/file_lock.py b/scripts/file_lock.py index 42005bb..5ce13f4 100644 --- a/scripts/file_lock.py +++ b/scripts/file_lock.py @@ -1,44 +1,76 @@ """ -文件锁工具 — 防止多进程并发读写 JSON 文件导致数据丢失。 +文件锁工具:防止多个进程并发读写 JSON 文件导致数据丢失。 -用法: - from file_lock import atomic_json_update, atomic_json_read - - # 原子读取 - data = atomic_json_read(path, default=[]) - - # 原子更新(读 → 修改 → 写回,全程持锁) - def modifier(tasks): - tasks.append(new_task) - return tasks - atomic_json_update(path, modifier, default=[]) +跨平台说明: +- POSIX(Linux/macOS)使用 fcntl.flock +- Windows 使用 msvcrt.locking +- 若锁实现异常,仍保留原子写(tmp file + replace)能力 """ -import fcntl import json import os import pathlib import tempfile +from contextlib import contextmanager from typing import Any, Callable +try: + import fcntl # type: ignore +except ImportError: + fcntl = None + +try: + import msvcrt # type: ignore +except ImportError: + msvcrt = None + + +LOCK_EX = 1 +LOCK_SH = 2 +LOCK_UN = 8 + def _lock_path(path: pathlib.Path) -> pathlib.Path: return path.parent / (path.name + '.lock') -def atomic_json_read(path: pathlib.Path, default: Any = None) -> Any: - """持锁读取 JSON 文件。""" - lock_file = _lock_path(path) +@contextmanager +def _locked_fd(lock_file: pathlib.Path, mode: int): + """Open a lock file and hold a best-effort cross-platform lock.""" lock_file.parent.mkdir(parents=True, exist_ok=True) fd = os.open(str(lock_file), os.O_CREAT | os.O_RDWR) try: - fcntl.flock(fd, fcntl.LOCK_SH) + if fcntl is not None: + fcntl.flock(fd, fcntl.LOCK_EX if mode == LOCK_EX else fcntl.LOCK_SH) + elif msvcrt is not None: + # Lock the first byte; ensure file has at least one byte. + os.lseek(fd, 0, os.SEEK_SET) + try: + os.write(fd, b'0') + except OSError: + pass + os.lseek(fd, 0, os.SEEK_SET) + lock_mode = msvcrt.LK_LOCK if mode == LOCK_EX else msvcrt.LK_RLCK + msvcrt.locking(fd, lock_mode, 1) + yield fd + finally: try: - return json.loads(path.read_text()) if path.exists() else default + if fcntl is not None: + fcntl.flock(fd, fcntl.LOCK_UN) + elif msvcrt is not None: + os.lseek(fd, 0, os.SEEK_SET) + msvcrt.locking(fd, msvcrt.LK_UNLCK, 1) + finally: + os.close(fd) + + +def atomic_json_read(path: pathlib.Path, default: Any = None) -> Any: + """原子读取 JSON 文件。""" + lock_file = _lock_path(path) + with _locked_fd(lock_file, LOCK_SH): + try: + return json.loads(path.read_text(encoding='utf-8')) if path.exists() else default except Exception: return default - finally: - fcntl.flock(fd, fcntl.LOCK_UN) - os.close(fd) def atomic_json_update( @@ -46,59 +78,38 @@ def atomic_json_update( modifier: Callable[[Any], Any], default: Any = None, ) -> Any: - """ - 原子地读取 → 修改 → 写回 JSON 文件。 - modifier(data) 应返回修改后的数据。 - 使用临时文件 + rename 保证写入原子性。 - """ + """原子读-改-写 JSON 文件。""" lock_file = _lock_path(path) - lock_file.parent.mkdir(parents=True, exist_ok=True) - fd = os.open(str(lock_file), os.O_CREAT | os.O_RDWR) - try: - fcntl.flock(fd, fcntl.LOCK_EX) - # Read + path.parent.mkdir(parents=True, exist_ok=True) + with _locked_fd(lock_file, LOCK_EX): try: - data = json.loads(path.read_text()) if path.exists() else default + data = json.loads(path.read_text(encoding='utf-8')) if path.exists() else default except Exception: data = default - # Modify result = modifier(data) - # Atomic write via temp file + rename - tmp_fd, tmp_path = tempfile.mkstemp( - dir=str(path.parent), suffix='.tmp', prefix=path.stem + '_' - ) + tmp_fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix='.tmp', prefix=path.stem + '_') try: - with os.fdopen(tmp_fd, 'w') as f: + with os.fdopen(tmp_fd, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) os.replace(tmp_path, str(path)) except Exception: - os.unlink(tmp_path) + if os.path.exists(tmp_path): + os.unlink(tmp_path) raise return result - finally: - fcntl.flock(fd, fcntl.LOCK_UN) - os.close(fd) def atomic_json_write(path: pathlib.Path, data: Any) -> None: - """原子写入 JSON 文件(持排他锁 + tmpfile rename)。 - 直接写入,不读取现有内容(避免 atomic_json_update 的多余读开销)。 - """ + """原子写入 JSON 文件。""" lock_file = _lock_path(path) - lock_file.parent.mkdir(parents=True, exist_ok=True) - fd = os.open(str(lock_file), os.O_CREAT | os.O_RDWR) - try: - fcntl.flock(fd, fcntl.LOCK_EX) - tmp_fd, tmp_path = tempfile.mkstemp( - dir=str(path.parent), suffix='.tmp', prefix=path.stem + '_' - ) + path.parent.mkdir(parents=True, exist_ok=True) + with _locked_fd(lock_file, LOCK_EX): + tmp_fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix='.tmp', prefix=path.stem + '_') try: - with os.fdopen(tmp_fd, 'w') as f: + with os.fdopen(tmp_fd, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) os.replace(tmp_path, str(path)) except Exception: - os.unlink(tmp_path) + if os.path.exists(tmp_path): + os.unlink(tmp_path) raise - finally: - fcntl.flock(fd, fcntl.LOCK_UN) - os.close(fd) diff --git a/scripts/kanban_update.py b/scripts/kanban_update.py index 0566cac..65179f5 100644 --- a/scripts/kanban_update.py +++ b/scripts/kanban_update.py @@ -24,7 +24,31 @@ """ import json, pathlib, datetime, sys, subprocess, logging, os, re -_BASE = pathlib.Path(__file__).resolve().parent.parent +if hasattr(sys.stdout, 'reconfigure'): + sys.stdout.reconfigure(encoding='utf-8', errors='replace') +if hasattr(sys.stderr, 'reconfigure'): + sys.stderr.reconfigure(encoding='utf-8', errors='replace') +os.environ.setdefault('PYTHONIOENCODING', 'utf-8') + +def _resolve_repo_base(): + env = os.environ.get('EDICT_REPO_DIR', '').strip() + candidates = [] + if env: + candidates.append(pathlib.Path(env)) + candidates.extend([ + pathlib.Path.home() / 'edict', + pathlib.Path(__file__).resolve().parent.parent, + ]) + for c in candidates: + try: + if (c / 'dashboard' / 'server.py').exists() and (c / 'scripts' / 'refresh_live_data.py').exists(): + return c + except Exception: + pass + return pathlib.Path(__file__).resolve().parent.parent + + +_BASE = _resolve_repo_base() TASKS_FILE = _BASE / 'data' / 'tasks_source.json' REFRESH_SCRIPT = _BASE / 'scripts' / 'refresh_live_data.py' diff --git a/scripts/refresh_live_data.py b/scripts/refresh_live_data.py index bd96490..1029928 100644 --- a/scripts/refresh_live_data.py +++ b/scripts/refresh_live_data.py @@ -1,5 +1,8 @@ #!/usr/bin/env python3 -import json, pathlib, datetime, logging +import json +import pathlib +import datetime +import logging from file_lock import atomic_json_write, atomic_json_read from utils import read_json @@ -18,11 +21,29 @@ def output_meta(path): return {"exists": True, "lastModified": ts} +def clean_text(value): + if not isinstance(value, str): + return value + t = value.replace('\ufeff', '').replace('\x00', '') + # 常见残留乱码 + t = t.replace('锟斤拷', '�').replace('宸叉帴鏃', '已接旨') + t = ''.join(ch for ch in t if ch >= ' ' or ch in '\n\t') + return t.strip() + + +def clean_obj(obj): + if isinstance(obj, dict): + return {k: clean_obj(v) for k, v in obj.items()} + if isinstance(obj, list): + return [clean_obj(v) for v in obj] + if isinstance(obj, str): + return clean_text(obj) + return obj + + def main(): - # 使用 officials_stats.json(与 sync_officials_stats.py 统一) officials_data = read_json(DATA / 'officials_stats.json', {}) officials = officials_data.get('officials', []) if isinstance(officials_data, dict) else officials_data - # 任务源优先:tasks_source.json(可对接外部系统同步写入) tasks = atomic_json_read(DATA / 'tasks_source.json', []) if not tasks: tasks = read_json(DATA / 'tasks.json', []) @@ -40,7 +61,6 @@ def main(): t['org'] = t.get('org') or org_map.get(t.get('official', ''), '') t['outputMeta'] = output_meta(t.get('output', '')) - # 心跳时效检测:对 Doing/Assigned 状态的任务标注活跃度 if t.get('state') in ('Doing', 'Assigned', 'Review'): updated_raw = t.get('updatedAt') or t.get('sourceMeta', {}).get('updatedAt') age_sec = None @@ -54,28 +74,29 @@ def main(): except Exception: pass if age_sec is None: - t['heartbeat'] = {'status': 'unknown', 'label': '⚪ 未知', 'ageSec': None} + t['heartbeat'] = {'status': 'unknown', 'label': '? 未知', 'ageSec': None} elif age_sec < 180: t['heartbeat'] = {'status': 'active', 'label': f'🟢 活跃 {int(age_sec//60)}分钟前', 'ageSec': int(age_sec)} elif age_sec < 600: - t['heartbeat'] = {'status': 'warn', 'label': f'🟡 可能停滞 {int(age_sec//60)}分钟前', 'ageSec': int(age_sec)} + t['heartbeat'] = {'status': 'warn', 'label': f'🟡 停滞 {int(age_sec//60)}分钟前', 'ageSec': int(age_sec)} else: - t['heartbeat'] = {'status': 'stalled', 'label': f'🔴 已停滞 {int(age_sec//60)}分钟', 'ageSec': int(age_sec)} + t['heartbeat'] = {'status': 'stalled', 'label': f'🔴 停滞 {int(age_sec//60)}分钟', 'ageSec': int(age_sec)} else: t['heartbeat'] = None today_str = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d') - def _is_today_done(t): - if t.get('state') != 'Done': + + def _is_today_done(task): + if task.get('state') != 'Done': return False - ua = t.get('updatedAt', '') + ua = task.get('updatedAt', '') if isinstance(ua, str) and ua[:10] == today_str: return True - # fallback: outputMeta lastModified - lm = t.get('outputMeta', {}).get('lastModified', '') + lm = task.get('outputMeta', {}).get('lastModified', '') if isinstance(lm, str) and lm[:10] == today_str: return True return False + today_done = sum(1 for t in tasks if _is_today_done(t)) total_done = sum(1 for t in tasks if t.get('state') == 'Done') in_progress = sum(1 for t in tasks if t.get('state') in ['Doing', 'Review', 'Next', 'Blocked']) @@ -90,15 +111,15 @@ def _is_today_done(t): 'official': t.get('official'), 'task': t.get('title'), 'out': t.get('output'), - 'qa': '通过' if t.get('outputMeta', {}).get('exists') else '待补成果' + 'qa': '通过' if t.get('outputMeta', {}).get('exists') else '未生成成果' }) payload = { 'generatedAt': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'taskSource': 'tasks_source.json' if (DATA / 'tasks_source.json').exists() else 'tasks.json', 'officials': officials, - 'tasks': tasks, - 'history': history, + 'tasks': clean_obj(tasks), + 'history': clean_obj(history), 'metrics': { 'officialCount': len(officials), 'todayDone': today_done, @@ -106,7 +127,7 @@ def _is_today_done(t): 'inProgress': in_progress, 'blocked': blocked }, - 'syncStatus': sync_status, + 'syncStatus': clean_obj(sync_status), 'health': { 'syncOk': bool(sync_status.get('ok', False)), 'syncLatencyMs': sync_status.get('durationMs'), diff --git a/scripts/run_loop_windows.py b/scripts/run_loop_windows.py new file mode 100644 index 0000000..e27713f --- /dev/null +++ b/scripts/run_loop_windows.py @@ -0,0 +1,9 @@ +import time, pathlib, subprocess, sys +base = pathlib.Path(r"C:\Users\0x00\edict") +while True: + for name in ["sync_from_openclaw_runtime.py","sync_agent_config.py","apply_model_changes.py","sync_officials_stats.py","refresh_live_data.py"]: + try: + subprocess.run([sys.executable, str(base / 'scripts' / name)], cwd=str(base), timeout=30, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except Exception: + pass + time.sleep(15) diff --git a/scripts/sync_from_openclaw_runtime.py b/scripts/sync_from_openclaw_runtime.py index 8b3de21..679bada 100644 --- a/scripts/sync_from_openclaw_runtime.py +++ b/scripts/sync_from_openclaw_runtime.py @@ -17,6 +17,41 @@ SESSIONS_ROOT = pathlib.Path.home() / '.openclaw' / 'agents' +def looks_mojibake(value): + if not isinstance(value, str) or not value: + return False + bad_tokens = ['鎴', '鍒', '浠', '锟', '宸', '馃', '�'] + return any(tok in value for tok in bad_tokens) + + +def try_unmojibake(value): + if not isinstance(value, str): + return value + candidates = [value] + try: + candidates.append(value.encode('latin1', errors='ignore').decode('utf-8', errors='ignore')) + except Exception: + pass + try: + candidates.append(value.encode('gbk', errors='ignore').decode('utf-8', errors='ignore')) + except Exception: + pass + for c in candidates: + if c and not looks_mojibake(c): + return c + return value + + +def clean_text(value): + if not isinstance(value, str): + return value + v = value.replace('[[reply_to_current]]', '').replace('\ufeff', '').replace('\x00', '') + v = try_unmojibake(v) + v = v.replace('宸叉帴鏃', '已接旨').replace('锟斤拷', '�') + v = ''.join(ch for ch in v if ch >= ' ' or ch in '\n\t') + return v.strip() + + def write_status(**kwargs): atomic_json_write(SYNC_STATUS, kwargs) @@ -64,7 +99,7 @@ def load_activity(session_file, limit=12): return [] rows = [] try: - lines = p.read_text(errors='ignore').splitlines() + lines = p.read_text(encoding='utf-8', errors='replace').splitlines() except Exception: return [] @@ -83,12 +118,14 @@ def load_activity(session_file, limit=12): msg = item.get('message') or {} role = msg.get('role') ts = item.get('timestamp') or '' + if role == 'assistant' and msg.get('errorMessage') == 'terminated': + continue if role == 'toolResult': tool = msg.get('toolName', '-') details = msg.get('details') or {} # If tool output is short, show it - content = msg.get('content', [{'text': ''}])[0].get('text', '') + content = clean_text(msg.get('content', [{'text': ''}])[0].get('text', '')) if len(content) < 50: text = f"Tool '{tool}' returned: {content}" else: @@ -99,11 +136,11 @@ def load_activity(session_file, limit=12): text = '' for c in msg.get('content', []): if c.get('type') == 'text' and c.get('text'): - raw_text = c.get('text').strip() + raw_text = clean_text(c.get('text', '').strip()) # Clean up common prefixes - clean_text = raw_text.replace('[[reply_to_current]]', '').strip() - if clean_text: - text = clean_text + cleaned = raw_text.replace('[[reply_to_current]]', '').strip() + if cleaned: + text = cleaned break if text: # Prioritize showing the "thought" - usually the first few sentences @@ -117,7 +154,7 @@ def load_activity(session_file, limit=12): text = '' for c in msg.get('content', []): if c.get('type') == 'text': - text = c.get('text', '')[:100] + text = clean_text(c.get('text', '')[:100]) if text: rows.append({'at': ts, 'kind': 'user', 'text': f"User: {text}..."}) @@ -151,14 +188,14 @@ def build_task(agent_id, session_key, row, now_ms): # Look for next assistant message (which is actually previous in time) for next_act in acts[1:]: if next_act['kind'] == 'assistant': - latest_act = f"正在执行: {next_act['text'][:80]}" + latest_act = clean_text(f"正在执行: {next_act['text'][:80]}") break else: latest_act = first_act['text'][:60] elif first_act['kind'] == 'assistant': - latest_act = f"思考中: {first_act['text'][:80]}" + latest_act = clean_text(f"思考中: {first_act['text'][:80]}") else: - latest_act = acts[0]['text'][:60] + latest_act = clean_text(acts[0]['text'][:60]) title_label = (row.get('origin') or {}).get('label') or session_key # 清洗会话标题:agent:xxx:cron:uuid → 定时任务, agent:xxx:subagent:uuid → 子任务 @@ -224,7 +261,7 @@ def main(): scan_files += 1 try: - raw = json.loads(sessions_file.read_text()) + raw = json.loads(sessions_file.read_text(encoding='utf-8-sig')) except Exception: continue @@ -240,7 +277,7 @@ def main(): mc_tasks_file = DATA / 'mission_control_tasks.json' if mc_tasks_file.exists(): try: - mc_tasks = json.loads(mc_tasks_file.read_text()) + mc_tasks = json.loads(mc_tasks_file.read_text(encoding='utf-8-sig')) if isinstance(mc_tasks, list): tasks.extend(mc_tasks) except Exception: @@ -250,7 +287,7 @@ def main(): manual_tasks_file = DATA / 'manual_parallel_tasks.json' if manual_tasks_file.exists(): try: - manual_tasks = json.loads(manual_tasks_file.read_text()) + manual_tasks = json.loads(manual_tasks_file.read_text(encoding='utf-8-sig')) if isinstance(manual_tasks, list): tasks.extend(manual_tasks) except Exception: @@ -295,7 +332,7 @@ def main(): # 除非它是 Blocked (报错),或者是今天新建的 state = t.get('state') # state_from_session: < 2min = Doing, < 60min = Review, else = Next - if state not in ('Doing', 'Blocked'): + if state not in ('Doing', 'Review', 'Blocked'): # 如果不是正在进行或报错,就隐藏掉 # 特例: 如果是 mission control (mc-) 的心跳,可能也没必要显示,除非 Doing continue @@ -310,7 +347,7 @@ def main(): existing_tasks_file = DATA / 'tasks_source.json' if existing_tasks_file.exists(): try: - existing = json.loads(existing_tasks_file.read_text()) + existing = json.loads(existing_tasks_file.read_text(encoding='utf-8-sig')) jjc_existing = [t for t in existing if str(t.get('id', '')).startswith('JJC')] # 去掉 tasks 里已有的 JJC(以防重复),再把旨意放到最前面 diff --git a/scripts/utils.py b/scripts/utils.py index 4155c5d..938250e 100644 --- a/scripts/utils.py +++ b/scripts/utils.py @@ -1,37 +1,39 @@ #!/usr/bin/env python3 """ -三省六部 · 公共工具函数 -避免 read_json / now_iso 等基础函数在多个脚本中重复定义 +三省六部 · 通用工具函数 +包括 read_json / now_iso 等基础函数,避免各脚本重复实现。 """ -import json, pathlib, datetime +import json +import pathlib +import datetime def read_json(path, default=None): - """安全读取 JSON 文件,失败返回 default""" + """安全读取 JSON 文件;失败返回 default。兼容 UTF-8 BOM。""" try: - return json.loads(pathlib.Path(path).read_text()) + return json.loads(pathlib.Path(path).read_text(encoding='utf-8-sig')) except Exception: return default if default is not None else {} def now_iso(): - """返回 UTC ISO 8601 时间字符串(末尾 Z)""" + """返回 UTC ISO 8601 时间字符串,末尾 Z。""" return datetime.datetime.now(datetime.timezone.utc).isoformat().replace('+00:00', 'Z') def today_str(fmt='%Y%m%d'): - """返回今天日期字符串,默认 YYYYMMDD""" + """返回今天日期字符串,默认 YYYYMMDD。""" return datetime.date.today().strftime(fmt) def safe_name(s: str) -> bool: - """检查名称是否只含安全字符(字母、数字、下划线、连字符、中文)""" + """校验名称是否只包含安全字符:字母、数字、下划线、横杠、中文。""" import re return bool(re.match(r'^[a-zA-Z0-9_\-\u4e00-\u9fff]+$', s)) def validate_url(url: str, allowed_schemes=('https',), allowed_domains=None) -> bool: - """校验 URL 合法性,防 SSRF""" + """校验 URL 合法性,避免 SSRF。""" from urllib.parse import urlparse try: parsed = urlparse(url) @@ -41,14 +43,13 @@ def validate_url(url: str, allowed_schemes=('https',), allowed_domains=None) -> return False if not parsed.hostname: return False - # 禁止内网地址 import ipaddress try: ip = ipaddress.ip_address(parsed.hostname) if ip.is_private or ip.is_loopback or ip.is_reserved: return False except ValueError: - pass # hostname 不是 IP,放行 + pass return True except Exception: return False