agentscope.pipeline

The pipeline module in AgentScope, that provides syntactic sugar for complex workflows and multi-agent conversations.

class MsgHub[source]

Bases: object

MsgHub class that controls the subscription of the participated agents.

Example

In the following example, the reply message from agent1, agent2, and agent3 will be broadcasted to all the other agents in the MsgHub.

with MsgHub(participant=[agent1, agent2, agent3]):
    agent1()
    agent2()

Actually, it has the same effect as the following code, but much more easy and elegant!

x1 = agent1()
agent2.observe(x1)
agent3.observe(x1)

x2 = agent2()
agent1.observe(x2)
agent3.observe(x2)
__init__(participants, announcement=None, enable_auto_broadcast=True, name=None)[source]

Initialize a MsgHub context manager.

Parameters:
  • participants (list[AgentBase]) – A list of agents that participate in the MsgHub.

  • None) (announcement (list[Msg] | Msg |) – The message that will be broadcast to all participants when entering the MsgHub.

  • enable_auto_broadcast (bool, defaults to True) – Whether to enable automatic broadcasting of the replied message from any participant to all other participants. If disabled, the MsgHub will only serve as a manual message broadcaster with the announcement argument and the broadcast() method.

  • name (str | None) – The name of this MsgHub. If not provided, a random ID will be generated.

  • announcement (Msg | list[Msg] | None)

Return type:

None

add(new_participant)[source]

Add new participant into this hub

Parameters:

new_participant (list[AgentBase] | AgentBase)

Return type:

None

delete(participant)[source]

Delete agents from participant.

Parameters:

participant (list[AgentBase] | AgentBase)

Return type:

None

async broadcast(msg)[source]

Broadcast the message to all participants.

Parameters:

msg (list[Msg] | Msg) – Message(s) to be broadcast among all participants.

Return type:

None

set_auto_broadcast(enable)[source]

Enable automatic broadcasting of the replied message from any participant to all other participants.

Parameters:

enable (bool) – Whether to enable automatic broadcasting. If disabled, the MsgHub will only serve as a manual message broadcaster with the announcement argument and the broadcast() method.

Return type:

None

class SequentialPipeline[source]

Bases: object

An async sequential pipeline class, which executes a sequence of agents sequentially. Compared with functional pipeline, this class can be re-used.

__init__(agents)[source]

Initialize a sequential pipeline class

Parameters:

agents (list[AgentBase]) – A list of agents.

Return type:

None

async __call__(msg=None)[source]

Execute the sequential pipeline

Parameters:

msg (Msg | list[Msg] | None, defaults to None) – The initial input that will be passed to the first agent.

Return type:

Msg | list[Msg] | None

async sequential_pipeline(agents, msg=None)[source]

An async syntactic sugar pipeline that executes a sequence of agents sequentially. The output of the previous agent will be passed as the input to the next agent. The final output will be the output of the last agent.

Example

agent1 = ReActAgent(...)
agent2 = ReActAgent(...)
agent3 = ReActAgent(...)

msg_input = Msg("user", "Hello", "user")

msg_output = await sequential_pipeline(
    [agent1, agent2, agent3],
    msg_input
)
Parameters:
  • agents (list[AgentBase]) – A list of agents.

  • msg (Msg | list[Msg] | None, defaults to None) – The initial input that will be passed to the first agent.

Returns:

The output of the last agent in the sequence.

Return type:

Msg | list[Msg] | None

class FanoutPipeline[source]

Bases: object

An async fanout pipeline class, which distributes the same input to multiple agents. Compared with functional pipeline, this class can be re-used and configured with default parameters.

__init__(agents, enable_gather=True)[source]

Initialize a fanout pipeline class

Parameters:
  • agents (list[AgentBase]) – A list of agents to execute.

  • enable_gather (bool, defaults to True) – Whether to execute agents concurrently using asyncio.gather(). If False, agents are executed sequentially.

Return type:

None

async __call__(msg=None, **kwargs)[source]

Execute the fanout pipeline

Parameters:
  • msg (Msg | list[Msg] | None, defaults to None) – The input message that will be distributed to all agents.

  • **kwargs (Any) – Additional keyword arguments passed to each agent during execution.

Returns:

A list of output messages from all agents.

Return type:

list[Msg]

async fanout_pipeline(agents, msg=None, enable_gather=True, **kwargs)[source]

A fanout pipeline that distributes the same input to multiple agents. This pipeline sends the same message (or a deep copy of it) to all agents and collects their responses. Agents can be executed either concurrently using asyncio.gather() or sequentially depending on the enable_gather parameter.

Example

agent1 = ReActAgent(...)
agent2 = ReActAgent(...)
agent3 = ReActAgent(...)

msg_input = Msg("user", "Hello", "user")

# Concurrent execution (default)
results = await fanout_pipeline(
    [agent1, agent2, agent3],
    msg_input
)

# Sequential execution
results = await fanout_pipeline(
    [agent1, agent2, agent3],
    msg_input,
    enable_gather=False
)
Parameters:
  • agents (list[AgentBase]) – A list of agents.

  • msg (Msg | list[Msg] | None, defaults to None) – The initial input that will be passed to all agents.

  • enable_gather (bool, defaults to True) – Whether to execute agents concurrently using asyncio.gather(). If False, agents are executed sequentially.

  • **kwargs (Any) – Additional keyword arguments passed to each agent during execution.

Returns:

A list of response messages from each agent.

Return type:

list[Msg]