agentscope.pipeline¶
The pipeline module in AgentScope, that provides syntactic sugar for complex workflows and multi-agent conversations.
- class MsgHub[source]¶
Bases:
object
MsgHub class that controls the subscription of the participated agents.
Example
In the following example, the reply message from agent1, agent2, and agent3 will be broadcasted to all the other agents in the MsgHub.
with MsgHub(participant=[agent1, agent2, agent3]): agent1() agent2()
Actually, it has the same effect as the following code, but much more easy and elegant!
x1 = agent1() agent2.observe(x1) agent3.observe(x1) x2 = agent2() agent1.observe(x2) agent3.observe(x2)
- __init__(participants, announcement=None, enable_auto_broadcast=True, name=None)[source]¶
Initialize a MsgHub context manager.
- Parameters:
participants (list[AgentBase]) – A list of agents that participate in the MsgHub.
None) (announcement (list[Msg] | Msg |) – The message that will be broadcast to all participants when entering the MsgHub.
enable_auto_broadcast (bool, defaults to True) – Whether to enable automatic broadcasting of the replied message from any participant to all other participants. If disabled, the MsgHub will only serve as a manual message broadcaster with the announcement argument and the broadcast() method.
name (str | None) – The name of this MsgHub. If not provided, a random ID will be generated.
- Return type:
None
- async broadcast(msg)[source]¶
Broadcast the message to all participants.
- Parameters:
msg (list[Msg] | Msg) – Message(s) to be broadcast among all participants.
- Return type:
None
- set_auto_broadcast(enable)[source]¶
Enable automatic broadcasting of the replied message from any participant to all other participants.
- Parameters:
enable (bool) – Whether to enable automatic broadcasting. If disabled, the MsgHub will only serve as a manual message broadcaster with the announcement argument and the broadcast() method.
- Return type:
None
- class SequentialPipeline[source]¶
Bases:
object
An async sequential pipeline class, which executes a sequence of agents sequentially. Compared with functional pipeline, this class can be re-used.
- async sequential_pipeline(agents, msg=None)[source]¶
An async syntactic sugar pipeline that executes a sequence of agents sequentially. The output of the previous agent will be passed as the input to the next agent. The final output will be the output of the last agent.
Example
agent1 = ReActAgent(...) agent2 = ReActAgent(...) agent3 = ReActAgent(...) msg_input = Msg("user", "Hello", "user") msg_output = await sequential_pipeline( [agent1, agent2, agent3], msg_input )
- Parameters:
agents (list[AgentBase]) – A list of agents.
msg (Msg | list[Msg] | None, defaults to None) – The initial input that will be passed to the first agent.
- Returns:
The output of the last agent in the sequence.
- Return type:
Msg | list[Msg] | None
- class FanoutPipeline[source]¶
Bases:
object
An async fanout pipeline class, which distributes the same input to multiple agents. Compared with functional pipeline, this class can be re-used and configured with default parameters.
- __init__(agents, enable_gather=True)[source]¶
Initialize a fanout pipeline class
- Parameters:
agents (list[AgentBase]) – A list of agents to execute.
enable_gather (bool, defaults to True) – Whether to execute agents concurrently using asyncio.gather(). If False, agents are executed sequentially.
- Return type:
None
- async __call__(msg=None, **kwargs)[source]¶
Execute the fanout pipeline
- Parameters:
msg (Msg | list[Msg] | None, defaults to None) – The input message that will be distributed to all agents.
**kwargs (Any) – Additional keyword arguments passed to each agent during execution.
- Returns:
A list of output messages from all agents.
- Return type:
list[Msg]
- async fanout_pipeline(agents, msg=None, enable_gather=True, **kwargs)[source]¶
A fanout pipeline that distributes the same input to multiple agents. This pipeline sends the same message (or a deep copy of it) to all agents and collects their responses. Agents can be executed either concurrently using asyncio.gather() or sequentially depending on the enable_gather parameter.
Example
agent1 = ReActAgent(...) agent2 = ReActAgent(...) agent3 = ReActAgent(...) msg_input = Msg("user", "Hello", "user") # Concurrent execution (default) results = await fanout_pipeline( [agent1, agent2, agent3], msg_input ) # Sequential execution results = await fanout_pipeline( [agent1, agent2, agent3], msg_input, enable_gather=False )
- Parameters:
agents (list[AgentBase]) – A list of agents.
msg (Msg | list[Msg] | None, defaults to None) – The initial input that will be passed to all agents.
enable_gather (bool, defaults to True) – Whether to execute agents concurrently using asyncio.gather(). If False, agents are executed sequentially.
**kwargs (Any) – Additional keyword arguments passed to each agent during execution.
- Returns:
A list of response messages from each agent.
- Return type:
list[Msg]