Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions src/powermem/core/async_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,9 +976,23 @@ async def search(
filters: Optional[Dict[str, Any]] = None,
limit: int = 30,
threshold: Optional[float] = None,
sort_by: Optional[Union[str, List[str]]] = None,
) -> Dict[str, Any]:
"""Search for memories asynchronously.

Args:
query: Search query string
user_id: Optional user ID filter
agent_id: Optional agent ID filter
run_id: Optional run ID filter
filters: Optional additional filters dictionary
limit: Maximum number of results to return (default: 30)
threshold: Optional minimum quality score threshold (0-1)
sort_by: Optional sorting criteria. Can be:
- Single value: "relevance" (default), "date_asc", "date_desc",
"importance_asc", "importance_desc", "access_count_desc", "retention_desc"
- List: Multiple sort keys for multi-criteria sorting (e.g., ["relevance", "date_desc"])

Returns:
Dict[str, Any]: A dictionary containing search results with the following structure:
- "results" (List[Dict]): List of memory search results, where each result contains:
Expand Down Expand Up @@ -1065,6 +1079,10 @@ async def search(
transformed_result[key] = result[key]
transformed_results.append(transformed_result)

# Apply sorting if sort_by is specified
if sort_by is not None:
transformed_results = self._sort_search_results(transformed_results, sort_by)

# Log audit event
await self.audit.log_event_async("memory.search", {
"query": query,
Expand Down Expand Up @@ -1094,6 +1112,95 @@ async def search(
logger.error(f"Failed to search memories: {e}")
self.telemetry.capture_event("memory.search.error", {"error": str(e)})
raise

def _sort_search_results(
self,
results: List[Dict[str, Any]],
sort_by: Union[str, List[str]]
) -> List[Dict[str, Any]]:
"""
Sort search results based on specified criteria.

Args:
results: List of search result dictionaries
sort_by: Sorting criteria - single value or list of values.
Options: "relevance" (default), "date_asc", "date_desc",
"importance_asc", "importance_desc",
"access_count_desc", "retention_desc"

Returns:
Sorted list of results
"""
# Normalize sort_by to list
sort_keys = [sort_by] if isinstance(sort_by, str) else sort_by

# Define sort key functions for each criterion
def get_sort_key(result: Dict[str, Any]) -> tuple:
"""Generate sort key tuple for multi-criteria sorting."""
keys = []
for criterion in sort_keys:
criterion = criterion.lower().strip()

if criterion == "relevance" or not criterion:
# Sort by quality score (descending - higher relevance first)
metadata = result.get("metadata", {})
quality_score = metadata.get("_quality_score", result.get("score", 0.0))
keys.append(-float(quality_score) if quality_score else 0)

elif criterion == "date_asc":
# Sort by creation time ascending (oldest first)
created_at = result.get("created_at", "")
keys.append(created_at or "")

elif criterion == "date_desc":
# Sort by creation time descending (newest first)
created_at = result.get("created_at", "")
keys.append(created_at or "0" if not created_at else created_at)

elif criterion == "importance_asc":
# Sort by importance score ascending (lower importance first)
metadata = result.get("metadata", {})
user_metadata = metadata.get("metadata", {}) if metadata else {}
importance = user_metadata.get("importance_score", 0.5)
keys.append(float(importance))

elif criterion == "importance_desc":
# Sort by importance score descending (higher importance first)
metadata = result.get("metadata", {})
user_metadata = metadata.get("metadata", {}) if metadata else {}
importance = user_metadata.get("importance_score", 0.5)
keys.append(-float(importance))

elif criterion == "access_count_desc":
# Sort by access count descending (most accessed first)
metadata = result.get("metadata", {})
user_metadata = metadata.get("metadata", {}) if metadata else {}
access_count = user_metadata.get("access_count", 0)
keys.append(-int(access_count))

elif criterion == "retention_desc":
# Sort by retention score descending (higher retention first)
metadata = result.get("metadata", {})
user_metadata = metadata.get("metadata", {}) if metadata else {}
retention = user_metadata.get("retention_score", 0.0)
keys.append(-float(retention))

else:
# Unknown criterion, use relevance as fallback
logger.warning(f"Unknown sort criterion: {criterion}, using relevance")
quality_score = result.get("metadata", {}).get("_quality_score", result.get("score", 0.0))
keys.append(-float(quality_score) if quality_score else 0)

return tuple(keys)

# Sort results
try:
sorted_results = sorted(results, key=get_sort_key, reverse=False)
logger.debug(f"Sorted {len(results)} results by criteria: {sort_keys}")
return sorted_results
except Exception as e:
logger.error(f"Failed to sort results: {e}")
return results

async def get(
self,
Expand Down
119 changes: 119 additions & 0 deletions src/powermem/core/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,9 +1100,23 @@ def search(
filters: Optional[Dict[str, Any]] = None,
limit: int = 30,
threshold: Optional[float] = None,
sort_by: Optional[Union[str, List[str]]] = None,
) -> Dict[str, Any]:
"""Search for memories.

Args:
query: Search query string
user_id: Optional user ID filter
agent_id: Optional agent ID filter
run_id: Optional run ID filter
filters: Optional additional filters dictionary
limit: Maximum number of results to return (default: 30)
threshold: Optional minimum quality score threshold (0-1)
sort_by: Optional sorting criteria. Can be:
- Single value: "relevance" (default), "date_asc", "date_desc",
"importance_asc", "importance_desc", "access_count_desc", "retention_desc"
- List: Multiple sort keys for multi-criteria sorting (e.g., ["relevance", "date_desc"])

Returns:
Dict[str, Any]: A dictionary containing search results with the following structure:
- "results" (List[Dict]): List of memory search results, where each result contains:
Expand Down Expand Up @@ -1193,6 +1207,10 @@ def search(
transformed_result["memory_id"] = transformed_result["id"]
transformed_results.append(transformed_result)

# Apply sorting if sort_by is specified
if sort_by is not None:
transformed_results = self._sort_search_results(transformed_results, sort_by)

# Log audit event
self.audit.log_event(
"memory.search",
Expand Down Expand Up @@ -1250,6 +1268,107 @@ def search(
logger.error(f"Failed to search memories: {e}")
self.telemetry.capture_event("memory.search.error", {"error": str(e)})
raise

def _sort_search_results(
self,
results: List[Dict[str, Any]],
sort_by: Union[str, List[str]]
) -> List[Dict[str, Any]]:
"""
Sort search results based on specified criteria.

Args:
results: List of search result dictionaries
sort_by: Sorting criteria - single value or list of values.
Options: "relevance" (default), "date_asc", "date_desc",
"importance_asc", "importance_desc",
"access_count_desc", "retention_desc"

Returns:
Sorted list of results
"""
# Normalize sort_by to list
sort_keys = [sort_by] if isinstance(sort_by, str) else sort_by

# Define sort key functions for each criterion
def get_sort_key(result: Dict[str, Any]) -> tuple:
"""Generate sort key tuple for multi-criteria sorting."""
keys = []
for criterion in sort_keys:
criterion = criterion.lower().strip()

if criterion == "relevance" or not criterion:
# Sort by quality score (descending - higher relevance first)
metadata = result.get("metadata", {})
quality_score = metadata.get("_quality_score", result.get("score", 0.0))
keys.append(-float(quality_score) if quality_score else 0)

elif criterion == "date_asc":
# Sort by creation time ascending (oldest first)
created_at = result.get("created_at", "")
keys.append(created_at or "")

elif criterion == "date_desc":
# Sort by creation time descending (newest first)
created_at = result.get("created_at", "")
# For descending, we use negative timestamp for reverse sorting
if created_at:
try:
# Parse ISO format datetime and convert to timestamp
dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
timestamp = dt.timestamp()
keys.append(-timestamp) # Negative for descending
except (ValueError, TypeError):
# If parsing fails, use the string for comparison
keys.append(created_at)
else:
# Empty dates should sort last in descending order
keys.append(float('inf'))

elif criterion == "importance_asc":
# Sort by importance score ascending (lower importance first)
metadata = result.get("metadata", {})
user_metadata = metadata.get("metadata", {}) if metadata else {}
importance = user_metadata.get("importance_score", 0.5) # Default 0.5 if not set
keys.append(float(importance))

elif criterion == "importance_desc":
# Sort by importance score descending (higher importance first)
metadata = result.get("metadata", {})
user_metadata = metadata.get("metadata", {}) if metadata else {}
importance = user_metadata.get("importance_score", 0.5)
keys.append(-float(importance))

elif criterion == "access_count_desc":
# Sort by access count descending (most accessed first)
metadata = result.get("metadata", {})
user_metadata = metadata.get("metadata", {}) if metadata else {}
access_count = user_metadata.get("access_count", 0)
keys.append(-int(access_count))

elif criterion == "retention_desc":
# Sort by retention score descending (higher retention first)
metadata = result.get("metadata", {})
user_metadata = metadata.get("metadata", {}) if metadata else {}
retention = user_metadata.get("retention_score", 0.0)
keys.append(-float(retention))

else:
# Unknown criterion, use relevance as fallback
logger.warning(f"Unknown sort criterion: {criterion}, using relevance")
quality_score = result.get("metadata", {}).get("_quality_score", result.get("score", 0.0))
keys.append(-float(quality_score) if quality_score else 0)

return tuple(keys)

# Sort results - use reverse for descending sorts (handled in get_sort_key)
try:
sorted_results = sorted(results, key=get_sort_key, reverse=False)
logger.debug(f"Sorted {len(results)} results by criteria: {sort_keys}")
return sorted_results
except Exception as e:
logger.error(f"Failed to sort results: {e}")
return results

def get(
self,
Expand Down
5 changes: 4 additions & 1 deletion src/server/api/v1/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Memory search API routes
"""

from typing import Optional
from typing import Optional, Union, List
from fastapi import APIRouter, Depends, Query, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
Expand Down Expand Up @@ -43,6 +43,7 @@ async def search_memories_post(
run_id=body.run_id,
filters=body.filters,
limit=body.limit,
sort_by=body.sort_by,
)

search_results = [
Expand Down Expand Up @@ -76,6 +77,7 @@ async def search_memories_get(
agent_id: Optional[str] = Query(None, description="Filter by agent ID"),
run_id: Optional[str] = Query(None, description="Filter by run ID"),
limit: int = Query(30, ge=1, le=100, description="Maximum number of results"),
sort_by: Optional[str] = Query(None, description="Sorting criteria: 'relevance', 'date_asc', 'date_desc', 'importance_asc', 'importance_desc', 'access_count_desc', 'retention_desc'"),
api_key: str = Depends(verify_api_key),
service: SearchService = Depends(get_search_service),
):
Expand All @@ -87,6 +89,7 @@ async def search_memories_get(
run_id=run_id,
filters=None, # GET method doesn't support complex filters
limit=limit,
sort_by=sort_by,
)

search_results = [
Expand Down
3 changes: 2 additions & 1 deletion src/server/models/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Request models for PowerMem API
"""

from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, Field


Expand Down Expand Up @@ -72,6 +72,7 @@ class SearchRequest(BaseModel):
run_id: Optional[str] = Field(None, description="Filter by run ID")
filters: Optional[Dict[str, Any]] = Field(None, description="Additional filters")
limit: int = Field(default=30, ge=1, le=100, description="Maximum number of results")
sort_by: Optional[Union[str, List[str]]] = Field(None, description="Sorting criteria: 'relevance', 'date_asc', 'date_desc', 'importance_asc', 'importance_desc', 'access_count_desc', 'retention_desc', or list for multi-criteria sorting")


class UserProfileAddRequest(BaseModel):
Expand Down
7 changes: 6 additions & 1 deletion src/server/services/search_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from powermem import Memory, auto_config
from ..models.errors import ErrorCode, APIError
from ..utils.metrics import get_metrics_collector
Expand Down Expand Up @@ -35,6 +35,7 @@ def search_memories(
run_id: Optional[str] = None,
filters: Optional[Dict[str, Any]] = None,
limit: int = 30,
sort_by: Optional[Union[str, List[str]]] = None,
) -> Dict[str, Any]:
"""
Search memories.
Expand All @@ -46,6 +47,9 @@ def search_memories(
run_id: Filter by run ID
filters: Additional filters
limit: Maximum number of results
sort_by: Sorting criteria: 'relevance', 'date_asc', 'date_desc',
'importance_asc', 'importance_desc', 'access_count_desc',
'retention_desc', or list for multi-criteria sorting

Returns:
Search results dictionary
Expand All @@ -68,6 +72,7 @@ def search_memories(
run_id=run_id,
filters=filters,
limit=limit,
sort_by=sort_by,
)

logger.info(f"Search completed: {len(results.get('results', []))} results")
Expand Down
Loading
Loading