Skip to content

Conversation

@wenjin272
Copy link
Collaborator

@wenjin272 wenjin272 commented Nov 20, 2025

Linked issue: #331

Purpose of change

Introduce the long-term memory interface in python, and provide an implementation based on chroma.

This is the first pr of three to introduce long-term memory in python:

  1. interface and one implementation
  2. support using long-term memory in action
  3. async interface and execution

Tests

Unit test

API

Yes, add long-term memory related api.

Documentation

  • doc-needed

@github-actions github-actions bot added priority/major Default priority of the PR or issue. fixVersion/0.2.0 The feature or bug should be implemented/fixed in the 0.2.0 version. doc-needed Your PR changes impact docs. labels Nov 20, 2025
@wenjin272 wenjin272 changed the title Long term memory [api][runtime] Introduce long-term memory in python Nov 20, 2025
from flink_agents.api.prompts.prompt import Prompt


class ReduceStrategy(Enum):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the name Reduce. I think Compact might be better.

SUMMARIZE = "summarize"


class ReduceSetup(BaseModel):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to name this CompactionStrategy, and make it an abstract class that we can provide different implementations, so we can have strict limit on which arguments should be specified for each strategy. We can call the current ReduceStrategy CompactionStrategyType.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think CompactionStrategy.trim(n) might be more straightforward for users, compared to ReduceSetup.trim_setup(n).

id: str
value: Any
compacted: bool = False
created_time: DatetimeRange
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
created_time: DatetimeRange
created_time: DatetimeRange | datetime

size: int = 0
capacity: int
reduce_setup: ReduceSetup
item_ids: List[str] = Field(default_factory=list)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to store ids of all the items?

capacity: int
reduce_setup: ReduceSetup
item_ids: List[str] = Field(default_factory=list)
reduced: bool = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And why do we need to know whether a memory set has been reduced/compacted?

)

# Connection configuration
persist_directory: str | None = Field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this directory for?

Comment on lines +200 to +202
if memory_set.size >= memory_set.capacity:
# trigger reduce operation to manage memory set size.
self._reduce(memory_set)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be extremely slow. We should proactively do the compaction.

self.client.delete_collection(name=name)

@override
def add(self, memory_set: MemorySet, memory_item: str | ChatMessage) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a feeling that adding items to long-term memory can take time, for embedding. We probably should also provide async apis.

return self.slice(memory_set=memory_set, offset=offset, n=n)

@override
def search(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Comment on lines +411 to +416
def _trim(self, memory_set: MemorySet) -> None:
reduce_setup: ReduceSetup = memory_set.reduce_setup
n = reduce_setup.arguments.get("n")
self.delete(memory_set=memory_set, offset=0, n=n)

def _summarize(self, memory_set: MemorySet) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these methods specialized for this class?

@wenjin272
Copy link
Collaborator Author

wenjin272 commented Dec 1, 2025

Hi, @alnzng. There's a design issue related to the vector store that I'd appreciate your help reviewing.

As describe in the design doc #339, long-term memory of flink-agents is also based on vector store. Currently, I provide an implementation based on chroma. In this implementation, I directly use chroma client rather than flink-agents BaseVectorStore, because there are some long-term memory needed interface not provided in BaseVectorStore.

@xintongsong believes that we can directly build upon the Flink-Agents BaseVectorStore. Thus, we can support using any already supported vector store as the backend for long-term memory.

I think this make sense, but it requires add some interfaces to BaseVectorStore, which maybe look like:

def get_or_create_collection(self, name: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
    """Get a collection, create if it doesn't already exist."""
    
def get_collection(self, name: str) -> None:
    """Get an existing collection."""
    
def update_collection(self, name: str, metadata: Dict[str, Any]) -> None:
    """Update an existing collection."""

def delete_collection(self, name: str) -> bool:
    """Delete a collection."""
    
def add(self, document: Document, collection_name: str | None = None) -> None:
    """Add a document to the collection."""

def update(self, document: Document, collection_name: str | None = None) -> None:
    """Update a document, can only update metadata."""

def delete(self, offset: int | None, limit: int | None, ids: List[int] | None = None, **kwargs: Any) -> bool:
    """Delete documents from collection."""

def get(self, offset: int | None, limit: int | None, collection_name: str | None = None, **kwargs: Any) -> List[Document]:
    """Get documents from collection."""

These interface may not be achievable for each vector store, I will conduct research and refinement afterward.
WDTY?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-needed Your PR changes impact docs. fixVersion/0.2.0 The feature or bug should be implemented/fixed in the 0.2.0 version. priority/major Default priority of the PR or issue.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants