# -*- coding: utf-8 -*-
""" Base class for Rpc Agent """
from typing import Type, Optional, Union, Sequence
from agentscope.agents.agent import AgentBase
from agentscope.message import Msg
from agentscope.message import PlaceholderMessage
from agentscope.rpc import RpcAgentClient
from agentscope.serialize import serialize
from agentscope.server.launcher import RpcAgentServerLauncher
from agentscope.studio._client import _studio_client
[docs]
class RpcAgent(AgentBase):
"""A wrapper to extend an AgentBase into a gRPC Client."""
[docs]
def __init__(
self,
name: str,
host: str = "localhost",
port: int = None,
agent_class: Type[AgentBase] = None,
agent_configs: Optional[dict] = None,
max_pool_size: int = 8192,
max_timeout_seconds: int = 7200,
local_mode: bool = True,
lazy_launch: bool = False,
agent_id: str = None,
connect_existing: bool = False,
) -> None:
"""Initialize a RpcAgent instance.
Args:
name (`str`): the name of the agent.
host (`str`, defaults to `localhost`):
Hostname of the rpc agent server.
port (`int`, defaults to `None`):
Port of the rpc agent server.
agent_class (`Type[AgentBase]`):
the AgentBase subclass of the source agent.
agent_configs (`dict`): The args used to
init configs of the agent, generated by `_AgentMeta`.
max_pool_size (`int`, defaults to `8192`):
Max number of task results that the server can accommodate.
max_timeout_seconds (`int`, defaults to `7200`):
Timeout for task results.
local_mode (`bool`, defaults to `True`):
Whether the started gRPC server only listens to local
requests.
lazy_launch (`bool`, defaults to `False`):
Only launch the server when the agent is called.
agent_id (`str`, defaults to `None`):
The agent id of this instance. If `None`, it will
be generated randomly.
connect_existing (`bool`, defaults to `False`):
Set to `True`, if the agent is already running on the agent
server.
"""
super().__init__(name=name)
self.agent_class = agent_class
self.agent_configs = agent_configs
self.host = host
self.port = port
self.server_launcher = None
self.client = None
self.connect_existing = connect_existing
if agent_id is not None:
self._agent_id = agent_id
# if host and port are not provided, launch server locally
if self.port is None and _studio_client.active:
server = _studio_client.alloc_server()
if "host" in server:
if RpcAgentClient(
host=server["host"],
port=server["port"],
).is_alive():
self.host = server["host"]
self.port = server["port"]
launch_server = self.port is None
if launch_server:
# check studio first
self.host = "localhost"
studio_url = None
if _studio_client.active:
studio_url = _studio_client.studio_url
self.server_launcher = RpcAgentServerLauncher(
host=self.host,
port=port,
max_pool_size=max_pool_size,
max_timeout_seconds=max_timeout_seconds,
local_mode=local_mode,
custom_agent_classes=[agent_class],
studio_url=studio_url,
)
if not lazy_launch:
self._launch_server()
else:
self.client = RpcAgentClient(
host=self.host,
port=self.port,
agent_id=self.agent_id,
)
if not self.connect_existing:
self.client.create_agent(
agent_configs,
)
def _launch_server(self) -> None:
"""Launch a rpc server and update the port and the client"""
self.server_launcher.launch()
self.port = self.server_launcher.port
self.client = RpcAgentClient(
host=self.host,
port=self.port,
agent_id=self.agent_id,
)
self.client.create_agent(self.agent_configs)
[docs]
def reply(self, x: Optional[Union[Msg, Sequence[Msg]]] = None) -> Msg:
if self.client is None:
self._launch_server()
return PlaceholderMessage(
client=self.client,
x=x,
)
[docs]
def observe(self, x: Union[Msg, Sequence[Msg]]) -> None:
if self.client is None:
self._launch_server()
self.client.call_agent_func(
func_name="_observe",
value=serialize(x),
)
[docs]
def clone_instances(
self,
num_instances: int,
including_self: bool = True,
) -> Sequence[AgentBase]:
"""
Clone a series of this instance with different agent_id and
return them as a list.
Args:
num_instances (`int`): The number of instances in the returned
list.
including_self (`bool`): Whether to include the instance calling
this method in the returned list.
Returns:
`Sequence[AgentBase]`: A list of agent instances.
"""
generated_instance_number = (
num_instances - 1 if including_self else num_instances
)
generated_instances = []
# launch the server before clone instances
if self.client is None:
self._launch_server()
# put itself as the first element of the returned list
if including_self:
generated_instances.append(self)
# clone instances without agent server
for _ in range(generated_instance_number):
new_agent_id = self.client.clone_agent(self.agent_id)
generated_instances.append(
RpcAgent(
name=self.name,
host=self.host,
port=self.port,
agent_id=new_agent_id,
connect_existing=True,
),
)
return generated_instances
[docs]
def stop(self) -> None:
"""Stop the RpcAgent and the rpc server."""
if self.server_launcher is not None:
self.server_launcher.shutdown()
def __del__(self) -> None:
self.stop()