# -*- coding: utf-8 -*-"""Functional counterpart for Pipeline"""importasynciofromcopyimportdeepcopyfromtypingimportAnyfrom..agentimportAgentBasefrom..messageimportMsg
[文档]asyncdefsequential_pipeline(agents:list[AgentBase],msg:Msg|list[Msg]|None=None,)->Msg|list[Msg]|None:"""An async syntactic sugar pipeline that executes a sequence of agents sequentially. The output of the previous agent will be passed as the input to the next agent. The final output will be the output of the last agent. Example: .. code-block:: python agent1 = ReActAgent(...) agent2 = ReActAgent(...) agent3 = ReActAgent(...) msg_input = Msg("user", "Hello", "user") msg_output = await sequential_pipeline( [agent1, agent2, agent3], msg_input ) Args: agents (`list[AgentBase]`): A list of agents. msg (`Msg | list[Msg] | None`, defaults to `None`): The initial input that will be passed to the first agent. Returns: `Msg | list[Msg] | None`: The output of the last agent in the sequence. """foragentinagents:msg=awaitagent(msg)returnmsg
[文档]asyncdeffanout_pipeline(agents:list[AgentBase],msg:Msg|list[Msg]|None=None,enable_gather:bool=True,**kwargs:Any,)->list[Msg]:"""A fanout pipeline that distributes the same input to multiple agents. This pipeline sends the same message (or a deep copy of it) to all agents and collects their responses. Agents can be executed either concurrently using asyncio.gather() or sequentially depending on the enable_gather parameter. Example: .. code-block:: python agent1 = ReActAgent(...) agent2 = ReActAgent(...) agent3 = ReActAgent(...) msg_input = Msg("user", "Hello", "user") # Concurrent execution (default) results = await fanout_pipeline( [agent1, agent2, agent3], msg_input ) # Sequential execution results = await fanout_pipeline( [agent1, agent2, agent3], msg_input, enable_gather=False ) Args: agents (`list[AgentBase]`): A list of agents. msg (`Msg | list[Msg] | None`, defaults to `None`): The initial input that will be passed to all agents. enable_gather (`bool`, defaults to `True`): Whether to execute agents concurrently using `asyncio.gather()`. If False, agents are executed sequentially. **kwargs (`Any`): Additional keyword arguments passed to each agent during execution. Returns: `list[Msg]`: A list of response messages from each agent. """ifenable_gather:tasks=[asyncio.create_task(agent(deepcopy(msg),**kwargs))foragentinagents]returnawaitasyncio.gather(*tasks)else:return[awaitagent(deepcopy(msg),**kwargs)foragentinagents]