diff --git a/docs/OPENCOG_INTEGRATION.md b/docs/OPENCOG_INTEGRATION.md new file mode 100644 index 00000000..721ac3f1 --- /dev/null +++ b/docs/OPENCOG_INTEGRATION.md @@ -0,0 +1,434 @@ +# OpenCog Systems Integration for OpenManus-RL + +This document describes the OpenCog systems integration in OpenManus-RL, providing symbolic reasoning and cognitive architecture capabilities for reinforcement learning agents. + +## Overview + +The OpenCog integration adds advanced symbolic AI capabilities to OpenManus-RL, enabling agents to: + +- **Symbolic Reasoning**: Perform logical inference and pattern matching +- **Knowledge Representation**: Build and query semantic knowledge graphs +- **Cognitive Architecture**: Implement complete cognitive processing cycles +- **Hybrid AI**: Combine symbolic reasoning with neural learning + +## Components + +### 1. AtomSpace Integration (`atomspace_integration.py`) + +The AtomSpace is OpenCog's hypergraph knowledge representation system. + +```python +from openmanus_rl.opencog_systems import AtomSpaceManager, AtomType + +# Create an AtomSpace +atomspace = AtomSpaceManager() + +# Add knowledge +concept = atomspace.create_concept_node("agent") +predicate = atomspace.create_predicate_node("can_perform") +action = atomspace.create_concept_node("analysis") + +# Create relationships +evaluation = atomspace.create_evaluation_link( + predicate, [concept, action], truth_value=0.8 +) +``` + +**Key Features:** +- Hypergraph knowledge representation +- Truth values and confidence tracking +- Pattern matching and querying +- Import/export capabilities +- Memory management + +### 2. Reasoning Engine (`reasoning_engine.py`) + +Provides symbolic inference capabilities using forward/backward chaining. + +```python +from openmanus_rl.opencog_systems import OpenCogReasoningEngine + +# Create reasoning engine +reasoning_engine = OpenCogReasoningEngine(atomspace) + +# Perform backward chaining +goal = {"type": "evaluation", "predicate": "can_perform", "args": ["agent", "task"]} +result = reasoning_engine.backward_chaining(goal) + +print(f"Confidence: {result.confidence}") +print(f"Reasoning path: {result.reasoning_path}") +``` + +**Reasoning Modes:** +- Forward chaining (data-driven) +- Backward chaining (goal-driven) +- Probabilistic reasoning +- Action consequence reasoning + +### 3. Pattern Matcher (`pattern_matcher.py`) + +Advanced pattern matching for knowledge retrieval and similarity detection. + +```python +from openmanus_rl.opencog_systems import OpenCogPatternMatcher, PatternQuery, MatchType + +# Create pattern matcher +matcher = OpenCogPatternMatcher(atomspace) + +# Define pattern query +query = PatternQuery({ + "type": "evaluation", + "predicate": "located_at", + "args": ["$entity", "laboratory"] +}) + +# Find matches +matches = matcher.match_pattern(query, MatchType.FUZZY) +``` + +**Match Types:** +- Exact matching +- Fuzzy matching (similarity-based) +- Structural matching (graph topology) +- Semantic matching (meaning-based) + +### 4. Cognitive Architecture (`cognitive_architecture.py`) + +Complete cognitive agent implementation with perception, reasoning, planning, and learning. + +```python +from openmanus_rl.opencog_systems import CognitiveAgent + +# Create cognitive agent +agent = CognitiveAgent("research_agent") + +# Cognitive cycle +observations = {"environment": "lab", "task": "analyze_samples"} +result = agent.cognitive_cycle(observations, goal="complete analysis") + +print(f"Cycle {result['cycle_number']} completed") +print(f"Action result: {result['action_result']}") +``` + +**Cognitive Processes:** +- Perception (observation processing) +- Reasoning (inference and analysis) +- Planning (goal-directed action selection) +- Action execution +- Learning from feedback + +### 5. Knowledge Graph (`knowledge_representation.py`) + +High-level knowledge representation and semantic networks. + +```python +from openmanus_rl.opencog_systems import KnowledgeGraph, RelationType + +# Create knowledge graph +kg = KnowledgeGraph(atomspace) + +# Add entities and relationships +lab_id = kg.add_entity("Laboratory", {"type": "location"}) +agent_id = kg.add_entity("Agent", {"type": "actor"}) + +kg.add_relationship(agent_id, lab_id, RelationType.SPATIAL, + properties={"relation": "located_in"}) + +# Query relationships +related = kg.find_related_entities(agent_id, max_distance=2) +``` + +**Knowledge Operations:** +- Entity and relationship management +- Ontology building +- Path finding and traversal +- Similarity detection +- Knowledge inference + +## Configuration + +Configure OpenCog systems using the configuration manager: + +```python +from openmanus_rl.opencog_systems import load_config, update_config + +# Load configuration +config = load_config("path/to/config.yaml") + +# Update parameters +update_config( + reasoning_max_depth=5, + cognitive_learning_rate=0.1, + enable_fuzzy_matching=True +) +``` + +**Configuration Categories:** +- AtomSpace settings (memory, persistence) +- Reasoning parameters (depth, iterations, thresholds) +- Cognitive architecture (attention, memory, learning) +- Pattern matching (similarity thresholds, match types) +- Knowledge graph (entities, relationships, inference) + +## Integration with OpenManus-RL + +### Agent Enhancement + +Integrate OpenCog capabilities into existing OpenManus agents: + +```python +from openmanus_rl.llm_agent.openmanus import OpenManusAgent +from openmanus_rl.opencog_systems import CognitiveAgent + +class CognitiveOpenManusAgent(OpenManusAgent): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.cognitive_agent = CognitiveAgent("enhanced_agent") + + def run_llm_loop(self, gen_batch, output_dir=None, global_steps=0): + # Extract observations from batch + observations = self.extract_observations(gen_batch) + + # Run cognitive processing + cognitive_result = self.cognitive_agent.cognitive_cycle(observations) + + # Use cognitive insights to enhance LLM processing + enhanced_batch = self.enhance_with_cognitive_insights( + gen_batch, cognitive_result + ) + + # Continue with original processing + return super().run_llm_loop(enhanced_batch, output_dir, global_steps) +``` + +### Memory Integration + +Enhance memory systems with symbolic knowledge: + +```python +from openmanus_rl.memory.memory import SimpleMemory +from openmanus_rl.opencog_systems import KnowledgeGraph + +class CognitiveMemory(SimpleMemory): + def __init__(self): + super().__init__() + self.atomspace = AtomSpaceManager() + self.knowledge_graph = KnowledgeGraph(self.atomspace) + + def store(self, record): + super().store(record) + + # Also store in knowledge graph + for env_idx, env_record in enumerate(record): + self.add_to_knowledge_graph(env_record, env_idx) + + def fetch_with_reasoning(self, query, history_length): + # Use pattern matching for intelligent retrieval + # ... implementation +``` + +### Reward Shaping + +Use symbolic reasoning for reward shaping: + +```python +from openmanus_rl.opencog_systems import OpenCogReasoningEngine + +class CognitiveRewardShaper: + def __init__(self, atomspace): + self.reasoning_engine = OpenCogReasoningEngine(atomspace) + + def shape_reward(self, state, action, reward, next_state): + # Reason about action appropriateness + action_result = self.reasoning_engine.reason_about_action( + action, {"state": state, "next_state": next_state} + ) + + # Adjust reward based on symbolic reasoning + reasoning_bonus = action_result.confidence * 0.1 + return reward + reasoning_bonus +``` + +## Examples + +### Basic Usage + +See `examples/opencog_integration/basic_cognitive_agent.py` for a complete demonstration: + +```bash +cd /path/to/OpenManus-RL +python examples/opencog_integration/basic_cognitive_agent.py +``` + +### Custom Cognitive Agent + +```python +from openmanus_rl.opencog_systems import CognitiveAgent, AttentionMode + +# Create specialized agent +agent = CognitiveAgent("scientific_researcher") +agent.set_attention_mode(AttentionMode.GOAL_DIRECTED) + +# Add domain knowledge +agent.atomspace.create_concept_node("hypothesis") +agent.atomspace.create_concept_node("experiment") +agent.atomspace.create_concept_node("evidence") + +# Create relationships +hypothesis_atom = agent.atomspace.find_atoms(name="hypothesis")[0] +experiment_atom = agent.atomspace.find_atoms(name="experiment")[0] +tests_pred = agent.atomspace.create_predicate_node("tests") + +agent.atomspace.create_evaluation_link( + tests_pred, [experiment_atom, hypothesis_atom], 0.9 +) + +# Run research cycle +observations = { + "lab_equipment": ["microscope", "computer"], + "samples": ["specimen_a", "specimen_b"], + "hypothesis_status": "untested" +} + +result = agent.cognitive_cycle(observations, "test hypothesis with experiments") +``` + +## Testing + +Run tests for OpenCog integration: + +```bash +# Test AtomSpace functionality +python -m unittest test.opencog_systems.test_atomspace + +# Test cognitive agent +python -m unittest test.opencog_systems.test_cognitive_agent + +# Run all OpenCog tests +python -m unittest discover test/opencog_systems/ +``` + +## Performance Considerations + +### Memory Management +- AtomSpace size limits (default: 100,000 atoms) +- Garbage collection thresholds +- Working memory constraints (20 active concepts) +- Experience history limits (100 recent experiences) + +### Reasoning Efficiency +- Max reasoning depth (default: 5) +- Max iterations for forward chaining (default: 10) +- Pattern matching result limits (default: 100) +- Rule application caching + +### Cognitive Cycles +- Max cycle time limits (1 second default) +- Parallel processing options +- Attention allocation strategies +- Memory consolidation thresholds + +## Advanced Features + +### Real OpenCog Integration + +To use the real OpenCog system (when available): + +```python +from openmanus_rl.opencog_systems import update_config + +update_config(enable_real_opencog=True) +``` + +### Custom Reasoning Rules + +Add domain-specific reasoning rules: + +```python +from openmanus_rl.opencog_systems import ReasoningRule + +# Define custom rule +rule = ReasoningRule( + name="experimental_evidence", + premises=[ + {"type": "evaluation", "predicate": "tests", "args": ["$experiment", "$hypothesis"]}, + {"type": "evaluation", "predicate": "supports", "args": ["$result", "$hypothesis"]} + ], + conclusion={"type": "evaluation", "predicate": "evidence_for", "args": ["$hypothesis"]}, + confidence=0.8 +) + +reasoning_engine.add_rule(rule) +``` + +### Custom Pattern Constraints + +Add constraints to pattern variables: + +```python +from openmanus_rl.opencog_systems import PatternQuery + +query = PatternQuery({ + "type": "evaluation", + "predicate": "located_at", + "args": ["$agent", "$location"] +}) + +# Add constraint: agent must be of type "researcher" +query.add_constraint("$agent", lambda atom: "researcher" in atom.name.lower()) +query.set_variable_type("$location", AtomType.CONCEPT_NODE) +``` + +## Troubleshooting + +### Common Issues + +1. **Import Errors**: Ensure PYTHONPATH includes the project root +2. **Memory Issues**: Adjust AtomSpace size limits in configuration +3. **Slow Reasoning**: Reduce max depth or enable result caching +4. **Pattern Matching Timeout**: Increase match thresholds or limit results + +### Debugging + +Enable detailed logging: + +```python +import logging +logging.basicConfig(level=logging.DEBUG) + +from openmanus_rl.opencog_systems import update_config +update_config(log_level="DEBUG", enable_logging=True) +``` + +### Performance Monitoring + +```python +from openmanus_rl.opencog_systems import CognitiveAgent + +agent = CognitiveAgent("monitored_agent") + +# Get performance statistics +stats = agent.get_cognitive_state() +reasoning_stats = agent.reasoning_engine.get_reasoning_statistics() +kg_stats = agent.knowledge_graph.get_statistics() + +print(f"Success rate: {stats['success_rate']}") +print(f"Average reasoning confidence: {reasoning_stats['average_confidence']}") +print(f"Knowledge graph size: {kg_stats['total_entities']} entities") +``` + +## Future Enhancements + +- Integration with external OpenCog installations +- Distributed reasoning across multiple agents +- Learning of new reasoning rules from experience +- Integration with neural-symbolic architectures +- Real-time knowledge graph updates from environment +- Multi-modal knowledge representation (text, images, sensors) + +## References + +- [OpenCog Framework](http://opencog.org/) +- [AtomSpace Documentation](https://wiki.opencog.org/w/AtomSpace) +- [Pattern Matcher Guide](https://wiki.opencog.org/w/Pattern_matcher) +- [PLN (Probabilistic Logic Networks)](https://wiki.opencog.org/w/PLN) \ No newline at end of file diff --git a/examples/opencog_integration/__init__.py b/examples/opencog_integration/__init__.py new file mode 100644 index 00000000..43ad8420 --- /dev/null +++ b/examples/opencog_integration/__init__.py @@ -0,0 +1 @@ +"""OpenCog integration examples for OpenManus-RL.""" \ No newline at end of file diff --git a/examples/opencog_integration/basic_cognitive_agent.py b/examples/opencog_integration/basic_cognitive_agent.py new file mode 100644 index 00000000..1c40278c --- /dev/null +++ b/examples/opencog_integration/basic_cognitive_agent.py @@ -0,0 +1,340 @@ +#!/usr/bin/env python3 +""" +Basic OpenCog Cognitive Agent Example for OpenManus-RL. + +This example demonstrates how to use the OpenCog systems integration +to create a cognitive agent with symbolic reasoning capabilities. +""" + +import logging +import json +import sys +import os + +# Add the project root to Python path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..')) + +from openmanus_rl.opencog_systems import ( + CognitiveAgent, AttentionMode, AtomSpaceManager, + OpenCogReasoningEngine, KnowledgeGraph +) + +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +def demonstrate_basic_cognitive_agent(): + """Demonstrate basic cognitive agent functionality.""" + + print("=" * 60) + print("OpenCog Cognitive Agent Demonstration") + print("=" * 60) + + # Create a cognitive agent + agent = CognitiveAgent("demo_agent") + + print(f"\n1. Agent initialized: {agent.agent_name}") + print(f" Initial AtomSpace size: {agent.atomspace.size()}") + print(f" Initial state: {agent.state.value}") + + # Demonstrate perception + print(f"\n2. Perception Phase") + observations = { + "environment": "laboratory", + "objects": ["computer", "microscope", "samples"], + "task": "analyze_samples", + "urgency": "high" + } + + agent.perceive(observations) + print(f" Perceived: {observations}") + print(f" AtomSpace size after perception: {agent.atomspace.size()}") + print(f" Active concepts: {len(agent.memory.active_concepts)}") + + # Demonstrate reasoning + print(f"\n3. Reasoning Phase") + reasoning_result = agent.reason("What should I do with the samples?") + print(f" Reasoning query: 'What should I do with the samples?'") + print(f" Reasoning confidence: {reasoning_result.confidence:.3f}") + print(f" Reasoning steps: {len(reasoning_result.reasoning_path)}") + + if reasoning_result.reasoning_path: + print(" Reasoning path:") + for i, step in enumerate(reasoning_result.reasoning_path[:3]): # Show first 3 steps + print(f" {i+1}. {step}") + + # Demonstrate planning + print(f"\n4. Planning Phase") + goal = "analyze the samples efficiently" + plan = agent.plan(goal, {"equipment": ["microscope", "computer"]}) + + print(f" Goal: {goal}") + print(f" Generated plan with {len(plan)} actions:") + + for i, action in enumerate(plan): + print(f" {i+1}. {action.action_type} (confidence: {action.confidence:.3f})") + print(f" Expected outcome: {action.expected_outcome}") + + # Demonstrate action execution + print(f"\n5. Action Execution Phase") + if plan: + action_result = agent.act(plan[0]) + print(f" Executed: {plan[0].action_type}") + print(f" Action result: {action_result}") + print(f" Success rate: {agent.success_rate:.3f}") + + # Demonstrate learning + print(f"\n6. Learning Phase") + feedback = { + "reward": 0.8, + "success": True, + "correction": None, + "outcome": "samples analyzed successfully" + } + + agent.learn(feedback) + print(f" Provided feedback: {feedback}") + print(f" Updated success rate: {agent.success_rate:.3f}") + + # Demonstrate full cognitive cycle + print(f"\n7. Complete Cognitive Cycle") + new_observations = { + "environment": "laboratory", + "new_samples": "bacterial_culture", + "equipment_status": "ready", + "previous_results": "positive" + } + + cycle_result = agent.cognitive_cycle(new_observations, "process new bacterial samples") + + print(f" Cycle {cycle_result['cycle_number']} completed in {cycle_result['cycle_time']:.3f}s") + print(f" Current attention focus: {cycle_result.get('attention_focus', 'None')}") + print(f" Active concepts: {cycle_result['active_concepts']}") + + # Show cognitive state + print(f"\n8. Current Cognitive State") + state = agent.get_cognitive_state() + + print(f" State: {state['state']}") + print(f" Attention mode: {state['attention_mode']}") + print(f" Total cycles: {state['cycle_count']}") + print(f" AtomSpace size: {state['atomspace_size']}") + print(f" Recent experiences: {state['recent_experiences']}") + + return agent + + +def demonstrate_knowledge_graph(): + """Demonstrate knowledge graph capabilities.""" + + print("\n" + "=" * 60) + print("OpenCog Knowledge Graph Demonstration") + print("=" * 60) + + # Create AtomSpace and Knowledge Graph + atomspace = AtomSpaceManager() + kg = KnowledgeGraph(atomspace) + + print(f"\n1. Knowledge Graph initialized") + print(f" Initial entities: {len(kg.entities)}") + print(f" Initial relationships: {len(kg.relationships)}") + + # Add domain-specific knowledge + print(f"\n2. Adding Domain Knowledge") + + # Add laboratory entities + lab_id = kg.add_entity("Laboratory", {"type": "location", "purpose": "research"}) + computer_id = kg.add_entity("Computer", {"type": "equipment", "function": "data_processing"}) + microscope_id = kg.add_entity("Microscope", {"type": "equipment", "function": "observation"}) + sample_id = kg.add_entity("Sample", {"type": "material", "state": "unknown"}) + + # Add agent + agent_id = kg.add_entity("ResearchAgent", {"type": "agent", "role": "researcher"}) + + print(f" Added 5 entities") + + # Add relationships + from openmanus_rl.opencog_systems.knowledge_representation import RelationType + + # Location relationships + kg.add_relationship(computer_id, lab_id, RelationType.SPATIAL, properties={"relation": "located_in"}) + kg.add_relationship(microscope_id, lab_id, RelationType.SPATIAL, properties={"relation": "located_in"}) + kg.add_relationship(sample_id, lab_id, RelationType.SPATIAL, properties={"relation": "located_in"}) + + # Agent relationships + kg.add_relationship(agent_id, computer_id, RelationType.ASSOCIATION, properties={"relation": "uses"}) + kg.add_relationship(agent_id, microscope_id, RelationType.ASSOCIATION, properties={"relation": "uses"}) + + # Task relationships + analyze_task_id = kg.add_entity("AnalyzeTask", {"type": "task", "priority": "high"}) + kg.add_relationship(agent_id, analyze_task_id, RelationType.ASSOCIATION, properties={"relation": "performs"}) + kg.add_relationship(analyze_task_id, sample_id, RelationType.ASSOCIATION, properties={"relation": "targets"}) + + print(f" Added {len(kg.relationships)} relationships") + + # Query the knowledge graph + print(f"\n3. Querying Knowledge Graph") + + # Find entities related to the agent + related = kg.find_related_entities(agent_id, max_distance=2) + print(f" Entities related to ResearchAgent:") + for entity, distance in related[:5]: # Show top 5 + print(f" - {entity.name} (distance: {distance:.2f})") + + # Get neighborhood of laboratory + neighborhood = kg.get_entity_neighborhood(lab_id, radius=1) + print(f" Laboratory neighborhood contains {len(neighborhood['entities'])} entities") + + # Infer new relationships + print(f"\n4. Knowledge Inference") + new_rels = kg.infer_relationships() + print(f" Inferred {len(new_rels)} new relationships") + + # Show statistics + stats = kg.get_statistics() + print(f"\n5. Knowledge Graph Statistics") + print(f" Total entities: {stats['total_entities']}") + print(f" Total relationships: {stats['total_relationships']}") + print(f" Entity types: {list(stats['entity_types'].keys())}") + print(f" Relation types: {list(stats['relation_types'].keys())}") + + return kg + + +def demonstrate_reasoning_engine(): + """Demonstrate reasoning engine capabilities.""" + + print("\n" + "=" * 60) + print("OpenCog Reasoning Engine Demonstration") + print("=" * 60) + + # Create AtomSpace and Reasoning Engine + atomspace = AtomSpaceManager() + reasoning_engine = OpenCogReasoningEngine(atomspace) + + print(f"\n1. Reasoning Engine initialized") + print(f" Number of rules: {len(reasoning_engine.rules)}") + + # Add some domain knowledge + print(f"\n2. Adding Domain Knowledge") + + # Create basic facts about the laboratory domain + lab_atom = atomspace.create_concept_node("laboratory") + agent_atom = atomspace.create_concept_node("agent") + sample_atom = atomspace.create_concept_node("sample") + analysis_atom = atomspace.create_concept_node("analysis") + + # Create relationships + located_in_pred = atomspace.create_predicate_node("located_in") + can_perform_pred = atomspace.create_predicate_node("can_perform") + requires_pred = atomspace.create_predicate_node("requires") + + # Agent is located in laboratory + atomspace.create_evaluation_link(located_in_pred, [agent_atom, lab_atom], 0.9) + + # Agent can perform analysis + atomspace.create_evaluation_link(can_perform_pred, [agent_atom, analysis_atom], 0.8) + + # Analysis requires sample + atomspace.create_evaluation_link(requires_pred, [analysis_atom, sample_atom], 0.95) + + print(f" Added domain facts") + print(f" AtomSpace size: {atomspace.size()}") + + # Demonstrate forward chaining + print(f"\n3. Forward Chaining Reasoning") + derived_atoms = reasoning_engine.forward_chaining(max_iterations=3) + print(f" Derived {len(derived_atoms)} new atoms through forward chaining") + + # Demonstrate backward chaining + print(f"\n4. Backward Chaining Reasoning") + + # Query: Can the agent perform analysis? + goal = { + "type": "evaluation", + "predicate": "can_perform", + "args": ["agent", "analysis"] + } + + result = reasoning_engine.backward_chaining(goal, max_depth=3) + print(f" Query: Can agent perform analysis?") + print(f" Result confidence: {result.confidence:.3f}") + print(f" Reasoning steps: {len(result.reasoning_path)}") + + if result.reasoning_path: + print(" Reasoning path:") + for step in result.reasoning_path[:3]: + print(f" - {step}") + + # Demonstrate action reasoning + print(f"\n5. Action Reasoning") + action_result = reasoning_engine.reason_about_action( + "perform_analysis", + {"location": "laboratory", "equipment": "microscope"} + ) + + print(f" Action: perform_analysis") + print(f" Context: laboratory with microscope") + print(f" Reasoning confidence: {action_result.confidence:.3f}") + + # Show explanation + if action_result.conclusion: + explanation = reasoning_engine.explain_reasoning(action_result) + print(f"\n6. Reasoning Explanation") + print(explanation) + + # Get statistics + stats = reasoning_engine.get_reasoning_statistics() + print(f"\n7. Reasoning Statistics") + print(f" Total rules: {stats['total_rules']}") + print(f" AtomSpace size: {stats['atomspace_size']}") + print(f" Reasoning history: {stats['reasoning_history_length']}") + print(f" Successful reasonings: {stats['successful_reasonings']}") + print(f" Average confidence: {stats['average_confidence']:.3f}") + + return reasoning_engine + + +def main(): + """Main demonstration function.""" + + print("OpenManus-RL OpenCog Systems Integration Demo") + print("=" * 60) + + try: + # Demonstrate each component + agent = demonstrate_basic_cognitive_agent() + kg = demonstrate_knowledge_graph() + reasoning_engine = demonstrate_reasoning_engine() + + print("\n" + "=" * 60) + print("Integration Demonstration Complete!") + print("=" * 60) + + print(f"\nSummary:") + print(f"- Cognitive Agent: {agent.cycle_count} cycles, {agent.success_rate:.3f} success rate") + print(f"- Knowledge Graph: {kg.get_statistics()['total_entities']} entities, {kg.get_statistics()['total_relationships']} relationships") + print(f"- Reasoning Engine: {len(reasoning_engine.reasoning_history)} reasoning operations") + + # Save cognitive state for inspection + state_file = "/tmp/cognitive_agent_state.json" + try: + with open(state_file, 'w') as f: + json.dump(agent.get_cognitive_state(), f, indent=2, default=str) + print(f"\nCognitive agent state saved to: {state_file}") + except Exception as e: + print(f"\nCould not save state: {e}") + + return True + + except Exception as e: + logger.error(f"Demo failed: {e}") + import traceback + traceback.print_exc() + return False + + +if __name__ == "__main__": + success = main() + sys.exit(0 if success else 1) \ No newline at end of file diff --git a/openmanus_rl/__init__.py b/openmanus_rl/__init__.py index e69de29b..9fa6ce07 100644 --- a/openmanus_rl/__init__.py +++ b/openmanus_rl/__init__.py @@ -0,0 +1,11 @@ +""" +OpenManus-RL: Open Platform for Generalist LLM Reasoning Agents with RL optimization. + +This package provides advanced reinforcement learning capabilities for language model agents, +including OpenCog systems for symbolic reasoning and cognitive architectures. +""" + +from . import opencog_systems + +__version__ = "0.1.0" +__all__ = ["opencog_systems"] diff --git a/openmanus_rl/opencog_systems/__init__.py b/openmanus_rl/opencog_systems/__init__.py new file mode 100644 index 00000000..0f2a6c95 --- /dev/null +++ b/openmanus_rl/opencog_systems/__init__.py @@ -0,0 +1,26 @@ +""" +OpenCog integration systems for OpenManus-RL. + +This module provides OpenCog AtomSpace integration for symbolic reasoning, +pattern matching, and cognitive architectures within the OpenManus-RL framework. +""" + +from .atomspace_integration import AtomSpaceManager, Atom, AtomType +from .reasoning_engine import OpenCogReasoningEngine, ReasoningResult, ReasoningMode +from .pattern_matcher import OpenCogPatternMatcher, PatternQuery, MatchType +from .cognitive_architecture import CognitiveAgent, CognitiveState, AttentionMode, CognitiveAction +from .knowledge_representation import KnowledgeGraph, RelationType +from .config import ( + OpenCogConfig, ConfigManager, get_config, load_config, update_config, + AtomSpaceConfig, ReasoningConfig, CognitiveConfig, PatternMatchingConfig +) + +__all__ = [ + 'AtomSpaceManager', 'Atom', 'AtomType', + 'OpenCogReasoningEngine', 'ReasoningResult', 'ReasoningMode', + 'OpenCogPatternMatcher', 'PatternQuery', 'MatchType', + 'CognitiveAgent', 'CognitiveState', 'AttentionMode', 'CognitiveAction', + 'KnowledgeGraph', 'RelationType', + 'OpenCogConfig', 'ConfigManager', 'get_config', 'load_config', 'update_config', + 'AtomSpaceConfig', 'ReasoningConfig', 'CognitiveConfig', 'PatternMatchingConfig' +] \ No newline at end of file diff --git a/openmanus_rl/opencog_systems/atomspace_integration.py b/openmanus_rl/opencog_systems/atomspace_integration.py new file mode 100644 index 00000000..d0151a12 --- /dev/null +++ b/openmanus_rl/opencog_systems/atomspace_integration.py @@ -0,0 +1,327 @@ +""" +AtomSpace integration for OpenCog systems in OpenManus-RL. + +This module provides a Python interface to OpenCog's AtomSpace for knowledge +representation and symbolic reasoning within RL agents. +""" + +import uuid +from typing import Dict, List, Any, Optional, Union, Set +from enum import Enum +import json +import logging +from dataclasses import dataclass + + +class AtomType(Enum): + """Basic OpenCog atom types.""" + CONCEPT_NODE = "ConceptNode" + PREDICATE_NODE = "PredicateNode" + VARIABLE_NODE = "VariableNode" + LIST_LINK = "ListLink" + EVALUATION_LINK = "EvaluationLink" + INHERITANCE_LINK = "InheritanceLink" + SIMILARITY_LINK = "SimilarityLink" + IMPLICATION_LINK = "ImplicationLink" + AND_LINK = "AndLink" + OR_LINK = "OrLink" + NOT_LINK = "NotLink" + + +@dataclass +class Atom: + """Represents an OpenCog Atom with type, name, and truth value.""" + atom_type: AtomType + name: str + outgoing: Optional[List['Atom']] = None + truth_value: Optional[float] = None + confidence: Optional[float] = None + atom_id: str = None + + def __post_init__(self): + if self.atom_id is None: + self.atom_id = str(uuid.uuid4()) + if self.outgoing is None: + self.outgoing = [] + + +class AtomSpaceManager: + """ + Manages an OpenCog AtomSpace for symbolic knowledge representation. + + This is a lightweight Python implementation that can interface with + the full OpenCog AtomSpace when available, or operate independently. + """ + + def __init__(self): + self.atoms: Dict[str, Atom] = {} + self.name_to_atoms: Dict[str, Set[str]] = {} + self.type_to_atoms: Dict[AtomType, Set[str]] = {} + self.logger = logging.getLogger(__name__) + + # Initialize type mappings + for atom_type in AtomType: + self.type_to_atoms[atom_type] = set() + + def add_atom(self, atom_type: AtomType, name: str, + outgoing: Optional[List[Atom]] = None, + truth_value: float = 1.0, + confidence: float = 1.0) -> Atom: + """ + Add an atom to the AtomSpace. + + Args: + atom_type: Type of the atom to create + name: Name/identifier for the atom + outgoing: List of atoms this atom points to (for links) + truth_value: Truth value (0.0 to 1.0) + confidence: Confidence in the truth value (0.0 to 1.0) + + Returns: + The created Atom object + """ + atom = Atom( + atom_type=atom_type, + name=name, + outgoing=outgoing or [], + truth_value=truth_value, + confidence=confidence + ) + + self.atoms[atom.atom_id] = atom + + # Update indexes + if name not in self.name_to_atoms: + self.name_to_atoms[name] = set() + self.name_to_atoms[name].add(atom.atom_id) + self.type_to_atoms[atom_type].add(atom.atom_id) + + self.logger.debug(f"Added atom: {atom_type.value} '{name}' with ID {atom.atom_id}") + return atom + + def find_atoms(self, atom_type: Optional[AtomType] = None, + name: Optional[str] = None) -> List[Atom]: + """ + Find atoms matching the given criteria. + + Args: + atom_type: Filter by atom type + name: Filter by atom name + + Returns: + List of matching atoms + """ + candidate_ids = set(self.atoms.keys()) + + if atom_type is not None: + candidate_ids &= self.type_to_atoms[atom_type] + + if name is not None: + candidate_ids &= self.name_to_atoms.get(name, set()) + + return [self.atoms[atom_id] for atom_id in candidate_ids] + + def get_atom(self, atom_id: str) -> Optional[Atom]: + """Get an atom by its ID.""" + return self.atoms.get(atom_id) + + def remove_atom(self, atom_id: str) -> bool: + """ + Remove an atom from the AtomSpace. + + Args: + atom_id: ID of the atom to remove + + Returns: + True if atom was removed, False if not found + """ + if atom_id not in self.atoms: + return False + + atom = self.atoms[atom_id] + + # Remove from indexes + self.name_to_atoms[atom.name].discard(atom_id) + if not self.name_to_atoms[atom.name]: + del self.name_to_atoms[atom.name] + + self.type_to_atoms[atom.atom_type].discard(atom_id) + + # Remove the atom + del self.atoms[atom_id] + + self.logger.debug(f"Removed atom {atom_id}") + return True + + def create_concept_node(self, name: str, truth_value: float = 1.0) -> Atom: + """Create a ConceptNode.""" + return self.add_atom(AtomType.CONCEPT_NODE, name, truth_value=truth_value) + + def create_predicate_node(self, name: str, truth_value: float = 1.0) -> Atom: + """Create a PredicateNode.""" + return self.add_atom(AtomType.PREDICATE_NODE, name, truth_value=truth_value) + + def create_variable_node(self, name: str) -> Atom: + """Create a VariableNode.""" + return self.add_atom(AtomType.VARIABLE_NODE, name) + + def create_evaluation_link(self, predicate: Atom, arguments: List[Atom], + truth_value: float = 1.0) -> Atom: + """ + Create an EvaluationLink. + + Args: + predicate: PredicateNode to evaluate + arguments: List of atoms as arguments + truth_value: Truth value of the evaluation + + Returns: + Created EvaluationLink atom + """ + # Create a ListLink for the arguments + list_link = self.add_atom( + AtomType.LIST_LINK, + f"list_{uuid.uuid4().hex[:8]}", + outgoing=arguments + ) + + # Create the EvaluationLink + return self.add_atom( + AtomType.EVALUATION_LINK, + f"eval_{uuid.uuid4().hex[:8]}", + outgoing=[predicate, list_link], + truth_value=truth_value + ) + + def create_inheritance_link(self, child: Atom, parent: Atom, + truth_value: float = 1.0) -> Atom: + """Create an InheritanceLink.""" + return self.add_atom( + AtomType.INHERITANCE_LINK, + f"inherit_{child.name}_{parent.name}", + outgoing=[child, parent], + truth_value=truth_value + ) + + def get_incoming_set(self, atom: Atom) -> List[Atom]: + """ + Get all atoms that have the given atom in their outgoing set. + + Args: + atom: The atom to find incoming links for + + Returns: + List of atoms that point to the given atom + """ + incoming = [] + for candidate in self.atoms.values(): + if atom in candidate.outgoing: + incoming.append(candidate) + return incoming + + def get_outgoing_set(self, atom: Atom) -> List[Atom]: + """Get the outgoing set of an atom.""" + return atom.outgoing.copy() + + def query_pattern(self, pattern: Dict[str, Any]) -> List[Dict[str, Atom]]: + """ + Simple pattern matching query. + + Args: + pattern: Dictionary describing the pattern to match + + Returns: + List of variable bindings that satisfy the pattern + """ + # This is a simplified pattern matcher + # A full implementation would use OpenCog's pattern matcher + results = [] + + if pattern.get("type") == "evaluation": + predicate_name = pattern.get("predicate") + arg_patterns = pattern.get("arguments", []) + + # Find all EvaluationLinks + eval_links = self.find_atoms(AtomType.EVALUATION_LINK) + + for eval_link in eval_links: + if len(eval_link.outgoing) >= 2: + pred_atom = eval_link.outgoing[0] + if pred_atom.name == predicate_name: + # Simple match - can be expanded + bindings = {"evaluation": eval_link} + results.append(bindings) + + return results + + def export_to_dict(self) -> Dict[str, Any]: + """Export the AtomSpace to a dictionary for serialization.""" + export_data = { + "atoms": {}, + "metadata": { + "total_atoms": len(self.atoms), + "atom_types": {atom_type.value: len(ids) + for atom_type, ids in self.type_to_atoms.items()} + } + } + + for atom_id, atom in self.atoms.items(): + export_data["atoms"][atom_id] = { + "type": atom.atom_type.value, + "name": atom.name, + "outgoing": [out_atom.atom_id for out_atom in atom.outgoing], + "truth_value": atom.truth_value, + "confidence": atom.confidence + } + + return export_data + + def import_from_dict(self, data: Dict[str, Any]) -> None: + """Import AtomSpace data from a dictionary.""" + self.clear() + + atoms_data = data.get("atoms", {}) + + # First pass: create all atoms without outgoing sets + atom_id_map = {} + for atom_id, atom_data in atoms_data.items(): + atom_type = AtomType(atom_data["type"]) + atom = Atom( + atom_type=atom_type, + name=atom_data["name"], + truth_value=atom_data.get("truth_value"), + confidence=atom_data.get("confidence"), + atom_id=atom_id + ) + self.atoms[atom_id] = atom + atom_id_map[atom_id] = atom + + # Update indexes + if atom.name not in self.name_to_atoms: + self.name_to_atoms[atom.name] = set() + self.name_to_atoms[atom.name].add(atom_id) + self.type_to_atoms[atom_type].add(atom_id) + + # Second pass: set up outgoing relationships + for atom_id, atom_data in atoms_data.items(): + atom = self.atoms[atom_id] + outgoing_ids = atom_data.get("outgoing", []) + atom.outgoing = [atom_id_map[out_id] for out_id in outgoing_ids + if out_id in atom_id_map] + + def clear(self) -> None: + """Clear all atoms from the AtomSpace.""" + self.atoms.clear() + self.name_to_atoms.clear() + for atom_type in AtomType: + self.type_to_atoms[atom_type].clear() + + def size(self) -> int: + """Get the number of atoms in the AtomSpace.""" + return len(self.atoms) + + def __len__(self) -> int: + return self.size() + + def __contains__(self, atom_id: str) -> bool: + return atom_id in self.atoms \ No newline at end of file diff --git a/openmanus_rl/opencog_systems/cognitive_architecture.py b/openmanus_rl/opencog_systems/cognitive_architecture.py new file mode 100644 index 00000000..4e8b570d --- /dev/null +++ b/openmanus_rl/opencog_systems/cognitive_architecture.py @@ -0,0 +1,582 @@ +""" +OpenCog Cognitive Architecture for OpenManus-RL agents. + +This module implements a cognitive architecture using OpenCog components, +providing integrated symbolic reasoning, learning, and decision-making capabilities. +""" + +from typing import Dict, List, Any, Optional, Tuple +import logging +from dataclasses import dataclass, field +from enum import Enum +import time + +from .atomspace_integration import AtomSpaceManager, Atom, AtomType +from .reasoning_engine import OpenCogReasoningEngine, ReasoningResult +from .pattern_matcher import OpenCogPatternMatcher, PatternQuery, MatchType +from .knowledge_representation import KnowledgeGraph + + +class CognitiveState(Enum): + """States of the cognitive processing cycle.""" + PERCEIVING = "perceiving" + REASONING = "reasoning" + PLANNING = "planning" + ACTING = "acting" + LEARNING = "learning" + + +class AttentionMode(Enum): + """Modes of attention allocation.""" + FOCUSED = "focused" + DISTRIBUTED = "distributed" + EXPLORATORY = "exploratory" + GOAL_DIRECTED = "goal_directed" + + +@dataclass +class CognitiveMemory: + """Working memory for the cognitive agent.""" + current_goals: List[Atom] = field(default_factory=list) + active_concepts: List[Atom] = field(default_factory=list) + recent_experiences: List[Dict[str, Any]] = field(default_factory=list) + attention_focus: Optional[Atom] = None + confidence_levels: Dict[str, float] = field(default_factory=dict) + + def add_experience(self, experience: Dict[str, Any]): + """Add a new experience to memory.""" + self.recent_experiences.append({ + **experience, + 'timestamp': time.time() + }) + # Keep only recent experiences (last 100) + self.recent_experiences = self.recent_experiences[-100:] + + def get_relevant_experiences(self, context: str, limit: int = 10) -> List[Dict[str, Any]]: + """Get experiences relevant to a context.""" + # Simplified relevance matching - would be more sophisticated in practice + relevant = [] + for exp in reversed(self.recent_experiences): + if context.lower() in str(exp).lower(): + relevant.append(exp) + if len(relevant) >= limit: + break + return relevant + + +@dataclass +class CognitiveAction: + """Represents an action in the cognitive space.""" + action_type: str + parameters: Dict[str, Any] + confidence: float + reasoning_path: List[str] + expected_outcome: Optional[str] = None + + def __str__(self): + return f"CognitiveAction({self.action_type}, conf={self.confidence:.3f})" + + +class CognitiveAgent: + """ + OpenCog-based cognitive agent for complex reasoning and decision-making. + + Integrates AtomSpace, reasoning engine, and pattern matcher to provide + a comprehensive cognitive architecture for RL agents. + """ + + def __init__(self, agent_name: str = "cognitive_agent"): + self.agent_name = agent_name + self.logger = logging.getLogger(f"{__name__}.{agent_name}") + + # Core OpenCog components + self.atomspace = AtomSpaceManager() + self.reasoning_engine = OpenCogReasoningEngine(self.atomspace) + self.pattern_matcher = OpenCogPatternMatcher(self.atomspace) + self.knowledge_graph = KnowledgeGraph(self.atomspace) + + # Cognitive state management + self.state = CognitiveState.PERCEIVING + self.memory = CognitiveMemory() + self.attention_mode = AttentionMode.FOCUSED + + # Learning and adaptation + self.learning_rate = 0.1 + self.exploration_factor = 0.2 + + # Performance tracking + self.action_history: List[CognitiveAction] = [] + self.success_rate = 0.0 + self.cycle_count = 0 + + # Initialize basic knowledge + self._initialize_basic_knowledge() + + def _initialize_basic_knowledge(self): + """Initialize the agent with basic knowledge and concepts.""" + + # Basic concepts + self_concept = self.atomspace.create_concept_node("self") + agent_concept = self.atomspace.create_concept_node("agent") + environment_concept = self.atomspace.create_concept_node("environment") + + # Self-identification + self.atomspace.create_inheritance_link(self_concept, agent_concept, 1.0) + + # Basic actions + action_concept = self.atomspace.create_concept_node("action") + think_action = self.atomspace.create_concept_node("think") + observe_action = self.atomspace.create_concept_node("observe") + + self.atomspace.create_inheritance_link(think_action, action_concept, 1.0) + self.atomspace.create_inheritance_link(observe_action, action_concept, 1.0) + + # Goal concepts + goal_concept = self.atomspace.create_concept_node("goal") + success_concept = self.atomspace.create_concept_node("success") + + # Add to knowledge graph + self.knowledge_graph.add_entity("self", {"type": "agent", "name": self.agent_name}) + self.knowledge_graph.add_entity("environment", {"type": "context"}) + + self.logger.info("Initialized basic knowledge structures") + + def perceive(self, observations: Dict[str, Any]) -> None: + """ + Process new observations and update internal state. + + Args: + observations: Dictionary of observed information + """ + self.state = CognitiveState.PERCEIVING + self.logger.debug(f"Perceiving: {observations}") + + # Convert observations to atoms + observation_atoms = [] + for key, value in observations.items(): + # Create observation atoms + obs_atom = self.atomspace.create_concept_node(f"obs_{key}_{value}") + observation_atoms.append(obs_atom) + + # Create evaluation link for the observation + obs_predicate = self.atomspace.create_predicate_node("observed") + self.atomspace.create_evaluation_link(obs_predicate, [obs_atom], + truth_value=0.9) + + # Update working memory + self.memory.active_concepts.extend(observation_atoms) + self.memory.add_experience({ + "type": "observation", + "content": observations, + "atoms": [atom.atom_id for atom in observation_atoms] + }) + + # Update attention based on observations + self._update_attention(observation_atoms) + + def reason(self, query: Optional[str] = None) -> ReasoningResult: + """ + Perform reasoning about the current situation. + + Args: + query: Optional specific query to reason about + + Returns: + ReasoningResult with conclusions and reasoning path + """ + self.state = CognitiveState.REASONING + self.logger.debug(f"Reasoning about: {query or 'current situation'}") + + if query: + # Specific query reasoning + query_pattern = self._parse_query_to_pattern(query) + result = self.reasoning_engine.backward_chaining(query_pattern) + else: + # General situation reasoning + result = self._reason_about_current_situation() + + # Update memory with reasoning results + self.memory.add_experience({ + "type": "reasoning", + "query": query, + "result": result, + "confidence": result.confidence + }) + + # Update confidence levels + if result.conclusion: + self.memory.confidence_levels[result.conclusion.name] = result.confidence + + return result + + def plan(self, goal: str, context: Dict[str, Any] = None) -> List[CognitiveAction]: + """ + Create a plan to achieve a goal. + + Args: + goal: Goal description + context: Additional context information + + Returns: + List of cognitive actions forming a plan + """ + self.state = CognitiveState.PLANNING + self.logger.debug(f"Planning for goal: {goal}") + + # Create goal atom + goal_atom = self.atomspace.create_concept_node(f"goal_{goal}") + self.memory.current_goals.append(goal_atom) + + # Use knowledge graph to find relevant actions + relevant_actions = self.knowledge_graph.find_related_entities( + "action", max_distance=2 + ) + + plan = [] + + # Simple planning: create actions based on goal and context + if "explore" in goal.lower(): + action = CognitiveAction( + action_type="explore_environment", + parameters={"strategy": "systematic"}, + confidence=0.8, + reasoning_path=["Goal requires exploration", "Systematic exploration is effective"], + expected_outcome="Increased knowledge of environment" + ) + plan.append(action) + + elif "learn" in goal.lower(): + action = CognitiveAction( + action_type="analyze_patterns", + parameters={"focus": context.get("domain", "general")}, + confidence=0.7, + reasoning_path=["Learning goal identified", "Pattern analysis aids learning"], + expected_outcome="Improved understanding" + ) + plan.append(action) + + else: + # Default action: gather more information + action = CognitiveAction( + action_type="gather_information", + parameters={"target": goal}, + confidence=0.6, + reasoning_path=["Goal not fully understood", "More information needed"], + expected_outcome="Better goal understanding" + ) + plan.append(action) + + # Add plan to memory + self.memory.add_experience({ + "type": "planning", + "goal": goal, + "plan": plan, + "context": context + }) + + return plan + + def act(self, action: CognitiveAction) -> Dict[str, Any]: + """ + Execute a cognitive action. + + Args: + action: CognitiveAction to execute + + Returns: + Dictionary with action results + """ + self.state = CognitiveState.ACTING + self.logger.debug(f"Executing action: {action}") + + result = {"success": False, "output": None, "confidence": 0.0} + + try: + if action.action_type == "explore_environment": + result = self._execute_exploration(action) + elif action.action_type == "analyze_patterns": + result = self._execute_pattern_analysis(action) + elif action.action_type == "gather_information": + result = self._execute_information_gathering(action) + else: + # Generic action execution + result = self._execute_generic_action(action) + + # Record action execution + self.action_history.append(action) + + # Update success rate + if result["success"]: + self.success_rate = (self.success_rate * len(self.action_history) + 1) / (len(self.action_history) + 1) + + # Add to memory + self.memory.add_experience({ + "type": "action_execution", + "action": action, + "result": result + }) + + except Exception as e: + self.logger.error(f"Error executing action {action}: {e}") + result["error"] = str(e) + + return result + + def learn(self, feedback: Dict[str, Any]) -> None: + """ + Learn from feedback and update knowledge. + + Args: + feedback: Feedback information including rewards, corrections, etc. + """ + self.state = CognitiveState.LEARNING + self.logger.debug(f"Learning from feedback: {feedback}") + + # Extract learning signals + reward = feedback.get("reward", 0.0) + correction = feedback.get("correction") + success = feedback.get("success", False) + + # Update knowledge based on feedback + if success and self.action_history: + # Reinforce successful actions + last_action = self.action_history[-1] + self._reinforce_action_knowledge(last_action, reward) + + if correction: + # Learn from corrections + self._learn_from_correction(correction) + + # Update confidence levels + recent_experiences = self.memory.get_relevant_experiences("action", limit=5) + if recent_experiences: + successful_actions = 0 + for exp in recent_experiences: + if isinstance(exp, dict) and "result" in exp: + result = exp.get("result", {}) + if isinstance(result, dict) and result.get("success", False): + successful_actions += 1 + + avg_success = successful_actions / len(recent_experiences) if recent_experiences else 0 + self.memory.confidence_levels["general_performance"] = avg_success + + # Add learning experience to memory + self.memory.add_experience({ + "type": "learning", + "feedback": feedback, + "adjustment": "knowledge_updated" + }) + + def cognitive_cycle(self, observations: Dict[str, Any], + goal: Optional[str] = None) -> Dict[str, Any]: + """ + Execute a complete cognitive cycle: perceive -> reason -> plan -> act. + + Args: + observations: Current observations + goal: Optional goal to work towards + + Returns: + Dictionary with cycle results + """ + self.cycle_count += 1 + cycle_start = time.time() + + # Perceive + self.perceive(observations) + + # Reason about current situation + reasoning_result = self.reason() + + # Plan if we have a goal + plan = [] + if goal: + plan = self.plan(goal, {"observations": observations}) + + # Act on the first planned action or a default action + action_result = None + if plan: + action_result = self.act(plan[0]) + else: + # Default action: analyze current situation + default_action = CognitiveAction( + action_type="analyze_situation", + parameters={"observations": observations}, + confidence=0.5, + reasoning_path=["No specific plan", "Analyzing situation"] + ) + action_result = self.act(default_action) + + cycle_time = time.time() - cycle_start + + cycle_result = { + "cycle_number": self.cycle_count, + "cycle_time": cycle_time, + "reasoning_result": reasoning_result, + "plan": plan, + "action_result": action_result, + "attention_focus": self.memory.attention_focus.name if self.memory.attention_focus else None, + "active_concepts": len(self.memory.active_concepts), + "success_rate": self.success_rate + } + + self.logger.info(f"Completed cognitive cycle {self.cycle_count} in {cycle_time:.3f}s") + return cycle_result + + def _update_attention(self, new_atoms: List[Atom]): + """Update attention focus based on new information.""" + if not new_atoms: + return + + if self.attention_mode == AttentionMode.FOCUSED: + # Focus on the most salient atom (simplified) + self.memory.attention_focus = new_atoms[0] + elif self.attention_mode == AttentionMode.EXPLORATORY: + # Rotate attention among new atoms + if len(new_atoms) > 1: + self.memory.attention_focus = new_atoms[1] + + # Update active concepts (limited working memory) + self.memory.active_concepts.extend(new_atoms) + self.memory.active_concepts = self.memory.active_concepts[-20:] # Keep last 20 + + def _parse_query_to_pattern(self, query: str) -> Dict[str, Any]: + """Parse a natural language query into a pattern for reasoning.""" + # Simplified query parsing - would be more sophisticated in practice + query_lower = query.lower() + + if "what" in query_lower: + return {"type": "evaluation", "predicate": "has_property", "args": ["$entity", "$property"]} + elif "who" in query_lower: + return {"type": "concept", "name": "$person"} + elif "where" in query_lower: + return {"type": "evaluation", "predicate": "located_at", "args": ["$entity", "$location"]} + else: + return {"type": "concept", "name": "$unknown"} + + def _reason_about_current_situation(self) -> ReasoningResult: + """Reason about the current situation based on active concepts.""" + # Create a general situational query + situation_query = { + "type": "evaluation", + "predicate": "current_situation", + "args": ["$state"] + } + + return self.reasoning_engine.backward_chaining(situation_query, max_depth=3) + + def _execute_exploration(self, action: CognitiveAction) -> Dict[str, Any]: + """Execute exploration action.""" + strategy = action.parameters.get("strategy", "random") + + # Use pattern matcher to find unexplored areas + query = PatternQuery({"type": "concept", "name": "$unknown"}) + matches = self.pattern_matcher.match_pattern(query, MatchType.FUZZY) + + result = { + "success": True, + "output": f"Explored using {strategy} strategy", + "confidence": action.confidence, + "discoveries": len(matches) + } + + return result + + def _execute_pattern_analysis(self, action: CognitiveAction) -> Dict[str, Any]: + """Execute pattern analysis action.""" + focus = action.parameters.get("focus", "general") + + # Find patterns in recent experiences + relevant_experiences = self.memory.get_relevant_experiences(focus, limit=10) + + # Simple pattern detection + pattern_count = len(set(exp.get("type") for exp in relevant_experiences)) + + result = { + "success": True, + "output": f"Analyzed patterns in {focus} domain", + "confidence": action.confidence, + "patterns_found": pattern_count, + "experience_count": len(relevant_experiences) + } + + return result + + def _execute_information_gathering(self, action: CognitiveAction) -> Dict[str, Any]: + """Execute information gathering action.""" + target = action.parameters.get("target", "general") + + # Query atomspace for relevant information + target_atoms = self.atomspace.find_atoms(name=target) + related_info = [] + + for atom in target_atoms: + incoming = self.atomspace.get_incoming_set(atom) + outgoing = self.atomspace.get_outgoing_set(atom) + related_info.extend(incoming + outgoing) + + result = { + "success": len(related_info) > 0, + "output": f"Gathered information about {target}", + "confidence": action.confidence, + "information_pieces": len(related_info) + } + + return result + + def _execute_generic_action(self, action: CognitiveAction) -> Dict[str, Any]: + """Execute a generic action.""" + result = { + "success": True, + "output": f"Executed {action.action_type}", + "confidence": action.confidence * 0.8 # Lower confidence for generic actions + } + + return result + + def _reinforce_action_knowledge(self, action: CognitiveAction, reward: float): + """Reinforce knowledge about successful actions.""" + # Create or update action-outcome relationships + action_atom = self.atomspace.create_concept_node(f"action_{action.action_type}") + outcome_atom = self.atomspace.create_concept_node("positive_outcome") + + # Create evaluation link with confidence based on reward + leads_to_predicate = self.atomspace.create_predicate_node("leads_to") + self.atomspace.create_evaluation_link( + leads_to_predicate, + [action_atom, outcome_atom], + truth_value=min(1.0, max(0.1, reward)) + ) + + def _learn_from_correction(self, correction: str): + """Learn from correction feedback.""" + # Simple correction learning - create negative examples + correction_atom = self.atomspace.create_concept_node(f"correction_{correction}") + mistake_atom = self.atomspace.create_concept_node("mistake") + + self.atomspace.create_inheritance_link(correction_atom, mistake_atom, 0.8) + + def get_cognitive_state(self) -> Dict[str, Any]: + """Get current cognitive state information.""" + return { + "state": self.state.value, + "attention_mode": self.attention_mode.value, + "attention_focus": self.memory.attention_focus.name if self.memory.attention_focus else None, + "active_concepts": len(self.memory.active_concepts), + "current_goals": [goal.name for goal in self.memory.current_goals], + "recent_experiences": len(self.memory.recent_experiences), + "confidence_levels": self.memory.confidence_levels.copy(), + "atomspace_size": self.atomspace.size(), + "success_rate": self.success_rate, + "cycle_count": self.cycle_count + } + + def set_attention_mode(self, mode: AttentionMode): + """Set the attention allocation mode.""" + self.attention_mode = mode + self.logger.debug(f"Attention mode set to: {mode.value}") + + def clear_memory(self): + """Clear working memory while preserving long-term knowledge.""" + self.memory = CognitiveMemory() + self.logger.info("Working memory cleared") \ No newline at end of file diff --git a/openmanus_rl/opencog_systems/config.py b/openmanus_rl/opencog_systems/config.py new file mode 100644 index 00000000..6369e67d --- /dev/null +++ b/openmanus_rl/opencog_systems/config.py @@ -0,0 +1,351 @@ +""" +Configuration settings for OpenCog systems in OpenManus-RL. + +This module provides configuration management for OpenCog components +including AtomSpace, reasoning engines, and cognitive architectures. +""" + +from typing import Dict, Any, Optional +from dataclasses import dataclass, field +from enum import Enum +import os +import json +import yaml + + +class ReasoningStrategy(Enum): + """Available reasoning strategies.""" + FORWARD_CHAINING = "forward_chaining" + BACKWARD_CHAINING = "backward_chaining" + HYBRID = "hybrid" + PROBABILISTIC = "probabilistic" + + +class AttentionStrategy(Enum): + """Available attention allocation strategies.""" + FOCUSED = "focused" + DISTRIBUTED = "distributed" + ADAPTIVE = "adaptive" + GOAL_DIRECTED = "goal_directed" + + +@dataclass +class AtomSpaceConfig: + """Configuration for AtomSpace.""" + initial_size_limit: int = 100000 + garbage_collection_threshold: float = 0.8 + truth_value_precision: float = 0.001 + confidence_threshold: float = 0.1 + enable_persistence: bool = False + persistence_file: Optional[str] = None + + def __post_init__(self): + if self.enable_persistence and not self.persistence_file: + self.persistence_file = "/tmp/atomspace_dump.json" + + +@dataclass +class ReasoningConfig: + """Configuration for reasoning engine.""" + default_strategy: ReasoningStrategy = ReasoningStrategy.HYBRID + max_iterations: int = 10 + max_depth: int = 5 + confidence_threshold: float = 0.5 + rule_strength_decay: float = 0.9 + enable_explanation: bool = True + cache_results: bool = True + cache_size_limit: int = 1000 + + # Forward chaining parameters + forward_chaining_enabled: bool = True + forward_max_iterations: int = 5 + + # Backward chaining parameters + backward_chaining_enabled: bool = True + backward_max_depth: int = 3 + + # Probabilistic reasoning parameters + probabilistic_enabled: bool = False + prior_probability: float = 0.1 + evidence_weight: float = 0.8 + + +@dataclass +class PatternMatchingConfig: + """Configuration for pattern matching.""" + match_threshold: float = 0.7 + max_results: int = 100 + enable_fuzzy_matching: bool = True + enable_structural_matching: bool = True + enable_semantic_matching: bool = False + + # Fuzzy matching parameters + fuzzy_string_similarity_threshold: float = 0.6 + fuzzy_type_weight: float = 0.3 + fuzzy_name_weight: float = 0.4 + fuzzy_structure_weight: float = 0.3 + + # Structural matching parameters + structural_arity_weight: float = 0.4 + structural_type_weight: float = 0.6 + + # Semantic matching parameters (requires embeddings) + semantic_embedding_model: Optional[str] = None + semantic_similarity_threshold: float = 0.8 + + +@dataclass +class CognitiveConfig: + """Configuration for cognitive architecture.""" + attention_strategy: AttentionStrategy = AttentionStrategy.ADAPTIVE + working_memory_size: int = 20 + experience_history_size: int = 100 + learning_rate: float = 0.1 + exploration_factor: float = 0.2 + + # Cognitive cycle parameters + max_cycle_time: float = 1.0 # seconds + enable_parallel_processing: bool = False + + # Memory management + memory_consolidation_threshold: int = 50 + memory_forgetting_rate: float = 0.01 + + # Goal management + max_active_goals: int = 5 + goal_priority_decay: float = 0.95 + + # Action selection + action_selection_strategy: str = "confidence_based" + action_confidence_threshold: float = 0.3 + + +@dataclass +class KnowledgeGraphConfig: + """Configuration for knowledge representation.""" + max_entities: int = 10000 + max_relationships: int = 50000 + relationship_strength_threshold: float = 0.1 + enable_inference: bool = True + inference_iterations: int = 3 + + # Entity management + entity_similarity_threshold: float = 0.9 + enable_entity_merging: bool = False + + # Relationship management + relationship_decay_rate: float = 0.001 + enable_relationship_pruning: bool = True + + # Ontology parameters + max_ontology_depth: int = 10 + enable_ontology_validation: bool = True + + +@dataclass +class OpenCogConfig: + """Main configuration container for all OpenCog systems.""" + atomspace: AtomSpaceConfig = field(default_factory=AtomSpaceConfig) + reasoning: ReasoningConfig = field(default_factory=ReasoningConfig) + pattern_matching: PatternMatchingConfig = field(default_factory=PatternMatchingConfig) + cognitive: CognitiveConfig = field(default_factory=CognitiveConfig) + knowledge_graph: KnowledgeGraphConfig = field(default_factory=KnowledgeGraphConfig) + + # Global settings + enable_logging: bool = True + log_level: str = "INFO" + enable_metrics: bool = False + metrics_output_file: Optional[str] = None + + # Integration settings + integration_mode: str = "standalone" # "standalone" or "openmanus_native" + enable_real_opencog: bool = False # Use real OpenCog if available + opencog_config_file: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert configuration to dictionary.""" + def convert_dataclass(obj): + if hasattr(obj, '__dataclass_fields__'): + return {field: convert_dataclass(getattr(obj, field)) + for field in obj.__dataclass_fields__} + elif isinstance(obj, Enum): + return obj.value + else: + return obj + + return convert_dataclass(self) + + def save_to_file(self, filepath: str, format: str = "yaml"): + """Save configuration to file.""" + config_dict = self.to_dict() + + with open(filepath, 'w') as f: + if format.lower() == "yaml": + yaml.dump(config_dict, f, indent=2, default_flow_style=False) + elif format.lower() == "json": + json.dump(config_dict, f, indent=2) + else: + raise ValueError(f"Unsupported format: {format}") + + @classmethod + def load_from_file(cls, filepath: str) -> 'OpenCogConfig': + """Load configuration from file.""" + with open(filepath, 'r') as f: + if filepath.endswith('.yaml') or filepath.endswith('.yml'): + data = yaml.safe_load(f) + elif filepath.endswith('.json'): + data = json.load(f) + else: + raise ValueError(f"Unsupported file format: {filepath}") + + return cls.from_dict(data) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'OpenCogConfig': + """Create configuration from dictionary.""" + # This is a simplified version - a full implementation would + # recursively construct all nested dataclass objects + config = cls() + + # Update top-level fields + for key, value in data.items(): + if hasattr(config, key) and not key.startswith('_'): + setattr(config, key, value) + + return config + + def merge_with(self, other: 'OpenCogConfig') -> 'OpenCogConfig': + """Merge this configuration with another, with other taking precedence.""" + # Simplified merge - a full implementation would handle nested objects + merged_dict = self.to_dict() + other_dict = other.to_dict() + + def deep_merge(base, override): + for key, value in override.items(): + if key in base and isinstance(base[key], dict) and isinstance(value, dict): + deep_merge(base[key], value) + else: + base[key] = value + + deep_merge(merged_dict, other_dict) + return self.from_dict(merged_dict) + + +class ConfigManager: + """Manager for OpenCog configuration.""" + + def __init__(self): + self.config = OpenCogConfig() + self._config_file_path = None + + def load_config(self, config_path: Optional[str] = None) -> OpenCogConfig: + """ + Load configuration from file or environment. + + Args: + config_path: Path to configuration file, or None to use environment + + Returns: + Loaded configuration + """ + if config_path: + self.config = OpenCogConfig.load_from_file(config_path) + self._config_file_path = config_path + else: + # Try to load from environment variables or default locations + env_config_path = os.getenv('OPENMANUS_OPENCOG_CONFIG') + + default_paths = [ + './opencog_config.yaml', + './config/opencog.yaml', + '~/.openmanus/opencog_config.yaml', + '/etc/openmanus/opencog_config.yaml' + ] + + config_path = env_config_path + if not config_path: + for path in default_paths: + expanded_path = os.path.expanduser(path) + if os.path.exists(expanded_path): + config_path = expanded_path + break + + if config_path and os.path.exists(config_path): + self.config = OpenCogConfig.load_from_file(config_path) + self._config_file_path = config_path + else: + # Use default configuration + self.config = OpenCogConfig() + + # Override with environment variables + self._override_from_environment() + + return self.config + + def _override_from_environment(self): + """Override configuration with environment variables.""" + env_mappings = { + 'OPENMANUS_OPENCOG_LOG_LEVEL': ('log_level', str), + 'OPENMANUS_OPENCOG_ENABLE_REAL': ('enable_real_opencog', lambda x: x.lower() == 'true'), + 'OPENMANUS_REASONING_MAX_DEPTH': ('reasoning.max_depth', int), + 'OPENMANUS_COGNITIVE_LEARNING_RATE': ('cognitive.learning_rate', float), + 'OPENMANUS_ATOMSPACE_SIZE_LIMIT': ('atomspace.initial_size_limit', int), + } + + for env_var, (config_path, converter) in env_mappings.items(): + value = os.getenv(env_var) + if value is not None: + try: + converted_value = converter(value) + self._set_nested_attr(self.config, config_path, converted_value) + except (ValueError, TypeError) as e: + print(f"Warning: Invalid value for {env_var}: {value} ({e})") + + def _set_nested_attr(self, obj, attr_path: str, value): + """Set nested attribute using dot notation.""" + parts = attr_path.split('.') + for part in parts[:-1]: + obj = getattr(obj, part) + setattr(obj, parts[-1], value) + + def save_config(self, config_path: Optional[str] = None): + """Save current configuration to file.""" + path = config_path or self._config_file_path or './opencog_config.yaml' + self.config.save_to_file(path) + + def get_config(self) -> OpenCogConfig: + """Get current configuration.""" + return self.config + + def update_config(self, **kwargs): + """Update configuration parameters.""" + for key, value in kwargs.items(): + if hasattr(self.config, key): + setattr(self.config, key, value) + + def reset_to_defaults(self): + """Reset configuration to defaults.""" + self.config = OpenCogConfig() + + +# Global configuration manager instance +config_manager = ConfigManager() + + +def get_config() -> OpenCogConfig: + """Get the global OpenCog configuration.""" + return config_manager.get_config() + + +def load_config(config_path: Optional[str] = None) -> OpenCogConfig: + """Load OpenCog configuration from file or environment.""" + return config_manager.load_config(config_path) + + +def update_config(**kwargs): + """Update global configuration parameters.""" + config_manager.update_config(**kwargs) + + +# Default configuration instance for easy access +default_config = OpenCogConfig() \ No newline at end of file diff --git a/openmanus_rl/opencog_systems/knowledge_representation.py b/openmanus_rl/opencog_systems/knowledge_representation.py new file mode 100644 index 00000000..7be6ac95 --- /dev/null +++ b/openmanus_rl/opencog_systems/knowledge_representation.py @@ -0,0 +1,642 @@ +""" +Knowledge Representation module using OpenCog for OpenManus-RL. + +This module provides high-level knowledge representation capabilities +including knowledge graphs, ontologies, and semantic networks. +""" + +from typing import Dict, List, Any, Optional, Set, Tuple, Union +import json +import logging +from dataclasses import dataclass, field +from enum import Enum +import uuid + +from .atomspace_integration import AtomSpaceManager, Atom, AtomType + + +class RelationType(Enum): + """Types of relationships in the knowledge graph.""" + INHERITANCE = "inheritance" + SIMILARITY = "similarity" + CAUSATION = "causation" + ASSOCIATION = "association" + COMPOSITION = "composition" + TEMPORAL = "temporal" + SPATIAL = "spatial" + + +@dataclass +class Entity: + """Represents an entity in the knowledge graph.""" + id: str + name: str + entity_type: str + properties: Dict[str, Any] = field(default_factory=dict) + atom: Optional[Atom] = None + + def __post_init__(self): + if self.id is None: + self.id = str(uuid.uuid4()) + + +@dataclass +class Relationship: + """Represents a relationship between entities.""" + id: str + source_id: str + target_id: str + relation_type: RelationType + strength: float = 1.0 + properties: Dict[str, Any] = field(default_factory=dict) + atom: Optional[Atom] = None + + def __post_init__(self): + if self.id is None: + self.id = str(uuid.uuid4()) + + +class KnowledgeGraph: + """ + Knowledge Graph implementation using OpenCog AtomSpace. + + Provides high-level operations for building and querying + semantic knowledge representations. + """ + + def __init__(self, atomspace: AtomSpaceManager): + self.atomspace = atomspace + self.entities: Dict[str, Entity] = {} + self.relationships: Dict[str, Relationship] = {} + self.logger = logging.getLogger(__name__) + + # Indexes for efficient querying + self.entity_by_name: Dict[str, str] = {} + self.entity_by_type: Dict[str, Set[str]] = {} + self.relationships_by_source: Dict[str, Set[str]] = {} + self.relationships_by_target: Dict[str, Set[str]] = {} + + # Initialize ontology structure + self._initialize_ontology() + + def _initialize_ontology(self): + """Initialize basic ontological concepts.""" + + # Basic top-level concepts + self.add_entity("Entity", {"type": "ontology_class", "level": 0}) + self.add_entity("Relationship", {"type": "ontology_class", "level": 0}) + self.add_entity("Property", {"type": "ontology_class", "level": 0}) + + # Agent-related concepts + self.add_entity("Agent", {"type": "ontology_class", "parent": "Entity"}) + self.add_entity("Action", {"type": "ontology_class", "parent": "Entity"}) + self.add_entity("Goal", {"type": "ontology_class", "parent": "Entity"}) + self.add_entity("State", {"type": "ontology_class", "parent": "Entity"}) + + # Environment concepts + self.add_entity("Environment", {"type": "ontology_class", "parent": "Entity"}) + self.add_entity("Object", {"type": "ontology_class", "parent": "Entity"}) + self.add_entity("Location", {"type": "ontology_class", "parent": "Entity"}) + + # Create inheritance relationships + self._create_inheritance_relationships() + + self.logger.info("Initialized knowledge graph ontology") + + def _create_inheritance_relationships(self): + """Create basic inheritance relationships in the ontology.""" + inheritance_pairs = [ + ("Agent", "Entity"), + ("Action", "Entity"), + ("Goal", "Entity"), + ("State", "Entity"), + ("Environment", "Entity"), + ("Object", "Entity"), + ("Location", "Entity") + ] + + for child, parent in inheritance_pairs: + if child in self.entity_by_name and parent in self.entity_by_name: + child_id = self.entity_by_name[child] + parent_id = self.entity_by_name[parent] + self.add_relationship(child_id, parent_id, RelationType.INHERITANCE) + + def add_entity(self, name: str, properties: Dict[str, Any] = None, + entity_type: str = "concept") -> str: + """ + Add an entity to the knowledge graph. + + Args: + name: Name of the entity + properties: Dictionary of entity properties + entity_type: Type classification of the entity + + Returns: + Entity ID + """ + properties = properties or {} + + # Create entity + entity = Entity( + id=str(uuid.uuid4()), + name=name, + entity_type=entity_type, + properties=properties + ) + + # Create corresponding atom in atomspace + atom = self.atomspace.create_concept_node(name) + entity.atom = atom + + # Store entity + self.entities[entity.id] = entity + + # Update indexes + self.entity_by_name[name] = entity.id + if entity_type not in self.entity_by_type: + self.entity_by_type[entity_type] = set() + self.entity_by_type[entity_type].add(entity.id) + + # Add properties as evaluation links + for prop_name, prop_value in properties.items(): + self._add_property_atom(entity, prop_name, prop_value) + + self.logger.debug(f"Added entity: {name} ({entity_type})") + return entity.id + + def add_relationship(self, source_id: str, target_id: str, + relation_type: RelationType, + strength: float = 1.0, + properties: Dict[str, Any] = None) -> str: + """ + Add a relationship between two entities. + + Args: + source_id: ID of source entity + target_id: ID of target entity + relation_type: Type of relationship + strength: Strength of the relationship (0.0 to 1.0) + properties: Additional properties of the relationship + + Returns: + Relationship ID + """ + if source_id not in self.entities or target_id not in self.entities: + raise ValueError("Source or target entity not found") + + properties = properties or {} + + # Create relationship + relationship = Relationship( + id=str(uuid.uuid4()), + source_id=source_id, + target_id=target_id, + relation_type=relation_type, + strength=strength, + properties=properties + ) + + # Create corresponding atom in atomspace + source_atom = self.entities[source_id].atom + target_atom = self.entities[target_id].atom + + if relation_type == RelationType.INHERITANCE: + atom = self.atomspace.create_inheritance_link( + source_atom, target_atom, strength + ) + elif relation_type == RelationType.SIMILARITY: + atom = self.atomspace.add_atom( + AtomType.SIMILARITY_LINK, + f"similarity_{source_id}_{target_id}", + outgoing=[source_atom, target_atom], + truth_value=strength + ) + else: + # Generic relationship as evaluation link + predicate = self.atomspace.create_predicate_node(relation_type.value) + atom = self.atomspace.create_evaluation_link( + predicate, [source_atom, target_atom], strength + ) + + relationship.atom = atom + + # Store relationship + self.relationships[relationship.id] = relationship + + # Update indexes + if source_id not in self.relationships_by_source: + self.relationships_by_source[source_id] = set() + if target_id not in self.relationships_by_target: + self.relationships_by_target[target_id] = set() + + self.relationships_by_source[source_id].add(relationship.id) + self.relationships_by_target[target_id].add(relationship.id) + + self.logger.debug(f"Added relationship: {relation_type.value} " + f"({self.entities[source_id].name} -> {self.entities[target_id].name})") + return relationship.id + + def get_entity(self, entity_id: str) -> Optional[Entity]: + """Get an entity by ID.""" + return self.entities.get(entity_id) + + def get_entity_by_name(self, name: str) -> Optional[Entity]: + """Get an entity by name.""" + entity_id = self.entity_by_name.get(name) + return self.entities.get(entity_id) if entity_id else None + + def get_entities_by_type(self, entity_type: str) -> List[Entity]: + """Get all entities of a specific type.""" + entity_ids = self.entity_by_type.get(entity_type, set()) + return [self.entities[entity_id] for entity_id in entity_ids] + + def get_relationship(self, relationship_id: str) -> Optional[Relationship]: + """Get a relationship by ID.""" + return self.relationships.get(relationship_id) + + def get_outgoing_relationships(self, entity_id: str) -> List[Relationship]: + """Get all relationships where the entity is the source.""" + rel_ids = self.relationships_by_source.get(entity_id, set()) + return [self.relationships[rel_id] for rel_id in rel_ids] + + def get_incoming_relationships(self, entity_id: str) -> List[Relationship]: + """Get all relationships where the entity is the target.""" + rel_ids = self.relationships_by_target.get(entity_id, set()) + return [self.relationships[rel_id] for rel_id in rel_ids] + + def find_path(self, source_id: str, target_id: str, + max_depth: int = 3) -> List[List[str]]: + """ + Find paths between two entities. + + Args: + source_id: Source entity ID + target_id: Target entity ID + max_depth: Maximum path length + + Returns: + List of paths, where each path is a list of entity IDs + """ + paths = [] + visited = set() + + def dfs(current_id: str, path: List[str], depth: int): + if depth > max_depth: + return + + if current_id == target_id: + paths.append(path + [current_id]) + return + + if current_id in visited: + return + + visited.add(current_id) + + # Explore outgoing relationships + for rel in self.get_outgoing_relationships(current_id): + dfs(rel.target_id, path + [current_id], depth + 1) + + visited.remove(current_id) + + dfs(source_id, [], 0) + return paths + + def find_related_entities(self, entity_id: str, + relation_types: List[RelationType] = None, + max_distance: int = 2) -> List[Tuple[Entity, float]]: + """ + Find entities related to the given entity. + + Args: + entity_id: Entity to find relations for + relation_types: Filter by specific relation types + max_distance: Maximum relationship distance + + Returns: + List of (entity, distance) tuples + """ + if entity_id not in self.entities: + return [] + + related = {} + queue = [(entity_id, 0)] + visited = set() + + while queue: + current_id, distance = queue.pop(0) + + if current_id in visited or distance > max_distance: + continue + + visited.add(current_id) + + # Get all relationships + outgoing = self.get_outgoing_relationships(current_id) + incoming = self.get_incoming_relationships(current_id) + + for rel in outgoing + incoming: + # Filter by relation type if specified + if relation_types and rel.relation_type not in relation_types: + continue + + # Get the other entity + other_id = rel.target_id if rel.source_id == current_id else rel.source_id + + if other_id != entity_id and other_id not in visited: + # Calculate weighted distance + weighted_distance = distance + (1.0 - rel.strength) + + if other_id not in related or weighted_distance < related[other_id]: + related[other_id] = weighted_distance + queue.append((other_id, distance + 1)) + + # Convert to list of (entity, distance) tuples + result = [(self.entities[eid], dist) for eid, dist in related.items()] + result.sort(key=lambda x: x[1]) # Sort by distance + + return result + + def query_entities(self, **criteria) -> List[Entity]: + """ + Query entities based on various criteria. + + Args: + **criteria: Query criteria (name, type, properties, etc.) + + Returns: + List of matching entities + """ + candidates = list(self.entities.values()) + + # Filter by name + if 'name' in criteria: + name_filter = criteria['name'] + if isinstance(name_filter, str): + candidates = [e for e in candidates if name_filter.lower() in e.name.lower()] + + # Filter by type + if 'type' in criteria: + type_filter = criteria['type'] + candidates = [e for e in candidates if e.entity_type == type_filter] + + # Filter by properties + for key, value in criteria.items(): + if key not in ['name', 'type']: + candidates = [e for e in candidates + if key in e.properties and e.properties[key] == value] + + return candidates + + def get_entity_neighborhood(self, entity_id: str, + radius: int = 1) -> Dict[str, Any]: + """ + Get the neighborhood of an entity (nearby entities and relationships). + + Args: + entity_id: Entity to get neighborhood for + radius: Neighborhood radius + + Returns: + Dictionary with neighborhood information + """ + if entity_id not in self.entities: + return {} + + neighborhood = { + "center_entity": self.entities[entity_id], + "entities": {}, + "relationships": [] + } + + # Get related entities within radius + related = self.find_related_entities(entity_id, max_distance=radius) + + for entity, distance in related: + neighborhood["entities"][entity.id] = { + "entity": entity, + "distance": distance + } + + # Get all relationships in the neighborhood + all_entity_ids = {entity_id} | set(neighborhood["entities"].keys()) + + for eid in all_entity_ids: + for rel in self.get_outgoing_relationships(eid): + if rel.target_id in all_entity_ids: + neighborhood["relationships"].append(rel) + + return neighborhood + + def add_temporal_relationship(self, event1_id: str, event2_id: str, + temporal_type: str = "before", + time_distance: float = 1.0) -> str: + """ + Add a temporal relationship between two events/entities. + + Args: + event1_id: First event entity ID + event2_id: Second event entity ID + temporal_type: Type of temporal relation (before, after, during, etc.) + time_distance: Temporal distance measure + + Returns: + Relationship ID + """ + properties = { + "temporal_type": temporal_type, + "time_distance": time_distance + } + + return self.add_relationship( + event1_id, event2_id, + RelationType.TEMPORAL, + strength=1.0 - min(0.9, time_distance / 10.0), # Closer in time = stronger + properties=properties + ) + + def add_causal_relationship(self, cause_id: str, effect_id: str, + causation_strength: float = 0.8, + evidence: List[str] = None) -> str: + """ + Add a causal relationship between entities. + + Args: + cause_id: Cause entity ID + effect_id: Effect entity ID + causation_strength: Strength of causal relationship + evidence: List of evidence supporting the causation + + Returns: + Relationship ID + """ + properties = { + "causation_type": "direct", + "evidence": evidence or [] + } + + return self.add_relationship( + cause_id, effect_id, + RelationType.CAUSATION, + strength=causation_strength, + properties=properties + ) + + def infer_relationships(self) -> List[str]: + """ + Infer new relationships based on existing knowledge. + + Returns: + List of new relationship IDs + """ + new_relationships = [] + + # Transitivity inference for inheritance + for entity_id in self.entities: + parents = [] + for rel in self.get_outgoing_relationships(entity_id): + if rel.relation_type == RelationType.INHERITANCE: + parents.append(rel.target_id) + + # Find grandparents + for parent_id in parents: + for rel in self.get_outgoing_relationships(parent_id): + if rel.relation_type == RelationType.INHERITANCE: + grandparent_id = rel.target_id + + # Check if direct relationship already exists + existing = any( + r.target_id == grandparent_id and r.relation_type == RelationType.INHERITANCE + for r in self.get_outgoing_relationships(entity_id) + ) + + if not existing: + # Infer transitive inheritance + new_rel_id = self.add_relationship( + entity_id, grandparent_id, + RelationType.INHERITANCE, + strength=0.6 # Lower strength for inferred relationships + ) + new_relationships.append(new_rel_id) + + # Similarity symmetry inference + for rel in self.relationships.values(): + if rel.relation_type == RelationType.SIMILARITY: + # Check if reverse relationship exists + reverse_exists = any( + r.source_id == rel.target_id and r.target_id == rel.source_id + and r.relation_type == RelationType.SIMILARITY + for r in self.relationships.values() + ) + + if not reverse_exists: + new_rel_id = self.add_relationship( + rel.target_id, rel.source_id, + RelationType.SIMILARITY, + strength=rel.strength + ) + new_relationships.append(new_rel_id) + + self.logger.info(f"Inferred {len(new_relationships)} new relationships") + return new_relationships + + def _add_property_atom(self, entity: Entity, prop_name: str, prop_value: Any): + """Add a property as an evaluation link in the atomspace.""" + prop_predicate = self.atomspace.create_predicate_node(f"has_{prop_name}") + value_atom = self.atomspace.create_concept_node(str(prop_value)) + + self.atomspace.create_evaluation_link( + prop_predicate, + [entity.atom, value_atom], + truth_value=0.9 + ) + + def export_to_dict(self) -> Dict[str, Any]: + """Export the knowledge graph to a dictionary.""" + export_data = { + "entities": {}, + "relationships": {}, + "metadata": { + "total_entities": len(self.entities), + "total_relationships": len(self.relationships), + "entity_types": list(self.entity_by_type.keys()) + } + } + + # Export entities + for entity_id, entity in self.entities.items(): + export_data["entities"][entity_id] = { + "id": entity.id, + "name": entity.name, + "type": entity.entity_type, + "properties": entity.properties + } + + # Export relationships + for rel_id, rel in self.relationships.items(): + export_data["relationships"][rel_id] = { + "id": rel.id, + "source_id": rel.source_id, + "target_id": rel.target_id, + "relation_type": rel.relation_type.value, + "strength": rel.strength, + "properties": rel.properties + } + + return export_data + + def import_from_dict(self, data: Dict[str, Any]): + """Import knowledge graph from a dictionary.""" + # Clear existing data + self.entities.clear() + self.relationships.clear() + self.entity_by_name.clear() + self.entity_by_type.clear() + self.relationships_by_source.clear() + self.relationships_by_target.clear() + + # Import entities + entities_data = data.get("entities", {}) + for entity_data in entities_data.values(): + entity_id = self.add_entity( + entity_data["name"], + entity_data.get("properties", {}), + entity_data.get("type", "concept") + ) + # Update the ID to match imported data + old_id = self.entity_by_name[entity_data["name"]] + if old_id != entity_data["id"]: + # Remap the entity + entity = self.entities.pop(old_id) + entity.id = entity_data["id"] + self.entities[entity_data["id"]] = entity + self.entity_by_name[entity_data["name"]] = entity_data["id"] + + # Import relationships + relationships_data = data.get("relationships", {}) + for rel_data in relationships_data.values(): + self.add_relationship( + rel_data["source_id"], + rel_data["target_id"], + RelationType(rel_data["relation_type"]), + rel_data.get("strength", 1.0), + rel_data.get("properties", {}) + ) + + def get_statistics(self) -> Dict[str, Any]: + """Get knowledge graph statistics.""" + entity_type_counts = {etype: len(eids) + for etype, eids in self.entity_by_type.items()} + + relation_type_counts = {} + for rel in self.relationships.values(): + rtype = rel.relation_type.value + relation_type_counts[rtype] = relation_type_counts.get(rtype, 0) + 1 + + return { + "total_entities": len(self.entities), + "total_relationships": len(self.relationships), + "entity_types": entity_type_counts, + "relation_types": relation_type_counts, + "atomspace_size": self.atomspace.size() + } \ No newline at end of file diff --git a/openmanus_rl/opencog_systems/pattern_matcher.py b/openmanus_rl/opencog_systems/pattern_matcher.py new file mode 100644 index 00000000..9e4d1744 --- /dev/null +++ b/openmanus_rl/opencog_systems/pattern_matcher.py @@ -0,0 +1,509 @@ +""" +OpenCog Pattern Matcher for symbolic pattern recognition and matching. + +This module provides pattern matching capabilities using OpenCog's AtomSpace, +enabling complex query processing and knowledge retrieval. +""" + +from typing import List, Dict, Any, Optional, Set, Union, Callable +import re +from dataclasses import dataclass +from enum import Enum + +from .atomspace_integration import AtomSpaceManager, Atom, AtomType + + +class MatchType(Enum): + """Types of pattern matching.""" + EXACT = "exact" + FUZZY = "fuzzy" + STRUCTURAL = "structural" + SEMANTIC = "semantic" + + +@dataclass +class PatternVariable: + """Represents a pattern variable with constraints.""" + name: str + atom_type: Optional[AtomType] = None + constraints: Optional[List[Callable]] = None + + def matches(self, atom: Atom) -> bool: + """Check if an atom satisfies this variable's constraints.""" + if self.atom_type and atom.atom_type != self.atom_type: + return False + + if self.constraints: + for constraint in self.constraints: + if not constraint(atom): + return False + + return True + + +@dataclass +class MatchResult: + """Result of a pattern matching operation.""" + bindings: Dict[str, Atom] + confidence: float + match_type: MatchType + + def __str__(self): + return f"Match(confidence={self.confidence:.3f}, bindings={len(self.bindings)})" + + +class PatternQuery: + """Represents a pattern query with variables and constraints.""" + + def __init__(self, pattern_dict: Dict[str, Any]): + self.pattern = pattern_dict + self.variables: Dict[str, PatternVariable] = {} + self._parse_variables() + + def _parse_variables(self): + """Extract variables from the pattern.""" + self._extract_variables_recursive(self.pattern) + + def _extract_variables_recursive(self, obj): + """Recursively extract variables from a nested structure.""" + if isinstance(obj, dict): + for key, value in obj.items(): + if isinstance(value, str) and value.startswith('$'): + if value not in self.variables: + self.variables[value] = PatternVariable(value) + elif isinstance(value, (dict, list)): + self._extract_variables_recursive(value) + elif isinstance(obj, list): + for item in obj: + self._extract_variables_recursive(item) + + def add_constraint(self, var_name: str, constraint: Callable[[Atom], bool]): + """Add a constraint to a variable.""" + if var_name in self.variables: + if self.variables[var_name].constraints is None: + self.variables[var_name].constraints = [] + self.variables[var_name].constraints.append(constraint) + + def set_variable_type(self, var_name: str, atom_type: AtomType): + """Set the required atom type for a variable.""" + if var_name in self.variables: + self.variables[var_name].atom_type = atom_type + + +class OpenCogPatternMatcher: + """ + OpenCog Pattern Matcher for complex symbolic pattern recognition. + + Provides sophisticated pattern matching capabilities including exact matching, + fuzzy matching, and structural similarity detection. + """ + + def __init__(self, atomspace: AtomSpaceManager): + self.atomspace = atomspace + self.match_threshold = 0.7 + self.max_results = 100 + + def match_pattern(self, query: PatternQuery, + match_type: MatchType = MatchType.EXACT) -> List[MatchResult]: + """ + Match a pattern query against the atomspace. + + Args: + query: Pattern query to match + match_type: Type of matching to perform + + Returns: + List of match results sorted by confidence + """ + if match_type == MatchType.EXACT: + return self._exact_match(query) + elif match_type == MatchType.FUZZY: + return self._fuzzy_match(query) + elif match_type == MatchType.STRUCTURAL: + return self._structural_match(query) + elif match_type == MatchType.SEMANTIC: + return self._semantic_match(query) + else: + raise ValueError(f"Unknown match type: {match_type}") + + def _exact_match(self, query: PatternQuery) -> List[MatchResult]: + """Perform exact pattern matching.""" + results = [] + + # Get all possible starting atoms based on the query + candidate_atoms = self._get_candidate_atoms(query.pattern) + + for atom in candidate_atoms: + bindings = {} + if self._try_match_atom(atom, query.pattern, bindings, query.variables): + result = MatchResult( + bindings=bindings.copy(), + confidence=1.0, + match_type=MatchType.EXACT + ) + results.append(result) + + return results[:self.max_results] + + def _fuzzy_match(self, query: PatternQuery) -> List[MatchResult]: + """Perform fuzzy pattern matching with similarity scoring.""" + results = [] + + candidate_atoms = self._get_candidate_atoms(query.pattern) + + for atom in candidate_atoms: + bindings = {} + similarity = self._calculate_fuzzy_similarity(atom, query.pattern, bindings, query.variables) + + if similarity >= self.match_threshold: + result = MatchResult( + bindings=bindings, + confidence=similarity, + match_type=MatchType.FUZZY + ) + results.append(result) + + # Sort by confidence (similarity) + results.sort(key=lambda r: r.confidence, reverse=True) + return results[:self.max_results] + + def _structural_match(self, query: PatternQuery) -> List[MatchResult]: + """Match based on structural similarity (link structure).""" + results = [] + + # Focus on link atoms for structural matching + link_atoms = [atom for atom in self.atomspace.atoms.values() + if atom.outgoing] + + for atom in link_atoms: + bindings = {} + structural_score = self._calculate_structural_similarity( + atom, query.pattern, bindings, query.variables + ) + + if structural_score >= self.match_threshold: + result = MatchResult( + bindings=bindings, + confidence=structural_score, + match_type=MatchType.STRUCTURAL + ) + results.append(result) + + results.sort(key=lambda r: r.confidence, reverse=True) + return results[:self.max_results] + + def _semantic_match(self, query: PatternQuery) -> List[MatchResult]: + """Match based on semantic similarity.""" + results = [] + + # Simplified semantic matching - in a full implementation, + # this would use word embeddings or semantic networks + + candidate_atoms = self._get_candidate_atoms(query.pattern) + + for atom in candidate_atoms: + bindings = {} + semantic_score = self._calculate_semantic_similarity( + atom, query.pattern, bindings, query.variables + ) + + if semantic_score >= self.match_threshold: + result = MatchResult( + bindings=bindings, + confidence=semantic_score, + match_type=MatchType.SEMANTIC + ) + results.append(result) + + results.sort(key=lambda r: r.confidence, reverse=True) + return results[:self.max_results] + + def _get_candidate_atoms(self, pattern: Dict[str, Any]) -> List[Atom]: + """Get candidate atoms that could potentially match the pattern.""" + pattern_type = pattern.get("type") + + if pattern_type == "concept": + return self.atomspace.find_atoms(AtomType.CONCEPT_NODE) + elif pattern_type == "predicate": + return self.atomspace.find_atoms(AtomType.PREDICATE_NODE) + elif pattern_type == "evaluation": + return self.atomspace.find_atoms(AtomType.EVALUATION_LINK) + elif pattern_type == "inheritance": + return self.atomspace.find_atoms(AtomType.INHERITANCE_LINK) + elif pattern_type == "similarity": + return self.atomspace.find_atoms(AtomType.SIMILARITY_LINK) + else: + # Return all atoms if type not specified + return list(self.atomspace.atoms.values()) + + def _try_match_atom(self, atom: Atom, pattern: Dict[str, Any], + bindings: Dict[str, Atom], + variables: Dict[str, PatternVariable]) -> bool: + """Try to match an atom against a pattern.""" + + # Handle variable patterns + if isinstance(pattern, str) and pattern.startswith('$'): + return self._bind_variable(pattern, atom, bindings, variables) + + # Handle dictionary patterns + if isinstance(pattern, dict): + pattern_type = pattern.get("type") + + # Check atom type matches + if pattern_type == "concept" and atom.atom_type != AtomType.CONCEPT_NODE: + return False + elif pattern_type == "evaluation" and atom.atom_type != AtomType.EVALUATION_LINK: + return False + + # Check name if specified + pattern_name = pattern.get("name") + if pattern_name: + if isinstance(pattern_name, str): + if pattern_name.startswith('$'): + if not self._bind_variable(pattern_name, atom, bindings, variables): + return False + elif pattern_name != atom.name: + return False + + # Check outgoing set for links + pattern_outgoing = pattern.get("outgoing") + if pattern_outgoing and len(pattern_outgoing) != len(atom.outgoing): + return False + + if pattern_outgoing: + for i, out_pattern in enumerate(pattern_outgoing): + if not self._try_match_atom(atom.outgoing[i], out_pattern, bindings, variables): + return False + + return True + + return False + + def _bind_variable(self, var_name: str, atom: Atom, + bindings: Dict[str, Atom], + variables: Dict[str, PatternVariable]) -> bool: + """Bind a variable to an atom if constraints are satisfied.""" + + # Check if variable already bound + if var_name in bindings: + return bindings[var_name] == atom + + # Check variable constraints + if var_name in variables: + variable = variables[var_name] + if not variable.matches(atom): + return False + + # Bind the variable + bindings[var_name] = atom + return True + + def _calculate_fuzzy_similarity(self, atom: Atom, pattern: Dict[str, Any], + bindings: Dict[str, Atom], + variables: Dict[str, PatternVariable]) -> float: + """Calculate fuzzy similarity between an atom and pattern.""" + + # Simplified fuzzy matching - would be more sophisticated in practice + base_score = 0.0 + + if isinstance(pattern, dict): + pattern_type = pattern.get("type") + + # Type similarity + if pattern_type == "concept" and atom.atom_type == AtomType.CONCEPT_NODE: + base_score += 0.3 + elif pattern_type == "evaluation" and atom.atom_type == AtomType.EVALUATION_LINK: + base_score += 0.3 + + # Name similarity + pattern_name = pattern.get("name") + if pattern_name and not pattern_name.startswith('$'): + name_similarity = self._string_similarity(atom.name, pattern_name) + base_score += 0.4 * name_similarity + + # Structural similarity for links + pattern_outgoing = pattern.get("outgoing") + if pattern_outgoing and atom.outgoing: + struct_similarity = min(len(pattern_outgoing), len(atom.outgoing)) / max(len(pattern_outgoing), len(atom.outgoing)) + base_score += 0.3 * struct_similarity + + return min(1.0, base_score) + + def _calculate_structural_similarity(self, atom: Atom, pattern: Dict[str, Any], + bindings: Dict[str, Atom], + variables: Dict[str, PatternVariable]) -> float: + """Calculate structural similarity based on graph structure.""" + + if not atom.outgoing: + return 0.1 # Low score for nodes in structural matching + + pattern_outgoing = pattern.get("outgoing", []) + if not pattern_outgoing: + return 0.1 + + # Compare outgoing structures + similarity = 0.0 + + # Arity similarity + arity_similarity = min(len(atom.outgoing), len(pattern_outgoing)) / max(len(atom.outgoing), len(pattern_outgoing)) + similarity += 0.4 * arity_similarity + + # Type distribution similarity + atom_types = [out.atom_type for out in atom.outgoing] + pattern_types = [] + for out_pattern in pattern_outgoing: + if isinstance(out_pattern, dict): + ptype = out_pattern.get("type") + if ptype == "concept": + pattern_types.append(AtomType.CONCEPT_NODE) + elif ptype == "predicate": + pattern_types.append(AtomType.PREDICATE_NODE) + + if pattern_types: + type_intersection = len(set(atom_types) & set(pattern_types)) + type_union = len(set(atom_types) | set(pattern_types)) + type_similarity = type_intersection / max(1, type_union) + similarity += 0.6 * type_similarity + + return min(1.0, similarity) + + def _calculate_semantic_similarity(self, atom: Atom, pattern: Dict[str, Any], + bindings: Dict[str, Atom], + variables: Dict[str, PatternVariable]) -> float: + """Calculate semantic similarity using name analysis.""" + + pattern_name = pattern.get("name") + if not pattern_name or pattern_name.startswith('$'): + return 0.5 # Default similarity for variables + + # Simple semantic similarity based on string matching + # In a full implementation, this would use word embeddings + return self._string_similarity(atom.name, pattern_name) + + def _string_similarity(self, str1: str, str2: str) -> float: + """Calculate string similarity using various metrics.""" + + # Exact match + if str1 == str2: + return 1.0 + + # Case-insensitive match + if str1.lower() == str2.lower(): + return 0.95 + + # Substring match + if str1 in str2 or str2 in str1: + return 0.8 + + # Edit distance similarity + edit_distance = self._levenshtein_distance(str1.lower(), str2.lower()) + max_len = max(len(str1), len(str2)) + if max_len == 0: + return 1.0 + + similarity = 1.0 - (edit_distance / max_len) + return max(0.0, similarity) + + def _levenshtein_distance(self, s1: str, s2: str) -> int: + """Calculate Levenshtein edit distance between two strings.""" + + if len(s1) < len(s2): + s1, s2 = s2, s1 + + if len(s2) == 0: + return len(s1) + + previous_row = list(range(len(s2) + 1)) + for i, c1 in enumerate(s1): + current_row = [i + 1] + for j, c2 in enumerate(s2): + insertions = previous_row[j + 1] + 1 + deletions = current_row[j] + 1 + substitutions = previous_row[j] + (c1 != c2) + current_row.append(min(insertions, deletions, substitutions)) + previous_row = current_row + + return previous_row[-1] + + def query_by_example(self, example_atom: Atom, similarity_threshold: float = 0.8) -> List[MatchResult]: + """Find atoms similar to an example atom.""" + results = [] + + for atom in self.atomspace.atoms.values(): + if atom == example_atom: + continue + + similarity = self._calculate_atom_similarity(example_atom, atom) + if similarity >= similarity_threshold: + result = MatchResult( + bindings={"$match": atom}, + confidence=similarity, + match_type=MatchType.FUZZY + ) + results.append(result) + + results.sort(key=lambda r: r.confidence, reverse=True) + return results[:self.max_results] + + def _calculate_atom_similarity(self, atom1: Atom, atom2: Atom) -> float: + """Calculate overall similarity between two atoms.""" + + # Type similarity + type_similarity = 1.0 if atom1.atom_type == atom2.atom_type else 0.3 + + # Name similarity + name_similarity = self._string_similarity(atom1.name, atom2.name) + + # Structure similarity for links + struct_similarity = 1.0 + if atom1.outgoing and atom2.outgoing: + if len(atom1.outgoing) == len(atom2.outgoing): + struct_similarity = 1.0 + else: + struct_similarity = min(len(atom1.outgoing), len(atom2.outgoing)) / max(len(atom1.outgoing), len(atom2.outgoing)) + elif atom1.outgoing or atom2.outgoing: + struct_similarity = 0.5 + + # Weighted combination + overall_similarity = (0.4 * type_similarity + + 0.4 * name_similarity + + 0.2 * struct_similarity) + + return overall_similarity + + def find_paths(self, start_atom: Atom, end_atom: Atom, + max_depth: int = 3) -> List[List[Atom]]: + """Find paths between two atoms in the knowledge graph.""" + paths = [] + + def dfs_path(current_atom: Atom, target_atom: Atom, + current_path: List[Atom], depth: int): + if depth > max_depth: + return + + if current_atom == target_atom: + paths.append(current_path + [current_atom]) + return + + if current_atom in current_path: + return # Avoid cycles + + # Explore outgoing links + for out_atom in current_atom.outgoing: + dfs_path(out_atom, target_atom, current_path + [current_atom], depth + 1) + + # Explore incoming links + incoming = self.atomspace.get_incoming_set(current_atom) + for in_atom in incoming: + dfs_path(in_atom, target_atom, current_path + [current_atom], depth + 1) + + dfs_path(start_atom, end_atom, [], 0) + return paths + + def set_match_threshold(self, threshold: float): + """Set the minimum similarity threshold for fuzzy matching.""" + self.match_threshold = max(0.0, min(1.0, threshold)) + + def set_max_results(self, max_results: int): + """Set the maximum number of results to return.""" + self.max_results = max(1, max_results) \ No newline at end of file diff --git a/openmanus_rl/opencog_systems/reasoning_engine.py b/openmanus_rl/opencog_systems/reasoning_engine.py new file mode 100644 index 00000000..0e910895 --- /dev/null +++ b/openmanus_rl/opencog_systems/reasoning_engine.py @@ -0,0 +1,413 @@ +""" +OpenCog Reasoning Engine for symbolic reasoning in OpenManus-RL. + +This module provides reasoning capabilities using OpenCog's AtomSpace, +including forward chaining, backward chaining, and probabilistic reasoning. +""" + +from typing import List, Dict, Any, Optional, Set, Tuple +import logging +from dataclasses import dataclass +from enum import Enum + +from .atomspace_integration import AtomSpaceManager, Atom, AtomType + + +class ReasoningMode(Enum): + """Different modes of reasoning.""" + FORWARD_CHAINING = "forward_chaining" + BACKWARD_CHAINING = "backward_chaining" + PROBABILISTIC = "probabilistic" + PATTERN_MATCHING = "pattern_matching" + + +@dataclass +class ReasoningRule: + """Represents a reasoning rule with premise and conclusion patterns.""" + name: str + premises: List[Dict[str, Any]] + conclusion: Dict[str, Any] + confidence: float = 1.0 + + def __str__(self): + return f"Rule({self.name}): {self.premises} -> {self.conclusion}" + + +@dataclass +class ReasoningResult: + """Result of a reasoning operation.""" + conclusion: Optional[Atom] + confidence: float + reasoning_path: List[str] + used_rules: List[str] + intermediate_results: List[Atom] + + +class OpenCogReasoningEngine: + """ + OpenCog-based reasoning engine for symbolic inference. + + Provides various reasoning modes including forward/backward chaining + and probabilistic reasoning using the AtomSpace knowledge base. + """ + + def __init__(self, atomspace: AtomSpaceManager): + self.atomspace = atomspace + self.rules: List[ReasoningRule] = [] + self.logger = logging.getLogger(__name__) + self.reasoning_history: List[ReasoningResult] = [] + + # Initialize with some basic reasoning rules + self._initialize_basic_rules() + + def _initialize_basic_rules(self): + """Initialize the engine with basic reasoning rules.""" + + # Transitivity rule for inheritance + transitivity_rule = ReasoningRule( + name="inheritance_transitivity", + premises=[ + {"type": "inheritance", "child": "$X", "parent": "$Y"}, + {"type": "inheritance", "child": "$Y", "parent": "$Z"} + ], + conclusion={"type": "inheritance", "child": "$X", "parent": "$Z"}, + confidence=0.9 + ) + + # Similarity symmetry rule + similarity_symmetry = ReasoningRule( + name="similarity_symmetry", + premises=[ + {"type": "similarity", "a": "$X", "b": "$Y"} + ], + conclusion={"type": "similarity", "a": "$Y", "b": "$X"}, + confidence=1.0 + ) + + # Action consequence rule + action_consequence = ReasoningRule( + name="action_consequence", + premises=[ + {"type": "evaluation", "predicate": "action_taken", "args": ["$action"]}, + {"type": "evaluation", "predicate": "action_leads_to", "args": ["$action", "$outcome"]} + ], + conclusion={"type": "evaluation", "predicate": "expected_outcome", "args": ["$outcome"]}, + confidence=0.8 + ) + + self.add_rule(transitivity_rule) + self.add_rule(similarity_symmetry) + self.add_rule(action_consequence) + + def add_rule(self, rule: ReasoningRule): + """Add a reasoning rule to the engine.""" + self.rules.append(rule) + self.logger.debug(f"Added reasoning rule: {rule.name}") + + def remove_rule(self, rule_name: str) -> bool: + """Remove a reasoning rule by name.""" + original_len = len(self.rules) + self.rules = [r for r in self.rules if r.name != rule_name] + removed = len(self.rules) < original_len + if removed: + self.logger.debug(f"Removed reasoning rule: {rule_name}") + return removed + + def forward_chaining(self, max_iterations: int = 10) -> List[Atom]: + """ + Perform forward chaining inference to derive new conclusions. + + Args: + max_iterations: Maximum number of inference iterations + + Returns: + List of newly derived atoms + """ + derived_atoms = [] + iteration = 0 + + while iteration < max_iterations: + initial_count = self.atomspace.size() + new_atoms_this_iteration = [] + + for rule in self.rules: + new_atoms = self._apply_rule_forward(rule) + new_atoms_this_iteration.extend(new_atoms) + derived_atoms.extend(new_atoms) + + iteration += 1 + + # Stop if no new atoms were derived + if self.atomspace.size() == initial_count: + break + + self.logger.debug(f"Forward chaining iteration {iteration}: " + f"derived {len(new_atoms_this_iteration)} new atoms") + + self.logger.info(f"Forward chaining completed after {iteration} iterations, " + f"derived {len(derived_atoms)} total atoms") + return derived_atoms + + def backward_chaining(self, goal: Dict[str, Any], max_depth: int = 5) -> ReasoningResult: + """ + Perform backward chaining to prove a goal. + + Args: + goal: Goal pattern to prove + max_depth: Maximum reasoning depth + + Returns: + ReasoningResult with proof information + """ + reasoning_path = [] + used_rules = [] + intermediate_results = [] + + def prove_goal(current_goal, depth): + if depth > max_depth: + return None, 0.0 + + reasoning_path.append(f"Trying to prove: {current_goal}") + + # Check if goal is already in atomspace + existing_atoms = self._find_matching_atoms(current_goal) + if existing_atoms: + best_atom = max(existing_atoms, key=lambda a: a.truth_value or 0) + reasoning_path.append(f"Found existing fact: {best_atom.name}") + return best_atom, best_atom.truth_value or 1.0 + + # Try to prove using rules + for rule in self.rules: + if self._goal_matches_rule_conclusion(current_goal, rule.conclusion): + reasoning_path.append(f"Trying rule: {rule.name}") + used_rules.append(rule.name) + + # Try to prove all premises + premise_confidence = 1.0 + premise_atoms = [] + + for premise in rule.premises: + premise_atom, confidence = prove_goal(premise, depth + 1) + if premise_atom is None: + premise_confidence = 0.0 + break + premise_atoms.append(premise_atom) + premise_confidence *= confidence + + if premise_confidence > 0: + # Create conclusion atom + conclusion_atom = self._create_atom_from_pattern( + rule.conclusion, premise_atoms + ) + if conclusion_atom: + final_confidence = premise_confidence * rule.confidence + conclusion_atom.truth_value = final_confidence + intermediate_results.append(conclusion_atom) + reasoning_path.append(f"Proved using {rule.name} with confidence {final_confidence}") + return conclusion_atom, final_confidence + + reasoning_path.append(f"Failed to prove: {current_goal}") + return None, 0.0 + + conclusion, confidence = prove_goal(goal, 0) + + return ReasoningResult( + conclusion=conclusion, + confidence=confidence, + reasoning_path=reasoning_path, + used_rules=used_rules, + intermediate_results=intermediate_results + ) + + def probabilistic_reasoning(self, query: Dict[str, Any], + evidence: List[Dict[str, Any]]) -> float: + """ + Perform probabilistic reasoning given evidence. + + Args: + query: Query to evaluate + evidence: List of evidence facts + + Returns: + Probability of the query being true + """ + # Simplified probabilistic reasoning + # In a full implementation, this would use PLN (Probabilistic Logic Networks) + + base_probability = 0.1 # Prior probability + + # Add evidence to atomspace temporarily + evidence_atoms = [] + for fact in evidence: + atom = self._create_atom_from_pattern(fact) + if atom: + evidence_atoms.append(atom) + + # Try backward chaining with evidence + result = self.backward_chaining(query) + + if result.conclusion: + # Evidence supports the query + probability = min(0.95, base_probability + result.confidence * 0.8) + else: + # Look for contradictory evidence + probability = base_probability + + # Clean up temporary evidence atoms + for atom in evidence_atoms: + if atom.atom_id in self.atomspace: + self.atomspace.remove_atom(atom.atom_id) + + return probability + + def explain_reasoning(self, result: ReasoningResult) -> str: + """ + Generate a human-readable explanation of the reasoning process. + + Args: + result: ReasoningResult to explain + + Returns: + Formatted explanation string + """ + explanation_lines = [] + explanation_lines.append("=== Reasoning Explanation ===") + + if result.conclusion: + explanation_lines.append(f"Conclusion: {result.conclusion.name}") + explanation_lines.append(f"Confidence: {result.confidence:.3f}") + else: + explanation_lines.append("No conclusion could be reached") + + explanation_lines.append("\nReasoning Path:") + for step in result.reasoning_path: + explanation_lines.append(f" • {step}") + + if result.used_rules: + explanation_lines.append("\nRules Applied:") + for rule_name in result.used_rules: + explanation_lines.append(f" • {rule_name}") + + if result.intermediate_results: + explanation_lines.append("\nIntermediate Results:") + for atom in result.intermediate_results: + explanation_lines.append(f" • {atom.name} (confidence: {atom.truth_value:.3f})") + + return "\n".join(explanation_lines) + + def reason_about_action(self, action: str, context: Dict[str, Any]) -> ReasoningResult: + """ + Reason about the consequences and appropriateness of an action. + + Args: + action: Action to reason about + context: Context information for the reasoning + + Returns: + ReasoningResult with action analysis + """ + # Create atoms for the action and context + action_atom = self.atomspace.create_concept_node(f"action_{action}") + + # Add context information + for key, value in context.items(): + context_atom = self.atomspace.create_concept_node(f"context_{key}_{value}") + self.atomspace.create_evaluation_link( + self.atomspace.create_predicate_node("has_context"), + [action_atom, context_atom] + ) + + # Query for action consequences + consequence_query = { + "type": "evaluation", + "predicate": "action_leads_to", + "args": [action, "$outcome"] + } + + result = self.backward_chaining(consequence_query) + + # Add action reasoning to history + self.reasoning_history.append(result) + + return result + + def _apply_rule_forward(self, rule: ReasoningRule) -> List[Atom]: + """Apply a rule in forward chaining mode.""" + new_atoms = [] + + # Find all possible variable bindings for the rule premises + bindings_list = self._find_rule_bindings(rule) + + for bindings in bindings_list: + # Check if conclusion already exists + conclusion_pattern = self._substitute_variables(rule.conclusion, bindings) + existing = self._find_matching_atoms(conclusion_pattern) + + if not existing: + # Create new conclusion atom + conclusion_atom = self._create_atom_from_pattern(conclusion_pattern) + if conclusion_atom: + conclusion_atom.confidence = rule.confidence + new_atoms.append(conclusion_atom) + self.logger.debug(f"Applied rule {rule.name}: created {conclusion_atom.name}") + + return new_atoms + + def _find_rule_bindings(self, rule: ReasoningRule) -> List[Dict[str, str]]: + """Find all possible variable bindings for a rule's premises.""" + # Simplified binding finder - would be more sophisticated in full implementation + return [] + + def _find_matching_atoms(self, pattern: Dict[str, Any]) -> List[Atom]: + """Find atoms matching a pattern.""" + if pattern.get("type") == "concept": + return self.atomspace.find_atoms(AtomType.CONCEPT_NODE, pattern.get("name")) + elif pattern.get("type") == "evaluation": + return self.atomspace.find_atoms(AtomType.EVALUATION_LINK) + return [] + + def _goal_matches_rule_conclusion(self, goal: Dict[str, Any], + conclusion: Dict[str, Any]) -> bool: + """Check if a goal matches a rule's conclusion pattern.""" + return goal.get("type") == conclusion.get("type") + + def _create_atom_from_pattern(self, pattern: Dict[str, Any], + context_atoms: List[Atom] = None) -> Optional[Atom]: + """Create an atom from a pattern description.""" + pattern_type = pattern.get("type") + + if pattern_type == "concept": + return self.atomspace.create_concept_node(pattern.get("name", "unknown")) + elif pattern_type == "evaluation": + predicate = self.atomspace.create_predicate_node( + pattern.get("predicate", "unknown_predicate") + ) + # Simplified - would handle arguments properly in full implementation + args = [self.atomspace.create_concept_node("arg")] + return self.atomspace.create_evaluation_link(predicate, args) + elif pattern_type == "inheritance": + child = self.atomspace.create_concept_node(pattern.get("child", "child")) + parent = self.atomspace.create_concept_node(pattern.get("parent", "parent")) + return self.atomspace.create_inheritance_link(child, parent) + + return None + + def _substitute_variables(self, pattern: Dict[str, Any], + bindings: Dict[str, str]) -> Dict[str, Any]: + """Substitute variables in a pattern with their bindings.""" + # Simplified variable substitution + substituted = pattern.copy() + for key, value in substituted.items(): + if isinstance(value, str) and value.startswith("$"): + substituted[key] = bindings.get(value, value) + return substituted + + def get_reasoning_statistics(self) -> Dict[str, Any]: + """Get statistics about reasoning operations.""" + return { + "total_rules": len(self.rules), + "atomspace_size": self.atomspace.size(), + "reasoning_history_length": len(self.reasoning_history), + "successful_reasonings": sum(1 for r in self.reasoning_history if r.conclusion), + "average_confidence": sum(r.confidence for r in self.reasoning_history) / max(1, len(self.reasoning_history)) + } \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 3dda39de..1beec513 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,4 +21,10 @@ wandb packaging>=20.0 uvicorn fastapi -qwen-vl-utils[decord] \ No newline at end of file +qwen-vl-utils[decord] + +# OpenCog Systems Integration +# Note: This implementation includes a Python-based OpenCog simulation +# For full OpenCog functionality, install OpenCog separately: +# https://github.com/opencog/opencog +pyyaml>=5.4.0 # For configuration management \ No newline at end of file diff --git a/test/opencog_systems/__init__.py b/test/opencog_systems/__init__.py new file mode 100644 index 00000000..01deb26a --- /dev/null +++ b/test/opencog_systems/__init__.py @@ -0,0 +1 @@ +"""Tests for OpenCog systems integration.""" \ No newline at end of file diff --git a/test/opencog_systems/test_atomspace.py b/test/opencog_systems/test_atomspace.py new file mode 100644 index 00000000..9819938b --- /dev/null +++ b/test/opencog_systems/test_atomspace.py @@ -0,0 +1,156 @@ +""" +Tests for OpenCog AtomSpace integration. +""" + +import unittest +from openmanus_rl.opencog_systems.atomspace_integration import ( + AtomSpaceManager, AtomType, Atom +) + + +class TestAtomSpaceManager(unittest.TestCase): + """Test cases for AtomSpace manager.""" + + def setUp(self): + """Set up test fixtures.""" + self.atomspace = AtomSpaceManager() + + def test_create_concept_node(self): + """Test creation of concept nodes.""" + atom = self.atomspace.create_concept_node("test_concept") + + self.assertIsInstance(atom, Atom) + self.assertEqual(atom.atom_type, AtomType.CONCEPT_NODE) + self.assertEqual(atom.name, "test_concept") + self.assertEqual(atom.truth_value, 1.0) + + def test_create_predicate_node(self): + """Test creation of predicate nodes.""" + atom = self.atomspace.create_predicate_node("test_predicate") + + self.assertIsInstance(atom, Atom) + self.assertEqual(atom.atom_type, AtomType.PREDICATE_NODE) + self.assertEqual(atom.name, "test_predicate") + + def test_create_evaluation_link(self): + """Test creation of evaluation links.""" + predicate = self.atomspace.create_predicate_node("likes") + arg1 = self.atomspace.create_concept_node("Alice") + arg2 = self.atomspace.create_concept_node("Bob") + + eval_link = self.atomspace.create_evaluation_link( + predicate, [arg1, arg2], truth_value=0.8 + ) + + self.assertEqual(eval_link.atom_type, AtomType.EVALUATION_LINK) + self.assertEqual(eval_link.truth_value, 0.8) + self.assertEqual(len(eval_link.outgoing), 2) # predicate and list_link + + def test_create_inheritance_link(self): + """Test creation of inheritance links.""" + child = self.atomspace.create_concept_node("dog") + parent = self.atomspace.create_concept_node("animal") + + inherit_link = self.atomspace.create_inheritance_link( + child, parent, truth_value=0.9 + ) + + self.assertEqual(inherit_link.atom_type, AtomType.INHERITANCE_LINK) + self.assertEqual(inherit_link.truth_value, 0.9) + self.assertEqual(len(inherit_link.outgoing), 2) + self.assertEqual(inherit_link.outgoing[0], child) + self.assertEqual(inherit_link.outgoing[1], parent) + + def test_find_atoms(self): + """Test finding atoms by type and name.""" + # Create some test atoms + concept1 = self.atomspace.create_concept_node("concept1") + concept2 = self.atomspace.create_concept_node("concept2") + predicate1 = self.atomspace.create_predicate_node("predicate1") + + # Test finding by type + concepts = self.atomspace.find_atoms(AtomType.CONCEPT_NODE) + self.assertIn(concept1, concepts) + self.assertIn(concept2, concepts) + self.assertNotIn(predicate1, concepts) + + # Test finding by name + found = self.atomspace.find_atoms(name="concept1") + self.assertIn(concept1, found) + self.assertNotIn(concept2, found) + + # Test finding by type and name + found = self.atomspace.find_atoms(AtomType.CONCEPT_NODE, "concept2") + self.assertEqual(len(found), 1) + self.assertEqual(found[0], concept2) + + def test_get_incoming_set(self): + """Test getting incoming links for an atom.""" + child = self.atomspace.create_concept_node("child") + parent = self.atomspace.create_concept_node("parent") + inherit_link = self.atomspace.create_inheritance_link(child, parent) + + # Parent should have incoming inheritance link + incoming_parent = self.atomspace.get_incoming_set(parent) + self.assertIn(inherit_link, incoming_parent) + + # Child should also have incoming link (from the inheritance link) + incoming_child = self.atomspace.get_incoming_set(child) + self.assertIn(inherit_link, incoming_child) + + def test_export_import(self): + """Test exporting and importing atomspace data.""" + # Create some test data + concept = self.atomspace.create_concept_node("test_concept", truth_value=0.8) + predicate = self.atomspace.create_predicate_node("test_predicate") + + # Export + export_data = self.atomspace.export_to_dict() + + # Clear atomspace and import + self.atomspace.clear() + self.assertEqual(self.atomspace.size(), 0) + + self.atomspace.import_from_dict(export_data) + + # Verify data is restored + self.assertEqual(self.atomspace.size(), 2) + concepts = self.atomspace.find_atoms(AtomType.CONCEPT_NODE, "test_concept") + self.assertEqual(len(concepts), 1) + self.assertEqual(concepts[0].truth_value, 0.8) + + def test_atomspace_size(self): + """Test atomspace size tracking.""" + initial_size = self.atomspace.size() + + self.atomspace.create_concept_node("test1") + self.assertEqual(self.atomspace.size(), initial_size + 1) + + self.atomspace.create_concept_node("test2") + self.assertEqual(self.atomspace.size(), initial_size + 2) + + self.atomspace.clear() + self.assertEqual(self.atomspace.size(), 0) + + def test_remove_atom(self): + """Test removing atoms from atomspace.""" + atom = self.atomspace.create_concept_node("to_remove") + atom_id = atom.atom_id + initial_size = self.atomspace.size() + + # Verify atom exists + self.assertIn(atom_id, self.atomspace) + + # Remove atom + removed = self.atomspace.remove_atom(atom_id) + self.assertTrue(removed) + self.assertEqual(self.atomspace.size(), initial_size - 1) + self.assertNotIn(atom_id, self.atomspace) + + # Try to remove non-existent atom + removed = self.atomspace.remove_atom("non_existent") + self.assertFalse(removed) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/test/opencog_systems/test_cognitive_agent.py b/test/opencog_systems/test_cognitive_agent.py new file mode 100644 index 00000000..4be58251 --- /dev/null +++ b/test/opencog_systems/test_cognitive_agent.py @@ -0,0 +1,236 @@ +""" +Tests for OpenCog Cognitive Agent. +""" + +import unittest +from openmanus_rl.opencog_systems.cognitive_architecture import ( + CognitiveAgent, CognitiveState, AttentionMode, CognitiveAction +) + + +class TestCognitiveAgent(unittest.TestCase): + """Test cases for Cognitive Agent.""" + + def setUp(self): + """Set up test fixtures.""" + self.agent = CognitiveAgent("test_agent") + + def test_initialization(self): + """Test agent initialization.""" + self.assertEqual(self.agent.agent_name, "test_agent") + self.assertEqual(self.agent.state, CognitiveState.PERCEIVING) + self.assertEqual(self.agent.attention_mode, AttentionMode.FOCUSED) + + # Check that basic knowledge was initialized + self.assertGreater(self.agent.atomspace.size(), 0) + + # Check for basic concepts + self_concepts = self.agent.atomspace.find_atoms(name="self") + self.assertGreater(len(self_concepts), 0) + + def test_perceive(self): + """Test perception of observations.""" + initial_size = self.agent.atomspace.size() + + observations = { + "location": "room1", + "objects": ["chair", "table"], + "state": "exploring" + } + + self.agent.perceive(observations) + + # Check that state changed + self.assertEqual(self.agent.state, CognitiveState.PERCEIVING) + + # Check that new atoms were created + self.assertGreater(self.agent.atomspace.size(), initial_size) + + # Check that observations are in memory + self.assertGreater(len(self.agent.memory.recent_experiences), 0) + last_exp = self.agent.memory.recent_experiences[-1] + self.assertEqual(last_exp["type"], "observation") + self.assertEqual(last_exp["content"], observations) + + def test_reasoning(self): + """Test reasoning capabilities.""" + # Add some knowledge first + self.agent.perceive({"environment": "test_env", "goal": "explore"}) + + # Test general reasoning + result = self.agent.reason() + + self.assertEqual(self.agent.state, CognitiveState.REASONING) + self.assertIsNotNone(result) + self.assertIsInstance(result.confidence, float) + self.assertIsInstance(result.reasoning_path, list) + + # Test specific query reasoning + result = self.agent.reason("what is the current goal?") + self.assertIsNotNone(result) + + def test_planning(self): + """Test planning capabilities.""" + plan = self.agent.plan("explore the environment") + + self.assertEqual(self.agent.state, CognitiveState.PLANNING) + self.assertIsInstance(plan, list) + + if plan: + action = plan[0] + self.assertIsInstance(action, CognitiveAction) + self.assertIsInstance(action.action_type, str) + self.assertIsInstance(action.confidence, float) + self.assertIsInstance(action.reasoning_path, list) + + def test_action_execution(self): + """Test action execution.""" + action = CognitiveAction( + action_type="explore_environment", + parameters={"strategy": "systematic"}, + confidence=0.8, + reasoning_path=["Test action"], + expected_outcome="Knowledge gained" + ) + + result = self.agent.act(action) + + self.assertEqual(self.agent.state, CognitiveState.ACTING) + self.assertIsInstance(result, dict) + self.assertIn("success", result) + self.assertIn("confidence", result) + + # Check that action was recorded + self.assertIn(action, self.agent.action_history) + + def test_learning(self): + """Test learning from feedback.""" + # First, execute an action + action = CognitiveAction( + action_type="test_action", + parameters={}, + confidence=0.7, + reasoning_path=["Test reasoning"] + ) + self.agent.act(action) + + # Provide feedback + feedback = { + "reward": 1.0, + "success": True, + "correction": None + } + + self.agent.learn(feedback) + + self.assertEqual(self.agent.state, CognitiveState.LEARNING) + + # Check that learning experience was recorded + learning_experiences = [ + exp for exp in self.agent.memory.recent_experiences + if exp.get("type") == "learning" + ] + self.assertGreater(len(learning_experiences), 0) + + def test_cognitive_cycle(self): + """Test complete cognitive cycle.""" + observations = { + "location": "start_room", + "visible_objects": ["door", "key"], + "goal_status": "incomplete" + } + + goal = "find and use the key" + + result = self.agent.cognitive_cycle(observations, goal) + + # Check result structure + self.assertIsInstance(result, dict) + self.assertIn("cycle_number", result) + self.assertIn("cycle_time", result) + self.assertIn("reasoning_result", result) + self.assertIn("plan", result) + self.assertIn("action_result", result) + + # Check that cycle number increased + self.assertEqual(result["cycle_number"], 1) + + # Run another cycle + result2 = self.agent.cognitive_cycle(observations) + self.assertEqual(result2["cycle_number"], 2) + + def test_attention_management(self): + """Test attention management.""" + # Test setting attention mode + self.agent.set_attention_mode(AttentionMode.EXPLORATORY) + self.assertEqual(self.agent.attention_mode, AttentionMode.EXPLORATORY) + + # Test attention focus updates + observations = {"new_item": "interesting_object"} + self.agent.perceive(observations) + + # Active concepts should be updated + self.assertGreater(len(self.agent.memory.active_concepts), 0) + + def test_memory_management(self): + """Test memory operations.""" + # Add some experiences + for i in range(5): + self.agent.perceive({"step": i, "data": f"test_data_{i}"}) + + # Check memory has experiences + self.assertGreater(len(self.agent.memory.recent_experiences), 0) + + # Test relevant experience retrieval + relevant = self.agent.memory.get_relevant_experiences("step", limit=3) + self.assertLessEqual(len(relevant), 3) + + # Test memory clearing + self.agent.clear_memory() + self.assertEqual(len(self.agent.memory.recent_experiences), 0) + self.assertEqual(len(self.agent.memory.current_goals), 0) + + def test_cognitive_state_reporting(self): + """Test cognitive state reporting.""" + # Add some activity + self.agent.perceive({"test": "data"}) + self.agent.plan("test goal") + + state = self.agent.get_cognitive_state() + + self.assertIsInstance(state, dict) + self.assertIn("state", state) + self.assertIn("attention_mode", state) + self.assertIn("active_concepts", state) + self.assertIn("atomspace_size", state) + self.assertIn("success_rate", state) + self.assertIn("cycle_count", state) + + # Check data types + self.assertIsInstance(state["active_concepts"], int) + self.assertIsInstance(state["atomspace_size"], int) + self.assertIsInstance(state["success_rate"], float) + self.assertIsInstance(state["cycle_count"], int) + + def test_knowledge_persistence(self): + """Test that knowledge persists across operations.""" + initial_size = self.agent.atomspace.size() + + # Add knowledge through perception + self.agent.perceive({"fact1": "value1", "fact2": "value2"}) + + # Knowledge should persist + size_after_perception = self.agent.atomspace.size() + self.assertGreater(size_after_perception, initial_size) + + # Reasoning might add some knowledge but shouldn't remove existing + self.agent.reason("test query") + self.assertGreaterEqual(self.agent.atomspace.size(), size_after_perception) + + # Planning adds goal knowledge + self.agent.plan("test goal") + self.assertGreaterEqual(self.agent.atomspace.size(), size_after_perception) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file