Source code for agentscope.memory._reme._reme_task_long_term_memory

# -*- coding: utf-8 -*-
"""Task memory implementation using ReMe library.

This module provides a task memory implementation that integrates
with the ReMe library to learn from execution trajectories and
retrieve relevant task experiences.

Requirements:
    Python 3.12 or greater is required to use ReMe.
"""
from typing import Any

from ._reme_long_term_memory_base import ReMeLongTermMemoryBase
from ..._logging import logger
from ...message import Msg, TextBlock
from ...tool import ToolResponse


[docs] class ReMeTaskLongTermMemory(ReMeLongTermMemoryBase): """Task memory implementation using ReMe library. Task memory learns from execution trajectories and provides retrieval of relevant task experiences. Requirements: Python 3.12 or greater is required to use ReMe. """
[docs] async def record_to_memory( self, thinking: str, content: list[str], **kwargs: Any, ) -> ToolResponse: """Record task execution experiences and learnings. Record task execution experiences and learnings to long-term memory. Use this function to save valuable task-related knowledge that can help with future similar tasks. This enables learning from experience and improving over time. When to record: - After solving technical problems or completing tasks - When discovering useful techniques or approaches - After implementing solutions with specific steps - When learning best practices or important lessons What to record: Be detailed and actionable. Include: - Task description and context - Step-by-step execution details - Specific techniques and methods used - Results, outcomes, and effectiveness - Lessons learned and considerations Args: thinking (`str`): Your reasoning about why this task experience is valuable and what makes it worth remembering for future reference. content (`list[str]`): List of specific task insights to remember. Each string should be a clear, actionable piece of information. Examples: ["Add indexes on WHERE clause columns to speed up queries", "Use EXPLAIN ANALYZE to identify missing indexes"]. **kwargs (`Any`): Additional keyword arguments. Can include 'score' (float) to indicate the quality/success of this approach (default: 1.0). Returns: `ToolResponse`: Confirmation message indicating successful memory recording. """ logger.info( "[ReMeTaskMemory] Entering record_to_memory - " "thinking: %s, content: %s, kwargs: %s", thinking, content, kwargs, ) if not self._app_started: raise RuntimeError( "ReMeApp context not started. " "Please use 'async with' to initialize the app.", ) try: # Prepare messages for task memory recording messages = [] # Add thinking as a user message if provided if thinking: messages.append( { "role": "user", "content": thinking, }, ) # Add content items as user-assistant pairs for item in content: messages.append( { "role": "user", "content": item, }, ) # Add a simple assistant acknowledgment messages.append( { "role": "assistant", "content": "Task information recorded.", }, ) result = await self.app.async_execute( name="summary_task_memory", workspace_id=self.workspace_id, trajectories=[ { "messages": messages, "score": kwargs.pop("score", 1.0), }, ], **kwargs, ) # Extract metadata if available summary_text = ( f"Successfully recorded {len(content)} task memory/memories." ) return ToolResponse( content=[ TextBlock( type="text", text=summary_text, ), ], metadata={"result": result}, ) except Exception as e: logger.exception("Error recording task memory: %s", str(e)) return ToolResponse( content=[ TextBlock( type="text", text=f"Error recording task memory: {str(e)}", ), ], )
[docs] async def retrieve_from_memory( self, keywords: list[str], **kwargs: Any, ) -> ToolResponse: """Search and retrieve relevant task experiences. Search and retrieve relevant task experiences from long-term memory. IMPORTANT: You should call this function BEFORE attempting to solve problems or answer technical questions. This ensures you leverage experiences and proven solutions rather than starting from scratch. Use this when: - Asked to solve a technical problem or implement a solution - Asked for recommendations, best practices, or approaches - Asked "what do you know about...?" or "have you seen this before?" - Dealing with tasks that may be similar to experiences - Need to recall specific techniques or methods Benefits of retrieving first: - Learn from past successes and mistakes - Provide more accurate, battle-tested solutions - Avoid reinventing the wheel - Give consistent, informed recommendations Args: keywords (`list[str]`): Keywords describing the task or problem domain. Be specific and use technical terms. Examples: ["database optimization", "slow queries"], ["API design", "rate limiting"], ["code refactoring", "Python"]. **kwargs (`Any`): Additional keyword arguments. Can include 'top_k' (int) to specify number of experiences to retrieve (default: 3). Returns: `ToolResponse`: Retrieved task experiences and learnings. If no relevant experiences found, you'll receive a message indicating that. """ logger.info( "[ReMeTaskMemory] Entering retrieve_from_memory - " "keywords: %s, kwargs: %s", keywords, kwargs, ) if not self._app_started: raise RuntimeError( "ReMeApp context not started. " "Please use 'async with' to initialize the app.", ) try: results = [] # Search for each keyword top_k = kwargs.get("top_k", 3) for keyword in keywords: result = await self.app.async_execute( name="retrieve_task_memory", workspace_id=self.workspace_id, query=keyword, top_k=top_k, **kwargs, ) # Extract the answer from the result answer = result.get("answer", "") if answer: results.append(f"Keyword '{keyword}':\n{answer}") # Combine all results if results: combined_text = "\n\n".join(results) else: combined_text = ( "No task experiences found for the given keywords." ) return ToolResponse( content=[ TextBlock( type="text", text=combined_text, ), ], ) except Exception as e: logger.exception("Error retrieving task memory: %s", str(e)) return ToolResponse( content=[ TextBlock( type="text", text=f"Error retrieving task memory: {str(e)}", ), ], )
[docs] async def record( self, msgs: list[Msg | None], **kwargs: Any, ) -> None: """Record the content to the task memory. This method converts AgentScope messages to ReMe's format and records them as a task execution trajectory. Args: msgs (`list[Msg | None]`): The messages to record to memory. **kwargs (`Any`): Additional keyword arguments for the recording. Can include 'score' (float) for trajectory scoring (default: 1.0). """ if isinstance(msgs, Msg): msgs = [msgs] # Filter out None msg_list = [_ for _ in msgs if _] if not msg_list: return if not all(isinstance(_, Msg) for _ in msg_list): raise TypeError( "The input messages must be a list of Msg objects.", ) if not self._app_started: raise RuntimeError( "ReMeApp context not started. " "Please use 'async with' to initialize the app.", ) try: # Convert AgentScope messages to ReMe format messages = [] for msg in msg_list: # Extract content as string if isinstance(msg.content, str): content_str = msg.content elif isinstance(msg.content, list): # Join content blocks into a single string content_parts = [] for block in msg.content: if isinstance(block, dict) and "text" in block: content_parts.append(block["text"]) elif isinstance(block, dict) and "thinking" in block: content_parts.append(block["thinking"]) content_str = "\n".join(content_parts) else: content_str = str(msg.content) messages.append( { "role": msg.role, "content": content_str, }, ) # Extract score from kwargs if provided, default to 1.0 score = kwargs.pop("score", 1.0) await self.app.async_execute( name="summary_task_memory", workspace_id=self.workspace_id, trajectories=[ { "messages": messages, "score": score, }, ], **kwargs, ) except Exception as e: # Log the error but don't raise to maintain compatibility logger.exception( "Error recording messages to task memory: %s", str(e), ) import warnings warnings.warn( f"Error recording messages to task memory: {str(e)}", )
[docs] async def retrieve( self, msg: Msg | list[Msg] | None, **kwargs: Any, ) -> str: """Retrieve relevant task experiences from memory. Args: msg (`Msg | list[Msg] | None`): The message to search for relevant task experiences. **kwargs (`Any`): Additional keyword arguments. Returns: `str`: The retrieved task experiences as a string. """ if msg is None: return "" if isinstance(msg, Msg): msg = [msg] if not isinstance(msg, list) or not all( isinstance(_, Msg) for _ in msg ): raise TypeError( "The input message must be a Msg or a list of Msg objects.", ) if not self._app_started: raise RuntimeError( "ReMeApp context not started. " "Please use 'async with' to initialize the app.", ) try: # Only use the last message's content for retrieval last_msg = msg[-1] query = "" if isinstance(last_msg.content, str): query = last_msg.content elif isinstance(last_msg.content, list): # Extract text from content blocks content_parts = [] for block in last_msg.content: if isinstance(block, dict) and "text" in block: content_parts.append(block["text"]) elif isinstance(block, dict) and "thinking" in block: content_parts.append(block["thinking"]) query = "\n".join(content_parts) if not query: return "" # Retrieve using the query from the last message top_k = kwargs.get("top_k", 3) result = await self.app.async_execute( name="retrieve_task_memory", workspace_id=self.workspace_id, query=query, top_k=top_k, **kwargs, ) return result.get("answer", "") except Exception as e: logger.exception("Error retrieving task memory: %s", str(e)) import warnings warnings.warn(f"Error retrieving task memory: {str(e)}") return ""