# -*- coding: utf-8 -*-"""MsgHub is designed to share messages among a group of agents."""fromtypingimportAnyimportshortuuidfrom.._loggingimportloggerfrom..agentimportAgentBasefrom..messageimportMsg
[文档]classMsgHub:"""MsgHub class that controls the subscription of the participated agents. Example: In the following example, the reply message from `agent1`, `agent2`, and `agent3` will be broadcasted to all the other agents in the MsgHub. .. code-block:: python with MsgHub(participant=[agent1, agent2, agent3]): agent1() agent2() Actually, it has the same effect as the following code, but much more easy and elegant! .. code-block:: python x1 = agent1() agent2.observe(x1) agent3.observe(x1) x2 = agent2() agent1.observe(x2) agent3.observe(x2) """
[文档]def__init__(self,participants:list[AgentBase],announcement:list[Msg]|Msg|None=None,enable_auto_broadcast:bool=True,name:str|None=None,)->None:"""Initialize a MsgHub context manager. Args: participants (`list[AgentBase]`): A list of agents that participate in the MsgHub. announcement (`list[Msg] | Msg | None`): The message that will be broadcast to all participants when entering the MsgHub. enable_auto_broadcast (`bool`, defaults to `True`): Whether to enable automatic broadcasting of the replied message from any participant to all other participants. If disabled, the MsgHub will only serve as a manual message broadcaster with the `announcement` argument and the `broadcast()` method. name (`str | None`): The name of this MsgHub. If not provided, a random ID will be generated. """self.name=nameorshortuuid.uuid()self.participants=participantsself.announcement=announcementself.enable_auto_broadcast=enable_auto_broadcast
asyncdef__aenter__(self)->"MsgHub":"""Will be called when entering the MsgHub."""self._reset_subscriber()# broadcast the input message to all participantsifself.announcementisnotNone:awaitself.broadcast(msg=self.announcement)returnselfasyncdef__aexit__(self,*args:Any,**kwargs:Any)->None:"""Will be called when exiting the MsgHub."""ifself.enable_auto_broadcast:foragentinself.participants:agent.remove_subscribers(self.name)def_reset_subscriber(self)->None:"""Reset the subscriber for agent in `self.participant`"""ifself.enable_auto_broadcast:foragentinself.participants:agent.reset_subscribers(self.name,self.participants)
[文档]defadd(self,new_participant:list[AgentBase]|AgentBase,)->None:"""Add new participant into this hub"""ifisinstance(new_participant,AgentBase):new_participant=[new_participant]foragentinnew_participant:ifagentnotinself.participants:self.participants.append(agent)self._reset_subscriber()
[文档]defdelete(self,participant:list[AgentBase]|AgentBase,)->None:"""Delete agents from participant."""ifisinstance(participant,AgentBase):participant=[participant]foragentinparticipant:ifagentinself.participants:# remove agent from self.participantself.participants.pop(self.participants.index(agent))else:logger.warning("Cannot find the agent with ID %s, skip its deletion.",agent.id,)# Remove this agent from the subscriber of other agentsself._reset_subscriber()
[文档]asyncdefbroadcast(self,msg:list[Msg]|Msg)->None:"""Broadcast the message to all participants. Args: msg (`list[Msg] | Msg`): Message(s) to be broadcast among all participants. """foragentinself.participants:awaitagent.observe(msg)
[文档]defset_auto_broadcast(self,enable:bool)->None:"""Enable automatic broadcasting of the replied message from any participant to all other participants. Args: enable (`bool`): Whether to enable automatic broadcasting. If disabled, the MsgHub will only serve as a manual message broadcaster with the `announcement` argument and the `broadcast()` method. """ifenable:self.enable_auto_broadcast=Trueself._reset_subscriber()else:self.enable_auto_broadcast=Falseforagentinself.participants:agent.remove_subscribers(self.name)