diff --git a/dashboard/server.py b/dashboard/server.py index 57fd64b..c011e29 100644 --- a/dashboard/server.py +++ b/dashboard/server.py @@ -38,6 +38,7 @@ DIST = BASE / 'dist' # React 构建产物 (npm run build) DATA = BASE.parent / "data" SCRIPTS = BASE.parent / 'scripts' +_ACTIVE_TASK_DATA_DIR = None # 静态资源 MIME 类型 _MIME_TYPES = { @@ -82,21 +83,84 @@ def now_iso(): return datetime.datetime.now(datetime.timezone.utc).isoformat().replace('+00:00', 'Z') -def load_tasks(): - return atomic_json_read(DATA / 'tasks_source.json', []) +def _iter_task_data_dirs(): + """返回可用的任务数据目录候选(优先 workspace,其次本地 data)。""" + dirs = [DATA] + oclaw_home = pathlib.Path.home() / '.openclaw' + for p in sorted(oclaw_home.glob('workspace-*/data')): + if p.is_dir(): + dirs.append(p) + return dirs -def save_tasks(tasks): - atomic_json_write(DATA / 'tasks_source.json', tasks) - # Trigger refresh (异步,不阻塞,避免僵尸进程) +def _task_source_score(task_file: pathlib.Path): + """给任务源打分:优先非 demo 任务,其次任务数,再按文件更新时间。""" + try: + tasks = atomic_json_read(task_file, []) + except Exception: + tasks = [] + if not isinstance(tasks, list): + tasks = [] + non_demo = 0 + for t in tasks: + tid = str((t or {}).get('id', '')) + if tid and not tid.startswith('JJC-DEMO'): + non_demo += 1 + try: + mtime = task_file.stat().st_mtime + except Exception: + mtime = 0 + return (1 if non_demo > 0 else 0, non_demo, len(tasks), mtime) + + +def get_task_data_dir(): + """自动选择当前任务数据目录,并缓存结果以保持一次服务期内稳定。""" + global _ACTIVE_TASK_DATA_DIR + if _ACTIVE_TASK_DATA_DIR and _ACTIVE_TASK_DATA_DIR.is_dir(): + return _ACTIVE_TASK_DATA_DIR + + best_dir = DATA + best_score = (-1, -1, -1, -1) + for d in _iter_task_data_dirs(): + tf = d / 'tasks_source.json' + if not tf.exists(): + continue + score = _task_source_score(tf) + if score > best_score: + best_score = score + best_dir = d + + _ACTIVE_TASK_DATA_DIR = best_dir + log.info(f'任务数据源: {_ACTIVE_TASK_DATA_DIR}') + return _ACTIVE_TASK_DATA_DIR + + +def _refresh_live_data_async(task_data_dir: pathlib.Path): + """触发对应数据目录的 live_status 刷新脚本。""" + script = task_data_dir.parent / 'scripts' / 'refresh_live_data.py' + if not script.exists(): + script = SCRIPTS / 'refresh_live_data.py' + def _refresh(): try: - subprocess.run(['python3', str(SCRIPTS / 'refresh_live_data.py')], timeout=30) + subprocess.run(['python3', str(script)], timeout=30) except Exception as e: log.warning(f'refresh_live_data.py 触发失败: {e}') + threading.Thread(target=_refresh, daemon=True).start() +def load_tasks(): + task_data_dir = get_task_data_dir() + return atomic_json_read(task_data_dir / 'tasks_source.json', []) + + +def save_tasks(tasks): + task_data_dir = get_task_data_dir() + atomic_json_write(task_data_dir / 'tasks_source.json', tasks) + _refresh_live_data_async(task_data_dir) + + def handle_task_action(task_id, action, reason): """Stop/cancel/resume a task from the dashboard.""" tasks = load_tasks() @@ -2124,12 +2188,17 @@ def do_GET(self): if p in ('', '/dashboard', '/dashboard.html'): self.send_file(DIST / 'index.html') elif p == '/healthz': - checks = {'dataDir': DATA.is_dir(), 'tasksReadable': (DATA / 'tasks_source.json').exists()} - checks['dataWritable'] = os.access(str(DATA), os.W_OK) + task_data_dir = get_task_data_dir() + checks = { + 'dataDir': task_data_dir.is_dir(), + 'tasksReadable': (task_data_dir / 'tasks_source.json').exists(), + } + checks['dataWritable'] = os.access(str(task_data_dir), os.W_OK) all_ok = all(checks.values()) self.send_json({'status': 'ok' if all_ok else 'degraded', 'ts': now_iso(), 'checks': checks}) elif p == '/api/live-status': - self.send_json(read_json(DATA / 'live_status.json')) + task_data_dir = get_task_data_dir() + self.send_json(read_json(task_data_dir / 'live_status.json')) elif p == '/api/agent-config': self.send_json(read_json(DATA / 'agent_config.json')) elif p == '/api/model-change-log':