[docs]classSequentialPipeline:"""An async sequential pipeline class, which executes a sequence of agents sequentially. Compared with functional pipeline, this class can be re-used."""
[docs]def__init__(self,agents:list[AgentBase],)->None:"""Initialize a sequential pipeline class Args: agents (`list[AgentBase]`): A list of agents. """self.agents=agents
[docs]asyncdef__call__(self,msg:Msg|list[Msg]|None=None,)->Msg|list[Msg]|None:"""Execute the sequential pipeline Args: msg (`Msg | list[Msg] | None`, defaults to `None`): The initial input that will be passed to the first agent. """returnawaitsequential_pipeline(agents=self.agents,msg=msg)