Note
Go to the end to download the full example code.
Pipeline¶
For multi-agent orchestration, AgentScope provides the agentscope.pipeline
module
as syntax sugar for chaining agents together, including
MsgHub: a message hub for broadcasting messages among multiple agents
sequential_pipeline and SequentialPipeline: a functional and class-based implementation that chains agents in a sequential manner
fanout_pipeline and FanoutPipeline: a functional and class-based implementation that distributes the same input to multiple agents
import os, asyncio
from agentscope.formatter import DashScopeMultiAgentFormatter
from agentscope.message import Msg
from agentscope.model import DashScopeChatModel
from agentscope.agent import ReActAgent
from agentscope.pipeline import MsgHub
Broadcasting with MsgHub¶
The MsgHub
class is an async context manager, receiving a list of agents as its participants.
When one participant generates a replying message, all other participants will receive this message by calling their observe
method.
That means within a MsgHub
context, developers don’t need to manually send a replying message from one agent to another.
The broadcasting is automatically handled.
Here we create four agents: Alice, Bob, Charlie and David. Then we start a meeting with Alice, Bob and Charlie by introducing themselves. Note David is not included in this meeting.
def create_agent(name: str, age: int, career: str) -> ReActAgent:
"""Create agent object by the given information."""
return ReActAgent(
name=name,
sys_prompt=f"You're {name}, a {age}-year-old {career}",
model=DashScopeChatModel(
model_name="qwen-max",
api_key=os.environ["DASHSCOPE_API_KEY"],
),
formatter=DashScopeMultiAgentFormatter(),
)
alice = create_agent("Alice", 50, "teacher")
bob = create_agent("Bob", 35, "engineer")
charlie = create_agent("Charlie", 28, "designer")
david = create_agent("David", 30, "developer")
Then we start a meeting and let them introduce themselves without manual message passing:
Hint
The message in announcement
will be broadcasted to all participants when entering the MsgHub
context.
async def example_broadcast_message():
"""Example of broadcasting messages with MsgHub."""
# Create a message hub
async with MsgHub(
participants=[alice, bob, charlie],
announcement=Msg(
"user",
"Now introduce yourself in one sentence, including your name, age and career.",
"user",
),
) as hub:
# Group chat without manual message passing
await alice()
await bob()
await charlie()
asyncio.run(example_broadcast_message())
Alice: Hello, I'm Alice, a 50-year-old teacher with a passion for education and fostering young minds.
Bob: Hello, I'm Bob, a 35-year-old engineer with a knack for solving complex problems and designing innovative solutions.
Charlie: Hello, I'm Charlie, a 28-year-old designer with a creative flair for bringing ideas to life through visual and interactive designs.
Now let’s check if Bob, Charlie and David received Alice’s message.
async def check_broadcast_message():
"""Check if the messages are broadcast correctly."""
user_msg = Msg(
"user",
"Do you know who's Alice, and what she does? Answer me briefly.",
"user",
)
await bob(user_msg)
await charlie(user_msg)
await david(user_msg)
asyncio.run(check_broadcast_message())
Bob: Alice is a 50-year-old teacher who is passionate about education and fostering young minds.
Charlie: Alice is a 50-year-old teacher who is passionate about education and nurturing young minds.
David: I don't have specific information about an Alice you're referring to, as there could be many people with that name. Could you provide more context or details about which Alice you're asking about?
Now we observe that Bob and Charlie know Alice and her profession, while David has no idea
about Alice since he is not included in the MsgHub
context.
Dynamic Participant Management¶
Additionally, MsgHub
supports to dynamically manage participants by the following methods:
add
: add one or multiple agents as new participantsdelete
: remove one or multiple agents from participants, and they will no longer receive broadcasted messagesbroadcast
: broadcast a message to all current participants
Note
The newly added participants will not receive the previous messages.
async with MsgHub(participants=[alice]) as hub:
# Add new participants
hub.add(david)
# Remove participants
hub.delete(alice)
# Broadcast to all current participants
await hub.broadcast(
Msg("system", "Now we begin to ...", "system"),
)
Pipeline¶
Pipeline serves as a syntax sugar for multi-agent orchestration.
Currently, AgentScope provides two main pipeline implementations:
Sequential Pipeline: Execute agents one by one in a predefined order
Fanout Pipeline: Distribute the same input to multiple agents and collect their responses
Sequential Pipeline¶
The sequential pipeline executes agents one by one, where the output of the previous agent becomes the input of the next agent.
For example, the two following code snippets are equivalent:
msg = None
msg = await alice(msg)
msg = await bob(msg)
msg = await charlie(msg)
msg = await david(msg)
from agentscope.pipeline import sequential_pipeline
msg = await sequential_pipeline(
# List of agents to be executed in order
agents=[alice, bob, charlie, david],
# The first input message, can be None
msg=None
)
Fanout Pipeline¶
The fanout pipeline distributes the same input message to multiple agents simultaneously and collects all their responses. This is useful when you want to gather different perspectives or expertise on the same topic.
For example, the two following code snippets are equivalent:
from copy import deepcopy
msgs = []
msg = None
for agent in [alice, bob, charlie, david]:
msgs.append(await agent(deepcopy(msg)))
from agentscope.pipeline import fanout_pipeline
msgs = await fanout_pipeline(
# List of agents to be executed in order
agents=[alice, bob, charlie, david],
# The first input message, can be None
msg=None,
enable_gather=False,
)
Note
The enable_gather
parameter controls the execution mode of the fanout pipeline:
enable_gather=True
(default): Executes all agents concurrently usingasyncio.gather()
. This provides better performance for I/O-bound operations like API calls, as agents run in parallel.enable_gather=False
: Executes agents sequentially one by one. This is useful when you need deterministic execution order or want to avoid overwhelming external services with concurrent requests.
Choose concurrent execution for better performance, or sequential execution for predictable ordering and resource control.
Tip
By combining MsgHub
and sequential_pipeline
or fanout_pipeline
, you can create more complex workflows very easily.
Advanced Pipeline Features¶
Additionally, for reusability, we also provide a class-based implementation:
from agentscope.pipeline import SequentialPipeline
# Create a pipeline object
pipeline = SequentialPipeline(agents=[alice, bob, charlie, david])
# Call the pipeline
msg = await pipeline(msg=None)
# Reuse the pipeline with different input
msg = await pipeline(msg=Msg("user", "Hello!", "user"))
from agentscope.pipeline import FanoutPipeline
# Create a pipeline object
pipeline = FanoutPipeline(agents=[alice, bob, charlie, david])
# Call the pipeline
msgs = await pipeline(msg=None)
# Reuse the pipeline with different input
msgs = await pipeline(msg=Msg("user", "Hello!", "user"))
Total running time of the script: (0 minutes 12.710 seconds)