Source code for agentscope.pipeline._functional

# -*- coding: utf-8 -*-
"""Functional counterpart for Pipeline"""
from ..agent import AgentBase
from ..message import Msg


[docs] async def sequential_pipeline( agents: list[AgentBase], msg: Msg | list[Msg] | None = None, ) -> Msg | list[Msg] | None: """An async syntactic sugar pipeline that executes a sequence of agents sequentially. The output of the previous agent will be passed as the input to the next agent. The final output will be the output of the last agent. Example: .. code-block:: python agent1 = ReActAgent(...) agent2 = ReActAgent(...) agent3 = ReActAgent(...) msg_input = Msg("user", "Hello", "user") msg_output = await sequential_pipeline( [agent1, agent2, agent3], msg_input ) Args: agents (`list[AgentBase]`): A list of agents. msg (`Msg | list[Msg] | None`, defaults to `None`): The initial input that will be passed to the first agent. Returns: `Msg | list[Msg] | None`: The output of the last agent in the sequence. """ for agent in agents: msg = await agent(msg) return msg