agentscope.pipeline

The pipeline module in AgentScope, that provides syntactic sugar for complex workflows and multi-agent conversations.

class MsgHub[源代码]

基类:object

MsgHub class that controls the subscription of the participated agents.

示例

In the following example, the reply message from agent1, agent2, and agent3 will be broadcasted to all the other agents in the MsgHub.

with MsgHub(participant=[agent1, agent2, agent3]):
    agent1()
    agent2()

Actually, it has the same effect as the following code, but much more easy and elegant!

x1 = agent1()
agent2.observe(x1)
agent3.observe(x1)

x2 = agent2()
agent1.observe(x2)
agent3.observe(x2)
__init__(participants, announcement=None, enable_auto_broadcast=True, name=None)[源代码]

Initialize a MsgHub context manager.

参数:
  • participants (list[AgentBase]) -- A list of agents that participate in the MsgHub.

  • None) (announcement (list[Msg] | Msg |) -- The message that will be broadcast to all participants when entering the MsgHub.

  • enable_auto_broadcast (bool, defaults to True) -- Whether to enable automatic broadcasting of the replied message from any participant to all other participants. If disabled, the MsgHub will only serve as a manual message broadcaster with the announcement argument and the broadcast() method.

  • name (str | None) -- The name of this MsgHub. If not provided, a random ID will be generated.

  • announcement (Msg | list[Msg] | None)

返回类型:

None

add(new_participant)[源代码]

Add new participant into this hub

参数:

new_participant (list[AgentBase] | AgentBase)

返回类型:

None

delete(participant)[源代码]

Delete agents from participant.

参数:

participant (list[AgentBase] | AgentBase)

返回类型:

None

async broadcast(msg)[源代码]

Broadcast the message to all participants.

参数:

msg (list[Msg] | Msg) -- Message(s) to be broadcast among all participants.

返回类型:

None

set_auto_broadcast(enable)[源代码]

Enable automatic broadcasting of the replied message from any participant to all other participants.

参数:

enable (bool) -- Whether to enable automatic broadcasting. If disabled, the MsgHub will only serve as a manual message broadcaster with the announcement argument and the broadcast() method.

返回类型:

None

class SequentialPipeline[源代码]

基类:object

An async sequential pipeline class, which executes a sequence of agents sequentially. Compared with functional pipeline, this class can be re-used.

__init__(agents)[源代码]

Initialize a sequential pipeline class

参数:

agents (list[AgentBase]) -- A list of agents.

返回类型:

None

async __call__(msg=None)[源代码]

Execute the sequential pipeline

参数:

msg (Msg | list[Msg] | None, defaults to None) -- The initial input that will be passed to the first agent.

返回类型:

Msg | list[Msg] | None

async sequential_pipeline(agents, msg=None)[源代码]

An async syntactic sugar pipeline that executes a sequence of agents sequentially. The output of the previous agent will be passed as the input to the next agent. The final output will be the output of the last agent.

示例

agent1 = ReActAgent(...)
agent2 = ReActAgent(...)
agent3 = ReActAgent(...)

msg_input = Msg("user", "Hello", "user")

msg_output = await sequential_pipeline(
    [agent1, agent2, agent3],
    msg_input
)
参数:
  • agents (list[AgentBase]) -- A list of agents.

  • msg (Msg | list[Msg] | None, defaults to None) -- The initial input that will be passed to the first agent.

返回:

The output of the last agent in the sequence.

返回类型:

Msg | list[Msg] | None

class FanoutPipeline[源代码]

基类:object

An async fanout pipeline class, which distributes the same input to multiple agents. Compared with functional pipeline, this class can be re-used and configured with default parameters.

__init__(agents, enable_gather=True)[源代码]

Initialize a fanout pipeline class

参数:
  • agents (list[AgentBase]) -- A list of agents to execute.

  • enable_gather (bool, defaults to True) -- Whether to execute agents concurrently using asyncio.gather(). If False, agents are executed sequentially.

返回类型:

None

async __call__(msg=None, **kwargs)[源代码]

Execute the fanout pipeline

参数:
  • msg (Msg | list[Msg] | None, defaults to None) -- The input message that will be distributed to all agents.

  • **kwargs (Any) -- Additional keyword arguments passed to each agent during execution.

返回:

A list of output messages from all agents.

返回类型:

list[Msg]

async fanout_pipeline(agents, msg=None, enable_gather=True, **kwargs)[源代码]

A fanout pipeline that distributes the same input to multiple agents. This pipeline sends the same message (or a deep copy of it) to all agents and collects their responses. Agents can be executed either concurrently using asyncio.gather() or sequentially depending on the enable_gather parameter.

示例

agent1 = ReActAgent(...)
agent2 = ReActAgent(...)
agent3 = ReActAgent(...)

msg_input = Msg("user", "Hello", "user")

# Concurrent execution (default)
results = await fanout_pipeline(
    [agent1, agent2, agent3],
    msg_input
)

# Sequential execution
results = await fanout_pipeline(
    [agent1, agent2, agent3],
    msg_input,
    enable_gather=False
)
参数:
  • agents (list[AgentBase]) -- A list of agents.

  • msg (Msg | list[Msg] | None, defaults to None) -- The initial input that will be passed to all agents.

  • enable_gather (bool, defaults to True) -- Whether to execute agents concurrently using asyncio.gather(). If False, agents are executed sequentially.

  • **kwargs (Any) -- Additional keyword arguments passed to each agent during execution.

返回:

A list of response messages from each agent.

返回类型:

list[Msg]