Skip to content
63 changes: 46 additions & 17 deletions dashboard/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse
from urllib.request import Request, urlopen
import shutil

if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
os.environ.setdefault('PYTHONIOENCODING', 'utf-8')

# 引入文件锁工具,确保与其他脚本并发安全
scripts_dir = str(pathlib.Path(__file__).parent.parent / 'scripts')
Expand Down Expand Up @@ -60,7 +67,7 @@

def read_json(path, default=None):
try:
return json.loads(path.read_text())
return json.loads(path.read_text(encoding='utf-8-sig'))
except Exception:
return default if default is not None else {}

Expand Down Expand Up @@ -669,14 +676,24 @@ def handle_review_action(task_id, action, comment=''):
]


def _find_openclaw_cmd():
"""跨平台定位 openclaw CLI,可用于 subprocess 调用。"""
candidates = [
shutil.which('openclaw'),
shutil.which('openclaw.cmd'),
shutil.which('openclaw.ps1'),
r'C:\nvm4w\nodejs\openclaw.cmd',
r'C:\nvm4w\nodejs\openclaw',
]
for c in candidates:
if c and pathlib.Path(c).exists():
return c
return 'openclaw'


def _check_gateway_alive():
"""检测 Gateway 进程是否在运行。"""
try:
result = subprocess.run(['pgrep', '-f', 'openclaw-gateway'],
capture_output=True, text=True, timeout=5)
return result.returncode == 0
except Exception:
return False
"""跨平台检查 Gateway 是否在线:以本地 HTTP probe 为准。"""
return _check_gateway_probe()


def _check_gateway_probe():
Expand Down Expand Up @@ -833,7 +850,7 @@ def wake_agent(agent_id, message=''):

def do_wake():
try:
cmd = ['openclaw', 'agent', '--agent', runtime_id, '-m', msg, '--timeout', '120']
cmd = [_find_openclaw_cmd(), 'agent', '--agent', runtime_id, '-m', msg, '--timeout', '120']
log.info(f'🔔 唤醒 {agent_id}...')
# 带重试(最多2次)
for attempt in range(1, 3):
Expand Down Expand Up @@ -1258,6 +1275,17 @@ def _collect_message_text(msg):
return ''.join(parts)




def _clean_garbled_text(text):
"""尽量清洗常见乱码与不可显示字符,避免前端出现大面积锟斤拷。"""
if not isinstance(text, str):
return ''
t = text.replace('\ufeff', '').replace('\x00', '')
if '锟斤拷' in t:
t = t.replace('锟斤拷', '�')
t = ''.join(ch for ch in t if ch >= ' ' or ch in '\n\t')
return t.strip()
def _parse_activity_entry(item):
"""将 session jsonl 的 message 统一解析成看板活动条目。"""
msg = item.get('message') or {}
Expand All @@ -1270,9 +1298,9 @@ def _parse_activity_entry(item):
tool_calls = []
for c in msg.get('content', []) or []:
if c.get('type') == 'text' and c.get('text') and not text:
text = str(c.get('text', '')).strip()
text = _clean_garbled_text(str(c.get('text', '')).strip())
elif c.get('type') == 'thinking' and c.get('thinking') and not thinking:
thinking = str(c.get('thinking', '')).strip()[:200]
thinking = _clean_garbled_text(str(c.get('thinking', '')).strip())[:200]
elif c.get('type') == 'tool_use':
tool_calls.append({
'name': c.get('name', ''),
Expand All @@ -1297,13 +1325,13 @@ def _parse_activity_entry(item):
output = ''
for c in msg.get('content', []) or []:
if c.get('type') == 'text' and c.get('text'):
output = str(c.get('text', '')).strip()[:200]
output = _clean_garbled_text(str(c.get('text', '')).strip())[:200]
break
if not output:
for key in ('output', 'stdout', 'stderr', 'message'):
val = details.get(key)
if isinstance(val, str) and val.strip():
output = val.strip()[:200]
output = _clean_garbled_text(val.strip())[:200]
break

entry = {
Expand All @@ -1322,7 +1350,7 @@ def _parse_activity_entry(item):
text = ''
for c in msg.get('content', []) or []:
if c.get('type') == 'text' and c.get('text'):
text = str(c.get('text', '')).strip()
text = _clean_garbled_text(str(c.get('text', '')).strip())
break
if not text:
return None
Expand Down Expand Up @@ -1350,7 +1378,7 @@ def get_agent_activity(agent_id, limit=30, task_id=None):

for session_file in files_to_scan:
try:
lines = session_file.read_text(errors='ignore').splitlines()
lines = session_file.read_text(encoding='utf-8', errors='replace').splitlines()
except Exception:
continue

Expand Down Expand Up @@ -1416,7 +1444,7 @@ def get_agent_activity_by_keywords(agent_id, keywords, limit=20):
target_file = None
for sf in jsonl_files[:5]:
try:
content = sf.read_text(errors='ignore')
content = sf.read_text(encoding='utf-8', errors='replace')
except Exception:
continue
hits = sum(1 for kw in keywords if kw.lower() in content.lower())
Expand Down Expand Up @@ -1956,7 +1984,7 @@ def _do_dispatch():
'lastDispatchTrigger': trigger,
}))
return
cmd = ['openclaw', 'agent', '--agent', agent_id, '-m', msg,
cmd = [_find_openclaw_cmd(), 'agent', '--agent', agent_id, '-m', msg,
'--deliver', '--channel', 'feishu', '--timeout', '300']
max_retries = 2
err = ''
Expand Down Expand Up @@ -2487,3 +2515,4 @@ def main():

if __name__ == '__main__':
main()

125 changes: 68 additions & 57 deletions scripts/file_lock.py
Original file line number Diff line number Diff line change
@@ -1,104 +1,115 @@
"""
文件锁工具 — 防止多进程并发读写 JSON 文件导致数据丢失。
文件锁工具:防止多个进程并发读写 JSON 文件导致数据丢失。

用法:
from file_lock import atomic_json_update, atomic_json_read

# 原子读取
data = atomic_json_read(path, default=[])

# 原子更新(读 → 修改 → 写回,全程持锁)
def modifier(tasks):
tasks.append(new_task)
return tasks
atomic_json_update(path, modifier, default=[])
跨平台说明:
- POSIX(Linux/macOS)使用 fcntl.flock
- Windows 使用 msvcrt.locking
- 若锁实现异常,仍保留原子写(tmp file + replace)能力
"""
import fcntl
import json
import os
import pathlib
import tempfile
from contextlib import contextmanager
from typing import Any, Callable

try:
import fcntl # type: ignore
except ImportError:
fcntl = None

try:
import msvcrt # type: ignore
except ImportError:
msvcrt = None


LOCK_EX = 1
LOCK_SH = 2
LOCK_UN = 8


def _lock_path(path: pathlib.Path) -> pathlib.Path:
return path.parent / (path.name + '.lock')


def atomic_json_read(path: pathlib.Path, default: Any = None) -> Any:
"""持锁读取 JSON 文件。"""
lock_file = _lock_path(path)
@contextmanager
def _locked_fd(lock_file: pathlib.Path, mode: int):
"""Open a lock file and hold a best-effort cross-platform lock."""
lock_file.parent.mkdir(parents=True, exist_ok=True)
fd = os.open(str(lock_file), os.O_CREAT | os.O_RDWR)
try:
fcntl.flock(fd, fcntl.LOCK_SH)
if fcntl is not None:
fcntl.flock(fd, fcntl.LOCK_EX if mode == LOCK_EX else fcntl.LOCK_SH)
elif msvcrt is not None:
# Lock the first byte; ensure file has at least one byte.
os.lseek(fd, 0, os.SEEK_SET)
try:
os.write(fd, b'0')
except OSError:
pass
os.lseek(fd, 0, os.SEEK_SET)
lock_mode = msvcrt.LK_LOCK if mode == LOCK_EX else msvcrt.LK_RLCK
msvcrt.locking(fd, lock_mode, 1)
yield fd
finally:
try:
return json.loads(path.read_text()) if path.exists() else default
if fcntl is not None:
fcntl.flock(fd, fcntl.LOCK_UN)
elif msvcrt is not None:
os.lseek(fd, 0, os.SEEK_SET)
msvcrt.locking(fd, msvcrt.LK_UNLCK, 1)
finally:
os.close(fd)


def atomic_json_read(path: pathlib.Path, default: Any = None) -> Any:
"""原子读取 JSON 文件。"""
lock_file = _lock_path(path)
with _locked_fd(lock_file, LOCK_SH):
try:
return json.loads(path.read_text(encoding='utf-8')) if path.exists() else default
except Exception:
return default
finally:
fcntl.flock(fd, fcntl.LOCK_UN)
os.close(fd)


def atomic_json_update(
path: pathlib.Path,
modifier: Callable[[Any], Any],
default: Any = None,
) -> Any:
"""
原子地读取 → 修改 → 写回 JSON 文件。
modifier(data) 应返回修改后的数据。
使用临时文件 + rename 保证写入原子性。
"""
"""原子读-改-写 JSON 文件。"""
lock_file = _lock_path(path)
lock_file.parent.mkdir(parents=True, exist_ok=True)
fd = os.open(str(lock_file), os.O_CREAT | os.O_RDWR)
try:
fcntl.flock(fd, fcntl.LOCK_EX)
# Read
path.parent.mkdir(parents=True, exist_ok=True)
with _locked_fd(lock_file, LOCK_EX):
try:
data = json.loads(path.read_text()) if path.exists() else default
data = json.loads(path.read_text(encoding='utf-8')) if path.exists() else default
except Exception:
data = default
# Modify
result = modifier(data)
# Atomic write via temp file + rename
tmp_fd, tmp_path = tempfile.mkstemp(
dir=str(path.parent), suffix='.tmp', prefix=path.stem + '_'
)
tmp_fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix='.tmp', prefix=path.stem + '_')
try:
with os.fdopen(tmp_fd, 'w') as f:
with os.fdopen(tmp_fd, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
os.replace(tmp_path, str(path))
except Exception:
os.unlink(tmp_path)
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise
return result
finally:
fcntl.flock(fd, fcntl.LOCK_UN)
os.close(fd)


def atomic_json_write(path: pathlib.Path, data: Any) -> None:
"""原子写入 JSON 文件(持排他锁 + tmpfile rename)。
直接写入,不读取现有内容(避免 atomic_json_update 的多余读开销)。
"""
"""原子写入 JSON 文件。"""
lock_file = _lock_path(path)
lock_file.parent.mkdir(parents=True, exist_ok=True)
fd = os.open(str(lock_file), os.O_CREAT | os.O_RDWR)
try:
fcntl.flock(fd, fcntl.LOCK_EX)
tmp_fd, tmp_path = tempfile.mkstemp(
dir=str(path.parent), suffix='.tmp', prefix=path.stem + '_'
)
path.parent.mkdir(parents=True, exist_ok=True)
with _locked_fd(lock_file, LOCK_EX):
tmp_fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix='.tmp', prefix=path.stem + '_')
try:
with os.fdopen(tmp_fd, 'w') as f:
with os.fdopen(tmp_fd, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp_path, str(path))
except Exception:
os.unlink(tmp_path)
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise
finally:
fcntl.flock(fd, fcntl.LOCK_UN)
os.close(fd)
26 changes: 25 additions & 1 deletion scripts/kanban_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,31 @@
"""
import json, pathlib, datetime, sys, subprocess, logging, os, re

_BASE = pathlib.Path(__file__).resolve().parent.parent
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
os.environ.setdefault('PYTHONIOENCODING', 'utf-8')

def _resolve_repo_base():
env = os.environ.get('EDICT_REPO_DIR', '').strip()
candidates = []
if env:
candidates.append(pathlib.Path(env))
candidates.extend([
pathlib.Path.home() / 'edict',
pathlib.Path(__file__).resolve().parent.parent,
])
for c in candidates:
try:
if (c / 'dashboard' / 'server.py').exists() and (c / 'scripts' / 'refresh_live_data.py').exists():
return c
except Exception:
pass
return pathlib.Path(__file__).resolve().parent.parent


_BASE = _resolve_repo_base()
TASKS_FILE = _BASE / 'data' / 'tasks_source.json'
REFRESH_SCRIPT = _BASE / 'scripts' / 'refresh_live_data.py'

Expand Down
Loading