agentscope.pipeline

The pipeline module in AgentScope, that provides syntactic sugar for complex workflows and multi-agent conversations.

class MsgHub[source]

Bases: object

MsgHub class that controls the subscription of the participated agents.

Example

In the following example, the reply message from agent1, agent2, and agent3 will be broadcasted to all the other agents in the MsgHub.

with MsgHub(participant=[agent1, agent2, agent3]):
    agent1()
    agent2()

Actually, it has the same effect as the following code, but much more easy and elegant!

x1 = agent1()
agent2.observe(x1)
agent3.observe(x1)

x2 = agent2()
agent1.observe(x2)
agent3.observe(x2)
__init__(participants, announcement=None)[source]

Initialize a MsgHub context manager.

Parameters:
  • participants (list[AgentBase]) – A list of agents that participate in the MsgHub.

  • None) (announcement (list[Msg] | Msg |) – The message that will be broadcast to all participants when entering the MsgHub.

  • announcement (Msg | list[Msg] | None)

Return type:

None

add(new_participant)[source]

Add new participant into this hub

Parameters:

new_participant (list[AgentBase] | AgentBase)

Return type:

None

delete(participant)[source]

Delete agents from participant.

Parameters:

participant (list[AgentBase] | AgentBase)

Return type:

None

async broadcast(msg)[source]

Broadcast the message to all participants.

Parameters:

msg (list[Msg] | Msg) – Message(s) to be broadcast among all participants.

Return type:

None

class SequentialPipeline[source]

Bases: object

An async sequential pipeline class, which executes a sequence of agents sequentially. Compared with functional pipeline, this class can be re-used.

__init__(agents)[source]

Initialize a sequential pipeline class

Parameters:

agents (list[AgentBase]) – A list of agents.

Return type:

None

async __call__(msg=None)[source]

Execute the sequential pipeline

Parameters:

msg (Msg | list[Msg] | None, defaults to None) – The initial input that will be passed to the first agent.

Return type:

Msg | list[Msg] | None

async sequential_pipeline(agents, msg=None)[source]

An async syntactic sugar pipeline that executes a sequence of agents sequentially. The output of the previous agent will be passed as the input to the next agent. The final output will be the output of the last agent.

Example

agent1 = ReActAgent(...)
agent2 = ReActAgent(...)
agent3 = ReActAgent(...)

msg_input = Msg("user", "Hello", "user")

msg_output = await sequential_pipeline(
    [agent1, agent2, agent3],
    msg_input
)
Parameters:
  • agents (list[AgentBase]) – A list of agents.

  • msg (Msg | list[Msg] | None, defaults to None) – The initial input that will be passed to the first agent.

Returns:

The output of the last agent in the sequence.

Return type:

Msg | list[Msg] | None