From 8a856add5fa42e929d9b18fe5fd9c934b8c1226f Mon Sep 17 00:00:00 2001 From: "caozouying.czy" Date: Tue, 18 Nov 2025 17:21:11 +0800 Subject: [PATCH 1/6] op for skills --- flowllm/extensions/skills/__init__.py | 5 + flowllm/extensions/skills/load_skill_op.py | 63 ++++++ .../skills/read_reference_file_op.py | 62 ++++++ flowllm/extensions/skills/requirements.txt | 1 + .../extensions/skills/run_shell_command_op.py | 109 ++++++++++ flowllm/extensions/skills/task_complete_op.py | 19 ++ flowllm/extensions/skills/use_skill_op.py | 205 ++++++++++++++++++ .../extensions/skills/use_skill_prompt.yaml | 24 ++ 8 files changed, 488 insertions(+) create mode 100644 flowllm/extensions/skills/__init__.py create mode 100644 flowllm/extensions/skills/load_skill_op.py create mode 100644 flowllm/extensions/skills/read_reference_file_op.py create mode 100644 flowllm/extensions/skills/requirements.txt create mode 100644 flowllm/extensions/skills/run_shell_command_op.py create mode 100644 flowllm/extensions/skills/task_complete_op.py create mode 100644 flowllm/extensions/skills/use_skill_op.py create mode 100644 flowllm/extensions/skills/use_skill_prompt.yaml diff --git a/flowllm/extensions/skills/__init__.py b/flowllm/extensions/skills/__init__.py new file mode 100644 index 0000000..14387bc --- /dev/null +++ b/flowllm/extensions/skills/__init__.py @@ -0,0 +1,5 @@ +from .load_skill_op import LoadSkillOp +from .read_reference_file_op import ReadReferenceFileOp +from .run_shell_command_op import RunShellCommandOp +from .task_complete_op import TaskCompleteOp +from .use_skill_op import UseSkillOp \ No newline at end of file diff --git a/flowllm/extensions/skills/load_skill_op.py b/flowllm/extensions/skills/load_skill_op.py new file mode 100644 index 0000000..cee4140 --- /dev/null +++ b/flowllm/extensions/skills/load_skill_op.py @@ -0,0 +1,63 @@ +import re + +from pathlib import Path +from loguru import logger +from flowllm.context.service_context import C +from flowllm.op.base_async_tool_op import BaseAsyncToolOp +from flowllm.schema.tool_call import ToolCall + + +@C.register_op(register_app="FlowLLM") +class LoadSkillOp(BaseAsyncToolOp): + + def build_tool_call(self) -> ToolCall: + return ToolCall(**{ + "name": "load_skill", + "description": "Load one skill's instructions from the SKILL.md.", + "input_schema": { + "skill_name": { + "type": "string", + "description": "skill name", + "required": True + }, + "path": { + "type": "string", + "description": "skills path", + "required": True + } + } + }) + + async def async_execute(self): + + if self.input_dict.get("skill_name"): + skill_name = self.input_dict.get("skill_name") + else: + raise RuntimeError("skill_name is required") + + if self.input_dict.get("path"): + skills_path = Path(self.input_dict.get("path")) + else: + raise RuntimeError("SKILL.md path is required") + + logger.info(f"🔧 Tool called: load_skill(skill_name='{skill_name}', path='{skills_path}')") + + skill_path = skills_path / skill_name / "SKILL.md" + if not skill_path.exists(): + available = [d.name for d in skills_path.iterdir() # both files and folders at the top leve + if d.is_dir() and (d / "SKILL.md").exists()] # each Skill is stored in one folder + logger.exception(f"❌ Skill '{skill_name}' not found") + self.set_result(f"Skill '{skill_name}' not found. Available: {', '.join(available)}") + return + + content = skill_path.read_text(encoding="utf-8") + frontmatter_match = re.match( + r"^---\s*\n(.*?)\n---\s*\n(.*)$", content, re.DOTALL + ) + + logger.info(f"✅ Loaded skill: {skill_name}") + if not frontmatter_match: + self.set_result(content) + else: + self.set_result(frontmatter_match.group(2)) + diff --git a/flowllm/extensions/skills/read_reference_file_op.py b/flowllm/extensions/skills/read_reference_file_op.py new file mode 100644 index 0000000..792516c --- /dev/null +++ b/flowllm/extensions/skills/read_reference_file_op.py @@ -0,0 +1,62 @@ +from loguru import logger +from pathlib import Path +from flowllm.context.service_context import C +from flowllm.op.base_async_tool_op import BaseAsyncToolOp +from flowllm.schema.tool_call import ToolCall + + +@C.register_op(register_app="FlowLLM") +class ReadReferenceFileOp(BaseAsyncToolOp): + + def build_tool_call(self) -> ToolCall: + return ToolCall(**{ + "name": "read_reference_file", + "description": "Read a reference file from a skill (e.g., forms.md, reference.md, ooxml.md)", + "input_schema": { + "skill_name": { + "type": "string", + "description": "skill name", + "required": False + }, + "file_name": { + "type": "string", + "description": "reference file name", + "required": False + }, + "path": { + "type": "string", + "description": "skills path", + "required": True + } + } + }) + + async def async_execute(self): + + if self.input_dict.get("skill_name"): + skill_name = self.input_dict.get("skill_name") + else: + raise RuntimeError("skill_name is required") + + if self.input_dict.get("file_name"): + file_name = self.input_dict.get("file_name") + else: + raise RuntimeError("reference_file_name is required") + + if self.input_dict.get("path"): + skills_path = Path(self.input_dict.get("path")) + else: + raise RuntimeError("skills path is required") + + logger.info(f"🔧 Tool called: read_reference_file(skill_name='{skill_name}', file_name='{file_name}', path='{skills_path})") + + file_path = skills_path / skill_name / file_name + if not file_path.exists(): + logger.exception(f"❌ File not found: {file_path}") + self.set_result(f"File '{file_name}' not found in skill '{skill_name}'") + return + + logger.info(f"✅ Read file: {skill_name}/{file_name}") + self.set_result(file_path.read_text(encoding="utf-8")) + + diff --git a/flowllm/extensions/skills/requirements.txt b/flowllm/extensions/skills/requirements.txt new file mode 100644 index 0000000..8c2e012 --- /dev/null +++ b/flowllm/extensions/skills/requirements.txt @@ -0,0 +1 @@ +pipreqs \ No newline at end of file diff --git a/flowllm/extensions/skills/run_shell_command_op.py b/flowllm/extensions/skills/run_shell_command_op.py new file mode 100644 index 0000000..161f291 --- /dev/null +++ b/flowllm/extensions/skills/run_shell_command_op.py @@ -0,0 +1,109 @@ +import re +import os +import subprocess + +from loguru import logger +from flowllm.context.service_context import C +from flowllm.op.base_async_tool_op import BaseAsyncToolOp +from flowllm.schema.tool_call import ToolCall + + +@C.register_op(register_app="FlowLLM") +class RunShellCommandOp(BaseAsyncToolOp): + + def build_tool_call(self) -> ToolCall: + return ToolCall(**{ + "name": "run_shell_command", + "description": "run shell command (e.g., 'echo \'hello world\'') in a subprocess.", + "input_schema": { + "command": { + "type": "string", + "description": "shell command", + "required": True + }, + "timeout": { + "type": "int", + "description": "timeout for the subprocess", + "required": False + } + } + }) + + async def async_execute(self): + if self.input_dict.get("command"): + command = self.input_dict.get("command") + else: + raise RuntimeError("one shell command is required") + timeout = self.input_dict.get("timeout", 60) + + # Extract script path from command + script_path = self._extract_script_path(command) + if script_path: + # Detect language + language = self._detect_language(script_path) + logger.info(f"Detected language: {language} in the script: {script_path}") + + # Detect and install dependencies + dependencies = self._detect_dependencies(script_path, language) + self._install_dependencies(dependencies, script_path, language) + else: + logger.info("No script detected in the command. Skipping language detection and dependency installation.") + + logger.info(f"Executing command:\n {command}") + + try: + output = subprocess.run( + command, + shell=True, + check=True, + timeout=timeout, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ).stdout.decode() + except subprocess.CalledProcessError as error: + logger.exception(f"❌ Error during command execution: {error}") + self.set_result(error.stdout.decode()) + return + + logger.info("✅ Successfully run the shell commands") + self.set_result(output.strip()) + + def _extract_script_path(self, command): + # This regex looks for common script file extensions + match = re.search(r'((?:/[\w.-]+)+\.(?:py|js|jsx|sh|bash))', command) + return match.group(1) if match else None + + def _detect_language(self, script_path): + _, ext = os.path.splitext(script_path) + ext = ext.lower() + + if ext in ['.py']: + return 'Python' + elif ext in ['.js', '.jsx']: + return 'JavaScript' + elif ext in ['.sh', '.bash']: + return 'Shell' + return 'Unknown' + + def _detect_dependencies(self, script_path, language): + if language == 'Python': + return self._detect_python_dependencies(script_path) + # Add more language-specific dependency detection methods here + return [] + + def _detect_python_dependencies(self, script_path): + # Use pipreqs to generate requirements + requirements_path = 'requirements.txt' + subprocess.run(['pipreqs', os.path.dirname(script_path), '--savepath', requirements_path], check=True) + with open(requirements_path, 'r') as file: + dependencies = file.read().splitlines() + os.remove(requirements_path) + return dependencies + + def _install_dependencies(self, dependencies, script_path, language): + if language == 'Python': + for dep in dependencies: + subprocess.run(['pip', 'install', dep], check=True, cwd=os.path.dirname(script_path)) + # Add more language-specific installation methods here + + \ No newline at end of file diff --git a/flowllm/extensions/skills/task_complete_op.py b/flowllm/extensions/skills/task_complete_op.py new file mode 100644 index 0000000..4ff658f --- /dev/null +++ b/flowllm/extensions/skills/task_complete_op.py @@ -0,0 +1,19 @@ +import asyncio + +from flowllm.context.service_context import C +from flowllm.op.base_async_tool_op import BaseAsyncToolOp +from flowllm.schema.tool_call import ToolCall + + +@C.register_op(register_app="FlowLLM") +class TaskCompleteOp(BaseAsyncToolOp): + + def build_tool_call(self) -> ToolCall: + return ToolCall(**{ + "name": "task_complete", + "description": "Call this tool to indicate that the task is complete.", + }) + + async def async_execute(self): + self.set_result(f"The task is complete.") + diff --git a/flowllm/extensions/skills/use_skill_op.py b/flowllm/extensions/skills/use_skill_op.py new file mode 100644 index 0000000..faca0bf --- /dev/null +++ b/flowllm/extensions/skills/use_skill_op.py @@ -0,0 +1,205 @@ +import asyncio +import json +import re +from pathlib import Path +from typing import List, Dict, Any + +from loguru import logger + +from flowllm.context import C, FlowContext +from flowllm.enumeration.chunk_enum import ChunkEnum +from flowllm.enumeration.role import Role +from flowllm.op.base_async_tool_op import BaseAsyncToolOp +from flowllm.schema.message import Message +from flowllm.schema.tool_call import ToolCall + + +@C.register_op(register_app="FlowLLM") +class UseSkillOp(BaseAsyncToolOp): + file_path: str = __file__ + + def __init__(self, + llm: str = "qwen3_max_instruct", + # llm: str = "qwen3_235b_instruct", + # llm: str = "qwen3_80b_instruct", + max_iterations: int = 20, + **kwargs): + super().__init__(llm=llm, **kwargs) + self.max_iterations: int = max_iterations + + def build_tool_call(self) -> ToolCall: + return ToolCall(**{ + "description": "Automatically uses pre-built Skills relevant to the query when needed.", + "input_schema": { + "query": { + "type": "string", + "description": "query", + "required": True + }, + "path": { + "type": "string", + "description": "skills path", + "required": True + } + } + }) + + async def get_skill_metadata(self, content: str, path: str) -> dict[str, Any] | None: + frontmatter_match = re.match( + r"^---\s*\n(.*?)\n---\s*\n(.*)$", content, re.DOTALL + ) + + if not frontmatter_match: + logger.warning(f"No YAML frontmatter found in skill from {path}") + return None + + frontmatter_text = frontmatter_match.group(1) + name_match = re.search(r"^name:\s*(.+)$", frontmatter_text, re.MULTILINE) + desc_match = re.search(r"^description:\s*(.+)$", frontmatter_text, re.MULTILINE) + + if not name_match or not desc_match: + logger.warning(f"Missing name or description in skill from {path}") + return None + + name = name_match.group(1).strip().strip("\"'") + description = desc_match.group(1).strip().strip("\"'") + + return { + "name": name, + "description": description, + } + + async def list_skills(self, path: Path) -> list[dict[str, Any]]: + """ + Return the metadata for each Skill: its name and description. + Agent loads this metadata at startup to know which Skills are available. + + This is the first level of progressive disclosure, + where Agent identifies the available Skills without loading their entire instructions. + """ + skill_files = list(path.rglob("SKILL.md")) + + skill_metadatas = "" + for skill_file in skill_files: + content = skill_file.read_text(encoding="utf-8") + metadata = await self.get_skill_metadata(content, str(skill_file)) + skill_metadatas += f"- {metadata["name"]}: {metadata["description"]}\n" + + return skill_metadatas + + async def async_execute(self): + + if self.input_dict.get("query"): + query = self.input_dict.get("query") + else: + raise RuntimeError("query is required") + + if self.input_dict.get("path"): + skill_path = Path(self.input_dict.get("path")) + else: + raise RuntimeError("query is required") + + logger.info(f"UseSkillOp processing query: {query} with access to skills in {skill_path}") + + tool_dict: Dict[str, BaseAsyncToolOp] = {} + for op in self.ops: + assert isinstance(op, BaseAsyncToolOp) + assert op.tool_call.name not in tool_dict, f"Duplicate tool name={op.tool_call.name}" + tool_dict[op.tool_call.name] = op + logger.info(f"add tool call={op.tool_call.simple_input_dump()}") + + # load the skill metadatas at startup and include them in the system prompt + skill_metadatas = await self.list_skills(skill_path) + system_prompt = self.prompt_format("system_prompt", + skills_path=skill_path, + skill_metadatas=skill_metadatas) + # logger.info(system_prompt) + messages = [ + Message(role=Role.SYSTEM, content=system_prompt), + Message(role=Role.USER, content=query), + ] + + results = [] + for i in range(self.max_iterations): + assistant_message = await self.llm.achat(messages=messages, + tools=[x.tool_call for x in tool_dict.values()]) + messages.append(assistant_message) + + assistant_content = f"[{self.name}.{i}]" + if assistant_message.content: + assistant_content += f" content={assistant_message.content}" + if assistant_message.reasoning_content: + assistant_content += f" reasoning={assistant_message.reasoning_content}" + if assistant_message.tool_calls: + tool_call_str = " | ".join([json.dumps(t.simple_output_dump(), ensure_ascii=False) \ + for t in assistant_message.tool_calls]) + assistant_content += f" tool_calls={tool_call_str}" + assistant_content += "\n\n" + logger.info(assistant_content) + await self.context.add_stream_chunk_and_type(assistant_content, ChunkEnum.THINK) + + if not assistant_message.tool_calls: + break + + ops: List[BaseAsyncToolOp] = [] + for j, tool in enumerate(assistant_message.tool_calls): + op = tool_dict[tool.name].copy() + op.tool_call.id = tool.id + ops.append(op) + logger.info(f"{self.name} submit op{j}={op.name} argument={tool.argument_dict}") + self.submit_async_task(op.async_call, **tool.argument_dict, stream_queue=self.context.stream_queue) + + await self.join_async_task() + + done: bool = False + for op in ops: + messages.append(Message(role=Role.TOOL, + content=op.output, + tool_call_id=op.tool_call.id)) + tool_content = f"[{self.name}.{i}.{op.name}] {op.output[:200]}...\n\n" + logger.info(tool_content) + await self.context.add_stream_chunk_and_type(tool_content, ChunkEnum.TOOL) + + if op.tool_call.name == "task_complete": + done = True + + if done: + break + + +async def main(): + from flowllm.app import FlowLLMApp + from flowllm.op.skills import LoadSkillOp, ReadReferenceFileOp, RunShellCommandOp, TaskCompleteOp + + async with FlowLLMApp(load_default_config=True): + # Help me merge two PDF files (minimal-document.pdf and pdflatex-outline) and save to merged.pdf + context = FlowContext(query="Fill Sample-Fillable-PDF.pdf with: name='Alice Johnson', select first choice from dropdown, check options 1 and 3, dependent name='Bob Johnson', age='12'. Save as filled-sample.pdf", + path="./skills", + stream_queue=asyncio.Queue()) + + op = UseSkillOp() \ + << LoadSkillOp() << ReadReferenceFileOp() << RunShellCommandOp() << TaskCompleteOp() + + await op.async_call(context=context) + + # async def async_call(): + # await op.async_call(context=context) + # await context.add_stream_done() + + # task = asyncio.create_task(async_call()) + + # while True: + # stream_chunk = await context.stream_queue.get() + # if stream_chunk.done: + # print("\nend") + # await task + # break + + # else: + # print(stream_chunk.chunk, end="") + + # await task + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/flowllm/extensions/skills/use_skill_prompt.yaml b/flowllm/extensions/skills/use_skill_prompt.yaml new file mode 100644 index 0000000..c1be486 --- /dev/null +++ b/flowllm/extensions/skills/use_skill_prompt.yaml @@ -0,0 +1,24 @@ +system_prompt: | + You are a helpful AI assistant with access to specialized skills. + + When you encounter tasks involving specific domains or file formats, use the "load_skill" tool to gain expert knowledge. + Skills path: {skills_path} + Available skills (each line is "- : "): + {skill_metadatas} + + Workflow: + 1. Identify if the task needs specialized knowledge. + 2. If specialized knowledge is needed, identify the most relevant skill from the available skills list. + 3. Use "load_skill" tool to get detailed instructions for the chosen skill. This will load the content of SKILL.md into your context. + 4. If the skill mentions reference files (e.g., forms.md), use "read_reference_file" tool to access their contents only when explicitly required for the task. + 5. If the skill includes executable scripts (e.g., fill_form.py), use "run_shell_command" tool with the appropriate shell commands to run them when necessary. Remember that only the script's output will be added to your context, not the script's code itself. + 6. Follow the instructions from the loaded skill + 7. Use available tools as needed + 8. After completing the task, call the "task_complete" tool to indicate that you are done with your task + + Important: + - Only load skills and additional resources when they are directly relevant to the current task + - Skill scripts are located in: skills//scripts/ + - When running skill scripts, use the full path from current directory when creating the shell commands + - Example: skills/pdf/scripts/check_fillable_fields.py + - If a task requires multiple skills, load and apply them sequentially as needed From 450f997a3f4bd9bee19f06ccc10cb91d7ee24130 Mon Sep 17 00:00:00 2001 From: "jinli.yl" Date: Wed, 19 Nov 2025 11:53:31 +0800 Subject: [PATCH 2/6] feat(skills): implement skill agent operations and integrate into flowllm --- flowllm/config/default.yaml | 20 ++ flowllm/core/schema/message.py | 13 +- flowllm/extensions/__init__.py | 4 +- flowllm/extensions/skills/__init__.py | 18 +- .../skills/load_skill_metadata_op.py | 104 +++++++++ flowllm/extensions/skills/load_skill_op.py | 101 +++++---- .../skills/read_reference_file_op.py | 104 +++++---- flowllm/extensions/skills/requirements.txt | 1 - .../extensions/skills/run_shell_command_op.py | 193 ++++++++++++----- flowllm/extensions/skills/skill_agent_op.py | 108 +++++++++ ...ll_prompt.yaml => skill_agent_prompt.yaml} | 12 +- flowllm/extensions/skills/task_complete_op.py | 19 -- flowllm/extensions/skills/use_skill_op.py | 205 ------------------ flowllm/extensions/utils/dt_utils.py | 50 +++-- flowllm/extensions/utils/edit_utils.py | 77 ++++++- pyproject.toml | 6 +- tests_op/test_skill_agent_op.py | 0 17 files changed, 621 insertions(+), 414 deletions(-) create mode 100644 flowllm/extensions/skills/load_skill_metadata_op.py delete mode 100644 flowllm/extensions/skills/requirements.txt create mode 100644 flowllm/extensions/skills/skill_agent_op.py rename flowllm/extensions/skills/{use_skill_prompt.yaml => skill_agent_prompt.yaml} (82%) delete mode 100644 flowllm/extensions/skills/task_complete_op.py delete mode 100644 flowllm/extensions/skills/use_skill_op.py create mode 100644 tests_op/test_skill_agent_op.py diff --git a/flowllm/config/default.yaml b/flowllm/config/default.yaml index 8aaecc9..49b3feb 100644 --- a/flowllm/config/default.yaml +++ b/flowllm/config/default.yaml @@ -40,6 +40,26 @@ flow: description: "user query" required: true + skill_agent_flow: + flow_content: | + skill_op = SkillAgentOp() + skill_op.ops.load_skill_metadata = LoadSkillMetadataOp() + skill_op.ops.load_skill_op = LoadSkillOp() + skill_op.ops.read_reference_file_op = ReadReferenceFileOp() + skill_op.ops.run_shell_command_op = RunShellCommandOp() + skill_op + description: "Automatically uses pre-built Skills relevant to the query when needed." + input_schema: + query: + type: string + description: "query" + required: true + skill_dir: + type: string + description: "skill dir" + required: true + + llm: default: backend: openai_compatible diff --git a/flowllm/core/schema/message.py b/flowllm/core/schema/message.py index 4b33914..ce49001 100644 --- a/flowllm/core/schema/message.py +++ b/flowllm/core/schema/message.py @@ -20,15 +20,12 @@ class Message(BaseModel): time_created: str = Field(default_factory=lambda: datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) metadata: dict = Field(default_factory=dict) - def simple_dump(self, add_reason_content: bool = True) -> dict: + def simple_dump(self, add_reasoning: bool = False) -> dict: """Convert Message to a simple dictionary format for API serialization.""" - result: dict - if self.content: - result = {"role": self.role.value, "content": self.content} - elif add_reason_content and self.reasoning_content: - result = {"role": self.role.value, "content": self.reasoning_content} - else: - result = {"role": self.role.value, "content": ""} + result: dict = {"role": self.role.value, "content": self.content} + + if add_reasoning: + result["reasoning_content"] = self.reasoning_content if self.tool_calls: result["tool_calls"] = [x.simple_output_dump() for x in self.tool_calls] diff --git a/flowllm/extensions/__init__.py b/flowllm/extensions/__init__.py index 7a25874..dc3f073 100644 --- a/flowllm/extensions/__init__.py +++ b/flowllm/extensions/__init__.py @@ -6,12 +6,14 @@ - file_tool: File-related operations including editing and searching files - data: Data-related operations including downloading stock data - utils: Utility functions for date/time operations and other helpers +- skills: Skill-based operations for managing and executing specialized skills """ -from . import data, file_tool, utils +from . import data, file_tool, skills, utils __all__ = [ "data", "file_tool", + "skills", "utils", ] diff --git a/flowllm/extensions/skills/__init__.py b/flowllm/extensions/skills/__init__.py index 14387bc..7095e70 100644 --- a/flowllm/extensions/skills/__init__.py +++ b/flowllm/extensions/skills/__init__.py @@ -1,5 +1,19 @@ +"""Skills extension module for flowllm. + +This module provides operations for managing and executing skills, +including loading skill metadata, reading reference files, and running shell commands. +""" + +from .load_skill_metadata_op import LoadSkillMetadataOp from .load_skill_op import LoadSkillOp from .read_reference_file_op import ReadReferenceFileOp from .run_shell_command_op import RunShellCommandOp -from .task_complete_op import TaskCompleteOp -from .use_skill_op import UseSkillOp \ No newline at end of file +from .skill_agent_op import SkillAgentOp + +__all__ = [ + "LoadSkillMetadataOp", + "LoadSkillOp", + "ReadReferenceFileOp", + "RunShellCommandOp", + "SkillAgentOp", +] diff --git a/flowllm/extensions/skills/load_skill_metadata_op.py b/flowllm/extensions/skills/load_skill_metadata_op.py new file mode 100644 index 0000000..ef4d912 --- /dev/null +++ b/flowllm/extensions/skills/load_skill_metadata_op.py @@ -0,0 +1,104 @@ +"""Operation for loading skill metadata. + +This module provides the LoadSkillMetadataOp class which scans the skills +directory and extracts metadata (name and description) from all SKILL.md files. +""" + +import json +from pathlib import Path + +from loguru import logger + +from ...core.context import C +from ...core.op import BaseAsyncToolOp +from ...core.schema import ToolCall + + +@C.register_op() +class LoadSkillMetadataOp(BaseAsyncToolOp): + """Operation for loading metadata from all available skills. + + This tool scans the skills directory for SKILL.md files and extracts + their metadata (name and description) from YAML frontmatter. + Returns a JSON string containing a list of skill metadata. + """ + + def build_tool_call(self) -> ToolCall: + """Build the tool call definition for load_skill_metadata. + + Returns: + A ToolCall object defining the load_skill_metadata tool. + """ + tool_params = { + "name": "load_skill_metadata", + "description": "Load metadata (name and description) for all available skills from the skills directory.", + "input_schema": {}, + } + return ToolCall(**tool_params) + + @staticmethod + async def parse_skill_metadata(content: str, path: str) -> dict[str, str] | None: + """Extract skill metadata (name and description) from SKILL.md content. + + Parses YAML frontmatter from SKILL.md files to extract the skill name + and description. The frontmatter should be in the format: + --- + name: skill_name + description: skill description + --- + + Args: + content: The content of the SKILL.md file. + path: The file path (used for logging purposes). + + Returns: + A dictionary with 'name' and 'description' keys, or None if + parsing fails or required fields are missing. + """ + parts = content.split("---") + if len(parts) < 3: + logger.warning(f"No YAML frontmatter found in skill from {path}") + return None + + frontmatter_text = parts[1].strip() + name = None + description = None + + for line in frontmatter_text.split("\n"): + line = line.strip() + if line.startswith("name:"): + name = line.split(":", 1)[1].strip().strip("\"'") + elif line.startswith("description:"): + description = line.split(":", 1)[1].strip().strip("\"'") + + if not name or not description: + logger.warning(f"Missing name or description in skill from {path}") + return None + + return { + "name": name, + "description": description, + } + + async def async_execute(self): + """Execute the load skill metadata operation. + + Scans the skills directory recursively for all SKILL.md files, + extracts their metadata, and returns a JSON string containing + a list of all skill metadata entries. + """ + skill_dir = Path(self.context.skill_dir) + logger.info(f"🔧 Tool called: load_skill_metadata(path={skill_dir})") + skill_files = list(skill_dir.rglob("SKILL.md")) + + skill_metadata_list = [] + for skill_file in skill_files: + content = skill_file.read_text(encoding="utf-8") + metadata = await self.parse_skill_metadata(content, str(skill_file)) + if metadata: + skill_metadata_list.append(metadata) + + logger.info(f"✅ Loaded {len(skill_metadata_list)} skill metadata entries") + + # Return as JSON string for easy parsing + self.set_output(json.dumps(skill_metadata_list, ensure_ascii=False, indent=2)) diff --git a/flowllm/extensions/skills/load_skill_op.py b/flowllm/extensions/skills/load_skill_op.py index cee4140..1f97d79 100644 --- a/flowllm/extensions/skills/load_skill_op.py +++ b/flowllm/extensions/skills/load_skill_op.py @@ -1,63 +1,72 @@ -import re +"""Operation for loading a specific skill. + +This module provides the LoadSkillOp class which loads the full content +of a SKILL.md file from a specified skill directory. +""" from pathlib import Path + from loguru import logger -from flowllm.context.service_context import C -from flowllm.op.base_async_tool_op import BaseAsyncToolOp -from flowllm.schema.tool_call import ToolCall +from ...core.context import C +from ...core.op import BaseAsyncToolOp +from ...core.schema import ToolCall -@C.register_op(register_app="FlowLLM") + +@C.register_op() class LoadSkillOp(BaseAsyncToolOp): + """Operation for loading a specific skill's instructions. + + This tool loads the complete content of a SKILL.md file from a + specified skill directory, including its YAML frontmatter and + instructions. + """ def build_tool_call(self) -> ToolCall: - return ToolCall(**{ - "name": "load_skill", - "description": "Load one skill's instructions from the SKILL.md.", - "input_schema": { - "skill_name": { - "type": "string", - "description": "skill name", - "required": True + """Build the tool call definition for load_skill. + + Returns: + A ToolCall object defining the load_skill tool. + """ + return ToolCall( + **{ + "name": "load_skill", + "description": "Load one skill's instructions from the SKILL.md.", + "input_schema": { + "skill_name": { + "type": "string", + "description": "skill name", + "required": True, + }, }, - "path": { - "type": "string", - "description": "skills path", - "required": True - } - } - }) + }, + ) async def async_execute(self): - - if self.input_dict.get("skill_name"): - skill_name = self.input_dict.get("skill_name") - else: - raise RuntimeError("skill_name is required") - - if self.input_dict.get("path"): - skills_path = Path(self.input_dict.get("path")) - else: - raise RuntimeError("SKILL.md path is required") - - logger.info(f"🔧 Tool called: load_skill(skill_name='{skill_name}', path='{skills_path}')") - - skill_path = skills_path / skill_name / "SKILL.md" + """Execute the load skill operation. + + Loads the SKILL.md file from the specified skill directory. + If the skill is not found, sets an error message with available + skills in the output. + + The file path is constructed as: {skill_dir}/{skill_name}/SKILL.md + """ + skill_name = self.input_dict["skill_name"] + skill_dir = Path(self.context.skill_dir) + logger.info(f"🔧 Tool called: load_skill(skill_name='{skill_name}')") + skill_path = skill_dir / skill_name / "SKILL.md" + if not skill_path.exists(): - available = [d.name for d in skills_path.iterdir() # both files and folders at the top leve - if d.is_dir() and (d / "SKILL.md").exists()] # each Skill is stored in one folder + available = [d.name for d in skill_dir / skill_name.iterdir() if d.is_dir() and (d / "SKILL.md").exists()] logger.exception(f"❌ Skill '{skill_name}' not found") - self.set_result(f"Skill '{skill_name}' not found. Available: {', '.join(available)}") + self.set_output(f"Skill '{skill_name}' not found. Available: {', '.join(available)}") return content = skill_path.read_text(encoding="utf-8") - frontmatter_match = re.match( - r"^---\s*\n(.*?)\n---\s*\n(.*)$", content, re.DOTALL - ) - - logger.info(f"✅ Loaded skill: {skill_name}") - if not frontmatter_match: - self.set_result(content) - else: - self.set_result(frontmatter_match.group(2)) + parts = content.split("---") + if len(parts) < 3: + logger.warning(f"No YAML frontmatter found in skill from {skill_path}") + return + logger.info(f"✅ Loaded skill: {skill_name}") + self.set_output(content) diff --git a/flowllm/extensions/skills/read_reference_file_op.py b/flowllm/extensions/skills/read_reference_file_op.py index 792516c..12f6048 100644 --- a/flowllm/extensions/skills/read_reference_file_op.py +++ b/flowllm/extensions/skills/read_reference_file_op.py @@ -1,62 +1,70 @@ -from loguru import logger +"""Operation for reading reference files from skills. + +This module provides the ReadReferenceFileOp class which allows reading +reference files (e.g., forms.md, reference.md) from skill directories. +""" + from pathlib import Path -from flowllm.context.service_context import C -from flowllm.op.base_async_tool_op import BaseAsyncToolOp -from flowllm.schema.tool_call import ToolCall + +from loguru import logger + +from ...core.context import C +from ...core.op import BaseAsyncToolOp +from ...core.schema import ToolCall -@C.register_op(register_app="FlowLLM") +@C.register_op() class ReadReferenceFileOp(BaseAsyncToolOp): + """Operation for reading reference files from a skill directory. + + This tool allows reading reference files like forms.md, reference.md, + or ooxml.md from a specific skill's directory. + """ def build_tool_call(self) -> ToolCall: - return ToolCall(**{ - "name": "read_reference_file", - "description": "Read a reference file from a skill (e.g., forms.md, reference.md, ooxml.md)", - "input_schema": { - "skill_name": { - "type": "string", - "description": "skill name", - "required": False - }, - "file_name": { - "type": "string", - "description": "reference file name", - "required": False + """Build the tool call definition for read_reference_file. + + Returns: + A ToolCall object defining the read_reference_file tool. + """ + return ToolCall( + **{ + "name": "read_reference_file", + "description": "Read a reference file from a skill (e.g., forms.md, reference.md, ooxml.md)", + "input_schema": { + "skill_name": { + "type": "string", + "description": "skill name", + "required": True, + }, + "file_name": { + "type": "string", + "description": "reference file name or file path", + "required": True, + }, }, - "path": { - "type": "string", - "description": "skills path", - "required": True - } - } - }) + }, + ) async def async_execute(self): - - if self.input_dict.get("skill_name"): - skill_name = self.input_dict.get("skill_name") - else: - raise RuntimeError("skill_name is required") - - if self.input_dict.get("file_name"): - file_name = self.input_dict.get("file_name") - else: - raise RuntimeError("reference_file_name is required") - - if self.input_dict.get("path"): - skills_path = Path(self.input_dict.get("path")) - else: - raise RuntimeError("skills path is required") - - logger.info(f"🔧 Tool called: read_reference_file(skill_name='{skill_name}', file_name='{file_name}', path='{skills_path})") - - file_path = skills_path / skill_name / file_name + """Execute the read reference file operation. + + Reads a reference file from the specified skill directory. + If the file is not found, sets an error message in the output. + + The file path is constructed as: {skill_dir}/{skill_name}/{file_name} + """ + skill_name = self.input_dict["skill_name"] + file_name = self.input_dict["file_name"] + skill_dir = Path(self.context.skill_dir) + + logger.info(f"🔧 Tool called: read_reference_file(skill_name='{skill_name}', file_name='{file_name}')") + + file_path = skill_dir / skill_name / file_name if not file_path.exists(): logger.exception(f"❌ File not found: {file_path}") - self.set_result(f"File '{file_name}' not found in skill '{skill_name}'") + self.set_output(f"File '{file_name}' not found in skill '{skill_name}'") return logger.info(f"✅ Read file: {skill_name}/{file_name}") - self.set_result(file_path.read_text(encoding="utf-8")) - - + self.set_output(file_path.read_text(encoding="utf-8")) diff --git a/flowllm/extensions/skills/requirements.txt b/flowllm/extensions/skills/requirements.txt deleted file mode 100644 index 8c2e012..0000000 --- a/flowllm/extensions/skills/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -pipreqs \ No newline at end of file diff --git a/flowllm/extensions/skills/run_shell_command_op.py b/flowllm/extensions/skills/run_shell_command_op.py index 161f291..054d00a 100644 --- a/flowllm/extensions/skills/run_shell_command_op.py +++ b/flowllm/extensions/skills/run_shell_command_op.py @@ -1,56 +1,87 @@ -import re +"""Operation for running shell commands. + +This module provides the RunShellCommandOp class which executes shell +commands in a subprocess, with automatic dependency detection and +installation for script files. +""" + import os +import re import subprocess from loguru import logger -from flowllm.context.service_context import C -from flowllm.op.base_async_tool_op import BaseAsyncToolOp -from flowllm.schema.tool_call import ToolCall +from ...core.context import C +from ...core.op import BaseAsyncToolOp +from ...core.schema import ToolCall -@C.register_op(register_app="FlowLLM") + +@C.register_op() class RunShellCommandOp(BaseAsyncToolOp): + """Operation for running shell commands in a subprocess. + + This tool executes shell commands and can automatically detect and + install dependencies for script files (Python, JavaScript, Shell). + It supports timeout configuration and error handling. + """ def build_tool_call(self) -> ToolCall: - return ToolCall(**{ - "name": "run_shell_command", - "description": "run shell command (e.g., 'echo \'hello world\'') in a subprocess.", - "input_schema": { - "command": { - "type": "string", - "description": "shell command", - "required": True + """Build the tool call definition for run_shell_command. + + Returns: + A ToolCall object defining the run_shell_command tool. + """ + return ToolCall( + **{ + "name": "run_shell_command", + "description": "run shell command (e.g., 'echo 'hello world'') in a subprocess.", + "input_schema": { + "command": { + "type": "string", + "description": "shell command", + "required": True, + }, + "timeout": { + "type": "int", + "description": "timeout for the subprocess", + "required": False, + }, }, - "timeout": { - "type": "int", - "description": "timeout for the subprocess", - "required": False - } - } - }) - + }, + ) + async def async_execute(self): - if self.input_dict.get("command"): - command = self.input_dict.get("command") - else: - raise RuntimeError("one shell command is required") - timeout = self.input_dict.get("timeout", 60) - + """Execute the shell command operation. + + Runs a shell command in a subprocess. If the command contains + a script file path, automatically detects the language and + installs any required dependencies before execution. + + Args: + command: The shell command to execute. + timeout: Optional timeout in seconds for the subprocess. + + The output (stdout) is captured and set as the operation output. + If an error occurs, the error output is captured and returned. + """ + command: str = self.input_dict["command"] + timeout: int | None = self.input_dict.get("timeout", None) + # Extract script path from command script_path = self._extract_script_path(command) if script_path: # Detect language language = self._detect_language(script_path) logger.info(f"Detected language: {language} in the script: {script_path}") - + # Detect and install dependencies dependencies = self._detect_dependencies(script_path, language) self._install_dependencies(dependencies, script_path, language) else: logger.info("No script detected in the command. Skipping language detection and dependency installation.") - + logger.info(f"Executing command:\n {command}") - + try: output = subprocess.run( command, @@ -60,50 +91,102 @@ async def async_execute(self): stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ).stdout.decode() + except subprocess.CalledProcessError as error: logger.exception(f"❌ Error during command execution: {error}") - self.set_result(error.stdout.decode()) + self.set_output(error.stdout.decode()) return - logger.info("✅ Successfully run the shell commands") - self.set_result(output.strip()) + logger.info(f"✅ Successfully run the shell commands \n{output}") + self.set_output(output) + + @staticmethod + def _extract_script_path(command: str): + """Extract script file path from a shell command. + + Uses regex to find file paths with common script extensions + (.py, .js, .jsx, .sh, .bash) in the command string. + + Args: + command: The shell command string. - def _extract_script_path(self, command): + Returns: + The extracted script path if found, None otherwise. + """ # This regex looks for common script file extensions - match = re.search(r'((?:/[\w.-]+)+\.(?:py|js|jsx|sh|bash))', command) + match = re.search(r"((?:/[\w.-]+)+\.(?:py|js|jsx|sh|bash))", command) return match.group(1) if match else None - def _detect_language(self, script_path): + @staticmethod + def _detect_language(script_path): + """Detect the programming language from a script file extension. + + Args: + script_path: Path to the script file. + + Returns: + The detected language name ("Python", "JavaScript", "Shell", or "Unknown"). + """ _, ext = os.path.splitext(script_path) ext = ext.lower() - - if ext in ['.py']: - return 'Python' - elif ext in ['.js', '.jsx']: - return 'JavaScript' - elif ext in ['.sh', '.bash']: - return 'Shell' - return 'Unknown' + + if ext in [".py"]: + return "Python" + elif ext in [".js", ".jsx"]: + return "JavaScript" + elif ext in [".sh", ".bash"]: + return "Shell" + return "Unknown" def _detect_dependencies(self, script_path, language): - if language == 'Python': + """Detect dependencies required by a script. + + Args: + script_path: Path to the script file. + language: The programming language of the script. + + Returns: + A list of dependency strings, empty if none found or language not supported. + """ + if language == "Python": return self._detect_python_dependencies(script_path) # Add more language-specific dependency detection methods here return [] - def _detect_python_dependencies(self, script_path): + @staticmethod + def _detect_python_dependencies(script_path): + """Detect Python dependencies using pipreqs. + + Uses pipreqs to analyze the script directory and generate + a requirements.txt file, then reads and returns the dependencies. + + Args: + script_path: Path to the Python script file. + + Returns: + A list of dependency strings from the generated requirements.txt. + """ # Use pipreqs to generate requirements - requirements_path = 'requirements.txt' - subprocess.run(['pipreqs', os.path.dirname(script_path), '--savepath', requirements_path], check=True) - with open(requirements_path, 'r') as file: + requirements_path = "requirements.txt" + subprocess.run(["pipreqs", os.path.dirname(script_path), "--savepath", requirements_path], check=True) + with open(requirements_path, "r", encoding="utf-8") as file: dependencies = file.read().splitlines() os.remove(requirements_path) return dependencies - - def _install_dependencies(self, dependencies, script_path, language): - if language == 'Python': + + @staticmethod + def _install_dependencies(dependencies, script_path, language): + """Install dependencies for a script. + + Installs the detected dependencies using the appropriate package + manager for the given language. + + Args: + dependencies: List of dependency strings to install. + script_path: Path to the script file (used for working directory). + language: The programming language of the script. + """ + if language == "Python": for dep in dependencies: - subprocess.run(['pip', 'install', dep], check=True, cwd=os.path.dirname(script_path)) + subprocess.run(["pip", "install", dep], check=True, cwd=os.path.dirname(script_path)) # Add more language-specific installation methods here - - \ No newline at end of file diff --git a/flowllm/extensions/skills/skill_agent_op.py b/flowllm/extensions/skills/skill_agent_op.py new file mode 100644 index 0000000..bb5bf7b --- /dev/null +++ b/flowllm/extensions/skills/skill_agent_op.py @@ -0,0 +1,108 @@ +"""Skill agent operation for orchestrating skill-based task execution. + +This module provides the SkillAgentOp class which coordinates the execution +of skills by managing tool calls and LLM interactions. +""" + +from typing import List + +from loguru import logger + +from ...core.context import C +from ...core.enumeration import Role +from ...core.op import BaseAsyncToolOp, BaseAsyncOp +from ...core.schema import Message + + +@C.register_op() +class SkillAgentOp(BaseAsyncOp): + """An agent operation that orchestrates skill-based task execution. + + This operation manages a conversation loop with an LLM, allowing it to + use various skill-related tools (load_skill, read_reference_file, + run_shell_command) to complete tasks. It iterates up to max_iterations + times, allowing the LLM to make multiple tool calls as needed. + + Attributes: + max_iterations: Maximum number of iterations for the agent loop. + """ + + file_path: str = __file__ + + def __init__(self, llm: str = "qwen3_max_instruct", max_iterations: int = 10, **kwargs): + """Initialize the SkillAgentOp. + + Args: + llm: The LLM model identifier to use for chat interactions. + max_iterations: Maximum number of agent loop iterations. + **kwargs: Additional arguments passed to the base class. + """ + super().__init__(llm=llm, **kwargs) + self.max_iterations: int = max_iterations + + async def async_execute(self): + """Execute the skill agent operation. + + This method orchestrates the skill-based task execution by: + 1. Loading skill metadata + 2. Setting up the system prompt with available skills + 3. Running an iterative loop where the LLM can use tools + 4. Collecting tool outputs and continuing the conversation + 5. Setting the final response in the context + + The operation stops when the LLM no longer makes tool calls or + max_iterations is reached. + """ + query: str = self.context.query + skill_dir: str = self.context.skill_dir + logger.info(f"UseSkillOp processing query: {query} with access to skills in {skill_dir}") + + load_skill_metadata_op: BaseAsyncToolOp = self.ops.load_skill_metadata_op + load_skill_op: BaseAsyncToolOp = self.ops.load_skill_op + read_reference_file_op: BaseAsyncToolOp = self.ops.read_reference_file_op + run_shell_command_op: BaseAsyncToolOp = self.ops.run_shell_command_op + + await load_skill_metadata_op.async_call() + system_prompt = self.prompt_format( + "system_prompt", + skill_dir=skill_dir, + skill_metadata=load_skill_metadata_op.output, + ) + + messages = [ + Message(role=Role.SYSTEM, content=system_prompt), + Message(role=Role.USER, content=query), + ] + + tool_op_dict: dict = { + op.tool_call.name: op for op in [load_skill_op, read_reference_file_op, run_shell_command_op] + } + + for i in range(self.max_iterations): + assistant_message = await self.llm.achat( + messages=messages, + tools=[x.tool_call for x in tool_op_dict.values()], + ) + messages.append(assistant_message) + logger.info(assistant_message.model_dump_json()) + + if not assistant_message.tool_calls: + break + + ops: List[BaseAsyncToolOp] = [] + for j, tool in enumerate(assistant_message.tool_calls): + op = tool_op_dict[tool.name].copy() + op.tool_call.id = tool.id + ops.append(op) + logger.info(f"{self.name} submit op{j}={op.name} argument={tool.argument_dict}") + self.submit_async_task(op.async_call, **tool.argument_dict) + + await self.join_async_task() + + for op in ops: + messages.append(Message(role=Role.TOOL, content=op.output, tool_call_id=op.tool_call.id)) + tool_content = f"[{self.name}.{i}.{op.name}] {op.output[:200]}...\n\n" + logger.info(tool_content) + + self.context.response.answer = messages[-1].content + self.context.response.metadata["messages"] = [x.simple_dump() for x in messages] diff --git a/flowllm/extensions/skills/use_skill_prompt.yaml b/flowllm/extensions/skills/skill_agent_prompt.yaml similarity index 82% rename from flowllm/extensions/skills/use_skill_prompt.yaml rename to flowllm/extensions/skills/skill_agent_prompt.yaml index c1be486..cf46708 100644 --- a/flowllm/extensions/skills/use_skill_prompt.yaml +++ b/flowllm/extensions/skills/skill_agent_prompt.yaml @@ -1,10 +1,10 @@ system_prompt: | You are a helpful AI assistant with access to specialized skills. - When you encounter tasks involving specific domains or file formats, use the "load_skill" tool to gain expert knowledge. - Skills path: {skills_path} - Available skills (each line is "- : "): - {skill_metadatas} + When you encounter tasks involving specific domains or file formats, use the "load_skill" tool to gain expert knowledge. + Skills Dir: {skills_dir} + Available skills: + {skill_metadata} Workflow: 1. Identify if the task needs specialized knowledge. @@ -18,7 +18,7 @@ system_prompt: | Important: - Only load skills and additional resources when they are directly relevant to the current task - - Skill scripts are located in: skills//scripts/ + - Skill scripts are located in: {skills_dir}//scripts/ - When running skill scripts, use the full path from current directory when creating the shell commands - - Example: skills/pdf/scripts/check_fillable_fields.py + - Example: python {skills_dir}//scripts/check_fillable_fields.py - If a task requires multiple skills, load and apply them sequentially as needed diff --git a/flowllm/extensions/skills/task_complete_op.py b/flowllm/extensions/skills/task_complete_op.py deleted file mode 100644 index 4ff658f..0000000 --- a/flowllm/extensions/skills/task_complete_op.py +++ /dev/null @@ -1,19 +0,0 @@ -import asyncio - -from flowllm.context.service_context import C -from flowllm.op.base_async_tool_op import BaseAsyncToolOp -from flowllm.schema.tool_call import ToolCall - - -@C.register_op(register_app="FlowLLM") -class TaskCompleteOp(BaseAsyncToolOp): - - def build_tool_call(self) -> ToolCall: - return ToolCall(**{ - "name": "task_complete", - "description": "Call this tool to indicate that the task is complete.", - }) - - async def async_execute(self): - self.set_result(f"The task is complete.") - diff --git a/flowllm/extensions/skills/use_skill_op.py b/flowllm/extensions/skills/use_skill_op.py deleted file mode 100644 index faca0bf..0000000 --- a/flowllm/extensions/skills/use_skill_op.py +++ /dev/null @@ -1,205 +0,0 @@ -import asyncio -import json -import re -from pathlib import Path -from typing import List, Dict, Any - -from loguru import logger - -from flowllm.context import C, FlowContext -from flowllm.enumeration.chunk_enum import ChunkEnum -from flowllm.enumeration.role import Role -from flowllm.op.base_async_tool_op import BaseAsyncToolOp -from flowllm.schema.message import Message -from flowllm.schema.tool_call import ToolCall - - -@C.register_op(register_app="FlowLLM") -class UseSkillOp(BaseAsyncToolOp): - file_path: str = __file__ - - def __init__(self, - llm: str = "qwen3_max_instruct", - # llm: str = "qwen3_235b_instruct", - # llm: str = "qwen3_80b_instruct", - max_iterations: int = 20, - **kwargs): - super().__init__(llm=llm, **kwargs) - self.max_iterations: int = max_iterations - - def build_tool_call(self) -> ToolCall: - return ToolCall(**{ - "description": "Automatically uses pre-built Skills relevant to the query when needed.", - "input_schema": { - "query": { - "type": "string", - "description": "query", - "required": True - }, - "path": { - "type": "string", - "description": "skills path", - "required": True - } - } - }) - - async def get_skill_metadata(self, content: str, path: str) -> dict[str, Any] | None: - frontmatter_match = re.match( - r"^---\s*\n(.*?)\n---\s*\n(.*)$", content, re.DOTALL - ) - - if not frontmatter_match: - logger.warning(f"No YAML frontmatter found in skill from {path}") - return None - - frontmatter_text = frontmatter_match.group(1) - name_match = re.search(r"^name:\s*(.+)$", frontmatter_text, re.MULTILINE) - desc_match = re.search(r"^description:\s*(.+)$", frontmatter_text, re.MULTILINE) - - if not name_match or not desc_match: - logger.warning(f"Missing name or description in skill from {path}") - return None - - name = name_match.group(1).strip().strip("\"'") - description = desc_match.group(1).strip().strip("\"'") - - return { - "name": name, - "description": description, - } - - async def list_skills(self, path: Path) -> list[dict[str, Any]]: - """ - Return the metadata for each Skill: its name and description. - Agent loads this metadata at startup to know which Skills are available. - - This is the first level of progressive disclosure, - where Agent identifies the available Skills without loading their entire instructions. - """ - skill_files = list(path.rglob("SKILL.md")) - - skill_metadatas = "" - for skill_file in skill_files: - content = skill_file.read_text(encoding="utf-8") - metadata = await self.get_skill_metadata(content, str(skill_file)) - skill_metadatas += f"- {metadata["name"]}: {metadata["description"]}\n" - - return skill_metadatas - - async def async_execute(self): - - if self.input_dict.get("query"): - query = self.input_dict.get("query") - else: - raise RuntimeError("query is required") - - if self.input_dict.get("path"): - skill_path = Path(self.input_dict.get("path")) - else: - raise RuntimeError("query is required") - - logger.info(f"UseSkillOp processing query: {query} with access to skills in {skill_path}") - - tool_dict: Dict[str, BaseAsyncToolOp] = {} - for op in self.ops: - assert isinstance(op, BaseAsyncToolOp) - assert op.tool_call.name not in tool_dict, f"Duplicate tool name={op.tool_call.name}" - tool_dict[op.tool_call.name] = op - logger.info(f"add tool call={op.tool_call.simple_input_dump()}") - - # load the skill metadatas at startup and include them in the system prompt - skill_metadatas = await self.list_skills(skill_path) - system_prompt = self.prompt_format("system_prompt", - skills_path=skill_path, - skill_metadatas=skill_metadatas) - # logger.info(system_prompt) - messages = [ - Message(role=Role.SYSTEM, content=system_prompt), - Message(role=Role.USER, content=query), - ] - - results = [] - for i in range(self.max_iterations): - assistant_message = await self.llm.achat(messages=messages, - tools=[x.tool_call for x in tool_dict.values()]) - messages.append(assistant_message) - - assistant_content = f"[{self.name}.{i}]" - if assistant_message.content: - assistant_content += f" content={assistant_message.content}" - if assistant_message.reasoning_content: - assistant_content += f" reasoning={assistant_message.reasoning_content}" - if assistant_message.tool_calls: - tool_call_str = " | ".join([json.dumps(t.simple_output_dump(), ensure_ascii=False) \ - for t in assistant_message.tool_calls]) - assistant_content += f" tool_calls={tool_call_str}" - assistant_content += "\n\n" - logger.info(assistant_content) - await self.context.add_stream_chunk_and_type(assistant_content, ChunkEnum.THINK) - - if not assistant_message.tool_calls: - break - - ops: List[BaseAsyncToolOp] = [] - for j, tool in enumerate(assistant_message.tool_calls): - op = tool_dict[tool.name].copy() - op.tool_call.id = tool.id - ops.append(op) - logger.info(f"{self.name} submit op{j}={op.name} argument={tool.argument_dict}") - self.submit_async_task(op.async_call, **tool.argument_dict, stream_queue=self.context.stream_queue) - - await self.join_async_task() - - done: bool = False - for op in ops: - messages.append(Message(role=Role.TOOL, - content=op.output, - tool_call_id=op.tool_call.id)) - tool_content = f"[{self.name}.{i}.{op.name}] {op.output[:200]}...\n\n" - logger.info(tool_content) - await self.context.add_stream_chunk_and_type(tool_content, ChunkEnum.TOOL) - - if op.tool_call.name == "task_complete": - done = True - - if done: - break - - -async def main(): - from flowllm.app import FlowLLMApp - from flowllm.op.skills import LoadSkillOp, ReadReferenceFileOp, RunShellCommandOp, TaskCompleteOp - - async with FlowLLMApp(load_default_config=True): - # Help me merge two PDF files (minimal-document.pdf and pdflatex-outline) and save to merged.pdf - context = FlowContext(query="Fill Sample-Fillable-PDF.pdf with: name='Alice Johnson', select first choice from dropdown, check options 1 and 3, dependent name='Bob Johnson', age='12'. Save as filled-sample.pdf", - path="./skills", - stream_queue=asyncio.Queue()) - - op = UseSkillOp() \ - << LoadSkillOp() << ReadReferenceFileOp() << RunShellCommandOp() << TaskCompleteOp() - - await op.async_call(context=context) - - # async def async_call(): - # await op.async_call(context=context) - # await context.add_stream_done() - - # task = asyncio.create_task(async_call()) - - # while True: - # stream_chunk = await context.stream_queue.get() - # if stream_chunk.done: - # print("\nend") - # await task - # break - - # else: - # print(stream_chunk.chunk, end="") - - # await task - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/flowllm/extensions/utils/dt_utils.py b/flowllm/extensions/utils/dt_utils.py index 7029011..0b3f5f5 100644 --- a/flowllm/extensions/utils/dt_utils.py +++ b/flowllm/extensions/utils/dt_utils.py @@ -1,6 +1,7 @@ -"""日期时间工具函数模块。 +"""Date and time utility functions module. -提供用于处理日期范围、查找特定日期等功能的工具函数。 +This module provides utility functions for working with date ranges, finding specific dates, +and performing binary search operations on date lists. """ from datetime import datetime, timedelta @@ -8,18 +9,19 @@ def get_monday_fridays(start_str: str, end_str: str) -> List[List[str]]: - """获取指定日期范围内所有周一到周五的日期范围列表。 + """Get all Monday-to-Friday date ranges within the specified date range. - 从开始日期开始,找到第一个周五,然后每隔7天获取一个周一到周五的日期范围, - 直到结束日期。 + Starting from the start date, finds the first Friday, then retrieves a Monday-to-Friday + date range every 7 days until the end date. Args: - start_str: 开始日期字符串,格式为 "%Y%m%d"(例如:"20240101") - end_str: 结束日期字符串,格式为 "%Y%m%d"(例如:"20241231") + start_str: Start date string in "%Y%m%d" format (e.g., "20240101") + end_str: End date string in "%Y%m%d" format (e.g., "20241231") Returns: - 包含周一到周五日期范围的列表,每个元素是一个包含两个日期字符串的列表: - [周一日期, 周五日期]。如果开始日期大于结束日期或范围内没有周五,返回空列表。 + A list of Monday-to-Friday date ranges, where each element is a list containing + two date strings: [Monday date, Friday date]. Returns an empty list if the start + date is greater than the end date or if there are no Fridays in the range. """ start = datetime.strptime(str(start_str), "%Y%m%d") end = datetime.strptime(str(end_str), "%Y%m%d") @@ -45,14 +47,15 @@ def get_monday_fridays(start_str: str, end_str: str) -> List[List[str]]: return result -def next_friday_or_same(date_str): - """获取给定日期之后的下一个周五,如果当天就是周五则返回当天。 +def next_friday_or_same(date_str: str) -> str: + """Get the next Friday after the given date, or return the same date if it's already Friday. Args: - date_str: 日期字符串,格式为 "%Y%m%d"(例如:"20240115") + date_str: Date string in "%Y%m%d" format (e.g., "20240115") Returns: - 下一个周五或当天的日期字符串,格式为 "%Y%m%d" + The next Friday or the same date if it's already Friday, as a date string + in "%Y%m%d" format. """ dt = datetime.strptime(date_str, "%Y%m%d") days_ahead = (4 - dt.weekday()) % 7 @@ -60,10 +63,23 @@ def next_friday_or_same(date_str): return next_fri.strftime("%Y%m%d") -def find_dt_less_index(dt: str | int, dt_list: List[str | int]): - """ - Use binary search to find the index of the date that is closest to and less than dt. - Time complexity: O(log n) +def find_dt_less_index(dt: str | int, dt_list: List[str | int]) -> Optional[int]: + """Find the index of the date that is closest to and less than or equal to dt using binary search. + + This function performs a binary search to efficiently find the index of the largest date + in the sorted list that is less than or equal to the target date. + + Args: + dt: Target date as a string or integer + dt_list: Sorted list of dates in ascending order (strings or integers) + + Returns: + Index of the date that is closest to and less than or equal to dt, or None if + dt is less than all dates in the list. Returns the last index if dt is greater + than or equal to all dates in the list. + + Note: + Time complexity: O(log n) """ if not dt_list: return None diff --git a/flowllm/extensions/utils/edit_utils.py b/flowllm/extensions/utils/edit_utils.py index fd17b2b..6f3eadf 100644 --- a/flowllm/extensions/utils/edit_utils.py +++ b/flowllm/extensions/utils/edit_utils.py @@ -8,12 +8,33 @@ def escape_regex(s: str) -> str: - """Escape special regex characters.""" + """Escape special regex characters in a string. + + This function escapes all special regex characters in the input string so that + it can be used as a literal string in a regular expression pattern. + + Args: + s: The string containing characters that may need escaping + + Returns: + A string with all special regex characters escaped + """ return re.escape(s) def restore_trailing_newline(original: str, modified: str) -> str: - """Restore trailing newline to match original.""" + """Restore trailing newline to match the original string. + + This function ensures that the modified string has the same trailing newline + behavior as the original string, preserving the file's newline format. + + Args: + original: The original string before modification + modified: The modified string that may have lost or gained a trailing newline + + Returns: + The modified string with trailing newline adjusted to match the original + """ had_newline = original.endswith("\n") if had_newline and not modified.endswith("\n"): return modified + "\n" @@ -27,7 +48,22 @@ def calculate_exact_replacement( old_string: str, new_string: str, ) -> tuple[str, int] | None: - """Try exact string replacement.""" + """Perform exact string replacement in content. + + This function attempts to replace the old_string with new_string using exact + matching. It normalizes line endings (converts \\r\\n to \\n) before matching + and preserves the original trailing newline behavior. + + Args: + content: The original content string to modify + old_string: The exact string to find and replace + new_string: The replacement string + + Returns: + A tuple containing (modified_content, occurrence_count) if the old_string + is found, or None if no match is found. The occurrence_count indicates + how many times the old_string appears in the content. + """ normalized_content = content normalized_old = old_string.replace("\r\n", "\n") normalized_new = new_string.replace("\r\n", "\n") @@ -45,7 +81,23 @@ def calculate_flexible_replacement( old_string: str, new_string: str, ) -> tuple[str, int] | None: - """Try flexible replacement ignoring indentation.""" + """Perform flexible string replacement that ignores indentation differences. + + This function matches and replaces text by comparing line content while ignoring + leading whitespace (indentation). It preserves the indentation of the first matched + line when applying the replacement, making it useful for code editing where + indentation may vary. + + Args: + content: The original content string to modify + old_string: The string pattern to find (indentation is ignored during matching) + new_string: The replacement string (will be indented to match the original) + + Returns: + A tuple containing (modified_content, occurrence_count) if matches are found, + or None if no match is found. The occurrence_count indicates how many times + the pattern was found and replaced. + """ normalized_content = content normalized_old = old_string.replace("\r\n", "\n") normalized_new = new_string.replace("\r\n", "\n") @@ -85,7 +137,22 @@ def calculate_regex_replacement( old_string: str, new_string: str, ) -> tuple[str, int] | None: - """Try regex-based flexible replacement.""" + """Perform regex-based flexible replacement with whitespace tolerance. + + This function converts the old_string into a regex pattern by escaping special + characters and allowing flexible whitespace between tokens. It's useful for + matching code patterns where whitespace may vary. The replacement preserves + the indentation of the matched block. + + Args: + content: The original content string to modify + old_string: The string pattern to find (converted to regex with flexible whitespace) + new_string: The replacement string (will be indented to match the original) + + Returns: + A tuple containing (modified_content, 1) if a match is found, or None if + no match is found. Only the first match is replaced. + """ normalized_old = old_string.replace("\r\n", "\n") normalized_new = new_string.replace("\r\n", "\n") diff --git a/pyproject.toml b/pyproject.toml index af68ca5..66feb6a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,8 +78,12 @@ token = [ "tiktoken>=0.5.0", ] +skill = [ + "pipreqs", +] + full = [ - "flowllm[dev,reme,fin,token]", + "flowllm[dev,reme,fin,token,skill]", ] [tool.setuptools.packages.find] diff --git a/tests_op/test_skill_agent_op.py b/tests_op/test_skill_agent_op.py new file mode 100644 index 0000000..e69de29 From d1627585c6d0a735e87a3cfc7030ef24b13346fe Mon Sep 17 00:00:00 2001 From: "jinli.yl" Date: Wed, 19 Nov 2025 14:52:17 +0800 Subject: [PATCH 3/6] refactor(skills): rename skill ops and update metadata output format --- flowllm/config/default.yaml | 8 ++-- .../skills/load_skill_metadata_op.py | 4 +- flowllm/extensions/skills/load_skill_op.py | 2 +- .../extensions/skills/run_shell_command_op.py | 2 +- flowllm/extensions/skills/skill_agent_op.py | 24 +++++------ .../extensions/skills/skill_agent_prompt.yaml | 8 ++-- tests/test_skill_agent_flow.py | 41 +++++++++++++++++++ 7 files changed, 65 insertions(+), 24 deletions(-) create mode 100644 tests/test_skill_agent_flow.py diff --git a/flowllm/config/default.yaml b/flowllm/config/default.yaml index 49b3feb..d2ac260 100644 --- a/flowllm/config/default.yaml +++ b/flowllm/config/default.yaml @@ -43,10 +43,10 @@ flow: skill_agent_flow: flow_content: | skill_op = SkillAgentOp() - skill_op.ops.load_skill_metadata = LoadSkillMetadataOp() - skill_op.ops.load_skill_op = LoadSkillOp() - skill_op.ops.read_reference_file_op = ReadReferenceFileOp() - skill_op.ops.run_shell_command_op = RunShellCommandOp() + skill_op.ops.load_metadata = LoadSkillMetadataOp() + skill_op.ops.load_skill = LoadSkillOp() + skill_op.ops.read_reference = ReadReferenceFileOp() + skill_op.ops.run_shell = RunShellCommandOp() skill_op description: "Automatically uses pre-built Skills relevant to the query when needed." input_schema: diff --git a/flowllm/extensions/skills/load_skill_metadata_op.py b/flowllm/extensions/skills/load_skill_metadata_op.py index ef4d912..2eedd14 100644 --- a/flowllm/extensions/skills/load_skill_metadata_op.py +++ b/flowllm/extensions/skills/load_skill_metadata_op.py @@ -4,7 +4,6 @@ directory and extracts metadata (name and description) from all SKILL.md files. """ -import json from pathlib import Path from loguru import logger @@ -101,4 +100,5 @@ async def async_execute(self): logger.info(f"✅ Loaded {len(skill_metadata_list)} skill metadata entries") # Return as JSON string for easy parsing - self.set_output(json.dumps(skill_metadata_list, ensure_ascii=False, indent=2)) + skill_metadata_list = [f"- {x['name']}: {x['description']}" for x in skill_metadata_list] + self.set_output("\n".join(skill_metadata_list)) diff --git a/flowllm/extensions/skills/load_skill_op.py b/flowllm/extensions/skills/load_skill_op.py index 1f97d79..1517e57 100644 --- a/flowllm/extensions/skills/load_skill_op.py +++ b/flowllm/extensions/skills/load_skill_op.py @@ -57,7 +57,7 @@ async def async_execute(self): skill_path = skill_dir / skill_name / "SKILL.md" if not skill_path.exists(): - available = [d.name for d in skill_dir / skill_name.iterdir() if d.is_dir() and (d / "SKILL.md").exists()] + available = [d.name for d in (skill_dir / skill_name).iterdir() if d.is_dir() and (d / "SKILL.md").exists()] logger.exception(f"❌ Skill '{skill_name}' not found") self.set_output(f"Skill '{skill_name}' not found. Available: {', '.join(available)}") return diff --git a/flowllm/extensions/skills/run_shell_command_op.py b/flowllm/extensions/skills/run_shell_command_op.py index 054d00a..37c3246 100644 --- a/flowllm/extensions/skills/run_shell_command_op.py +++ b/flowllm/extensions/skills/run_shell_command_op.py @@ -42,7 +42,7 @@ def build_tool_call(self) -> ToolCall: "required": True, }, "timeout": { - "type": "int", + "type": "integer", "description": "timeout for the subprocess", "required": False, }, diff --git a/flowllm/extensions/skills/skill_agent_op.py b/flowllm/extensions/skills/skill_agent_op.py index bb5bf7b..cf109d4 100644 --- a/flowllm/extensions/skills/skill_agent_op.py +++ b/flowllm/extensions/skills/skill_agent_op.py @@ -55,18 +55,20 @@ async def async_execute(self): """ query: str = self.context.query skill_dir: str = self.context.skill_dir - logger.info(f"UseSkillOp processing query: {query} with access to skills in {skill_dir}") + logger.info(f"SkillAgentOp processing query: {query} with access to skills in {skill_dir}") - load_skill_metadata_op: BaseAsyncToolOp = self.ops.load_skill_metadata_op - load_skill_op: BaseAsyncToolOp = self.ops.load_skill_op - read_reference_file_op: BaseAsyncToolOp = self.ops.read_reference_file_op - run_shell_command_op: BaseAsyncToolOp = self.ops.run_shell_command_op + load_metadata_op: BaseAsyncToolOp = self.ops.load_metadata + load_skill_op: BaseAsyncToolOp = self.ops.load_skill + read_reference_op: BaseAsyncToolOp = self.ops.read_reference + run_shell_op: BaseAsyncToolOp = self.ops.run_shell - await load_skill_metadata_op.async_call() + await load_metadata_op.async_call(skill_dir=skill_dir) + skill_metadata = load_metadata_op.output + logger.info(f"SkillAgentOp loaded skill metadata: {skill_metadata}") system_prompt = self.prompt_format( "system_prompt", skill_dir=skill_dir, - skill_metadata=load_skill_metadata_op.output, + skill_metadata=skill_metadata, ) messages = [ @@ -74,9 +76,7 @@ async def async_execute(self): Message(role=Role.USER, content=query), ] - tool_op_dict: dict = { - op.tool_call.name: op for op in [load_skill_op, read_reference_file_op, run_shell_command_op] - } + tool_op_dict: dict = {op.tool_call.name: op for op in [load_skill_op, read_reference_op, run_shell_op]} for i in range(self.max_iterations): assistant_message = await self.llm.achat( @@ -91,11 +91,11 @@ async def async_execute(self): ops: List[BaseAsyncToolOp] = [] for j, tool in enumerate(assistant_message.tool_calls): - op = tool_op_dict[tool.name].copy() + op: BaseAsyncToolOp = tool_op_dict[tool.name].copy() op.tool_call.id = tool.id ops.append(op) logger.info(f"{self.name} submit op{j}={op.name} argument={tool.argument_dict}") - self.submit_async_task(op.async_call, **tool.argument_dict) + self.submit_async_task(op.async_call, skill_dir=skill_dir, **tool.argument_dict) await self.join_async_task() diff --git a/flowllm/extensions/skills/skill_agent_prompt.yaml b/flowllm/extensions/skills/skill_agent_prompt.yaml index cf46708..8efa3f3 100644 --- a/flowllm/extensions/skills/skill_agent_prompt.yaml +++ b/flowllm/extensions/skills/skill_agent_prompt.yaml @@ -2,8 +2,8 @@ system_prompt: | You are a helpful AI assistant with access to specialized skills. When you encounter tasks involving specific domains or file formats, use the "load_skill" tool to gain expert knowledge. - Skills Dir: {skills_dir} - Available skills: + Skill Dir: {skill_dir} + Available skills (each line is "- : ") {skill_metadata} Workflow: @@ -18,7 +18,7 @@ system_prompt: | Important: - Only load skills and additional resources when they are directly relevant to the current task - - Skill scripts are located in: {skills_dir}//scripts/ + - Skill scripts are located in: {skill_dir}//scripts/ - When running skill scripts, use the full path from current directory when creating the shell commands - - Example: python {skills_dir}//scripts/check_fillable_fields.py + - Example: {skill_dir}//scripts/check_fillable_fields.py - If a task requires multiple skills, load and apply them sequentially as needed diff --git a/tests/test_skill_agent_flow.py b/tests/test_skill_agent_flow.py new file mode 100644 index 0000000..b4ba066 --- /dev/null +++ b/tests/test_skill_agent_flow.py @@ -0,0 +1,41 @@ +"""Test module for skill_agent_flow functionality. + +This module contains a test demonstrating the usage of HttpClient +for executing skill_agent_flow with a PDF filling task. +""" + +import asyncio + +from flowllm.core.utils import HttpClient + + +async def main(): + """Test function for skill_agent_flow. + + This function demonstrates how to use HttpClient to execute + skill_agent_flow with a PDF filling task. + curl -X POST http://localhost:8002/skill_agent_flow \ + -H "Content-Type: application/json" \ + -d '{ + "query": "xxxx", + "skill_dir": "../skills" + }' + """ + async with HttpClient("http://0.0.0.0:8002") as client: + query = ( + "Fill Sample-Fillable-PDF.pdf with: name='Alice Johnson', select first choice from dropdown, " + "check options 1 and 3, dependent name='Bob Johnson', age='12'. Save as filled-sample.pdf" + ) + skill_dir = "../skills" + print("=" * 50) + print("Testing skill_agent_flow endpoint...") + response = await client.execute_flow( + "skill_agent_flow", + query=query, + skill_dir=skill_dir, + ) + print(f"Result: {response.answer}") + + +if __name__ == "__main__": + asyncio.run(main()) From cf63adbc7927bc9a0c646a3d93b69e22de6e23dd Mon Sep 17 00:00:00 2001 From: "jinli.yl" Date: Wed, 19 Nov 2025 16:06:07 +0800 Subject: [PATCH 4/6] feat(skills): refactor skill metadata loading and update prompts --- flowllm/core/op/base_async_tool_op.py | 2 +- .../skills/load_skill_metadata_op.py | 18 +++++++------ flowllm/extensions/skills/skill_agent_op.py | 8 +++--- .../extensions/skills/skill_agent_prompt.yaml | 26 +++++++++++++++++++ 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/flowllm/core/op/base_async_tool_op.py b/flowllm/core/op/base_async_tool_op.py index 09895f2..e42481b 100644 --- a/flowllm/core/op/base_async_tool_op.py +++ b/flowllm/core/op/base_async_tool_op.py @@ -137,7 +137,7 @@ def output_keys(self) -> str | List[str]: return output_keys @property - def output(self) -> str: + def output(self): """Convenience accessor for the primary output value. Raises: diff --git a/flowllm/extensions/skills/load_skill_metadata_op.py b/flowllm/extensions/skills/load_skill_metadata_op.py index 2eedd14..e57598f 100644 --- a/flowllm/extensions/skills/load_skill_metadata_op.py +++ b/flowllm/extensions/skills/load_skill_metadata_op.py @@ -90,15 +90,17 @@ async def async_execute(self): logger.info(f"🔧 Tool called: load_skill_metadata(path={skill_dir})") skill_files = list(skill_dir.rglob("SKILL.md")) - skill_metadata_list = [] + skill_metadata_dict = {} for skill_file in skill_files: content = skill_file.read_text(encoding="utf-8") metadata = await self.parse_skill_metadata(content, str(skill_file)) if metadata: - skill_metadata_list.append(metadata) - - logger.info(f"✅ Loaded {len(skill_metadata_list)} skill metadata entries") - - # Return as JSON string for easy parsing - skill_metadata_list = [f"- {x['name']}: {x['description']}" for x in skill_metadata_list] - self.set_output("\n".join(skill_metadata_list)) + skill_dir = skill_file.parent.as_posix() + skill_metadata_dict[metadata["name"]] = { + "description": metadata["description"], + "skill_dir": skill_dir, + } + logger.info(f"✅ Loaded skill {metadata['name']} metadata skill_dir={skill_dir}") + + logger.info(f"✅ Loaded {len(skill_metadata_dict)} skill metadata entries") + self.set_output(skill_metadata_dict) diff --git a/flowllm/extensions/skills/skill_agent_op.py b/flowllm/extensions/skills/skill_agent_op.py index cf109d4..ba9db86 100644 --- a/flowllm/extensions/skills/skill_agent_op.py +++ b/flowllm/extensions/skills/skill_agent_op.py @@ -63,12 +63,14 @@ async def async_execute(self): run_shell_op: BaseAsyncToolOp = self.ops.run_shell await load_metadata_op.async_call(skill_dir=skill_dir) - skill_metadata = load_metadata_op.output - logger.info(f"SkillAgentOp loaded skill metadata: {skill_metadata}") + skill_metadata_dict: dict = load_metadata_op.output + + skill_metadata_list = [f"- {k}: {v['description']}" for k, v in skill_metadata_dict.items()] + logger.info(f"SkillAgentOp loaded skill metadata: {skill_metadata_dict}") system_prompt = self.prompt_format( "system_prompt", skill_dir=skill_dir, - skill_metadata=skill_metadata, + skill_metadata="\n".join(skill_metadata_list), ) messages = [ diff --git a/flowllm/extensions/skills/skill_agent_prompt.yaml b/flowllm/extensions/skills/skill_agent_prompt.yaml index 8efa3f3..db26c97 100644 --- a/flowllm/extensions/skills/skill_agent_prompt.yaml +++ b/flowllm/extensions/skills/skill_agent_prompt.yaml @@ -22,3 +22,29 @@ system_prompt: | - When running skill scripts, use the full path from current directory when creating the shell commands - Example: {skill_dir}//scripts/check_fillable_fields.py - If a task requires multiple skills, load and apply them sequentially as needed + + +system_prompt_zh: | + 你是一个具备专业技能访问权限的智能助手。 + + 当你遇到涉及特定领域或文件格式的任务时,请使用“load_skill”工具来获取专家级知识。 + 技能目录:{skill_dir} + 可用技能(每行格式为“- <技能名称>: <技能描述>”) + {skill_metadata} + + 工作流程: + 1. 判断当前任务是否需要专业知识。 + 2. 如果需要专业知识,请从可用技能列表中选择最相关的技能。 + 3. 使用“load_skill”工具加载所选技能的详细说明。这会将该技能目录下的 SKILL.md 文件内容载入你的上下文。 + 4. 如果技能说明中提到了参考文件(例如 forms.md),仅在任务明确需要时,才使用“read_reference_file”工具读取其内容。 + 5. 如果技能包含可执行脚本(例如 fill_form.py),在必要时使用“run_shell_command”工具运行相应的 shell 命令。请注意,只有脚本的输出结果会被添加到你的上下文中,而非脚本本身的代码。 + 6. 遵循已加载技能中的说明进行操作。 + 7. 根据需要使用其他可用工具。 + 8. 任务完成后,调用“task_complete”工具,表明你已完成当前任务。 + + 重要提示: + - 仅在当前任务直接相关时,才加载技能和额外资源。 + - 技能脚本位于:{skill_dir}/<技能名称>/scripts/ + - 运行技能脚本时,请在 shell 命令中使用从当前目录出发的完整路径。 + - 示例:{skill_dir}/<技能名称>/scripts/check_fillable_fields.py + - 如果一个任务需要多个技能,请按需依次加载并应用它们。 \ No newline at end of file From 8e60ab8a840d4bd1925809c8d4f20603bea453c5 Mon Sep 17 00:00:00 2001 From: "jinli.yl" Date: Thu, 20 Nov 2025 11:23:57 +0800 Subject: [PATCH 5/6] fix(core): change default value of raise_exception to False --- docs/zh/todo.md | 2 ++ flowllm/core/op/base_op.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/zh/todo.md b/docs/zh/todo.md index 269c2b6..ff59bd8 100644 --- a/docs/zh/todo.md +++ b/docs/zh/todo.md @@ -16,3 +16,5 @@ - [ ] github action - [ ] contribution guide - [ ] ah feature & backtest +- [ ] Look at the Bollinger Bands range with 95% standard deviation. + diff --git a/flowllm/core/op/base_op.py b/flowllm/core/op/base_op.py index 71bacc6..de8fa09 100644 --- a/flowllm/core/op/base_op.py +++ b/flowllm/core/op/base_op.py @@ -105,7 +105,7 @@ def __init__( name: str = "", async_mode: bool = False, max_retries: int = 1, - raise_exception: bool = True, + raise_exception: bool = False, enable_multithread: bool = True, language: str = "", prompt_path: str = "", From be4cbd89878b23aa7d94ab50d4ea84f5b8aeb18b Mon Sep 17 00:00:00 2001 From: "jinli.yl" Date: Thu, 20 Nov 2025 15:28:41 +0800 Subject: [PATCH 6/6] feat(agent): introduce ReactAgentOp and ReactSearchOp for tool-augmented reasoning --- flowllm/core/context/base_context.py | 8 ++ flowllm/core/service/base_service.py | 9 +- flowllm/gallery/__init__.py | 4 +- flowllm/gallery/agent/__init__.py | 9 ++ flowllm/gallery/agent/react_agent_op.py | 125 ++++++++++++++++++ flowllm/gallery/agent/react_agent_prompt.yaml | 5 + flowllm/gallery/agent/react_search_op.py | 23 ++++ flowllm/gallery/react_search_op.py | 115 ---------------- flowllm/gallery/react_search_prompt.yaml | 19 --- flowllm/gallery/think_tool_op.py | 37 ++++++ flowllm/gallery/think_tool_prompt.yaml | 32 +++++ old/op/think_op.py | 57 -------- tests_op/test_react_search_op.py | 4 +- 13 files changed, 249 insertions(+), 198 deletions(-) create mode 100644 flowllm/gallery/agent/__init__.py create mode 100644 flowllm/gallery/agent/react_agent_op.py create mode 100644 flowllm/gallery/agent/react_agent_prompt.yaml create mode 100644 flowllm/gallery/agent/react_search_op.py delete mode 100644 flowllm/gallery/react_search_op.py delete mode 100644 flowllm/gallery/react_search_prompt.yaml create mode 100644 flowllm/gallery/think_tool_op.py create mode 100644 flowllm/gallery/think_tool_prompt.yaml delete mode 100644 old/op/think_op.py diff --git a/flowllm/core/context/base_context.py b/flowllm/core/context/base_context.py index 9893073..058db7c 100644 --- a/flowllm/core/context/base_context.py +++ b/flowllm/core/context/base_context.py @@ -127,6 +127,14 @@ def keys(self): """ return self._data.keys() + def values(self): + """Get all values in the context. + + Returns: + A view object of all values in the context. + """ + return self._data.values() + def update(self, kwargs: dict): """Update context with new key-value pairs. diff --git a/flowllm/core/service/base_service.py b/flowllm/core/service/base_service.py index 6286f85..f71747a 100644 --- a/flowllm/core/service/base_service.py +++ b/flowllm/core/service/base_service.py @@ -56,19 +56,22 @@ def run(self): prints the logo (optional) and suppresses deprecation warnings. Concrete services should call super().run() before their own startup logic. """ + flow_names = [] for _, flow in C.flow_dict.items(): assert isinstance(flow, BaseFlow) if flow.stream: if self.integrate_stream_flow(flow): - logger.info(f"integrate {flow.name}[stream]") + flow_names.append(flow.name) elif isinstance(flow, BaseToolFlow): if self.integrate_tool_flow(flow): - logger.info(f"integrate {flow.name}") + flow_names.append(flow.name) else: if self.integrate_flow(flow): - logger.info(f"integrate {flow.name}") + flow_names.append(flow.name) + + logger.info(f"integrate {','.join(flow_names)}") import warnings diff --git a/flowllm/gallery/__init__.py b/flowllm/gallery/__init__.py index bc2fe62..392b34a 100644 --- a/flowllm/gallery/__init__.py +++ b/flowllm/gallery/__init__.py @@ -13,24 +13,24 @@ - TokenCountOp: Token counting operation for calculating token usage in messages """ +from . import agent from .chat_op import ChatOp from .code_analyse_op import CodeAnalyseOp from .dashscope_search_op import DashscopeSearchOp from .execute_code_op import ExecuteCodeOp from .gen_system_prompt_op import GenSystemPromptOp from .mock_search_op import MockSearchOp -from .react_search_op import ReactSearchOp from .stream_chat_op import StreamChatOp from .token_count_op import TokenCountOp __all__ = [ + "agent", "ChatOp", "CodeAnalyseOp", "DashscopeSearchOp", "ExecuteCodeOp", "GenSystemPromptOp", "MockSearchOp", - "ReactSearchOp", "StreamChatOp", "TokenCountOp", ] diff --git a/flowllm/gallery/agent/__init__.py b/flowllm/gallery/agent/__init__.py new file mode 100644 index 0000000..7f9bab3 --- /dev/null +++ b/flowllm/gallery/agent/__init__.py @@ -0,0 +1,9 @@ +"""Convenience exports for agent operators.""" + +from .react_agent_op import ReactAgentOp +from .react_search_op import ReactSearchOp + +__all__ = [ + "ReactAgentOp", + "ReactSearchOp", +] diff --git a/flowllm/gallery/agent/react_agent_op.py b/flowllm/gallery/agent/react_agent_op.py new file mode 100644 index 0000000..b54f0cd --- /dev/null +++ b/flowllm/gallery/agent/react_agent_op.py @@ -0,0 +1,125 @@ +"""Reactive agent operator that orchestrates tool-augmented LLM reasoning.""" + +import datetime +import time +from typing import List, Dict + +from loguru import logger + +from ...core.context import C, BaseContext +from ...core.enumeration import Role +from ...core.op import BaseAsyncToolOp +from ...core.schema import Message, ToolCall + + +@C.register_op() +class ReactAgentOp(BaseAsyncToolOp): + """React-style agent capable of iterative tool invocation.""" + + file_path: str = __file__ + + def __init__( + self, + llm: str = "qwen3_30b_instruct", + max_steps: int = 5, + tool_call_interval: float = 1.0, + add_think_tool: bool = False, + **kwargs, + ): + """Initialize the agent runtime configuration.""" + super().__init__(llm=llm, **kwargs) + self.max_steps: int = max_steps + self.tool_call_interval: float = tool_call_interval + self.add_think_tool: bool = add_think_tool + + def build_tool_call(self) -> ToolCall: + """Expose metadata describing how to invoke the agent.""" + return ToolCall( + **{ + "description": "A React agent that answers user queries.", + "input_schema": { + "query": { + "type": "string", + "description": "query", + "required": True, + }, + }, + }, + ) + + def _build_tool_op_dict(self) -> dict: + """Collect available tool operators from the execution context.""" + assert isinstance(self.ops, BaseContext), "self.ops must be BaseContext" + tool_op_dict: Dict[str, BaseAsyncToolOp] = { + op.tool_call.name: op for op in self.ops.values() if isinstance(op, BaseAsyncToolOp) + } + for op in tool_op_dict.values(): + op.language = self.language + return tool_op_dict + + async def async_execute(self): + """Main execution loop that alternates LLM calls and tool invocations.""" + from ..think_tool_op import ThinkToolOp + + think_op = ThinkToolOp(language=self.language) + + query: str = self.input_dict["query"] + + tool_op_dict = self._build_tool_op_dict() + + if self.add_think_tool: + tool_op_dict["think_tool"] = think_op + + now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + messages = [ + Message(role=Role.SYSTEM, content=self.prompt_format(prompt_name="system_prompt", time=now_time)), + Message(role=Role.USER, content=query), + ] + logger.info(f"round0.system={messages[0].model_dump_json()}") + logger.info(f"round0.user={messages[1].model_dump_json()}") + + for i in range(self.max_steps): + assistant_message: Message = await self.llm.achat( + messages=messages, + tools=[op.tool_call for op in tool_op_dict.values()], + ) + messages.append(assistant_message) + logger.info(f"round{i + 1}.assistant={assistant_message.model_dump_json()}") + + if not assistant_message.tool_calls: + break + + op_list: List[BaseAsyncToolOp] = [] + has_think_tool_flag: bool = False + for j, tool_call in enumerate(assistant_message.tool_calls): + if tool_call.name == think_op.tool_call.name: + has_think_tool_flag = True + + if tool_call.name not in tool_op_dict: + logger.exception(f"unknown tool_call.name={tool_call.name}") + continue + + logger.info(f"round{i + 1}.{j} submit tool_calls={tool_call.name} argument={tool_call.argument_dict}") + + op_copy: BaseAsyncToolOp = tool_op_dict[tool_call.name].copy() + op_copy.tool_call.id = tool_call.id + op_list.append(op_copy) + self.submit_async_task(op_copy.async_call, **tool_call.argument_dict) + time.sleep(self.tool_call_interval) + + await self.join_async_task() + + for j, op in enumerate(op_list): + logger.info(f"round{i + 1}.{j} join tool_result={op.output}") + tool_result = str(op.output) + tool_message = Message(role=Role.TOOL, content=tool_result, tool_call_id=op.tool_call.id) + messages.append(tool_message) + + if self.add_think_tool: + if not has_think_tool_flag: + tool_op_dict["think_tool"] = think_op + else: + tool_op_dict.pop("think_tool") + + self.set_output(messages[-1].content) + self.context.response.messages = messages diff --git a/flowllm/gallery/agent/react_agent_prompt.yaml b/flowllm/gallery/agent/react_agent_prompt.yaml new file mode 100644 index 0000000..84e65a6 --- /dev/null +++ b/flowllm/gallery/agent/react_agent_prompt.yaml @@ -0,0 +1,5 @@ +system_prompt: | + You are a helpful assistant. The current time is {time}. + +system_prompt_zh: | + 你是一个有用的助手。当前时间是 {time}。 diff --git a/flowllm/gallery/agent/react_search_op.py b/flowllm/gallery/agent/react_search_op.py new file mode 100644 index 0000000..7a4ae96 --- /dev/null +++ b/flowllm/gallery/agent/react_search_op.py @@ -0,0 +1,23 @@ +"""ReactAgentOp specialization that ensures search capability is available.""" + +from typing import Dict + +from .react_agent_op import ReactAgentOp +from ...core.context import C +from ...core.op import BaseAsyncToolOp + + +@C.register_op() +class ReactSearchOp(ReactAgentOp): + """Agent that guarantees a search tool fallback when none are configured.""" + + def _build_tool_op_dict(self) -> dict: + """Extend parent tools with a default search operator when needed.""" + tool_op_dict: Dict[str, BaseAsyncToolOp] = super()._build_tool_op_dict() + if not tool_op_dict: + from ..dashscope_search_op import DashscopeSearchOp + + search_op = DashscopeSearchOp() + tool_op_dict[search_op.tool_call.name] = search_op + + return tool_op_dict diff --git a/flowllm/gallery/react_search_op.py b/flowllm/gallery/react_search_op.py deleted file mode 100644 index 85da530..0000000 --- a/flowllm/gallery/react_search_op.py +++ /dev/null @@ -1,115 +0,0 @@ -"""ReAct (Reasoning and Acting) search operation module. - -This module implements a ReAct agent that answers user queries by reasoning about -the problem and taking actions (such as searching) in an iterative manner. The agent -can use search tools to gather information and provide comprehensive answers. -""" - -import datetime -import time -from typing import List - -from loguru import logger - -from ..core.context import C -from ..core.enumeration import Role -from ..core.op import BaseAsyncToolOp -from ..core.schema import Message -from ..core.schema import ToolCall - - -@C.register_op() -class ReactSearchOp(BaseAsyncToolOp): - """A ReAct (Reasoning and Acting) agent for answering queries using search tools. - - This operation implements a ReAct agent that iteratively reasons about user queries - and takes actions (like searching) to gather information. The agent continues until - it has enough information to provide a final answer or reaches the maximum number - of steps. - - Attributes: - llm: The language model to use for reasoning (default: "qwen3_30b_instruct"). - max_steps: Maximum number of reasoning-action iterations (default: 5). - """ - - file_path: str = __file__ - - def __init__(self, llm: str = "qwen3_30b_thinking", max_steps: int = 5, **kwargs): - super().__init__(llm=llm, **kwargs) - self.max_steps: int = max_steps - - def build_tool_call(self) -> ToolCall: - return ToolCall( - **{ - "description": "A React agent that answers user queries.", - "input_schema": { - "query": { - "type": "string", - "description": "query", - "required": True, - }, - }, - }, - ) - - async def async_execute(self): - query: str = self.input_dict["query"] - if "search" in self.ops: - search_op = self.ops.search - else: - from .dashscope_search_op import DashscopeSearchOp - - search_op = DashscopeSearchOp() - - assert isinstance(search_op, BaseAsyncToolOp) - # NOTE search_op.tool_call.name √ search_op.name × - tool_dict = {search_op.tool_call.name: search_op} - - now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - user_prompt = self.prompt_format( - prompt_name="role_prompt", - time=now_time, - tools=",".join(tool_dict.keys()), - query=query, - ) - messages: List[Message] = [Message(role=Role.USER, content=user_prompt)] - logger.info(f"step.0 user_prompt={user_prompt}") - - for i in range(self.max_steps): - assistant_message: Message = await self.llm.achat( - messages, - tools=[op.tool_call for op in tool_dict.values()], - ) - messages.append(assistant_message) - logger.info( - f"assistant.round{i}.reasoning_content={assistant_message.reasoning_content}\n" - f"content={assistant_message.content}\n" - f"tool.size={len(assistant_message.tool_calls)}", - ) - - if not assistant_message.tool_calls: - break - - op_list: List[BaseAsyncToolOp] = [] - for j, tool_call in enumerate(assistant_message.tool_calls): - logger.info(f"submit step={i} tool_calls.name={tool_call.name} argument_dict={tool_call.argument_dict}") - - if tool_call.name not in tool_dict: - logger.exception(f"step={i} no tool_call.name={tool_call.name}") - continue - - op_copy = tool_dict[tool_call.name].copy() - op_list.append(op_copy) - self.submit_async_task(op_copy.async_call, **tool_call.argument_dict) - time.sleep(1) - - await self.join_async_task() - - for j, op in enumerate(op_list): - logger.info(f"submit step.index={i}.{j} tool_result={op.output}") - tool_result = str(op.output) - tool_message = Message(role=Role.TOOL, content=tool_result, tool_call_id=op.tool_call.id) - messages.append(tool_message) - - self.set_output(messages[-1].content) - self.context.response.messages = messages diff --git a/flowllm/gallery/react_search_prompt.yaml b/flowllm/gallery/react_search_prompt.yaml deleted file mode 100644 index 4e6cfc2..0000000 --- a/flowllm/gallery/react_search_prompt.yaml +++ /dev/null @@ -1,19 +0,0 @@ -role_prompt: | - You are a helpful assistant. - The current time is {time}. - Please proactively choose the most suitable tool or combination of tools based on the user's question, including {tools} etc. - Please first think about how to break down the problem into subtasks, what tools and parameters should be used for each subtask, and finally provide the tool call name and parameters. - Try calling the same tool multiple times with different parameters to obtain information from various perspectives. - Please determine the response language based on the language of the user's question. - - {query} - -role_prompt_zh: | - 你是一个有用的助手。 - 当前时间是 {time}。 - 请根据用户的问题,主动选择最合适的工具或工具组合,包括 {tools} 等。 - 请先思考如何将问题分解为子任务,每个子任务应使用哪些工具和参数,最后提供工具调用名称和参数。 - 尝试多次使用相同的工具,但使用不同的参数,从多个角度获取信息。 - 请根据用户问题的语言来确定回复的语言。 - - {query} diff --git a/flowllm/gallery/think_tool_op.py b/flowllm/gallery/think_tool_op.py new file mode 100644 index 0000000..b933e03 --- /dev/null +++ b/flowllm/gallery/think_tool_op.py @@ -0,0 +1,37 @@ +"""Lightweight tool op used to elicit intermediate thinking from the LLM.""" + +from ..core.context import C +from ..core.op import BaseAsyncToolOp +from ..core.schema import ToolCall + + +@C.register_op() +class ThinkToolOp(BaseAsyncToolOp): + """Utility operation that prompts the model for explicit reflection text.""" + + file_path = __file__ + + def __init__(self, add_output_reflection: bool = False, **kwargs): + super().__init__(**kwargs) + self.add_output_reflection: bool = add_output_reflection + + def build_tool_call(self) -> ToolCall: + return ToolCall( + **{ + "name": "think_tool", + "description": self.get_prompt("tool_desc"), + "input_schema": { + "reflection": { + "type": "string", + "description": self.get_prompt("reflection"), + "required": True, + }, + }, + }, + ) + + async def async_execute(self): + if self.add_output_reflection: + self.set_output(self.input_dict["reflection"]) + else: + self.set_output(self.get_prompt("reflection_output")) diff --git a/flowllm/gallery/think_tool_prompt.yaml b/flowllm/gallery/think_tool_prompt.yaml new file mode 100644 index 0000000..cd9f17e --- /dev/null +++ b/flowllm/gallery/think_tool_prompt.yaml @@ -0,0 +1,32 @@ +tool_desc: | + Before calling any external tool, invoke this tool to articulate a brief plan. + The output must cover: + 1. Whether the current context is enough to answer the user directly, plus reasoning. + 2. If not, what information or validation is missing. + 3. A strategy to close the gap: which tool to call next, why, and key parameters or query terms. + Keep the reasoning tightly scoped to the current turn, avoid unrelated background, + and do not execute tools from here—only produce clear, actionable thoughts. + +reflection: | + 1) Can I answer now? Why? + 2) What is missing? + 3) Which tool + params next? + +tool_desc_zh: | + 每次准备调用任何外部工具之前,都必须先调用本工具进行简短思考。 + 输出需覆盖以下要点: + 1. 评估当前上下文是否足以直接回答用户问题,并解释理由。 + 2. 若不能回答,明确缺失的信息或验证步骤。 + 3. 针对缺口设计下一步策略:列出计划使用的工具、调用目的、关键参数或查询关键词。 + 思考要紧扣当前轮对话内容,避免复述无关背景,不要直接执行工具,只输出清晰推理。 + +reflection_zh: | + 1) 能直接回答吗?为什么? + 2) 缺什么信息? + 3) 下一步用哪个工具+参数? + +reflection_output: | + Reflection has been recorded. + +reflection_output_zh: | + 已经记录反思 \ No newline at end of file diff --git a/old/op/think_op.py b/old/op/think_op.py deleted file mode 100644 index dfa3ab3..0000000 --- a/old/op/think_op.py +++ /dev/null @@ -1,57 +0,0 @@ -import asyncio - -from flowllm.context.flow_context import FlowContext -from flowllm.context.service_context import C -from flowllm.op.base_async_tool_op import BaseAsyncToolOp -from flowllm.schema.tool_call import ToolCall - - -@C.register_op(register_app="FlowLLM") -class ThinkToolOp(BaseAsyncToolOp): - - def build_tool_call(self) -> ToolCall: - return ToolCall( - **{ - "name": "think_tool", - "description": """ -Tool for strategic reflection on research progress and decision-making. - -Use this tool after each search to analyze results and plan next steps systematically. -This creates a deliberate pause in the research workflow for quality decision-making. - -When to use: -- After receiving search results: What key information did I find? -- Before deciding next steps: Do I have enough to answer comprehensively? -- When assessing research gaps: What specific information am I still missing? -- Before concluding research: Can I provide a complete answer now? - -Reflection should address: -1. Analysis of current findings - What concrete information have I gathered? -2. Gap assessment - What crucial information is still missing? -3. Quality evaluation - Do I have sufficient evidence/examples for a good answer? -4. Strategic decision - Should I continue searching or provide my answer? - """.strip(), - "input_schema": { - "reflection": { - "type": "str", - "description": "Your detailed reflection on research progress, findings, gaps, and next steps.", - "required": True, - }, - }, - }, - ) - - async def async_execute(self): - reflection: str = self.input_dict["reflection"] - self.set_result(f"Reflection recorded: {reflection}") - - -async def main(): - op = ThinkToolOp() - context = FlowContext(reflection="haha") - await op.async_call(context) - print(f"Result: {op.output}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/tests_op/test_react_search_op.py b/tests_op/test_react_search_op.py index b3740dd..6504c2d 100644 --- a/tests_op/test_react_search_op.py +++ b/tests_op/test_react_search_op.py @@ -6,14 +6,14 @@ import asyncio -from flowllm.gallery.react_search_op import ReactSearchOp +from flowllm.gallery.agent import ReactSearchOp from flowllm.main import FlowLLMApp async def async_main(): """Test function for ReactSearchOp.""" async with FlowLLMApp(): - op = ReactSearchOp() + op = ReactSearchOp(add_think_tool=True, language="zh") await op.async_call(query="小米股价为什么一直跌?现在还值得买吗?") print(op.output)