Source code for agentscope.pipeline._class

# -*- coding: utf-8 -*-
"""Pipeline classes."""
from ._functional import sequential_pipeline
from ..agent import AgentBase
from ..message import Msg


[docs] class SequentialPipeline: """An async sequential pipeline class, which executes a sequence of agents sequentially. Compared with functional pipeline, this class can be re-used."""
[docs] def __init__( self, agents: list[AgentBase], ) -> None: """Initialize a sequential pipeline class Args: agents (`list[AgentBase]`): A list of agents. """ self.agents = agents
[docs] async def __call__( self, msg: Msg | list[Msg] | None = None, ) -> Msg | list[Msg] | None: """Execute the sequential pipeline Args: msg (`Msg | list[Msg] | None`, defaults to `None`): The initial input that will be passed to the first agent. """ return await sequential_pipeline(agents=self.agents, msg=msg)