# -*- coding: utf-8 -*-"""MsgHub is designed to share messages among a group of agents."""from__future__importannotationsfromtypingimportAny,Optional,Union,Sequencefromloguruimportloggerfrom.agentsimportAgentBasefrom.messageimportMsg
[docs]classMsgHubManager:"""MsgHub manager class for sharing dialog among a group of agents."""def__init__(self,participants:Sequence[AgentBase],announcement:Optional[Union[Sequence[Msg],Msg]]=None,)->None:"""Initialize a msghub manager from the given arguments. Args: participants (`Sequence[AgentBase]`): The Sequence of participants in the msghub. announcement (`Optional[Union[list[Msg], Msg]]`, defaults to `None`): The message that will be broadcast to all participants at the first without requiring response. """self.participants=participantsself.announcement=announcementdef__enter__(self)->MsgHubManager:"""Will be called when entering the msghub."""name_participants=[agent.nameforagentinself.participants]logger.debug("Enter msghub with participants: {}",", ".join(name_participants,),)self._reset_audience()# broadcast the input message to all participantsifself.announcementisnotNone:foragentinself.participants:agent.observe(self.announcement)returnselfdef__exit__(self,*args:Any,**kwargs:Any)->None:"""Will be called when exiting the msghub."""foragentinself.participants:agent.clear_audience()def_reset_audience(self)->None:"""Reset the audience for agent in `self.participant`"""foragentinself.participants:agent.reset_audience(self.participants)
[docs]defadd(self,new_participant:Union[Sequence[AgentBase],AgentBase],)->None:"""Add new participant into this hub"""ifisinstance(new_participant,AgentBase):new_participant=[new_participant]foragentinnew_participant:ifagentnotinself.participants:self.participants.append(agent)else:logger.warning(f"Skip adding agent [{agent.name}] for it has ""already joined in.",)self._reset_audience()
[docs]defdelete(self,participant:Union[Sequence[AgentBase],AgentBase],)->None:"""Delete agents from participant."""ifisinstance(participant,AgentBase):participant=[participant]foragentinparticipant:ifagentinself.participants:# Clear the audience of the deleted agent firstlyagent.clear_audience()# remove agent from self.participantself.participants.pop(self.participants.index(agent))else:logger.warning(f"Cannot find agent [{agent.name}], skip its"f" deletion.",)# Remove this agent from the audience of other agentsself._reset_audience()
[docs]defbroadcast(self,msg:Union[Msg,Sequence[Msg]])->None:"""Broadcast the message to all participants. Args: msg (`Union[Msg, Sequence[Msg]]`): One or a list of dict messages to broadcast among all participants. """foragentinself.participants:agent.observe(msg)
[docs]defmsghub(participants:Sequence[AgentBase],announcement:Optional[Union[Sequence[Msg],Msg]]=None,)->MsgHubManager:"""msghub is used to share messages among a group of agents. Args: participants (`Sequence[AgentBase]`): A Sequence of participated agents in the msghub. announcement (`Optional[Union[list[Msg], Msg]]`, defaults to `None`): The message that will be broadcast to all participants at the very beginning without requiring response. Example: In the following code, we create a msghub with three agents, and each message output by `agent1`, `agent2`, `agent3` will be passed to all other agents, that's what we mean msghub. .. code-block:: python with msghub(participant=[agent1, agent2, agent3]): agent1() agent2() Actually, it has the same effect as the following code, but much more easy and elegant! .. code-block:: python x1 = agent1() agent2.observe(x1) agent3.observe(x1) x2 = agent2() agent1.observe(x2) agent3.observe(x2) """returnMsgHubManager(participants,announcement)