diff --git a/documents/adapter/input/web/document_router.py b/documents/adapter/input/web/document_router.py index 22e41f3..c9b85a6 100644 --- a/documents/adapter/input/web/document_router.py +++ b/documents/adapter/input/web/document_router.py @@ -1,6 +1,10 @@ from fastapi import APIRouter, UploadFile, File, Depends +from documents.adapter.input.web.request.rag_query_request import RAGQueryRequest +from documents.adapter.input.web.request.sentiment_request import SentimentRequest from documents.application.factory.analyze_document_usecase_factory import get_analyze_document_usecase +from documents.application.factory.rag_query_usecase_factory import get_rag_query_usecase +from documents.application.factory.sentiment_analysis_usecase_factory import get_sentiment_analysis_usecase from documents.application.factory.upload_file_usecase_factory import get_upload_document_usecase documentRouter = APIRouter() @@ -19,4 +23,20 @@ async def analyze_document( usecase = Depends(get_analyze_document_usecase) ): result = await usecase.execute(document_id) - return {"result": result} \ No newline at end of file + return {"result": result} + +@documentRouter.post("/rag/query") +async def rag_query( + request: RAGQueryRequest, + usecase = Depends(get_rag_query_usecase) +): + result = usecase.ask(request.query) + return {"result": result} + +@documentRouter.post("/sentiment") +async def sentiment_analysis( + request: SentimentRequest, + usecase = Depends(get_sentiment_analysis_usecase) +): + result = usecase.analyze(request.text) + return {"result": result} diff --git a/documents/adapter/input/web/request/__init__.py b/documents/adapter/input/web/request/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/documents/adapter/input/web/request/rag_query_request.py b/documents/adapter/input/web/request/rag_query_request.py new file mode 100644 index 0000000..16d26cf --- /dev/null +++ b/documents/adapter/input/web/request/rag_query_request.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + +class RAGQueryRequest(BaseModel): + query: str + top_k: int = 5 diff --git a/documents/adapter/input/web/request/sentiment_request.py b/documents/adapter/input/web/request/sentiment_request.py new file mode 100644 index 0000000..22dd36b --- /dev/null +++ b/documents/adapter/input/web/request/sentiment_request.py @@ -0,0 +1,4 @@ +from pydantic import BaseModel + +class SentimentRequest(BaseModel): + text: str diff --git a/documents/adapter/output/ai/multi_agent_analyzer.py b/documents/adapter/output/ai/multi_agent_analyzer.py index 5df7213..5ba6589 100644 --- a/documents/adapter/output/ai/multi_agent_analyzer.py +++ b/documents/adapter/output/ai/multi_agent_analyzer.py @@ -1,4 +1,4 @@ -# multi_agent_analyzer_token_chunks.py +# multi_agent_analyzer.py from typing import Annotated, List from langgraph.graph import StateGraph from transformers import pipeline, AutoTokenizer @@ -10,8 +10,9 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) + # --------------------------- -# LangGraph 호환 상태 스키마 (타입으로 제공) +# LangGraph state schema # --------------------------- class MyStateSchema: bullet_summary_done: bool = False @@ -20,8 +21,9 @@ class MyStateSchema: final_summary_done: bool = False answer_generated: bool = False + # --------------------------- -# PDF 읽기 유틸 +# PDF Reader # --------------------------- def read_pdf(file_path: str) -> str: doc = fitz.open(file_path) @@ -30,208 +32,209 @@ def read_pdf(file_path: str) -> str: pages.append(page.get_text()) return "\n".join(pages).strip() + # --------------------------- -# 토큰 단위 chunking (토크나이저 사용) -# - 안전하게 모델의 max_input_tokens를 넘기지 않도록 자른다. -# - special tokens 고려 -> margin으로 여유 둠 +# Token-based chunking # --------------------------- class TokenChunker: def __init__(self, tokenizer, max_input_tokens: int = 1024, margin_tokens: int = 50): - """ - max_input_tokens: 모델 encoder max length (e.g., bart-large-cnn ~1024) - margin_tokens: 여유 토큰 수 (프롬프트/특수문자/버퍼용) - """ self.tokenizer = tokenizer self.max_tokens = max_input_tokens self.margin = margin_tokens - # 실제로 chunk에 사용할 최대 토큰 수 self.chunk_size = max(64, self.max_tokens - self.margin) def chunk_text(self, text: str) -> List[str]: - """ - Returns list of text chunks, each decoded back to text, each chunk token length <= chunk_size - """ - # 토큰화(정수 id 리스트) token_ids = self.tokenizer.encode(text, add_special_tokens=False) + if not token_ids: + return [] + chunks = [] total = len(token_ids) - if total == 0: - return [] num_chunks = math.ceil(total / self.chunk_size) + for i in range(num_chunks): start = i * self.chunk_size end = start + self.chunk_size chunk_ids = token_ids[start:end] - # 디코드할 때 special tokens 생략 - chunk_text = self.tokenizer.decode(chunk_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True) + chunk_text = self.tokenizer.decode(chunk_ids, skip_special_tokens=True) chunks.append(chunk_text.strip()) + return chunks + # --------------------------- -# Analyzer: tokenizer 기반 chunk 요약 + final 합치기 + QA +# MultiAgent Analyzer # --------------------------- class MultiAgentAnalyzer: def __init__( self, model_name: str = "facebook/bart-large-cnn", - device: int = -1, # CPU:-1, GPU:0 + device: int = -1, encoder_max_tokens: int | None = None, - max_concurrent_workers: int = 3 ): - """ - encoder_max_tokens: model encoder max tokens, None이면 기본값 1024 사용 - """ - logger.info("Initializing tokenizer and summarization pipeline...") - self.tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True) - self.summarizer = pipeline("summarization", model=model_name, tokenizer=self.tokenizer, device=device) - # 모델의 encoder max tokens (기본값 1024) + logger.info("Initializing tokenizer and summarizer pipeline...") + + # ✅ 핵심 해결: fast tokenizer 제거 → Already borrowed 방지 + self.tokenizer = AutoTokenizer.from_pretrained( + model_name, + use_fast=False + ) + + # ✅ summarizer 초기화 + self.summarizer = pipeline( + "summarization", + model=model_name, + tokenizer=self.tokenizer, + device=device + ) + self.encoder_max_tokens = encoder_max_tokens or 1024 - self.chunker = TokenChunker(self.tokenizer, max_input_tokens=self.encoder_max_tokens, margin_tokens=50) - # 동시성 제어: thread pool 호출 폭주 방지 - self.semaphore = asyncio.Semaphore(max_concurrent_workers) - async def _summarize_single_chunk(self, prompt: str, max_length: int = 250, min_length: int = 20) -> str: + # ✅ token chunker 생성 + self.chunker = TokenChunker( + self.tokenizer, + max_input_tokens=self.encoder_max_tokens, + margin_tokens=50 + ) + + # ✅ 핵심 해결: pipeline + tokenizer thread-safe 보장을 위해 concurrency=1 + self.semaphore = asyncio.Semaphore(1) + + # 안전한 truncate (fast tokenizer 제거로 안정성 향상) + def _safe_truncate(self, text: str) -> str: + ids = self.tokenizer.encode( + text, + truncation=True, + max_length=self.chunker.chunk_size + ) + return self.tokenizer.decode(ids, skip_special_tokens=True) + + async def _summarize_single_chunk(self, prompt: str, max_length: int, min_length: int) -> str: loop = asyncio.get_event_loop() - # 파이프라인 호출을 thread executor로 감싸서 비동기에서 안전 사용 + def sync_call(): - # try/except로 safety net (예상치 못한 model error시 fallback) try: - out = self.summarizer(prompt, max_length=max_length, min_length=min_length, do_sample=False) - # pipeline은 리스트를 반환 - if isinstance(out, list) and len(out) > 0 and isinstance(out[0], dict): + # ✅ prompt 전체를 모델 입력 한도 이하로 강제 truncate + safe_ids = self.tokenizer.encode( + prompt, + truncation=True, + max_length=900, # ← BART 안정값 (1024보다 충분히 낮게) + ) + safe_prompt = self.tokenizer.decode(safe_ids, skip_special_tokens=True) + + out = self.summarizer( + safe_prompt, + max_length=max_length, + min_length=min_length, + do_sample=False + ) + if isinstance(out, list) and out and isinstance(out[0], dict): return out[0].get("summary_text", "").strip() - # fallback return str(out).strip() + except Exception as e: logger.exception("Summarizer failed on chunk: %s", e) - # 안전하게 입력을 잘라서 다시 시도 (토큰량 강제 제한) - encoded = self.tokenizer.encode(prompt, truncation=True, max_length=self.chunker.chunk_size) - truncated = self.tokenizer.decode(encoded, skip_special_tokens=True) - out2 = self.summarizer(truncated, max_length=max_length, min_length=min_length, do_sample=False) + + # ✅ fallback: 600 토큰까지 더 줄여서 재시도 + fallback_ids = self.tokenizer.encode( + prompt, + truncation=True, + max_length=600, + ) + fallback_prompt = self.tokenizer.decode(fallback_ids, skip_special_tokens=True) + + out2 = self.summarizer( + fallback_prompt, + max_length=max_length, + min_length=min_length, + do_sample=False + ) return out2[0].get("summary_text", "").strip() - # semaphore로 동시성 제어 async with self.semaphore: - result = await loop.run_in_executor(None, sync_call) - return result + return await loop.run_in_executor(None, sync_call) async def summarize_token_chunks(self, text: str, prompt_type: str) -> str: - """ - prompt_type: "bullet", "abstract", "casual" - returns: concatenated chunk summaries (joined by newline) - """ if not text: return "" chunks = self.chunker.chunk_text(text) - logger.info("Text split into %d chunks (token-based).", len(chunks)) + logger.info("Token chunks = %d", len(chunks)) + tasks = [] for chunk in chunks: if prompt_type == "bullet": - prompt = f"Summarize the following text in concise bullet points (use '-' or '*' per bullet):\n\n{chunk}" - max_len = 180 - min_len = 10 + prompt = f"Summarize as bullet points:\n\n{chunk}" + max_len, min_len = 180, 10 elif prompt_type == "abstract": - prompt = f"Write an academic-style abstract for the following text (single-paragraph):\n\n{chunk}" - max_len = 200 - min_len = 40 - elif prompt_type == "casual": - prompt = f"Write a short, casual, easy-to-understand summary for general readers:\n\n{chunk}" - max_len = 160 - min_len = 20 - else: - prompt = chunk - max_len = 150 - min_len = 20 - - tasks.append(self._summarize_single_chunk(prompt, max_length=max_len, min_length=min_len)) - - # chunk별 요약을 병렬 실행 (세마포어로 동시성 제한) - chunk_summaries = await asyncio.gather(*tasks) - # 정리: 중복 공백 제거 - cleaned = [s for s in (c.strip() for c in chunk_summaries) if s] - return "\n".join(cleaned) + prompt = f"Write an academic abstract:\n\n{chunk}" + max_len, min_len = 200, 40 + else: # casual + prompt = f"Write a casual summary:\n\n{chunk}" + max_len, min_len = 160, 20 + + tasks.append(self._summarize_single_chunk(prompt, max_len, min_len)) + + results = await asyncio.gather(*tasks) + return "\n".join([r for r in results if r.strip()]) async def generate_final_summary(self, bullet: str, abstract: str, casual: str) -> str: prompt = ( - "Merge the following summaries into a single, clear comprehensive overview. " - "Keep it concise (one or two paragraphs), avoid repeating phrases, and keep factual tone.\n\n" - f"Bullet Summary:\n{bullet}\n\nAbstract:\n{abstract}\n\nCasual Summary:\n{casual}\n" + "Merge these summaries into one clear overview, avoiding repetition.\n\n" + f"Bullet:\n{bullet}\n\nAbstract:\n{abstract}\n\nCasual:\n{casual}" ) - return await self._summarize_single_chunk(prompt, max_length=300, min_length=80) + return await self._summarize_single_chunk(prompt, 300, 80) async def generate_qa(self, text: str, num_qa: int = 6) -> str: - """ - Generate potential user Q&A pairs from the full text. - """ - # 안전: 길면 chunk 하나로 줄여서 진행 (QA는 전체 내용의 요약으로 생성) - # 먼저 간단한 doc-level summary (short) short_summary = await self._summarize_single_chunk( - f"Summarize the following text in 2-3 sentences:\n\n{text}", max_length=120, min_length=30 + f"Summarize in 2–3 sentences:\n\n{text}", + 120, 30 ) + prompt = ( - f"Based on the summary and content below, generate up to {num_qa} useful user questions and short answers. " - "Format each as: Q: ... A: ...\n\n" - f"Short summary:\n{short_summary}\n\nContent excerpt:\n{text[:4000]}" # 앞 부분만 넣음 (안정성) + f"Generate {num_qa} Q&A pairs based on:\n\n" + f"Summary:\n{short_summary}\n\n" + f"Content excerpt:\n{text[:4000]}" ) - return await self._summarize_single_chunk(prompt, max_length=300, min_length=60) + return await self._summarize_single_chunk(prompt, 300, 60) async def run(self, local_path: str): - # 1) PDF 읽기 text_content = read_pdf(local_path) if not text_content: - raise ValueError("PDF content is empty or not readable.") + raise ValueError("PDF content empty") - # 2) LangGraph compatible StateGraph (type + reducer) graph = StateGraph(Annotated[MyStateSchema, "reducer"]) state = MyStateSchema() - # 3) Parallel chunked summaries - bullet_task = self.summarize_token_chunks(text_content, "bullet") - abstract_task = self.summarize_token_chunks(text_content, "abstract") - casual_task = self.summarize_token_chunks(text_content, "casual") - - bullet_summary, abstract_summary, casual_summary = await asyncio.gather( - bullet_task, abstract_task, casual_task + bullet, abstract, casual = await asyncio.gather( + self.summarize_token_chunks(text_content, "bullet"), + self.summarize_token_chunks(text_content, "abstract"), + self.summarize_token_chunks(text_content, "casual"), ) + state.bullet_summary_done = True state.abstract_summary_done = True state.casual_summary_done = True - # 4) Final merged summary - final_summary = await self.generate_final_summary(bullet_summary, abstract_summary, casual_summary) + final_summary = await self.generate_final_summary(bullet, abstract, casual) state.final_summary_done = True - # 5) QA generation (uses short summary + excerpt) - answer = await self.generate_qa(text_content, num_qa=6) + qa = await self.generate_qa(text_content) state.answer_generated = True - # 6) 결과 - state_dict = { - "bullet_summary_done": state.bullet_summary_done, - "abstract_summary_done": state.abstract_summary_done, - "casual_summary_done": state.casual_summary_done, - "final_summary_done": state.final_summary_done, - "answer_generated": state.answer_generated, - } - return { "summaries": { - "bullet": bullet_summary, - "abstract": abstract_summary, - "casual": casual_summary, + "bullet": bullet, + "abstract": abstract, + "casual": casual, }, "final_summary": final_summary, - "answer": answer, - "state": state_dict + "answer": qa, + "state": { + "bullet_summary_done": state.bullet_summary_done, + "abstract_summary_done": state.abstract_summary_done, + "casual_summary_done": state.casual_summary_done, + "final_summary_done": state.final_summary_done, + "answer_generated": state.answer_generated, + } } - -# --------------------------- -# 사용 예시 -# --------------------------- -# if __name__ == "__main__": -# import asyncio -# analyzer = MultiAgentAnalyzer(device=-1) # CPU -# res = asyncio.run(analyzer.run("downloaded_docs/sample.pdf")) -# from pprint import pprint -# pprint(res) diff --git a/documents/adapter/output/ai/rag_pipeline_adapter.py b/documents/adapter/output/ai/rag_pipeline_adapter.py new file mode 100644 index 0000000..335319d --- /dev/null +++ b/documents/adapter/output/ai/rag_pipeline_adapter.py @@ -0,0 +1,109 @@ +import unicodedata +from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline +from documents.domain.port.rag_port import RAGPort + + +# 하드코딩 문서 +RAG_BASE_DOCS = [ + """ + 도메인 분리 기준: + 1. 변경 이유가 다르면 분리한다. + 2. 비즈니스 규칙은 도메인에만 있어야 한다. + 3. 인프라(저장소, 외부 API)가 도메인을 참조하면 안 된다. + 4. 도메인 객체는 서로 과도하게 의존하면 안 된다. + """, + + """ + 도메인 설계 원칙: + 1. 고수준 정책과 저수준 구현을 분리한다. + 2. 도메인 서비스는 도메인 로직만 담당한다. + 3. 애플리케이션 서비스는 도메인을 조합하는 역할이다. + 4. 도메인은 기술 세부 사항을 몰라야 한다. + """, + + """ + 도메인 분리 실수: + 1. 도메인에 저장소 로직을 넣는 것. + 2. 엔티티/밸류가 너무 많은 기능을 가지는 것. + 3. 유틸리티나 헬퍼가 비즈니스 규칙을 알게 되는 것. + 4. CQRS에서 Read/Write 모델이 섞여버리는 것. + """ +] + + +def clean_text(text: str) -> str: + if not isinstance(text, str): + text = str(text) + text = unicodedata.normalize("NFKC", text) + text = "".join(ch for ch in text if ch.isprintable()) + return text.strip() + + +class RAGPipelineAdapter(RAGPort): + def __init__(self, vector_db_adapter): + self.vector_db = vector_db_adapter + self.retriever = vector_db_adapter.as_retriever() + + # ✅ 한국어 T5 모델로 변경 (강력) + model_name = "lcw99/t5-large-korean-text-summary" + + tokenizer = AutoTokenizer.from_pretrained(model_name) + model = AutoModelForSeq2SeqLM.from_pretrained(model_name) + + self.llm = pipeline( + "text2text-generation", + model=model, + tokenizer=tokenizer, + max_length=256, + num_beams=5, + device=-1 # CPU + ) + + def _safe_retrieve_faiss(self, query: str) -> str: + try: + docs = self.retriever.invoke(query) + if isinstance(docs, list): + return "\n\n".join( + clean_text(d.page_content) if hasattr(d, "page_content") else clean_text(str(d)) + for d in docs + ) + if hasattr(docs, "page_content"): + return clean_text(d.page_content) + return clean_text(str(docs)) + except: + return "" + + def _search_hardcoded_docs(self, query: str) -> str: + q = query.lower() + result = [] + for doc in RAG_BASE_DOCS: + if "도메인" in doc or "분리" in doc or "설계" in doc: + result.append(doc) + return "\n\n".join(result) + + def answer(self, query: str) -> str: + context = "\n\n".join([ + self._safe_retrieve_faiss(query), + self._search_hardcoded_docs(query) + ]).strip() + + if not context: + return "관련된 정보를 찾을 수 없습니다." + + prompt = f""" + 아래 내용을 기반으로 질문에 답하세요. + + [Context] + {context} + + [Question] + {query} + + 자연스럽고 명확한 한국어로 설명하시오. + """ + + out = self.llm(prompt)[0]["generated_text"] + return clean_text(out) + + def query(self, query: str) -> str: + return self.answer(query) diff --git a/documents/adapter/output/ai/sentiment_agent.py b/documents/adapter/output/ai/sentiment_agent.py new file mode 100644 index 0000000..7c92137 --- /dev/null +++ b/documents/adapter/output/ai/sentiment_agent.py @@ -0,0 +1,36 @@ +# documents/adapter/output/ai/sentiment_agent.py +from transformers import pipeline +from documents.domain.port.sentiment_analysis_port import SentimentAnalysisPort + +class SentimentAgent(SentimentAnalysisPort): + """ + 한국어 감정 분석용 에이전트 (로그인 필요 없는 공개 모델 사용) + """ + + def __init__(self): + model_name = "nlpai-lab/korean-sentiment-analysis" + + try: + self.analyzer = pipeline( + "sentiment-analysis", + model=model_name, + ) + except Exception: + # fallback: 영어 기본 모델 + self.analyzer = pipeline("sentiment-analysis") + + def analyze(self, text: str) -> dict: + if not text: + return {"label": "unknown", "score": 0.0} + + safe_text = text[:2000] + + try: + result = self.analyzer(safe_text) + except Exception: + return {"label": "error", "score": 0.0} + + if isinstance(result, list) and len(result) > 0: + return result[0] + + return {"label": "unknown", "score": 0.0} diff --git a/documents/adapter/output/ai/vector_db_adapter.py b/documents/adapter/output/ai/vector_db_adapter.py new file mode 100644 index 0000000..da95485 --- /dev/null +++ b/documents/adapter/output/ai/vector_db_adapter.py @@ -0,0 +1,30 @@ +from langchain_community.vectorstores import FAISS +from langchain_community.embeddings import HuggingFaceEmbeddings +from documents.domain.port.vector_db_port import VectorDBPort + + +class FAISSVectorDBAdapter(VectorDBPort): + def __init__(self): + model_name = "sentence-transformers/all-MiniLM-L6-v2" + self.embedding = HuggingFaceEmbeddings(model_name=model_name) + + # dummy로 초기 인덱스 생성 + self.db = FAISS.from_texts(["dummy"], self.embedding) + + # ✅ 실제 존재하는 인덱스 ID만 삭제 + try: + if len(self.db.index_to_docstore_id) > 0: + # key = 실제 FAISS 인덱스 번호 (0,1,2,...) + faiss_index_id = list(self.db.index_to_docstore_id.keys())[0] + self.db.delete([faiss_index_id]) + except Exception as e: + print("Dummy delete skipped:", e) + + def add_document(self, doc_id: str, content: str): + self.db.add_texts([content], ids=[doc_id]) + + def search_similar(self, query: str, top_k: int = 5): + return self.db.similarity_search(query, k=top_k) + + def as_retriever(self): + return self.db.as_retriever() diff --git a/documents/application/factory/analyze_document_usecase_factory.py b/documents/application/factory/analyze_document_usecase_factory.py index 860175f..0322f83 100644 --- a/documents/application/factory/analyze_document_usecase_factory.py +++ b/documents/application/factory/analyze_document_usecase_factory.py @@ -1,3 +1,4 @@ +from documents.adapter.output.ai.vector_db_adapter import FAISSVectorDBAdapter from documents.adapter.output.persistence.document_repository_adapter import DocumentRepositoryAdapter from documents.adapter.output.storage.s3_storage_adapter import S3StorageAdapter from documents.adapter.output.ai.multi_agent_analyzer import MultiAgentAnalyzer @@ -10,5 +11,11 @@ def get_analyze_document_usecase(): document_repo = DocumentRepositoryAdapter() storage_adapter = S3StorageAdapter(bucket_name) analyzer = MultiAgentAnalyzer() + vector_db_adapter = FAISSVectorDBAdapter() - return AnalyzeDocumentUseCase(document_repo, storage_adapter, analyzer) + return AnalyzeDocumentUseCase( + document_repo, + storage_adapter, + analyzer, + vector_db_adapter + ) diff --git a/documents/application/factory/rag_query_usecase_factory.py b/documents/application/factory/rag_query_usecase_factory.py new file mode 100644 index 0000000..1c38bbd --- /dev/null +++ b/documents/application/factory/rag_query_usecase_factory.py @@ -0,0 +1,9 @@ +from documents.adapter.output.ai.vector_db_adapter import FAISSVectorDBAdapter +from documents.adapter.output.ai.rag_pipeline_adapter import RAGPipelineAdapter +from documents.application.usecase.rag_query_usecase import RAGQueryUsecase + + +def get_rag_query_usecase(): + vector_db = FAISSVectorDBAdapter() + rag_adapter = RAGPipelineAdapter(vector_db) + return RAGQueryUsecase(rag_adapter) diff --git a/documents/application/factory/sentiment_analysis_usecase_factory.py b/documents/application/factory/sentiment_analysis_usecase_factory.py new file mode 100644 index 0000000..d20b425 --- /dev/null +++ b/documents/application/factory/sentiment_analysis_usecase_factory.py @@ -0,0 +1,7 @@ +from documents.adapter.output.ai.sentiment_agent import SentimentAgent +from documents.application.usecase.sentiment_analysis_usecase import SentimentAnalysisUseCase + + +def get_sentiment_analysis_usecase(): + sentiment_agent = SentimentAgent() + return SentimentAnalysisUseCase(sentiment_agent) diff --git a/documents/application/usecase/analyze_document_usecase.py b/documents/application/usecase/analyze_document_usecase.py index 29f6370..0aa3a8d 100644 --- a/documents/application/usecase/analyze_document_usecase.py +++ b/documents/application/usecase/analyze_document_usecase.py @@ -1,13 +1,14 @@ from documents.domain.entity.document import Document class AnalyzeDocumentUseCase: - def __init__(self, document_repo, storage_adapter, analyzer): + def __init__(self, document_repo, storage_adapter, analyzer, vector_db_adapter): self.document_repo = document_repo self.storage_adapter = storage_adapter self.analyzer = analyzer + self.vector_db = vector_db_adapter async def execute(self, document_id: int): - # DB에서 Document 조회 + # ✅ 1. DB에서 문서 조회 document: Document = self.document_repo.find_by_id(document_id) if not document: raise ValueError(f"Document with id={document_id} not found") @@ -15,10 +16,26 @@ async def execute(self, document_id: int): s3_url = str(document.path.s3_url) print(f"Downloading from S3: {s3_url}") - # S3에서 로컬 다운로드 + # ✅ 2. S3에서 로컬 다운로드 local_path = await self.storage_adapter.download_file(s3_url) print(f"Downloaded to: {local_path}") - # 멀티에이전트 분석 실행 + # ✅ 3. 멀티 에이전트 분석 실행 result = await self.analyzer.run(local_path) + + # ✅ 4. 분석된 요약을 모두 합쳐 RAG 벡터 DB에 저장 + # bullet/abstract/casual 요약 필드가 있다고 가정 + summaries = result.get("summaries", {}) + + # ✅ 빈 텍스트 방지 + full_text = "\n".join([ + summaries.get("bullet", ""), + summaries.get("abstract", ""), + summaries.get("casual", "") + ]).strip() + + if full_text: + print(f"🔍 Adding document {document_id} to vector DB...") + self.vector_db.add_document(str(document_id), full_text) + return result diff --git a/documents/application/usecase/rag_query_usecase.py b/documents/application/usecase/rag_query_usecase.py new file mode 100644 index 0000000..5d54f2c --- /dev/null +++ b/documents/application/usecase/rag_query_usecase.py @@ -0,0 +1,6 @@ +class RAGQueryUsecase: + def __init__(self, rag_service): + self.rag_service = rag_service + + def ask(self, query: str) -> str: + return self.rag_service.query(query) diff --git a/documents/application/usecase/sentiment_analysis_usecase.py b/documents/application/usecase/sentiment_analysis_usecase.py new file mode 100644 index 0000000..e4507c7 --- /dev/null +++ b/documents/application/usecase/sentiment_analysis_usecase.py @@ -0,0 +1,10 @@ +from documents.domain.port.sentiment_analysis_port import SentimentAnalysisPort + +class SentimentAnalysisUseCase: + + def __init__(self, sentiment_agent: SentimentAnalysisPort): + self.sentiment_agent = sentiment_agent + + def analyze(self, text: str) -> dict: + return self.sentiment_agent.analyze(text) + diff --git a/documents/domain/entity/sentiment.py b/documents/domain/entity/sentiment.py new file mode 100644 index 0000000..e69de29 diff --git a/documents/domain/port/rag_port.py b/documents/domain/port/rag_port.py new file mode 100644 index 0000000..26b07bb --- /dev/null +++ b/documents/domain/port/rag_port.py @@ -0,0 +1,8 @@ +from abc import ABC, abstractmethod + +class RAGPort(ABC): + + @abstractmethod + def answer(self, query: str) -> str: + """Return an answer using RAG (Retriever + Generator).""" + pass diff --git a/documents/domain/port/sentiment_analysis_port.py b/documents/domain/port/sentiment_analysis_port.py new file mode 100644 index 0000000..c765cff --- /dev/null +++ b/documents/domain/port/sentiment_analysis_port.py @@ -0,0 +1,7 @@ +from abc import ABC, abstractmethod + +class SentimentAnalysisPort(ABC): + + @abstractmethod + def analyze(self, text: str) -> dict: + pass diff --git a/documents/domain/port/vector_db_port.py b/documents/domain/port/vector_db_port.py new file mode 100644 index 0000000..5dff78d --- /dev/null +++ b/documents/domain/port/vector_db_port.py @@ -0,0 +1,8 @@ +from abc import ABC, abstractmethod + +class VectorDBPort(ABC): + @abstractmethod + def add_document(self, doc_id: str, content: str): ... + + @abstractmethod + def search_similar(self, query: str, top_k: int = 5): ... diff --git a/documents/infrastructure/config/vector_db_config.py b/documents/infrastructure/config/vector_db_config.py new file mode 100644 index 0000000..7634c88 --- /dev/null +++ b/documents/infrastructure/config/vector_db_config.py @@ -0,0 +1,7 @@ +from pydantic import BaseSettings + +class VectorDBConfig(BaseSettings): + vector_db_path: str = "./vector_db" + + class Config: + env_file = ".env"