Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 115 additions & 5 deletions refactron/llm/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@
import os
import re
from pathlib import Path
from typing import List, Optional, Union
from typing import Dict, List, Optional, Union

from refactron.core.models import CodeIssue, IssueCategory, IssueLevel
from refactron.llm.backend_client import BackendLLMClient
from refactron.llm.client import GroqClient
from refactron.llm.models import RefactoringSuggestion, SuggestionStatus
from refactron.llm.prompts import DOCUMENTATION_PROMPT, SUGGESTION_PROMPT, SYSTEM_PROMPT
from refactron.llm.prompts import (
BATCH_TRIAGE_PROMPT,
BATCH_TRIAGE_SYSTEM_PROMPT,
DOCUMENTATION_PROMPT,
SUGGESTION_PROMPT,
SYSTEM_PROMPT,
)
from refactron.llm.safety import SafetyGate
from refactron.rag.retriever import ContextRetriever

Expand Down Expand Up @@ -43,12 +49,15 @@ def __init__(

self.safety_gate = safety_gate or SafetyGate()

def generate_suggestion(self, issue: CodeIssue, original_code: str) -> RefactoringSuggestion:
def generate_suggestion(
self, issue: CodeIssue, original_code: str, language: str = "python"
) -> RefactoringSuggestion:
"""Generate a refactoring suggestion for a code issue.

Args:
issue: The code issue to fix
original_code: The failing code snippet
language: The programming language of the code (default: "python")

Returns:
A validated refactoring suggestion
Expand All @@ -75,6 +84,7 @@ def generate_suggestion(self, issue: CodeIssue, original_code: str) -> Refactori
severity=issue.level.value,
original_code=original_code,
rag_context=rag_context,
language=language,
)

# 3. Call LLM
Expand Down Expand Up @@ -156,13 +166,14 @@ def generate_suggestion(self, issue: CodeIssue, original_code: str) -> Refactori
return suggestion

def generate_documentation(
self, code: str, file_path: str = "unknown"
self, code: str, file_path: str = "unknown", language: str = "python"
) -> RefactoringSuggestion:
"""Generate documentation for the provided code.

Args:
code: The code to document
file_path: Optional file path for context
language: The programming language of the code (default: "python")

Returns:
A suggestion containing the documented code
Expand All @@ -189,7 +200,9 @@ def generate_documentation(
rag_context = "\n\n".join(context_snippets) if context_snippets else "No context available."

# 2. Construct Prompt
prompt = DOCUMENTATION_PROMPT.format(original_code=code, rag_context=rag_context)
prompt = DOCUMENTATION_PROMPT.format(
original_code=code, rag_context=rag_context, language=language
)

# 3. Call LLM
try:
Expand Down Expand Up @@ -246,6 +259,103 @@ def generate_documentation(
status=SuggestionStatus.FAILED,
)

def evaluate_issues_batch(
self,
issues: List[CodeIssue],
source_code: str,
language: str = "python",
) -> Dict[str, float]:
"""Evaluate a batch of issues for a single file to suppress false positives.

Args:
issues: List of CodeIssues found in the file
source_code: The full source code of the file
language: The programming language of the code (default: "python")

Returns:
Dict mapping issue IDs (using rule_id or index) to confidence scores
"""
if not issues:
return {}

# 1. Retrieve Context
context_snippets = []
if self.retriever:
try:
results = self.retriever.retrieve_similar(source_code[:1000], top_k=3)
context_snippets = [r.content for r in results]
except Exception as e:
logger.warning(f"Context retrieval failed: {e}")

rag_context = (
"\n\n".join(context_snippets) if context_snippets else "No context available."
)

# 2. Construct JSON for issues
issues_data = {}
for i, issue in enumerate(issues):
base_id = getattr(issue, "rule_id", None) or "issue"
line_number = getattr(issue, "line_number", None)
id_parts = [str(base_id)]
if line_number is not None:
id_parts.append(str(line_number))
id_parts.append(str(i))
issue_id = ":".join(id_parts)

# Ensure uniqueness in case of unexpected collisions
unique_id = issue_id
suffix = 1
while unique_id in issues_data:
suffix += 1
unique_id = f"{issue_id}_{suffix}"
logger.warning(
f"Issue ID collision detected for '{issue_id}', using '{unique_id}'"
)

issues_data[unique_id] = {
"rule_id": getattr(issue, "rule_id", None),
"message": issue.message,
"line": issue.line_number,
"category": (
issue.category.value
if hasattr(issue.category, "value")
else str(issue.category)
),
"severity": (
issue.level.value if hasattr(issue.level, "value") else str(issue.level)
),
}

# 3. Construct Prompt
prompt = BATCH_TRIAGE_PROMPT.format(
source_code=source_code,
rag_context=rag_context,
issues_json=json.dumps(issues_data, indent=2),
language=language,
)

# 4. Call LLM with dedicated batch triage system prompt
try:
response_text = self.client.generate(
prompt=prompt, system=BATCH_TRIAGE_SYSTEM_PROMPT, temperature=0.1
)
clean_text = self._clean_json_response(response_text)
data = json.loads(clean_text, strict=False)

# Ensure we return a Dict[str, float]
result = {}
for k, v in data.items():
try:
result[str(k)] = float(v)
except (ValueError, TypeError):
result[str(k)] = 0.5 # Fallback for parsing errors
return result

except Exception as e:
logger.error(f"Batch triage failed: {e}")
# Fallback: return default confidence
return {str(k): 0.5 for k in issues_data.keys()}

def _clean_json_response(self, text: str) -> str:
"""Clean LLM response to extract JSON."""
text = text.strip()
Expand Down
42 changes: 38 additions & 4 deletions refactron/llm/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
Severity: {severity}

Original Code:
```python
```{language}
{original_code}
```

Expand All @@ -42,7 +42,7 @@
SAFETY_CHECK_PROMPT = """
Analyze the following code patch for safety risks:

```python
```{language}
{proposed_code}
```

Expand All @@ -62,10 +62,10 @@
"""

DOCUMENTATION_PROMPT = """
Analyze the following Python code and generate a comprehensive MARKDOWN documentation file.
Analyze the following {language} code and generate a comprehensive MARKDOWN documentation file.

Original Code:
```python
```{language}
{original_code}
```

Expand All @@ -92,3 +92,37 @@
The complete Markdown documentation content including the mermaid diagram
@@@END@@@
"""

BATCH_TRIAGE_SYSTEM_PROMPT = """You are a code triage expert.
Your goal is to evaluate code issues and determine whether each is a true positive
(requiring fixing) or a false positive.

RESPONSE FORMAT:
You must output ONLY valid JSON.
- Do not output markdown code blocks, just the raw JSON object.
- The JSON must be a flat map where keys are issue IDs (strings) and values are
confidence scores (floats between 0.0 and 1.0).
- A score of 0.0 means the issue is very likely a false positive.
- A score of 1.0 means the issue is very likely a true positive requiring a fix.
"""

BATCH_TRIAGE_PROMPT = """
Evaluate the following list of code issues found in a single file and determine
the confidence that each is a true positive (requiring fixing) rather than a
false positive.

File Source Code:
```{language}
{source_code}
```

Relevant Context (RAG):
{rag_context}

Issues to evaluate:
{issues_json}

Return ONLY a JSON map where the keys are the issue IDs and the values are the
confidence scores (float between 0.0 and 1.0).
Do NOT return anything except the JSON object.
"""
Loading