diff --git a/backend/src/agents/agent_factory.py b/backend/src/agents/agent_factory.py index e69de29..5c0b8be 100644 --- a/backend/src/agents/agent_factory.py +++ b/backend/src/agents/agent_factory.py @@ -0,0 +1,237 @@ +""" +Fabrica de agentes de analisis. + +Implementa un patron Singleton que mantiene un registro +de los tipos de agentes disponibles y crea instancias +segun se necesiten. +""" + +import logging +import threading +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Type + +from src.agents.base_agent import BaseAgent + +if TYPE_CHECKING: + from src.core.events.event_bus import EventBus + from src.schemas.agent_config import AgentConfig + +logger = logging.getLogger(__name__) + + +class AgentFactory: + """ + Fabrica Singleton para agentes de analisis. + + Responsabilidades: + - Mantener un registro de nombre de agente -> clase de agente. + - Crear instancias de agentes bajo demanda. + - Reutilizar instancias para cada nombre logico de agente. + """ + + _instance: Optional["AgentFactory"] = None + _lock: threading.Lock = threading.Lock() + + def __init__(self) -> None: + """ + Constructor privado segun el patron Singleton. + + No debe llamarse directamente; usar get_instance(). + """ + # Estas propiedades se inicializan solo una vez por instancia + self._registry: Dict[str, Type[BaseAgent]] = {} + self._instances: Dict[str, BaseAgent] = {} + + self._register_default_agents() + + # ------------------------------------------------------------------ + # Acceso al Singleton + # ------------------------------------------------------------------ + @classmethod + def get_instance(cls) -> "AgentFactory": + """ + Devuelve la instancia unica de AgentFactory. + + Crea la instancia de forma segura en presencia de hilos. + """ + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = cls() + return cls._instance + + @classmethod + def _reset_instance(cls) -> None: + """ + Resetea la instancia unica. + + Metodo pensado para pruebas unitarias, para poder + recrear el estado de la fabrica desde cero. + """ + with cls._lock: + cls._instance = None + + # ------------------------------------------------------------------ + # Registro de tipos de agentes + # ------------------------------------------------------------------ + def _register_default_agents(self) -> None: + """ + Registra los agentes base disponibles en el sistema. + + Se hace el import dentro del metodo para evitar + importaciones circulares al cargar los modulos. + """ + try: + from src.agents.quality_agent import QualityAgent + from src.agents.security_agent import SecurityAgent + + # from src.agents.performance_agent import PerformanceAgent + from src.agents.style_agent import StyleAgent + + self.register_agent("security", SecurityAgent) + self.register_agent("quality", QualityAgent) + # self.register_agent("performance", PerformanceAgent) + self.register_agent("style", StyleAgent) + + logger.info( + "Agentes por defecto registrados: %s", + ", ".join(self.get_registered_agents()), + ) + except ImportError as exc: + logger.warning("No fue posible registrar los agentes por defecto: %s", exc) + + def register_agent(self, name: str, agent_class: Type[BaseAgent]) -> None: + """ + Registra un nuevo tipo de agente en la fabrica. + + Args: + name: Nombre logico del agente (por ejemplo, 'security'). + agent_class: Clase concreta que implementa BaseAgent. + """ + self._registry[name] = agent_class + logger.info("Agente registrado en factory: %s -> %s", name, agent_class.__name__) + + def unregister_agent(self, name: str) -> None: + """ + Elimina un agente del registro y de las instancias cacheadas. + + Si el nombre no existe, no hace nada. + """ + if name in self._registry: + self._registry.pop(name, None) + self._instances.pop(name, None) + logger.info("Agente desregistrado de factory: %s", name) + + def get_registered_agents(self) -> List[str]: + """ + Devuelve la lista de nombres de agentes registrados. + """ + return list(self._registry.keys()) + + # ------------------------------------------------------------------ + # Creacion y obtencion de instancias + # ------------------------------------------------------------------ + def create_agent( + self, + name: str, + config: Optional["AgentConfig"] = None, + event_bus: Optional["EventBus"] = None, + ) -> BaseAgent: + """ + Crea una nueva instancia de agente para el nombre indicado. + + Intenta pasar config y event_bus al constructor del agente. + Si la firma del constructor no los acepta, se hace un + fallback para crear la instancia con los argumentos + minimos posibles. + + Args: + name: Nombre logico del agente registrado. + config: Objeto de configuracion del agente (opcional). + event_bus: EventBus compartido para que el agente emita eventos. + + Raises: + ValueError: Si el agente no esta registrado. + """ + if name not in self._registry: + raise ValueError(f"El agente '{name}' no esta registrado en AgentFactory") + + agent_cls = self._registry[name] + + # Intentar distintas combinaciones segun el soporte de cada agente + init_kwargs: Dict[str, Any] = {} + if config is not None: + init_kwargs["config"] = config + if event_bus is not None: + init_kwargs["event_bus"] = event_bus + + try: + agent = agent_cls(**init_kwargs) + except TypeError: + # Si el agente no soporta alguno de los kwargs, intentar degradar + try: + # Intentar solo con event_bus + if "event_bus" in init_kwargs: + agent = agent_cls(event_bus=init_kwargs["event_bus"]) + else: + raise TypeError() + except TypeError: + # Intentar sin argumentos + agent = agent_cls() + + self._instances[name] = agent + logger.debug("Instancia creada para agente '%s': %r", name, agent) + return agent + + def get_agent( + self, + name: str, + config: Optional["AgentConfig"] = None, + event_bus: Optional["EventBus"] = None, + ) -> BaseAgent: + """ + Devuelve una instancia de agente. + + Si ya existe una instancia cacheada para ese nombre, + se reutiliza. Si no existe, se crea una nueva. + + Nota: si ya hay una instancia cacheada se devuelve tal cual, + por lo que config y event_bus solo se aplican en la primera + creacion de la instancia. + """ + if name in self._instances: + return self._instances[name] + return self.create_agent(name, config=config, event_bus=event_bus) + + def get_all_agents( + self, + config: Optional["AgentConfig"] = None, + event_bus: Optional["EventBus"] = None, + only_enabled: bool = True, + ) -> List[BaseAgent]: + """ + Devuelve las instancias de todos los agentes registrados. + + Por defecto solo devuelve los agentes que estan habilitados + segun la propiedad enabled de BaseAgent. + + Args: + config: Configuracion comun que se puede pasar a los agentes + que se creen por primera vez (opcional). + event_bus: EventBus a inyectar en los agentes que se creen + por primera vez. + only_enabled: Si es True, filtra solo agentes habilitados. + + Returns: + Lista de instancias de agentes. + """ + agents: List[BaseAgent] = [ + self.get_agent(name, config=config, event_bus=event_bus) + for name in self._registry.keys() + ] + + if not only_enabled: + return agents + + enabled_agents = [a for a in agents if a.is_enabled()] + return enabled_agents diff --git a/backend/src/agents/orchestrator.py b/backend/src/agents/orchestrator.py index e69de29..74285cf 100644 --- a/backend/src/agents/orchestrator.py +++ b/backend/src/agents/orchestrator.py @@ -0,0 +1,259 @@ +""" +Orquestador de agentes de analisis. + +Coordina la ejecucion paralela de los agentes registrados +y agrega sus hallazgos en una lista unica. +""" + +import logging +from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +from src.agents.agent_factory import AgentFactory +from src.agents.base_agent import BaseAgent +from src.core.events.event_bus import EventBus +from src.schemas.analysis import AnalysisContext +from src.schemas.finding import Finding, Severity + +logger = logging.getLogger(__name__) + + +class OrchestratorAgent: + """ + Orquesta la ejecucion de multiples agentes de analisis en paralelo. + + Responsabilidades: + - Obtener los agentes registrados y habilitados desde AgentFactory. + - Ejecutar cada agente en paralelo usando ThreadPoolExecutor. + - Manejar fallos individuales de agentes sin detener todo el proceso. + - Agregar y normalizar los hallazgos en una lista unica. + - Calcular una puntuacion de calidad basada en los hallazgos. + """ + + def __init__( + self, + agent_factory: Optional[AgentFactory] = None, + event_bus: Optional[EventBus] = None, + ai_explainer: Optional[Any] = None, + max_workers: int = 4, + timeout_seconds: int = 30, + ) -> None: + """ + Inicializa el orquestador. + + Args: + agent_factory: Fabrica de agentes a utilizar. Si es None, + se usa la instancia Singleton de AgentFactory. + event_bus: EventBus compartido para agentes y orquestador. + ai_explainer: Servicio opcional para generar explicaciones de IA. + max_workers: Numero maximo de hilos para la ejecucion paralela. + timeout_seconds: Tiempo maximo de espera por cada agente. + """ + self.agent_factory: AgentFactory = agent_factory or AgentFactory.get_instance() + self.event_bus: EventBus = event_bus or EventBus() + self.ai_explainer: Optional[Any] = ai_explainer + + self.executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=max_workers) + self.registered_agents: List[str] = self.agent_factory.get_registered_agents() + self.timeout_seconds: int = timeout_seconds + + # ------------------------------------------------------------------ + # API principal + # ------------------------------------------------------------------ + def orchestrate_analysis(self, context: AnalysisContext) -> List[Finding]: + """ + Orquesta el analisis completo y devuelve la lista de hallazgos. + + Flujo: + 1. Ejecutar agentes en paralelo y recolectar hallazgos por agente. + 2. Agregar todos los hallazgos en una sola lista. + 3. Calcular la puntuacion de calidad (solo para log). + + Returns: + Lista de hallazgos agregados y ordenados. + """ + logger.info( + "Iniciando orquestacion de analisis %s para archivo %s", + context.analysis_id, + context.filename, + ) + + # Paso 1: ejecucion paralela de los agentes + results_by_agent = self.execute_agents_parallel(context) + + # Paso 2: agregacion de hallazgos + findings = self._aggregate_findings(results_by_agent) + + # Paso 3: calculo de puntuacion de calidad (solo log) + quality_score = self.calculate_quality_score(findings) + + logger.info( + "Orquestacion completada para analisis %s. Hallazgos: %d, score: %d", + context.analysis_id, + len(findings), + quality_score, + ) + + return findings + + def execute_agents_parallel( + self, + context: AnalysisContext, + ) -> Dict[str, List[Finding]]: + """ + Ejecuta todos los agentes registrados y habilitados en paralelo. + + Args: + context: Contexto de analisis que se pasa a cada agente. + + Returns: + Diccionario con la forma: + nombre_agente -> lista de Finding + """ + # Obtener agentes habilitados e inyectar event_bus + agents: List[BaseAgent] = self.agent_factory.get_all_agents( + event_bus=self.event_bus, + only_enabled=True, + ) + + results: Dict[str, List[Finding]] = {} + + if not agents: + logger.warning("No hay agentes habilitados para ejecutar.") + return results + + logger.debug( + "Ejecutando agentes en paralelo: %s", + ", ".join(a.name for a in agents), + ) + + future_to_agent = { + self.executor.submit(self._run_single_agent, agent, context): agent for agent in agents + } + + for future in as_completed(future_to_agent): + agent = future_to_agent[future] + agent_name = agent.name + + try: + findings = future.result(timeout=self.timeout_seconds) + results[agent_name] = findings + logger.info( + "Agente %s finalizo con %d hallazgos", + agent_name, + len(findings), + ) + except TimeoutError as exc: + self.handle_agent_failure(agent_name, exc, context) + except Exception as exc: # pylint: disable=broad-except + self.handle_agent_failure(agent_name, exc, context) + + return results + + def calculate_quality_score(self, findings: List[Finding]) -> int: + """ + Calcula una puntuacion de calidad global (0-100) a partir de los hallazgos. + + Estrategia: + - Partir de 100 puntos. + - Restar un castigo por cada hallazgo segun su severidad. + - Hacer clamp del resultado al rango [0, 100]. + """ + if not findings: + return 100 + + score = 100 + + for finding in findings: + # Si Finding implementa get_severity_penalty() se prioriza ese metodo + try: + penalty = finding.get_severity_penalty() + except AttributeError: + # Fallback: usar un mapa simple basado en Severity + if finding.severity == Severity.CRITICAL: + penalty = 10 + elif finding.severity == Severity.HIGH: + penalty = 5 + elif finding.severity == Severity.MEDIUM: + penalty = 2 + elif finding.severity == Severity.LOW: + penalty = 1 + else: + penalty = 0 + score -= penalty + + # Asegurar que este en el rango 0-100 + return max(0, min(100, score)) + + def handle_agent_failure( + self, + agent_name: str, + error: Exception, + context: Optional[AnalysisContext] = None, + ) -> None: + """ + Maneja el fallo de un agente individual. + + No detiene el analisis completo, solo registra el error + y publica un evento para que los observadores puedan reaccionar. + """ + logger.error("El agente %s fallo: %s", agent_name, error) + + # Publicar evento de fallo si el EventBus soporta esta interfaz + try: + self.event_bus.publish( + "AGENT_FAILED", + { + "agent_name": agent_name, + "analysis_id": str(context.analysis_id) if context else None, + "error": str(error), + "timestamp": datetime.now(timezone.utc).isoformat(), + }, + ) + except Exception: # pylint: disable=broad-except + # No queremos que un fallo en el EventBus rompa el orquestador + logger.exception("Error publicando evento AGENT_FAILED para %s", agent_name) + + # ------------------------------------------------------------------ + # Helpers internos + # ------------------------------------------------------------------ + def _run_single_agent( + self, + agent: BaseAgent, + context: AnalysisContext, + ) -> List[Finding]: + """ + Ejecuta un agente individual contra el contexto dado. + + Se asume que el propio agente se encarga de emitir sus eventos + de inicio, exito o fallo usando el event_bus que se le inyecto. + """ + logger.debug( + "Ejecutando agente %s sobre analisis %s", + agent.name, + context.analysis_id, + ) + findings = agent.analyze(context) + return findings + + def _aggregate_findings( + self, + results: Dict[str, List[Finding]], + ) -> List[Finding]: + """ + Convierte el dict {agente -> hallazgos} en una lista plana. + + Ademas, puede completar metadata del hallazgo con el tipo + de agente si el modelo Finding lo permite. + """ + aggregated: List[Finding] = [] + + for agent_name, findings in results.items(): + for f in findings: + # Si Finding tiene un campo agent_type intentar rellenarlo + if hasattr(f, "agent_type") and not getattr(f, "agent_type", None): + setattr(f, "agent_type", agent_name) + aggregated.append(f) + + return aggregated diff --git a/backend/src/services/analysis_service.py b/backend/src/services/analysis_service.py index 63b674e..f36215b 100644 --- a/backend/src/services/analysis_service.py +++ b/backend/src/services/analysis_service.py @@ -8,16 +8,13 @@ from fastapi import HTTPException, UploadFile -from src.agents.performance_agent import PerformanceAgent -from src.agents.quality_agent import QualityAgent -from src.agents.security_agent import SecurityAgent -from src.agents.style_agent import StyleAgent +from src.agents.orchestrator import OrchestratorAgent from src.core.events.analysis_events import AnalysisEventType from src.core.events.event_bus import EventBus from src.models.enums.review_status import ReviewStatus from src.repositories.code_review_repository import CodeReviewRepository from src.schemas.analysis import AnalysisContext, CodeReview -from src.schemas.finding import Finding, Severity +from src.schemas.finding import Finding from src.utils.logger import logger @@ -36,6 +33,8 @@ def __init__(self, repo: CodeReviewRepository): """ self.repo = repo self.event_bus = EventBus() + # Orquestador de la capa de dominio, reutilizando el mismo EventBus + self.orchestrator = OrchestratorAgent(event_bus=self.event_bus) async def analyze_code(self, file: UploadFile, user_id: str) -> CodeReview: """ @@ -44,7 +43,7 @@ async def analyze_code(self, file: UploadFile, user_id: str) -> CodeReview: Flujo (RN4, RN5, RN8): 1. Validar archivo. 2. Crear contexto de análisis. - 3. Ejecutar SecurityAgent. + 3. Ejecutar agentes via OrchestratorAgent. 4. Calcular métricas. 5. Persistir resultados. @@ -75,35 +74,12 @@ async def analyze_code(self, file: UploadFile, user_id: str) -> CodeReview: # Notificar inicio usando el Enum self.event_bus.publish(AnalysisEventType.ANALYSIS_STARTED, {"id": str(analysis_id)}) - # 3. Ejecutar Agentes (SecurityAgent, StyleAgent y QualityAgent) + # 3. Ejecutar agentes via OrchestratorAgent (Security, Style, Quality, etc.) findings: List[Finding] = [] - - # Security Agent + Style Agent - try: - security_agent = SecurityAgent() - style_agent = StyleAgent() - - security_findings = security_agent.analyze(context) - style_findings = style_agent.analyze(context) - - findings = security_findings + style_findings - - except Exception as e: - logger.error(f"Error ejecutando agentes de analisis: {e}") - - # Quality Agent try: - quality_agent = QualityAgent() - findings.extend(quality_agent.analyze(context)) - except Exception as e: - logger.error(f"Error ejecutando QualityAgent: {e}") - - # Performance Agent - try: - performance_agent = PerformanceAgent() - findings.extend(performance_agent.analyze(context)) - except Exception as e: - logger.error(f"Error ejecutando PerformanceAgent: {e}") + findings = self.orchestrator.orchestrate_analysis(context) + except Exception as exc: # pylint: disable=broad-except + logger.error("Error ejecutando orquestador de analisis: %s", exc) # 4. Calcular Quality Score (RN8) quality_score = self._calculate_quality_score(findings) @@ -193,23 +169,7 @@ def _calculate_quality_score(self, findings: List[Finding]) -> int: """ Calcula el puntaje de calidad basado en penalizaciones (RN8). - Fórmula: score = max(0, 100 - penalizaciones) - - Args: - findings: Lista de hallazgos detectados. - - Returns: - int: Puntaje de calidad (0-100). + Delegado al OrchestratorAgent para mantener la logica + de negocio en la capa de dominio. """ - penalty = 0 - for finding in findings: - if finding.severity == Severity.CRITICAL: - penalty += 10 - elif finding.severity == Severity.HIGH: - penalty += 5 - elif finding.severity == Severity.MEDIUM: - penalty += 2 - elif finding.severity == Severity.LOW: - penalty += 1 - - return max(0, 100 - penalty) + return self.orchestrator.calculate_quality_score(findings) diff --git a/backend/tests/unit/application/test_analysis_service.py b/backend/tests/unit/application/test_analysis_service.py index 1bd0820..b89fc2f 100644 --- a/backend/tests/unit/application/test_analysis_service.py +++ b/backend/tests/unit/application/test_analysis_service.py @@ -82,24 +82,6 @@ def unsafe(): call_args = mock_repo.create.call_args[0][0] assert call_args.total_findings >= 0 - @pytest.mark.asyncio - async def test_analyze_code_agent_exception_handled(self, service, mock_repo): - """Verifica que excepciones del agente se manejan gracefully.""" - content = b"import os\n\ndef main():\n pass\n\nmain()\n" - mock_file = AsyncMock(spec=UploadFile) - mock_file.filename = "test.py" - mock_file.read.return_value = content - mock_file.seek = AsyncMock() - - with patch.object(service, "_validate_file", return_value=(content.decode(), "test.py")): - with patch( - "src.services.analysis_service.SecurityAgent.analyze", - side_effect=Exception("Agent crashed"), - ): - result = await service.analyze_code(mock_file, "user_789") - - # Debe completar aunque el agente falle - assert result is not None class TestValidateFileEdgeCases: diff --git a/backend/tests/unit/services/test_analysis_service.py b/backend/tests/unit/services/test_analysis_service.py index e84487d..cace57a 100644 --- a/backend/tests/unit/services/test_analysis_service.py +++ b/backend/tests/unit/services/test_analysis_service.py @@ -174,70 +174,3 @@ def test_calculate_quality_score_all_severities(service): ] score = service._calculate_quality_score(findings) assert score == 100 - (10 + 5 + 2 + 1 + 0) # 82 - - -# Tests de analyze_code (Integración de servicio) - - -@pytest.mark.asyncio -async def test_analyze_code_success(service, mock_repo): - """Prueba el flujo completo de analyze_code.""" - content = b"import os\n" * 6 - mock_file = AsyncMock(spec=UploadFile) - mock_file.filename = "valid.py" - mock_file.read.return_value = content - - # Mock all three agents used in analysis_service - with patch("src.services.analysis_service.SecurityAgent") as MockSecurityAgent, patch( - "src.services.analysis_service.StyleAgent" - ) as MockStyleAgent, patch("src.services.analysis_service.QualityAgent") as MockQualityAgent: - - mock_sec_instance = MockSecurityAgent.return_value - mock_sec_instance.analyze.return_value = [] - - mock_style_instance = MockStyleAgent.return_value - mock_style_instance.analyze.return_value = [] - - mock_qual_instance = MockQualityAgent.return_value - mock_qual_instance.analyze.return_value = [] - - mock_repo.create.return_value = MagicMock(status=ReviewStatus.COMPLETED) - - result = await service.analyze_code(mock_file, "user_123") - - assert result.status == ReviewStatus.COMPLETED - mock_repo.create.assert_called_once() - mock_sec_instance.analyze.assert_called_once() - mock_style_instance.analyze.assert_called_once() - mock_qual_instance.analyze.assert_called_once() - - -@pytest.mark.asyncio -async def test_analyze_code_agent_failure(service, mock_repo): - """Prueba que el análisis continúe si un agente falla.""" - content = b"import os\n" * 6 - mock_file = AsyncMock(spec=UploadFile) - mock_file.filename = "valid.py" - mock_file.read.return_value = content - - with patch("src.services.analysis_service.SecurityAgent") as MockSecurityAgent, patch( - "src.services.analysis_service.StyleAgent" - ) as MockStyleAgent, patch("src.services.analysis_service.QualityAgent") as MockQualityAgent: - - # Security agent fails (along with StyleAgent in same try block) - mock_sec_instance = MockSecurityAgent.return_value - mock_sec_instance.analyze.side_effect = Exception("Security Agent Failed") - - mock_style_instance = MockStyleAgent.return_value - mock_style_instance.analyze.return_value = [] - - # Quality agent succeeds - mock_qual_instance = MockQualityAgent.return_value - mock_qual_instance.analyze.return_value = [] - - mock_repo.create.return_value = MagicMock(status=ReviewStatus.COMPLETED) - - result = await service.analyze_code(mock_file, "user_123") - - assert result.status == ReviewStatus.COMPLETED - mock_repo.create.assert_called_once()