Source code for agentscope.memory._mem0_long_term_memory

# -*- coding: utf-8 -*-
"""Long-term memory implementation using mem0 library.

This module provides a long-term memory implementation that integrates
with the mem0 library to provide persistent memory storage and retrieval
capabilities for AgentScope agents.
"""
import json
from typing import Any, TYPE_CHECKING
from importlib import metadata
from packaging import version


from pydantic import field_validator

from ..embedding import EmbeddingModelBase
from ._long_term_memory_base import LongTermMemoryBase
from ..message import Msg, TextBlock
from ..model import ChatModelBase
from ..tool import ToolResponse


if TYPE_CHECKING:
    from mem0.configs.base import MemoryConfig
    from mem0.vector_stores.configs import VectorStoreConfig
else:
    MemoryConfig = Any
    VectorStoreConfig = Any


def _create_agentscope_config_classes() -> tuple:
    """Create custom config classes for agentscope providers."""
    from mem0.embeddings.configs import EmbedderConfig
    from mem0.llms.configs import LlmConfig

    class _ASLlmConfig(LlmConfig):
        """Custom LLM config class that updates the validate_config method.

        Attention: in mem0, the validate_config hardcodes the provider, so we
        need to override the validate_config method to support the agentscope
        providers. We will follow up with the mem0 to improve this.
        """

        @field_validator("config")
        @classmethod
        def validate_config(cls, v: Any, values: Any) -> Any:
            """Validate the LLM configuration."""
            from mem0.utils.factory import LlmFactory

            provider = values.data.get("provider")
            if provider in LlmFactory.provider_to_class:
                return v
            raise ValueError(f"Unsupported LLM provider: {provider}")

    class _ASEmbedderConfig(EmbedderConfig):
        """Custom embedder config class that updates the validate_config
        method."""

        @field_validator("config")
        @classmethod
        def validate_config(cls, v: Any, values: Any) -> Any:
            """Validate the embedder configuration."""
            from mem0.utils.factory import EmbedderFactory

            provider = values.data.get("provider")
            if provider in EmbedderFactory.provider_to_class:
                return v
            raise ValueError(f"Unsupported Embedder provider: {provider}")

    return _ASLlmConfig, _ASEmbedderConfig


[docs] class Mem0LongTermMemory(LongTermMemoryBase): """A class that implements the LongTermMemoryBase interface using mem0."""
[docs] def __init__( self, agent_name: str | None = None, user_name: str | None = None, run_name: str | None = None, model: ChatModelBase | None = None, embedding_model: EmbeddingModelBase | None = None, vector_store_config: VectorStoreConfig | None = None, mem0_config: MemoryConfig | None = None, default_memory_type: str | None = None, **kwargs: Any, ) -> None: """Initialize the Mem0LongTermMemory instance Args: agent_name (`str | None`, optional): The name of the agent. Default is None. user_name (`str | None`, optional): The name of the user. Default is None. run_name (`str | None`, optional): The name of the run/session. Default is None. .. note:: 1. At least one of `agent_name`, `user_name`, or `run_name` is required. 2. During memory recording, these parameters become metadata for the stored memories. 3. During memory retrieval, only memories with matching metadata values will be returned. model (`ChatModelBase | None`, optional): The chat model to use for the long-term memory. If mem0_config is provided, this will override the LLM configuration. If mem0_config is None, this is required. embedding_model (`EmbeddingModelBase | None`, optional): The embedding model to use for the long-term memory. If mem0_config is provided, this will override the embedder configuration. If mem0_config is None, this is required. vector_store_config (`VectorStoreConfig | None`, optional): The vector store config to use for the long-term memory. If mem0_config is provided, this will override the vector store configuration. If mem0_config is None and this is not provided, defaults to Qdrant with on_disk=True. mem0_config (`MemoryConfig | None`, optional): The mem0 config to use for the long-term memory. If provided, individual model/embedding_model/vector_store_config parameters will override the corresponding configurations in mem0_config. If None, a new MemoryConfig will be created using the provided parameters. default_memory_type (`str | None`, optional): The type of memory to use. Default is None, to create a semantic memory. Raises: `ValueError`: If `mem0_config` is None and either `model` or `embedding_model` is None. """ super().__init__() try: import mem0 from mem0.configs.llms.base import BaseLlmConfig from mem0.utils.factory import LlmFactory, EmbedderFactory # Check mem0 version current_version = metadata.version("mem0ai") is_mem0_version_low = version.parse( current_version, ) <= version.parse("0.1.115") # Register the agentscope providers with mem0 EmbedderFactory.provider_to_class[ "agentscope" ] = "agentscope.memory._mem0_utils.AgentScopeEmbedding" if is_mem0_version_low: # For mem0 version <= 0.1.115, use the old style LlmFactory.provider_to_class[ "agentscope" ] = "agentscope.memory._mem0_utils.AgentScopeLLM" else: # For mem0 version > 0.1.115, use the new style LlmFactory.provider_to_class["agentscope"] = ( "agentscope.memory._mem0_utils.AgentScopeLLM", BaseLlmConfig, ) except ImportError as e: raise ImportError( "Please install the mem0 library by `pip install mem0ai`", ) from e # Create the custom config classes for agentscope providers dynamically _ASLlmConfig, _ASEmbedderConfig = _create_agentscope_config_classes() if agent_name is None and user_name is None and run_name is None: raise ValueError( "at least one of agent_name, user_name, and run_name is " "required", ) # Store agent and user identifiers for memory management self.agent_id = agent_name self.user_id = user_name self.run_id = run_name # Configuration logic: Handle mem0_config parameter if mem0_config is not None: # Case 1: mem0_config is provided - override specific # configurations if individual params are given # Override LLM configuration if model is provided if model is not None: mem0_config.llm = _ASLlmConfig( provider="agentscope", config={"model": model}, ) # Override embedder configuration if embedding_model is provided if embedding_model is not None: mem0_config.embedder = _ASEmbedderConfig( provider="agentscope", config={"model": embedding_model}, ) # Override vector store configuration if vector_store_config is # provided if vector_store_config is not None: mem0_config.vector_store = vector_store_config else: # Case 2: mem0_config is not provided - create new configuration # from individual parameters # Validate that required parameters are provided if model is None or embedding_model is None: raise ValueError( "model and embedding_model are required if mem0_config " "is not provided", ) # Create new MemoryConfig with provided LLM and embedder mem0_config = mem0.configs.base.MemoryConfig( llm=_ASLlmConfig( provider="agentscope", config={"model": model}, ), embedder=_ASEmbedderConfig( provider="agentscope", config={"model": embedding_model}, ), ) # Set vector store configuration if vector_store_config is not None: # Use provided vector store configuration mem0_config.vector_store = vector_store_config else: # Use default Qdrant configuration with on-disk storage for # persistence set on_disk to True to enable persistence, # otherwise it will be in memory only on_disk = kwargs.get("on_disk", True) mem0_config.vector_store = ( mem0.vector_stores.configs.VectorStoreConfig( config={"on_disk": on_disk}, ) ) # Initialize the async memory instance with the configured settings self.long_term_working_memory = mem0.AsyncMemory(mem0_config) # Store the default memory type for future use self.default_memory_type = default_memory_type
[docs] async def record_to_memory( self, thinking: str, content: list[str], **kwargs: Any, ) -> ToolResponse: """Use this function to record important information that you may need later. The target content should be specific and concise, e.g. who, when, where, do what, why, how, etc. Args: thinking (`str`): Your thinking and reasoning about what to record. content (`list[str]`): The content to remember, which is a list of strings. """ try: if thinking: content = [thinking] + content results = await self._mem0_record( [ { "role": "assistant", "content": "\n".join(content), "name": "assistant", }, ], **kwargs, ) return ToolResponse( content=[ TextBlock( type="text", text=f"Successfully recorded content to memory " f"{results}", ), ], ) except Exception as e: return ToolResponse( content=[ TextBlock( type="text", text=f"Error recording memory: {str(e)}", ), ], )
[docs] async def retrieve_from_memory( self, keywords: list[str], limit: int = 5, **kwargs: Any, ) -> ToolResponse: """Retrieve the memory based on the given keywords. Args: keywords (`list[str]`): The keywords to search for in the memory, which should be specific and concise, e.g. the person's name, the date, the location, etc. limit (`int`, optional): The maximum number of memories to retrieve per search. Returns: `ToolResponse`: A ToolResponse containing the retrieved memories as JSON text. """ try: results = [] for keyword in keywords: result = await self.long_term_working_memory.search( query=keyword, agent_id=self.agent_id, user_id=self.user_id, run_id=self.run_id, limit=limit, ) if result: results.extend( [item["memory"] for item in result["results"]], ) return ToolResponse( content=[ TextBlock( type="text", text="\n".join(results), ), ], ) except Exception as e: return ToolResponse( content=[ TextBlock( type="text", text=f"Error retrieving memory: {str(e)}", ), ], )
[docs] async def record( self, msgs: list[Msg | None], memory_type: str | None = None, infer: bool = True, **kwargs: Any, ) -> None: """Record the content to the long-term memory. Args: msgs (`list[Msg | None]`): The messages to record to memory. memory_type (`str | None`, optional): The type of memory to use. Default is None, to create a semantic memory. "procedural_memory" is explicitly used for procedural memories. infer (`bool`, optional): Whether to infer memory from the content. Default is True. **kwargs (`Any`): Additional keyword arguments for the mem0 recording. """ if isinstance(msgs, Msg): msgs = [msgs] # Filter out None msg_list = [_ for _ in msgs if _] if not all(isinstance(_, Msg) for _ in msg_list): raise TypeError( "The input messages must be a list of Msg objects.", ) messages = [ { "role": "assistant", "content": "\n".join([str(_.content) for _ in msg_list]), "name": "assistant", }, ] await self._mem0_record( messages, memory_type=memory_type, infer=infer, **kwargs, )
async def _mem0_record( self, messages: str | list[dict], memory_type: str | None, infer: bool = True, **kwargs: Any, ) -> dict: """Record the content to the long-term memory. Args: messages (`str`): The content to remember, which is a string or a list of dictionaries representing messages. memory_type (`str | None`, optional): The type of memory to use. Default is None, to create a semantic memory. "procedural_memory" is explicitly used for procedural memories. infer (`bool`, optional): Whether to infer memory from the content. Default is True. **kwargs (`Any`): Additional keyword arguments. Returns: `dict`: The result from the memory recording operation. """ results = await self.long_term_working_memory.add( messages=messages, agent_id=self.agent_id, user_id=self.user_id, run_id=self.run_id, memory_type=( memory_type if memory_type is not None else self.default_memory_type ), infer=infer, **kwargs, ) return results
[docs] async def retrieve( self, msg: Msg | list[Msg] | None, limit: int = 5, **kwargs: Any, ) -> str: """Retrieve the content from the long-term memory. Args: msg (`Msg | list[Msg] | None`): The message to search for in the memory, which should be specific and concise, e.g. the person's name, the date, the location, etc. limit (`int`, optional): The maximum number of memories to retrieve per search. **kwargs (`Any`): Additional keyword arguments. Returns: `str`: The retrieved memory """ if isinstance(msg, Msg): msg = [msg] if not isinstance(msg, list) or not all( isinstance(_, Msg) for _ in msg ): raise TypeError( "The input message must be a Msg or a list of Msg objects.", ) msg_strs = [ json.dumps(_.to_dict()["content"], ensure_ascii=False) for _ in msg ] results = [] for item in msg_strs: result = await self.long_term_working_memory.search( query=item, agent_id=self.agent_id, user_id=self.user_id, run_id=self.run_id, limit=limit, ) if result: results.extend([item["memory"] for item in result["results"]]) return "\n".join(results)