diff --git a/api/apps/sdk/powerrag_proxy.py b/api/apps/sdk/powerrag_proxy.py index 3bafdf867..065071c34 100644 --- a/api/apps/sdk/powerrag_proxy.py +++ b/api/apps/sdk/powerrag_proxy.py @@ -21,8 +21,11 @@ 这样 SDK 可以通过主 RAGFlow 服务访问 PowerRAG 功能,无需直接连接到 PowerRAG server """ -import os +import asyncio import logging +import os +from io import BytesIO + import httpx from quart import request, jsonify from api.utils.api_utils import token_required, get_error_data_result @@ -93,10 +96,15 @@ async def _forward_request(method: str, endpoint: str, tenant_id: str = None): # 因为会丢失文件名。需要构造 httpx 期望的格式 files = {} for field_name, file_storage in files_dict.items(): - # httpx 期望格式: (filename, content, content_type) + # 在线程中读取文件内容(避免阻塞事件循环) + # httpx 期望文件对象或元组格式 + # 使用 BytesIO 将 bytes 包装成文件对象 + file_content = await asyncio.to_thread(file_storage.read) + # httpx 期望格式: (filename, file_object, content_type) 或 (filename, file_object) + file_obj = BytesIO(file_content) files[field_name] = ( file_storage.filename, - file_storage.read(), + file_obj, file_storage.content_type or 'application/octet-stream' ) except Exception: @@ -592,3 +600,37 @@ async def parse_to_md_upload_proxy(tenant_id): """ return await _forward_request("POST", "/parse_to_md/upload", tenant_id) + +@manager.route("/powerrag/split/file", methods=["POST"]) # noqa: F821 +@token_required +async def split_file_proxy(tenant_id): + """ + 代理 split/file API 请求到 PowerRAG server + + 支持所有ParserType方法对文件进行切片(使用文件路径或URL) + + --- + tags: + - PowerRAG Proxy + security: + - ApiKeyAuth: [] + """ + return await _forward_request("POST", "/split/file", tenant_id) + + +@manager.route("/powerrag/split/file/upload", methods=["POST"]) # noqa: F821 +@token_required +async def split_file_upload_proxy(tenant_id): + """ + 代理 split/file/upload API 请求到 PowerRAG server + + 上传文件并切片,支持所有ParserType方法 + + --- + tags: + - PowerRAG Proxy + security: + - ApiKeyAuth: [] + """ + return await _forward_request("POST", "/split/file/upload", tenant_id) + diff --git a/powerrag/sdk/README.md b/powerrag/sdk/README.md index f59868396..825a134f5 100644 --- a/powerrag/sdk/README.md +++ b/powerrag/sdk/README.md @@ -290,14 +290,122 @@ status = client.extraction.get_struct_extract_status(task['task_id']) ### 文本切片 -无需上传文档即可对文本进行切片: +无需上传文档即可对文本进行切片。 + +**注意**: `split_text` 方法仅支持以下三种解析器: +- `title`: 基于标题的切片 +- `regex`: 基于正则表达式的切片 +- `smart`: 智能切片 + +对于其他解析器(如 `naive`, `book`, `qa` 等),请使用 `split_file` 或 `split_file_upload` 方法。 ```python +# 使用 title 解析器 result = client.chunk.split_text( text="# Title\n\nContent...", parser_id="title", config={"chunk_token_num": 512} ) + +# 使用 regex 解析器 +result = client.chunk.split_text( + text="Section 1\n\nContent...", + parser_id="regex", + config={ + "chunk_token_num": 256, + "regex_pattern": r"Section \d+" + } +) + +# 使用 smart 解析器 +result = client.chunk.split_text( + text="Long text content...", + parser_id="smart", + config={"chunk_token_num": 512} +) + +print(f"Total chunks: {result['total_chunks']}") +for chunk in result['chunks']: + print(chunk) +``` + +### 文件切片 + +文件切片支持所有 ParserType 方法,提供三种使用方式: + +#### 方式 1: 使用本地文件路径 + +```python +result = client.chunk.split_file( + file_path="/path/to/document.pdf", + parser_id="book", # 支持所有 ParserType + config={ + "chunk_token_num": 512, + "delimiter": "\n。.;;!!??", + "lang": "Chinese", + "from_page": 0, + "to_page": 100000 + } +) +``` + +#### 方式 2: 使用文件 URL + +```python +result = client.chunk.split_file( + file_url="https://example.com/doc.pdf", + parser_id="naive", + config={ + "chunk_token_num": 256, + "max_file_size": 128 * 1024 * 1024, # 128MB + "download_timeout": 300, # 5分钟 + "head_request_timeout": 30 # 30秒 + } +) +``` + +#### 方式 3: 上传文件并切片 + +```python +result = client.chunk.split_file_upload( + file_path="/path/to/document.pdf", + parser_id="book", + config={ + "chunk_token_num": 512, + "delimiter": "\n。.;;!!??", + "lang": "Chinese" + } +) + +print(f"Total chunks: {result['total_chunks']}") +print(f"Filename: {result['filename']}") +for chunk in result['chunks']: + print(chunk) +``` + +**支持的 ParserType 方法:** +- 基础方法: `naive`, `title`, `regex`, `smart` +- 专业方法: `qa`, `book`, `laws`, `paper`, `manual`, `presentation` +- 特殊格式: `table`, `resume`, `picture`, `one`, `email` + +**配置参数说明:** +- `chunk_token_num` (int): 目标分块大小(tokens),默认 512 +- `delimiter` (str): 分隔符字符串,默认 `"\n。.;;!!??"` +- `lang` (str): 语言,默认 `"Chinese"` +- `from_page` (int): PDF 起始页码,默认 0 +- `to_page` (int): PDF 结束页码,默认 100000 +- `max_file_size` (int): URL 下载的最大文件大小(字节),仅用于 `file_url` 方式 +- `download_timeout` (int): 下载超时时间(秒),仅用于 `file_url` 方式 +- `head_request_timeout` (int): HEAD 请求超时时间(秒),仅用于 `file_url` 方式 + +**返回值结构:** +```python +{ + "parser_id": "book", + "chunks": ["chunk1", "chunk2", ...], # 字符串列表 + "total_chunks": 10, + "filename": "document.pdf" +} ``` ## 核心模块 @@ -558,15 +666,49 @@ client.chunk.delete(kb_id, doc_id, [chunk_id]) # 删除文档的所有切片 client.chunk.delete(kb_id, doc_id, None) -# 文本切片(无需上传文档) +# 文本切片(仅支持 title, regex, smart) result = client.chunk.split_text( text="# Title\n\nLong text to be chunked...", - parser_id="title", # 解析器ID - config={"chunk_token_num": 512} # 自定义配置 + parser_id="title", # 仅支持: title, regex, smart + config={"chunk_token_num": 512} ) print(f"Total chunks: {result['total_chunks']}") for chunk in result['chunks']: - print(chunk['content']) + print(chunk) + +# 文件切片(支持所有ParserType方法) +# 方式1: 使用本地文件路径 +result = client.chunk.split_file( + file_path="/path/to/document.pdf", + parser_id="book", # 支持所有 ParserType + config={ + "chunk_token_num": 512, + "delimiter": "\n。.;;!!??", + "lang": "Chinese" + } +) + +# 方式2: 使用文件URL +result = client.chunk.split_file( + file_url="https://example.com/doc.pdf", + parser_id="naive", + config={ + "chunk_token_num": 256, + "max_file_size": 128 * 1024 * 1024, # 128MB + "download_timeout": 300 + } +) + +# 方式3: 上传文件并切片 +result = client.chunk.split_file_upload( + file_path="/path/to/document.pdf", + parser_id="book", + config={"chunk_token_num": 512} +) +print(f"Total chunks: {result['total_chunks']}") +print(f"Filename: {result['filename']}") +for chunk in result['chunks']: + print(chunk) ``` ### 4. 信息抽取 (Extraction) @@ -894,6 +1036,7 @@ SDK 包含完整的测试套件,覆盖所有功能模块。 # 设置环境变量 export HOST_ADDRESS="http://127.0.0.1:9380" export POWERRAG_API_KEY="your-api-key" +export PYTHONPATH=$(pwd) # 运行测试 pytest powerrag/sdk/tests/ @@ -1203,6 +1346,47 @@ for result in results: # 重新解析或删除 ``` +### Q: 文本切片和文件切片有什么区别?应该使用哪个? + +A: +- **`split_text`**: 仅支持 `title`, `regex`, `smart` 三种解析器,适用于纯文本内容(Markdown格式) +- **`split_file`**: 支持所有 ParserType 方法,适用于文件(通过路径或URL) +- **`split_file_upload`**: 支持所有 ParserType 方法,适用于文件上传 + +**使用建议:** +- 如果只有文本内容且需要使用 `title`/`regex`/`smart`,使用 `split_text` +- 如果有文件且需要使用其他解析器(如 `book`, `qa`, `naive` 等),使用 `split_file` 或 `split_file_upload` +- 如果文件在本地,使用 `split_file(file_path=...)` 或 `split_file_upload` +- 如果文件在远程URL,使用 `split_file(file_url=...)` + +**示例:** +```python +# 文本切片(仅支持 title, regex, smart) +result = client.chunk.split_text( + text="# Title\n\nContent...", + parser_id="title" +) + +# 文件切片(支持所有解析器) +# 本地文件 +result = client.chunk.split_file( + file_path="/path/to/doc.pdf", + parser_id="book" # 可以使用任何解析器 +) + +# 远程文件 +result = client.chunk.split_file( + file_url="https://example.com/doc.pdf", + parser_id="naive" +) + +# 文件上传 +result = client.chunk.split_file_upload( + file_path="/path/to/doc.pdf", + parser_id="qa" +) +``` + ### Q: 如何解析无扩展名的文件? A: 使用 `parse_to_md_binary` 方法并使用 `input_type='auto'`(默认值): diff --git a/powerrag/sdk/modules/chunk_manager.py b/powerrag/sdk/modules/chunk_manager.py index 61acccd1b..45c41b64e 100644 --- a/powerrag/sdk/modules/chunk_manager.py +++ b/powerrag/sdk/modules/chunk_manager.py @@ -14,7 +14,9 @@ # limitations under the License. # -from typing import Optional, List, Dict, Any +import json +from pathlib import Path +from typing import Optional, List, Dict, Any, Union from .chunk import ChunkInfo @@ -268,4 +270,168 @@ def split_text( raise Exception(res_json.get("message", "Split text failed")) return res_json.get("data", {}) + + def split_file( + self, + file_path: Optional[str] = None, + file_url: Optional[str] = None, + parser_id: str = "naive", + config: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """ + 文件切片(支持所有ParserType方法) + + 支持三种方式: + 1. 本地文件路径:file_path + 2. 文件URL:file_url + 3. 文件上传:使用 split_file_upload 方法 + + Args: + file_path: 本地文件路径(可选,与file_url二选一) + file_url: 文件URL(可选,与file_path二选一) + parser_id: 解析器ID,支持所有ParserType: + - naive, qa, book, laws, paper, manual, presentation + - table, resume, picture, one, audio, email, tag + - knowledge_graph, title, regex, smart + 默认"naive" + config: 解析配置(可选) + - chunk_token_num: 目标分块大小(tokens),默认512 + - delimiter: 分隔符字符串,默认"\n。.;;!!??" + - lang: 语言,默认"Chinese" + - from_page: 起始页码,默认0 + - to_page: 结束页码,默认100000 + - max_file_size: URL下载的最大文件大小(字节) + - download_timeout: 下载超时时间(秒) + - head_request_timeout: HEAD请求超时时间(秒) + + Returns: + 切片结果,包含chunks列表、total_chunks数量和filename + + Raises: + Exception: API调用失败 + ValueError: file_path和file_url都未提供 + + Example: + ```python + # 使用本地文件路径 + result = client.chunk.split_file( + file_path="/path/to/document.pdf", + parser_id="book", + config={"chunk_token_num": 512} + ) + + # 使用文件URL + result = client.chunk.split_file( + file_url="https://example.com/doc.pdf", + parser_id="naive", + config={"chunk_token_num": 256} + ) + ``` + """ + if not file_path and not file_url: + raise ValueError("Either file_path or file_url must be provided") + + payload = { + "parser_id": parser_id, + } + + if file_path: + payload["file_path"] = file_path + if file_url: + payload["file_url"] = file_url + + if config: + payload["config"] = config + + url = "/powerrag/split/file" + res = self.client.post(url, json=payload) + + # 检查响应状态码 + if res.status_code != 200: + try: + error_json = res.json() + error_msg = error_json.get("message", f"HTTP {res.status_code}") + except Exception: + error_msg = f"HTTP {res.status_code}: {res.text[:200]}" + raise Exception(error_msg) + + res_json = res.json() + + if res_json.get("code") != 0: + raise Exception(res_json.get("message", "Split file failed")) + + return res_json.get("data", {}) + + def split_file_upload( + self, + file_path: Union[str, Path], + parser_id: str = "naive", + config: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """ + 上传文件并切片(支持所有ParserType方法) + + Args: + file_path: 本地文件路径 + parser_id: 解析器ID,支持所有ParserType,默认"naive" + config: 解析配置(可选) + - chunk_token_num: 目标分块大小(tokens),默认512 + - delimiter: 分隔符字符串,默认"\n。.;;!!??" + - lang: 语言,默认"Chinese" + - from_page: 起始页码,默认0 + - to_page: 结束页码,默认100000 + + Returns: + 切片结果,包含chunks列表、total_chunks数量和filename + + Raises: + Exception: API调用失败 + FileNotFoundError: 文件不存在 + + Example: + ```python + result = client.chunk.split_file_upload( + file_path="/path/to/document.pdf", + parser_id="book", + config={"chunk_token_num": 512} + ) + print(f"Total chunks: {result['total_chunks']}") + for chunk in result['chunks']: + print(chunk) + ``` + """ + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + + # 准备文件 + with open(path, "rb") as f: + files = [("file", (path.name, f.read()))] + + # 准备表单数据 + form_data = { + "parser_id": parser_id, + } + + if config: + form_data["config"] = json.dumps(config) + + url = "/powerrag/split/file/upload" + res = self.client.post(url, json=None, files=files, data=form_data) + + # 检查响应状态码 + if res.status_code != 200: + try: + error_json = res.json() + error_msg = error_json.get("message", f"HTTP {res.status_code}") + except Exception: + error_msg = f"HTTP {res.status_code}: {res.text[:200]}" + raise Exception(error_msg) + + res_json = res.json() + + if res_json.get("code") != 0: + raise Exception(res_json.get("message", "Split file upload failed")) + + return res_json.get("data", {}) diff --git a/powerrag/sdk/tests/test_chunk.py b/powerrag/sdk/tests/test_chunk.py index 940460201..7b7861e90 100644 --- a/powerrag/sdk/tests/test_chunk.py +++ b/powerrag/sdk/tests/test_chunk.py @@ -156,8 +156,128 @@ def test_split_text_with_config(self, client: PowerRAGClient): text = "This is a test document with multiple paragraphs." result = client.chunk.split_text( text=text, - parser_id="naive", + parser_id="regex", config={"chunk_token_num": 128} ) assert "chunks" in result or "total_chunks" in result + + def test_split_text_unsupported_parser(self, client: PowerRAGClient): + """测试不支持的parser_id应该抛出错误""" + text = "Test text" + # 使用一个真正不支持的 parser_id(如 "paper") + with pytest.raises(Exception) as exc_info: + client.chunk.split_text( + text=text, + parser_id="paper", # paper 不支持纯文本切片,需要文件处理 + config={"chunk_token_num": 128} + ) + error_msg = str(exc_info.value).lower() + # PowerRAG: "not supported for text splitting" + "split_file" + # RAGFlow proxy: "unknown chunker" + "supported text chunkers" + assert ( + ("not supported for text splitting" in error_msg and "split_file" in error_msg) + or ("unknown chunker" in error_msg and "paper" in error_msg) + ) + + +class TestChunkSplitFile: + """测试文件切片""" + + def test_split_file_with_file_path(self, client: PowerRAGClient, test_file_path: str): + """测试使用 file_path 参数的文件切片(服务器需能访问该路径)""" + result = client.chunk.split_file( + file_path=test_file_path, + parser_id="naive", + config={"chunk_token_num": 512}, + ) + assert "chunks" in result + assert "total_chunks" in result + assert "filename" in result + assert isinstance(result["chunks"], list) + assert result["total_chunks"] >= 0 + + def test_split_file_with_file_url(self, client: PowerRAGClient): + """测试使用 file_url 参数的文件切片""" + # 使用 httpbin.org(HTTP 避免 SSL 证书问题,返回 HTML) + result = client.chunk.split_file( + file_url="http://httpbin.org/html", + parser_id="naive", + config={"chunk_token_num": 512, "filename": "example.html"}, + ) + assert "chunks" in result + assert "total_chunks" in result + assert "filename" in result + assert isinstance(result["chunks"], list) + assert result["total_chunks"] >= 0 + + def test_split_file_missing_both(self, client: PowerRAGClient): + """测试 file_path 和 file_url 都未提供时应抛出错误""" + with pytest.raises(ValueError) as exc_info: + client.chunk.split_file(parser_id="naive") + assert "file_path" in str(exc_info.value).lower() or "file_url" in str(exc_info.value).lower() + assert "must be provided" in str(exc_info.value).lower() + + def test_split_file_invalid_url(self, client: PowerRAGClient): + """测试无效 URL 应抛出错误""" + with pytest.raises(Exception) as exc_info: + client.chunk.split_file( + file_url="http://invalid-domain-does-not-exist-12345.example.com/file.pdf", + parser_id="naive", + config={"download_timeout": 5}, + ) + error_msg = str(exc_info.value).lower() + assert "download" in error_msg or "failed" in error_msg or "connection" in error_msg + + def test_split_file_size_limit_exceeded(self, client: PowerRAGClient): + """测试超过大小限制的 URL 应抛出错误""" + # httpbin.org/bytes/1000000 返回 1MB,max_file_size=1024 应触发限制 + with pytest.raises(Exception) as exc_info: + client.chunk.split_file( + file_url="https://httpbin.org/bytes/1000000", + parser_id="naive", + config={"max_file_size": 1024}, + ) + error_msg = str(exc_info.value).lower() + assert "exceeds" in error_msg or "size" in error_msg or "limit" in error_msg + + def test_split_file_upload(self, client: PowerRAGClient, test_file_path: str): + """测试上传文件并切片""" + result = client.chunk.split_file_upload( + file_path=test_file_path, + parser_id="naive", + config={"chunk_token_num": 512} + ) + assert "chunks" in result + assert "total_chunks" in result + assert "filename" in result + assert isinstance(result["chunks"], list) + assert result["total_chunks"] >= 0 + + def test_split_file_upload_with_different_parsers(self, client: PowerRAGClient, test_file_path: str): + """测试使用不同parser_id的文件切片""" + parsers = ["naive", "book", "title"] + for parser_id in parsers: + try: + result = client.chunk.split_file_upload( + file_path=test_file_path, + parser_id=parser_id, + config={"chunk_token_num": 256} + ) + assert "chunks" in result + assert result["total_chunks"] >= 0 + except Exception as e: + # 某些parser可能不支持特定文件类型,这是正常的 + if "not supported" not in str(e).lower(): + raise + + def test_split_file_upload_nonexistent_file(self, client: PowerRAGClient): + """测试不存在的文件应该抛出错误""" + with pytest.raises((FileNotFoundError, Exception)) as exc_info: + client.chunk.split_file_upload( + file_path="/nonexistent/file.pdf", + parser_id="naive" + ) + # SDK raises FileNotFoundError locally; API may wrap in generic Exception + error_msg = str(exc_info.value).lower() + assert "not found" in error_msg or "no such file" in error_msg diff --git a/powerrag/sdk/tests/test_document.py b/powerrag/sdk/tests/test_document.py index 173b1ddde..69dac99ff 100644 --- a/powerrag/sdk/tests/test_document.py +++ b/powerrag/sdk/tests/test_document.py @@ -14,6 +14,8 @@ # limitations under the License. # +import time + import pytest from powerrag.sdk import PowerRAGClient @@ -219,13 +221,31 @@ def test_cancel_parse(self, client: PowerRAGClient, kb_id: str, test_file_path: """测试取消解析""" uploaded_docs = client.document.upload(kb_id, test_file_path) doc_id = uploaded_docs[0]["id"] - + try: client.document.parse_to_chunk(kb_id, [doc_id], wait=False) + + # Poll for parsing to start (RUNNING or SCHEDULE), timeout 10s + for _ in range(100): + doc = client.document.get(kb_id, doc_id) + if doc["run"] in ["RUNNING", "1", "SCHEDULE"]: + break + time.sleep(0.1) + else: + pytest.fail("Parsing did not start within 10s") + client.document.cancel_parse(kb_id, [doc_id]) - - doc = client.document.get(kb_id, doc_id) - assert doc["run"] in ["CANCEL", "UNSTART"] + + # Poll for cancel to propagate, timeout 5s + for _ in range(50): + doc = client.document.get(kb_id, doc_id) + if doc["run"] in ["CANCEL", "UNSTART", "2", "0"]: + break + time.sleep(0.1) + else: + pytest.fail("Cancel did not propagate within 5s") + + assert doc["run"] in ["CANCEL", "UNSTART", "2", "0"] finally: client.document.delete(kb_id, [doc_id]) @@ -954,12 +974,12 @@ def test_parse_from_invalid_url(self, client: PowerRAGClient): } ) - # 应该返回 400 错误 - assert response.status_code == 400 + # 无效 URL(连接失败)返回 502 (Bad Gateway) + assert response.status_code == 502 result = response.json() - assert result["code"] == 400 + assert result["code"] == 502 assert "Failed to download" in result["message"] - + def test_parse_cannot_provide_both_file_and_url(self, client: PowerRAGClient, tmp_path): """测试不能同时提供 file 和 file_url""" import requests diff --git a/powerrag/server/routes/powerrag_routes.py b/powerrag/server/routes/powerrag_routes.py index b7d7f3221..c6acc82cb 100644 --- a/powerrag/server/routes/powerrag_routes.py +++ b/powerrag/server/routes/powerrag_routes.py @@ -1187,11 +1187,13 @@ async def parse_to_md_upload(tenant_id): "message": f"Request timeout while downloading file from URL. Please try again or increase timeout." }), 408 except requests.exceptions.ConnectionError as e: + # ConnectionError: DNS failure, invalid URL, network unreachable, connection refused. + # 502 (Bad Gateway) indicates we could not reach the upstream URL. logger.error(f"Connection error downloading file from URL: {file_url}. Error: {e}") return jsonify({ - "code": 503, - "message": f"Failed to connect to file URL. Please check the URL and try again." - }), 503 + "code": 502, + "message": f"Failed to download file from URL: {str(e)}" + }), 502 except requests.exceptions.HTTPError as e: logger.error(f"HTTP error downloading file from URL: {file_url}. Error: {e}") return jsonify({ @@ -1199,11 +1201,12 @@ async def parse_to_md_upload(tenant_id): "message": f"HTTP error while downloading file: {str(e)}" }), 502 except requests.exceptions.RequestException as e: + # Catch-all for other request errors (e.g. TooManyRedirects) logger.error(f"Request error downloading file from URL: {file_url}. Error: {e}") return jsonify({ - "code": 400, + "code": 502, "message": f"Failed to download file from URL: {str(e)}" - }), 400 + }), 502 except Exception as e: logger.error(f"Unexpected error downloading file from URL: {file_url}. Error: {e}", exc_info=True) return jsonify({ @@ -1402,6 +1405,213 @@ async def split_text(tenant_id): }), 500 +@powerrag_bp.route("/split/file", methods=["POST"]) +@apikey_required +async def split_file(tenant_id): + """ + Split file into chunks using rag/app chunking methods + + Supports all ParserType methods: naive, qa, book, laws, paper, manual, + presentation, table, resume, picture, one, audio, email, tag, knowledge_graph, + title, regex, smart. + + Request JSON: + { + "file_path": "/path/to/document.pdf", # or use file_url + "file_url": "https://example.com/doc.pdf", # optional + "parser_id": "naive", + "config": { + "chunk_token_num": 512, + "delimiter": "\n。.;;!!??", + "lang": "Chinese", + "from_page": 0, + "to_page": 100000 + } + } + + Response: + { + "code": 0, + "data": { + "parser_id": "naive", + "chunks": ["chunk1", "chunk2", ...], + "total_chunks": 10, + "filename": "document.pdf" + } + } + """ + try: + data = await request.get_json() + + if not data: + return jsonify({ + "code": 400, + "message": "No JSON data provided" + }), 400 + + file_path = data.get("file_path") + file_url = data.get("file_url") + parser_id = data.get("parser_id", "naive") + config = data.get("config", {}) + + if not file_path and not file_url: + return jsonify({ + "code": 400, + "message": "Either file_path or file_url is required" + }), 400 + + # Handle file URL download + if file_url: + max_file_size = config.get('max_file_size', settings.DOC_MAXIMUM_SIZE) + download_timeout = config.get('download_timeout', DEFAULT_DOWNLOAD_TIMEOUT) + head_timeout = config.get('head_request_timeout', DEFAULT_HEAD_REQUEST_TIMEOUT) + + logger.info(f"Downloading file from URL: {file_url}") + try: + binary, error_msg = download_file_with_validation( + file_url, max_file_size, download_timeout, head_timeout + ) + if error_msg: + return jsonify({ + "code": 400, + "message": f"Failed to download file: {error_msg}" + }), 400 + + # Extract filename from URL or use provided filename + filename = config.get('filename') or file_url.split('/')[-1].split('?')[0] + if not filename: + filename = "downloaded_file" + except Exception as e: + logger.error(f"Error downloading file from URL: {e}", exc_info=True) + return jsonify({ + "code": 500, + "message": f"Failed to download file: {str(e)}" + }), 500 + else: + # Use file path + filename = file_path + binary = None + + service = PowerRAGSplitService() + result = service.split_file( + filename=filename, + binary=binary, + parser_id=parser_id, + config=config, + tenant_id=tenant_id, + ) + + return jsonify({ + "code": 0, + "data": result, + "message": "success" + }), 200 + + except Exception as e: + logger.error(f"Split file error: {e}", exc_info=True) + return jsonify({ + "code": 500, + "message": str(e) + }), 500 + + +@powerrag_bp.route("/split/file/upload", methods=["POST"]) +@apikey_required +async def split_file_upload(tenant_id): + """ + Split uploaded file into chunks using rag/app chunking methods + + Supports all ParserType methods: naive, qa, book, laws, paper, manual, + presentation, table, resume, picture, one, audio, email, tag, knowledge_graph, + title, regex, smart. + + Request (multipart/form-data): + - file: File to split (required) + - parser_id: Parser ID (optional, default: "naive") + - config: JSON string of parser config (optional) + + Config parameters: + - chunk_token_num (int): Target chunk size in tokens (default: 512) + - delimiter (str): Delimiter string for splitting (default: "\n。.;;!!??") + - lang (str): Language (default: "Chinese") + - from_page (int): Start page number (default: 0) + - to_page (int): End page number (default: 100000) + + Response: + { + "code": 0, + "data": { + "parser_id": "naive", + "chunks": ["chunk1", "chunk2", ...], + "total_chunks": 10, + "filename": "document.pdf" + } + } + """ + try: + # Check if file is present + files = await request.files + if 'file' not in files: + return jsonify({ + "code": 400, + "message": "No file provided" + }), 400 + + file = files['file'] + if file.filename == '': + return jsonify({ + "code": 400, + "message": "No file selected" + }), 400 + + # Get parameters + form = await request.form + parser_id = form.get('parser_id', 'naive') + + # Parse config from JSON string if provided + import json + config_str = form.get('config', '{}') + try: + config = json.loads(config_str) + except json.JSONDecodeError: + return jsonify({ + "code": 400, + "message": "Invalid JSON in config parameter" + }), 400 + + filename = file.filename + + # Read file binary (file.read() is synchronous in Quart) + binary = file.read() + if not binary: + return jsonify({ + "code": 400, + "message": "File is empty" + }), 400 + + service = PowerRAGSplitService() + result = service.split_file( + filename=filename, + binary=binary, + parser_id=parser_id, + config=config, + tenant_id=tenant_id, + ) + + return jsonify({ + "code": 0, + "data": result, + "message": "success" + }), 200 + + except Exception as e: + logger.error(f"Split file upload error: {e}", exc_info=True) + return jsonify({ + "code": 500, + "message": str(e) + }), 500 + + # ============================================================================ # 信息抽取接口 # ============================================================================ diff --git a/powerrag/server/services/split_service.py b/powerrag/server/services/split_service.py index e4b535938..738111a7c 100644 --- a/powerrag/server/services/split_service.py +++ b/powerrag/server/services/split_service.py @@ -34,6 +34,12 @@ logger = logging.getLogger(__name__) + +def _dummy_callback(prog=None, msg=""): + """No-op callback for parser progress; used when progress reporting is not needed.""" + pass + + # Chunker Factory - mapping parser_id to chunking module CHUNKER_FACTORY = {} @@ -50,12 +56,13 @@ class PowerRAGSplitService: def __init__(self): # 初始化时动态导入chunker,避免循环导入 self._init_chunker_factory() + self._init_file_chunker_factory() def _init_chunker_factory(self): """动态导入chunker模块,避免循环导入""" global CHUNKER_FACTORY if not CHUNKER_FACTORY: - # 直接引用同一模块中定义的函数 + # PowerRAG 专门的 chunker(仅支持文本切分) CHUNKER_FACTORY.update({ ParserType.TITLE.value: title_based_chunking, # PowerRAG Title Chunker ParserType.REGEX.value: regex_based_chunking, # PowerRAG regex Chunker @@ -117,11 +124,6 @@ def split_text(self, text: str, parser_id: str = "title", config: Dict[str, Any] chunker = self._get_chunker(parser_id) logger.info(f"Using chunker: {parser_id} for text splitting") - # Prepare callback - def dummy(prog=None, msg=""): - """Dummy callback for progress""" - pass - # Build parser_config based on parser_id if parser_id == ParserType.TITLE.value: # Title parser specific config @@ -158,9 +160,14 @@ def dummy(prog=None, msg=""): # Smart chunking returns a list of chunks directly chunks = chunker(text, parser_config=parser_config) else: - # Use config as-is for other parsers - chunks=[] - raise ValueError(f"Chunker not found for parser_id: {parser_id}") + # Other parser types (naive, qa, book, laws, etc.) are not supported for text splitting + # Use split_file method instead for file-based chunking + raise ValueError( + f"Parser '{parser_id}' is not supported for text splitting. " + f"Supported parsers for text splitting are: {ParserType.TITLE.value}, " + f"{ParserType.REGEX.value}, {ParserType.SMART.value}. " + f"For other parser types, please use split_file() method instead." + ) # Ensure all chunks are strings and handle encoding processed_chunks = [] @@ -196,6 +203,171 @@ def dummy(prog=None, msg=""): logger.error(f"Error splitting text with parser '{parser_id}': {e}", exc_info=True) raise + def _init_file_chunker_factory(self): + """初始化文件 chunker factory,映射 ParserType 到 rag/app 模块""" + # 延迟导入,避免循环导入 + if not hasattr(self, '_file_chunker_factory'): + self._file_chunker_factory = {} + try: + # 导入 rag/app 模块 + from rag.app import ( + laws, paper, presentation, manual, qa, table, book, resume, + picture, naive, one, audio, email, tag + ) + # 导入 powerrag/app 模块 + from powerrag.app import title as powerrag_title, regex as powerrag_regex, smart as powerrag_smart + + # 映射 ParserType 到对应的 chunk 模块 + self._file_chunker_factory = { + ParserType.NAIVE.value: naive, + ParserType.PAPER.value: paper, + ParserType.BOOK.value: book, + ParserType.PRESENTATION.value: presentation, + ParserType.MANUAL.value: manual, + ParserType.LAWS.value: laws, + ParserType.QA.value: qa, + ParserType.TABLE.value: table, + ParserType.RESUME.value: resume, + ParserType.PICTURE.value: picture, + ParserType.ONE.value: one, + ParserType.EMAIL.value: email, + ParserType.KG.value: naive, # knowledge_graph 使用 naive + ParserType.TAG.value: tag, + ParserType.TITLE.value: powerrag_title, # PowerRAG Title Parser + ParserType.REGEX.value: powerrag_regex, # PowerRAG Regex Parser + ParserType.SMART.value: powerrag_smart, # PowerRAG Smart Parser + } + except ImportError as e: + logger.warning(f"Failed to import some rag/app modules: {e}") + # 如果导入失败,至少提供基本的 naive chunker + try: + from rag.app import naive + self._file_chunker_factory = {ParserType.NAIVE.value: naive} + except ImportError: + logger.error("Failed to import naive chunker, file splitting will not work") + self._file_chunker_factory = {} + + def split_file(self, filename: str = None, binary: bytes = None, parser_id: str = "naive", + config: Dict[str, Any] = None, tenant_id: str = None) -> Dict[str, Any]: + """ + Split file into chunks using rag/app chunking methods + + Args: + filename: File path (optional if binary is provided) + binary: File binary content (optional if filename is provided) + parser_id: Parser/chunker ID (e.g., "naive", "book", "title") + config: Chunking configuration (optional) + tenant_id: Tenant ID (required for audio and picture parsers; used for LLM model lookup) + + Returns: + Dict containing chunks and metadata + + Example: + ```python + service = PowerRAGSplitService() + + # Using file path + result = service.split_file( + filename="/path/to/document.pdf", + parser_id="book", + config={"chunk_token_num": 512} + ) + + # Using binary + with open("document.pdf", "rb") as f: + binary = f.read() + result = service.split_file( + filename="document.pdf", + binary=binary, + parser_id="naive", + config={"chunk_token_num": 256} + ) + ``` + """ + if not filename and not binary: + raise ValueError("Either filename or binary must be provided") + + if filename and not binary: + # Read file from path + try: + with open(filename, "rb") as f: + binary = f.read() + except FileNotFoundError as e: + raise FileNotFoundError( + f"Failed to open file '{filename}' for splitting" + ) from e + + if not filename: + # Generate a temporary filename from binary or use default + filename = "temp_file" + + if config is None: + config = {} + + # Get chunker module + chunker_module = self._file_chunker_factory.get(parser_id.lower()) + if not chunker_module: + logger.warning(f"Chunker '{parser_id}' not found in file chunker factory, using naive") + chunker_module = self._file_chunker_factory.get(ParserType.NAIVE.value) + if not chunker_module: + raise ValueError(f"Chunker '{parser_id}' not found and naive chunker not available") + + # Build parser_config from config + parser_config = config.copy() + parser_config.setdefault("chunk_token_num", 512) + parser_config.setdefault("delimiter", "\n。.;;!!??") + + # Build kwargs + kwargs = { + "lang": config.get("lang", "Chinese"), + "callback": _dummy_callback, + "parser_config": parser_config, + "from_page": config.get("from_page", 0), + "to_page": config.get("to_page", 100000), + } + + if tenant_id: + kwargs["tenant_id"] = tenant_id + + # Add optional fields + if config.get("kb_id"): + kwargs["kb_id"] = config["kb_id"] + if config.get("doc_id"): + kwargs["doc_id"] = config["doc_id"] + + try: + # Call chunk function + logger.info(f"Calling chunk function for parser '{parser_id}' on file '{filename}'") + tokenized_chunks = chunker_module.chunk(filename, binary=binary, **kwargs) + + # Extract text content from tokenized chunks + chunks = [] + for chunk_dict in tokenized_chunks: + if isinstance(chunk_dict, dict): + # Extract content_with_weight or content field + content = chunk_dict.get("content_with_weight") or chunk_dict.get("content", "") + if content: + chunks.append(content) + elif isinstance(chunk_dict, str): + chunks.append(chunk_dict) + + logger.info(f"Split file '{filename}' with parser '{parser_id}': {len(chunks)} chunks") + + return { + "parser_id": parser_id, + "chunks": chunks, + "total_chunks": len(chunks), + "filename": filename, + "metadata": { + "chunker": "rag/app", + "config": config + } + } + + except Exception as e: + logger.error(f"Error splitting file '{filename}' with parser '{parser_id}': {e}", exc_info=True) + raise + # ============================================== # Shared utility functions for chunking