# -*- coding: utf-8 -*-
"""The ReAct Agent V2, which use the tools API of the LLM service providers
rather than assembling the prompt manually."""
from typing import Union, Optional, Tuple, Type, Any
from pydantic import BaseModel, ValidationError
from ._agent import AgentBase
from ..manager import ModelManager
from ..message import Msg, ToolUseBlock, TextBlock, ContentBlock
from ..models import (
OpenAIChatWrapper,
DashScopeChatWrapper,
AnthropicChatWrapper,
)
from ..service import ServiceToolkit, ServiceResponse, ServiceExecStatus
[docs]
class ReActAgentV2(AgentBase):
"""The ReAct Agent V2, which use the tools API of the LLM service providers
rather than assembling the prompt manually.
The currently supported LLM providers include:
- Anthropic
- DashScope
- OpenAI
"""
_finish_function: str = "generate_response"
"""The function name used to finish replying and return a response"""
def __init__(
self,
name: str,
model_config_name: str,
service_toolkit: ServiceToolkit,
sys_prompt: str = "You're a helpful assistant named {name}.",
max_iters: int = 10,
verbose: bool = True,
exit_reply_without_tool_calls: bool = True,
) -> None:
"""Initial the ReAct agent with the given name, model config name and
tools.
Args:
name (`str`):
The name of the agent.
model_config_name (`str`):
The name of the model config, which is used to load model from
configuration.
service_toolkit (`ServiceToolkit`):
The service toolkit object that contains the tool functions.
sys_prompt (`str`):
The system prompt of the agent.
max_iters (`int`, defaults to `10`):
The maximum number of iterations of the reasoning-acting loops.
verbose (`bool`, defaults to `True`):
Whether to print the detailed information during reasoning and
acting steps. If `False`, only the content in speak field will
be print out.
exit_reply_without_tool_calls (`bool`, defaults to `True`):
Whether to exit the reply function when no tool calls are
generated. If `True`, the agent is allowed to generate a
response without calling the `generate_response` function.
"""
super().__init__(name=name)
self.sys_prompt: str = sys_prompt.format(name=self.name)
self.model = ModelManager.get_instance().get_model_by_config_name(
model_config_name,
)
assert self.model.model_type in [
OpenAIChatWrapper.model_type,
DashScopeChatWrapper.model_type,
AnthropicChatWrapper.model_type,
], (
f"The model type {self.model.model_type} is not supported yet by "
f"the ReActAgentV2. Only {OpenAIChatWrapper.model_type}, "
f"{DashScopeChatWrapper.model_type} and "
f"{AnthropicChatWrapper.model_type} are supported."
)
self.service_toolkit = service_toolkit
self.service_toolkit.add(
self.generate_response,
include_var_keyword=False,
)
self.verbose = verbose
self.max_iters = max_iters
self.exit_reply_without_tool_calls = exit_reply_without_tool_calls
# Used to store the structured output in the current reply
self._current_structured_output = None
self._current_structured_model: Union[Type[BaseModel], None] = None
# Clear the structured output after each reply
self.register_hook(
"post_reply",
"as_clear_structured_output",
self._clear_structured_output_hook,
)
[docs]
def reply(
self,
x: Optional[Union[Msg, list[Msg]]] = None,
structured_model: Optional[Type[BaseModel]] = None,
) -> Msg:
"""The reply method of the agent.
Args:
x (`Optional[Union[Msg, list[Msg]]]`):
The input message(s) to the agent.
structured_model (`Optional[Type[BaseModel]]`,
defaults to `None`):
A Pydantic model class that defines the structured output. If
provided, the schema of the structured output will be merged
with the `generate_response` function, and the argument
`exit_reply_without_tool_calls` will be invalid. The agent
must call the `generate_response` function to end the reply
with a structured output.
Returns:
`Msg`:
The response message from the agent.
"""
# Merge the schema of the structured output with the finish function
if structured_model:
self._prepare_structured_output_schema(
structured_model,
)
self.memory.add(x)
force_call_finish_func = False
for _ in range(self.max_iters):
# Reasoning to generate tool calls
tool_calls, msg_reasoning = self._reasoning(force_call_finish_func)
# Exit when 1) no tool calls generated, and
# 2) exit_reply_without_tool_calls is True
# 3) structured output is not required
if (
tool_calls is None
and self.exit_reply_without_tool_calls
and structured_model is None
):
self.memory.add(msg_reasoning)
return msg_reasoning
# If exit_reply_without_tool_calls is False, or structured output
# is required, we drop the last reasoning message and
# force the LLM to call the finish function
if tool_calls is None:
force_call_finish_func = True
continue
self.memory.add(msg_reasoning)
# Acting based on the tool calls
msg_response = self._acting(tool_calls)
if msg_response:
return msg_response
# Generate a response when exceeding the maximum iterations
return self._summarizing()
def _reasoning(
self,
force_call_finish_func: bool = False,
) -> Tuple[Union[list[ToolUseBlock], None], Msg]:
"""The reasoning process of the agent.
Args:
force_call_finish_func (`bool`, defaults to `False`):
Force to call the finish function, which is set to `True` when
the last reasoning message fails to generate tool calls and
`exit_reply_without_tool_calls` is `False`.
Returns:
`Tuple[Union[list[ToolUseBlock], None], Msg]`:
Return the tool calls (`None` if empty) and reasoning message.
"""
prompt = self.model.format(
Msg(
"system",
self.sys_prompt,
role="system",
),
self.memory.get_memory(),
# TODO: Support multi-agent mode in the future
multi_agent_mode=False,
)
raw_response = self.model(
prompt,
tools=self.model.format_tools_json_schemas(
self.service_toolkit.json_schemas,
),
tool_choice=self._finish_function
if force_call_finish_func
else None,
)
if self.verbose:
self.speak(
raw_response.stream or raw_response.text,
tool_calls=raw_response.tool_calls,
)
# Prepare the content for the msg
content: list[ContentBlock] = []
if raw_response.text:
content.append(TextBlock(type="text", text=raw_response.text))
if raw_response.tool_calls:
content.extend(raw_response.tool_calls)
msg_reasoning = Msg(
self.name,
content,
role="assistant",
)
return raw_response.tool_calls, msg_reasoning
def _acting(self, tool_calls: list[ToolUseBlock]) -> Union[None, Msg]:
"""The acting process of the agent, which takes a tool use block as
input, execute the function and return a message if the
`generate_response`
function is called.
Args:
tool_calls (`ToolUseBlock`):
The tool use block to be executed.
Returns:
`Union[None, Msg]`:
Return `None` if the function is not `generate_response`,
otherwise return a message to the user.
"""
msg_response: Union[None, Msg] = None
for tool_call in tool_calls:
# Execute the function
msg_execution = self.service_toolkit.parse_and_call_func(
tool_call,
tools_api_mode=True,
)
# Print and remember the execution result
if self.verbose:
self.speak(msg_execution)
self.memory.add(msg_execution)
# When calling finish function, return a message if no structured
# output is required or have met the structured output
if (
tool_call["name"] == self._finish_function
and self._current_structured_model is None
or self._current_structured_output is not None
):
# This message won't be record in the memory for duplicate
# meaning with the tool result block
msg_response = Msg(
self.name,
str(tool_call["input"]["response"]),
"assistant",
metadata=self._current_structured_output,
)
self.speak(msg_response)
return msg_response
def _summarizing(self) -> Msg:
"""Generate a response when the agent fails to solve the problem in
the maximum iterations."""
hint_msg = Msg(
"user",
"You have failed to generate response within the maximum "
"iterations. Now respond directly by summarizing the current "
"situation.",
role="user",
echo=self.verbose,
)
# Generate a reply by summarizing the current situation
prompt = self.model.format(
self.memory.get_memory(),
hint_msg,
)
res = self.model(prompt)
self.speak(res.stream or res.text)
res_msg = Msg(self.name, res.text, "assistant")
return res_msg
[docs]
def generate_response(
self,
response: str, # pylint: disable=unused-argument
**kwargs: Any,
) -> ServiceResponse:
"""Generate a response. You must call this function to interact with
others (e.g., users).
Args:
response (`str`):
The response to the user.
"""
if self._current_structured_model:
try:
self._current_structured_model.model_validate(kwargs)
self._current_structured_output = kwargs
except ValidationError as e:
return ServiceResponse(
status=ServiceExecStatus.ERROR,
content=f"Validation error: {e.errors()}",
)
return ServiceResponse(
status=ServiceExecStatus.SUCCESS,
content="Success",
)
def _prepare_structured_output_schema(
self,
structured_output: Type[BaseModel],
) -> None:
"""Merge the structured output into the schema of the finish function
as additional parameters.
Args:
structured_output (`Type[BaseModel]`):
A Pydantic model class that defines the structured output.
Returns:
`dict`:
The merged function schema with the structured output.
"""
if not issubclass(structured_output, BaseModel):
raise TypeError(
f"structured_output must be a Pydantic model, "
f"but got {type(structured_output)}",
)
self._current_structured_model = structured_output
output_schema = structured_output.model_json_schema()
# Update the schema of the finish function by inserting the
# structured output schema
finish_function_schema = self.service_toolkit.json_schemas[
self._finish_function
]
for key, value in output_schema["properties"].items():
if (
key
in finish_function_schema["function"]["parameters"][
"properties"
]
):
raise ValueError(
f"The key '{key}' in structured output already exists in "
f"the finish function '{self._finish_function}'. "
"Try to use another key name.",
)
finish_function_schema["function"]["parameters"]["properties"][
key
] = value
if key in output_schema["required"]:
finish_function_schema["function"]["parameters"][
"required"
].append(key)
def _clear_structured_output_hook( # type: ignore
self, # pylint: disable=unused-argument
*args,
**kwargs,
) -> None:
"""Clear the structured output."""
self._current_structured_output = None
self._current_structured_model = None
self.service_toolkit.remove(self._finish_function)
self.service_toolkit.add(
self.generate_response,
include_var_keyword=False,
)