Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/praisonai-agents/praisonaiagents/memory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
"""

from .memory import Memory
from .tools import MemoryTools, get_memory_tools

__all__ = ["Memory"]
__all__ = ["Memory", "MemoryTools", "get_memory_tools"]
334 changes: 334 additions & 0 deletions src/praisonai-agents/praisonaiagents/memory/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,32 @@ def __init__(self, config: Dict[str, Any], verbose: int = 0):
self.use_rag = (self.provider.lower() == "rag") and CHROMADB_AVAILABLE and self.cfg.get("use_embedding", False)
self.graph_enabled = False # Initialize graph support flag

# Initialize session summary configuration
self.session_summary_config = self.cfg.get("session_summary_config", {})
self.session_enabled = self.session_summary_config.get("enabled", False)
self.update_after_n_turns = self.session_summary_config.get("update_after_n_turns", 5)
self.summary_model = self.session_summary_config.get("model", "gpt-4o-mini")
self.include_in_context = self.session_summary_config.get("include_in_context", True)

# Initialize agentic memory configuration
self.agentic_config = self.cfg.get("agentic_config", {})
self.agentic_enabled = self.agentic_config.get("enabled", False)
self.auto_classify = self.agentic_config.get("auto_classify", True)
self.confidence_threshold = self.agentic_config.get("confidence_threshold", 0.7)
self.management_model = self.agentic_config.get("management_model", "gpt-4o")

# Initialize memory reference configuration
self.reference_config = self.cfg.get("reference_config", {})
self.include_references = self.reference_config.get("include_references", False)
self.reference_format = self.reference_config.get("reference_format", "inline")
self.max_references = self.reference_config.get("max_references", 5)
self.show_confidence = self.reference_config.get("show_confidence", False)

# Session tracking for summaries
self.turn_counter = 0
self.session_history = []
self.current_session_summary = None

# Extract embedding model from config
self.embedder_config = self.cfg.get("embedder", {})
if isinstance(self.embedder_config, dict):
Expand Down Expand Up @@ -1144,3 +1170,311 @@ def search_with_quality(
logger.info(f"After quality filter: {len(filtered)} results")

return filtered

# -------------------------------------------------------------------------
# Session Summary Methods
# -------------------------------------------------------------------------
def add_to_session(self, role: str, content: str) -> None:
"""Add a conversation turn to the session history"""
if not self.session_enabled:
return

self.session_history.append({
"role": role,
"content": content,
"timestamp": time.time()
})
self.turn_counter += 1

# Check if we need to update the session summary
if self.turn_counter % self.update_after_n_turns == 0:
self._update_session_summary()

def _update_session_summary(self) -> None:
"""Update the session summary using the configured model"""
if not self.session_history:
return

# Create conversation text for summarization
conversation_text = "\n".join([
f"{turn['role']}: {turn['content']}"
for turn in self.session_history[-self.update_after_n_turns:]
])

summary_prompt = f"""
Summarize the following conversation, focusing on:
1. Key topics discussed
2. Important decisions made
3. Relevant context for future conversations
4. User preferences and requirements mentioned

Conversation:
{conversation_text}

Provide a concise summary in JSON format with keys: "text", "topics", "key_points"
"""

try:
if LITELLM_AVAILABLE:
import litellm
response = litellm.completion(
model=self.summary_model,
messages=[{"role": "user", "content": summary_prompt}],
response_format={"type": "json_object"},
temperature=0.3
)
summary_data = json.loads(response.choices[0].message.content)
elif OPENAI_AVAILABLE:
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model=self.summary_model,
messages=[{"role": "user", "content": summary_prompt}],
response_format={"type": "json_object"},
temperature=0.3
)
summary_data = json.loads(response.choices[0].message.content)
else:
self._log_verbose("No LLM available for session summary", logging.WARNING)
return

self.current_session_summary = summary_data

# Store summary in long-term memory if enabled
if self.include_in_context:
self.store_long_term(
text=summary_data.get("text", ""),
metadata={
"type": "session_summary",
"topics": summary_data.get("topics", []),
"key_points": summary_data.get("key_points", []),
"turn_count": self.turn_counter
}
)

except Exception as e:
self._log_verbose(f"Error updating session summary: {e}", logging.ERROR)

async def aget_session_summary(self) -> Optional[Dict[str, Any]]:
"""Get the current session summary (async version)"""
return self.current_session_summary

def get_session_summary(self) -> Optional[Dict[str, Any]]:
"""Get the current session summary"""
return self.current_session_summary

# -------------------------------------------------------------------------
# Agentic Memory Management Methods
# -------------------------------------------------------------------------
def remember(self, fact: str, metadata: Optional[Dict[str, Any]] = None) -> bool:
"""Store important information with agentic classification"""
if not self.agentic_enabled:
# Fallback to regular long-term storage
self.store_long_term(fact, metadata=metadata)
return True

# Auto-classify the importance if enabled
if self.auto_classify:
importance_score = self._classify_importance(fact)
if importance_score < self.confidence_threshold:
self._log_verbose(f"Fact importance {importance_score} below threshold {self.confidence_threshold}")
return False

# Store with agentic metadata
agentic_metadata = metadata or {}
agentic_metadata.update({
"stored_by": "agentic_memory",
"importance_score": importance_score if self.auto_classify else 1.0,
"auto_classified": self.auto_classify
})

self.store_long_term(fact, metadata=agentic_metadata)
return True

def update_memory(self, memory_id: str, new_fact: str) -> bool:
"""Update existing memory by ID"""
try:
# Update in SQLite
conn = sqlite3.connect(self.long_db)
c = conn.cursor()
c.execute(
"UPDATE long_mem SET content = ?, meta = ? WHERE id = ?",
(new_fact, json.dumps({"updated": True, "updated_at": time.time()}), memory_id)
)
Comment on lines +1300 to +1303
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The UPDATE statement overwrites the existing metadata. This will cause any pre-existing metadata associated with the memory entry to be lost. To prevent this data loss, fetch the existing metadata, update it, and then write it back.

            # First, fetch existing metadata
            c.execute("SELECT meta FROM long_mem WHERE id = ?", (memory_id,))
            row = c.fetchone()
            if not row:
                return False # Or handle as an error

            existing_meta = json.loads(row[0] or '{}')
            existing_meta.update({"updated": True, "updated_at": time.time()})

            # Then, update the record with the merged metadata
            c.execute(
                "UPDATE long_mem SET content = ?, meta = ? WHERE id = ?",
                (new_fact, json.dumps(existing_meta), memory_id)
            )

updated = c.rowcount > 0
conn.commit()
conn.close()

# Update in vector store if available
if self.use_rag and hasattr(self, "chroma_col"):
try:
# ChromaDB doesn't support direct updates, so we delete and re-add
self.chroma_col.delete(ids=[memory_id])
if LITELLM_AVAILABLE:
import litellm
response = litellm.embedding(
model=self.embedding_model,
input=new_fact
)
embedding = response.data[0]["embedding"]
elif OPENAI_AVAILABLE:
from openai import OpenAI
client = OpenAI()
response = client.embeddings.create(
input=new_fact,
model=self.embedding_model
)
embedding = response.data[0].embedding
else:
return updated

self.chroma_col.add(
documents=[new_fact],
metadatas=[{"updated": True, "updated_at": time.time()}],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the SQLite update, the metadata for the ChromaDB entry is being overwritten here. When you delete and re-add the document, preserve the original metadata and merge it with the update information to avoid data loss.

                    # get existing metadata
                    existing_metadata = self.chroma_col.get(ids=[memory_id], include=["metadatas"])
                    if existing_metadata and existing_metadata['metadatas']:
                        # Merge existing metadata with new metadata
                        updated_metadata = existing_metadata['metadatas'][0]
                        updated_metadata.update({"updated": True, "updated_at": time.time()})
                    else:
                        updated_metadata = {"updated": True, "updated_at": time.time()}

                    self.chroma_col.add(
                        documents=[new_fact],
                        metadatas=[updated_metadata],
                        ids=[memory_id],
                        embeddings=[embedding]
                    )

ids=[memory_id],
embeddings=[embedding]
)
except Exception as e:
self._log_verbose(f"Error updating in ChromaDB: {e}", logging.ERROR)

return updated

except Exception as e:
self._log_verbose(f"Error updating memory: {e}", logging.ERROR)
return False

def forget(self, memory_id: str) -> bool:
"""Remove a memory by ID"""
try:
# Delete from SQLite
conn = sqlite3.connect(self.long_db)
c = conn.cursor()
c.execute("DELETE FROM long_mem WHERE id = ?", (memory_id,))
deleted = c.rowcount > 0
conn.commit()
conn.close()

# Delete from vector store if available
if self.use_rag and hasattr(self, "chroma_col"):
try:
self.chroma_col.delete(ids=[memory_id])
except Exception as e:
self._log_verbose(f"Error deleting from ChromaDB: {e}", logging.ERROR)

return deleted

except Exception as e:
self._log_verbose(f"Error forgetting memory: {e}", logging.ERROR)
return False

def search_memories(self, query: str, limit: int = 5, **kwargs) -> List[Dict[str, Any]]:
"""Search memories with agentic filtering"""
# Use existing search method but add agentic filtering
results = self.search_long_term(query, limit=limit, **kwargs)

# Filter by agentic metadata if enabled
if self.agentic_enabled:
results = [
r for r in results
if r.get("metadata", {}).get("stored_by") == "agentic_memory"
]

return results

def _classify_importance(self, fact: str) -> float:
"""Classify the importance of a fact using LLM"""
classification_prompt = f"""
Rate the importance of storing this information in long-term memory on a scale of 0.0 to 1.0:
- 1.0: Critical information (user preferences, key decisions, important facts)
- 0.7: Important information (useful context, relevant details)
- 0.5: Moderate information (might be useful later)
- 0.3: Low importance (casual conversation, temporary info)
- 0.0: Not worth storing (greetings, small talk)

Information: {fact}

Return only a number between 0.0 and 1.0.
"""

try:
if LITELLM_AVAILABLE:
import litellm
response = litellm.completion(
model=self.management_model,
messages=[{"role": "user", "content": classification_prompt}],
temperature=0.1
)
score_text = response.choices[0].message.content.strip()
elif OPENAI_AVAILABLE:
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model=self.management_model,
messages=[{"role": "user", "content": classification_prompt}],
temperature=0.1
)
score_text = response.choices[0].message.content.strip()
else:
return 0.5 # Default moderate importance

return float(score_text)

except Exception as e:
self._log_verbose(f"Error classifying importance: {e}", logging.ERROR)
return 0.5 # Default moderate importance

# -------------------------------------------------------------------------
# Memory Reference Methods
# -------------------------------------------------------------------------
def search_with_references(self, query: str, limit: int = 5, **kwargs) -> Dict[str, Any]:
"""Search with memory references included"""
results = self.search_long_term(query, limit=limit, **kwargs)

if not self.include_references or not results:
return {
"content": "",
"references": []
}

# Format results with references
content_parts = []
references = []

for i, result in enumerate(results[:self.max_references], 1):
text = result.get("text", "")
metadata = result.get("metadata", {})
confidence = result.get("score", 0.0)

if self.reference_format == "inline":
content_parts.append(f"{text} [{i}]")
elif self.reference_format == "footnote":
content_parts.append(f"{text}")
else: # metadata format
content_parts.append(text)

ref_entry = {
"id": i,
"text": text,
"metadata": metadata
}

if self.show_confidence:
ref_entry["confidence"] = confidence

references.append(ref_entry)

content = " ".join(content_parts)

# Add footnotes if using footnote format
if self.reference_format == "footnote":
footnotes = [
f"[{ref['id']}] {ref['text']}" +
(f" (confidence: {ref['confidence']:.2f})" if self.show_confidence else "")
for ref in references
]
content += "\n\nReferences:\n" + "\n".join(footnotes)

return {
"content": content,
"references": references
}
Loading
Loading