agentscope.pipeline._functional 源代码

# -*- coding: utf-8 -*-
"""Functional counterpart for Pipeline"""
import asyncio
from copy import deepcopy
from typing import Any
from ..agent import AgentBase
from ..message import Msg


[文档] async def sequential_pipeline( agents: list[AgentBase], msg: Msg | list[Msg] | None = None, ) -> Msg | list[Msg] | None: """An async syntactic sugar pipeline that executes a sequence of agents sequentially. The output of the previous agent will be passed as the input to the next agent. The final output will be the output of the last agent. Example: .. code-block:: python agent1 = ReActAgent(...) agent2 = ReActAgent(...) agent3 = ReActAgent(...) msg_input = Msg("user", "Hello", "user") msg_output = await sequential_pipeline( [agent1, agent2, agent3], msg_input ) Args: agents (`list[AgentBase]`): A list of agents. msg (`Msg | list[Msg] | None`, defaults to `None`): The initial input that will be passed to the first agent. Returns: `Msg | list[Msg] | None`: The output of the last agent in the sequence. """ for agent in agents: msg = await agent(msg) return msg
[文档] async def fanout_pipeline( agents: list[AgentBase], msg: Msg | list[Msg] | None = None, enable_gather: bool = True, **kwargs: Any, ) -> list[Msg]: """A fanout pipeline that distributes the same input to multiple agents. This pipeline sends the same message (or a deep copy of it) to all agents and collects their responses. Agents can be executed either concurrently using asyncio.gather() or sequentially depending on the enable_gather parameter. Example: .. code-block:: python agent1 = ReActAgent(...) agent2 = ReActAgent(...) agent3 = ReActAgent(...) msg_input = Msg("user", "Hello", "user") # Concurrent execution (default) results = await fanout_pipeline( [agent1, agent2, agent3], msg_input ) # Sequential execution results = await fanout_pipeline( [agent1, agent2, agent3], msg_input, enable_gather=False ) Args: agents (`list[AgentBase]`): A list of agents. msg (`Msg | list[Msg] | None`, defaults to `None`): The initial input that will be passed to all agents. enable_gather (`bool`, defaults to `True`): Whether to execute agents concurrently using `asyncio.gather()`. If False, agents are executed sequentially. **kwargs (`Any`): Additional keyword arguments passed to each agent during execution. Returns: `list[Msg]`: A list of response messages from each agent. """ if enable_gather: tasks = [ asyncio.create_task(agent(deepcopy(msg), **kwargs)) for agent in agents ] return await asyncio.gather(*tasks) else: return [await agent(deepcopy(msg), **kwargs) for agent in agents]