备注
Go to the end to download the full example code.
实时智能体¶
实时智能体(Realtime Agent) 用于处理实时交互场景,例如语音对话或实时聊天会话。 AgentScope 中的实时智能体具有以下特性:
集成 OpenAI、DashScope、Gemini 等实时模型 API
统一的事件接口,简化与不同实时模型的交互
支持工具调用能力
支持多智能体交互
备注
实时智能体目前处于活跃开发阶段,欢迎社区贡献、讨论和反馈!如果开发者对实时智能体感兴趣,欢迎加入讨论和开发。
import asyncio
import os
from agentscope.agent import RealtimeAgent
from agentscope.realtime import (
DashScopeRealtimeModel,
OpenAIRealtimeModel,
GeminiRealtimeModel,
)
创建实时模型¶
AgentScope 目前支持以下实时模型 API:
提供商 |
类名 |
支持的模型 |
输入模态 |
工具支持 |
|---|---|---|---|---|
DashScope |
|
|
文本、音频、图像 |
否 |
OpenAI |
|
|
文本、音频 |
是 |
Gemini |
|
|
文本、音频、图像 |
是 |
以下是初始化不同实时模型的示例:
# DashScope 实时模型
dashscope_model = DashScopeRealtimeModel(
model_name="qwen3-omni-flash-realtime",
api_key=os.getenv("DASHSCOPE_API_KEY"),
voice="Cherry", # 可选项: "Cherry", "Serena", "Ethan", "Chelsie"
enable_input_audio_transcription=True,
)
# OpenAI 实时模型
openai_model = OpenAIRealtimeModel(
model_name="gpt-4o-realtime-preview",
api_key=os.getenv("OPENAI_API_KEY"),
voice="alloy", # 可选项: "alloy", "echo", "marin", "cedar"
enable_input_audio_transcription=True,
)
# Gemini 实时模型
gemini_model = GeminiRealtimeModel(
model_name="gemini-2.5-flash-native-audio-preview-09-2025",
api_key=os.getenv("GEMINI_API_KEY"),
voice="Puck", # 可选项: "Puck", "Charon", "Kore", "Fenrir"
enable_input_audio_transcription=True,
)
实时模型提供以下核心方法:
方法 |
描述 |
|---|---|
|
建立与实时模型 API 的 WebSocket 连接 |
|
关闭 WebSocket 连接 |
|
向实时模型发送音频/文本/图像数据进行处理 |
connect() 方法中的 outgoing_queue 参数是一个 asyncio 队列,
用于将实时模型的事件转发到外部(例如智能体或前端)。
模型事件接口¶
AgentScope 提供统一的 agentscope.realtime.ModelEvents 接口,
简化与不同实时模型的交互。支持以下事件:
备注
ModelEvents 中的 "session" 指的是实时模型与模型 API 之间的 WebSocket 连接会话,而非前端与后端之间的会话。
事件 |
描述 |
|---|---|
|
会话创建成功 |
|
会话已结束 |
|
模型开始生成响应 |
|
模型完成响应生成 |
|
流式音频数据块 |
|
音频响应完成 |
|
流式音频转录文本块 |
|
音频转录完成 |
|
流式工具调用参数 |
|
工具调用参数完成 |
|
流式用户输入转录文本块 |
|
用户输入转录完成 |
|
检测到用户音频输入开始(VAD) |
|
检测到用户音频输入结束(VAD) |
|
发生错误 |
创建实时智能体¶
RealtimeAgent 作为桥接层,负责:
将实时模型的
ModelEvents转换为ServerEvents,发送给前端和其他智能体接收来自前端或其他智能体的
ClientEvents,并转发给实时模型 API管理智能体的生命周期和事件队列
服务端和客户端事件¶
AgentScope 提供统一的 ServerEvents 和 ClientEvents,
用于后端与前端之间的通信:
**ServerEvents**(后端 → 前端):
事件 |
描述 |
|---|---|
|
后端创建会话 |
|
后端更新会话 |
|
后端结束会话 |
|
智能体准备接收输入 |
|
智能体已结束 |
|
智能体开始生成响应 |
|
智能体完成响应生成 |
|
智能体流式音频块 |
|
音频响应完成 |
|
智能体响应的流式转录 |
|
转录完成 |
|
流式工具调用数据 |
|
工具调用完成 |
|
工具执行结果 |
|
用户输入的流式转录 |
|
输入转录完成 |
|
用户音频输入开始 |
|
用户音频输入结束 |
|
发生错误 |
**ClientEvents**(前端 → 后端):
事件 |
描述 |
|---|---|
|
创建指定配置的新会话 |
|
结束当前会话 |
|
请求智能体立即生成响应 |
|
中断智能体的当前响应 |
|
追加文本输入 |
|
追加音频输入 |
|
提交音频输入(标志输入结束) |
|
追加图像输入 |
|
发送工具执行结果 |
初始化实时智能体¶
以下是创建和使用实时智能体的示例:
async def example_realtime_agent() -> None:
"""创建和使用实时智能体的示例。"""
agent = RealtimeAgent(
name="Friday",
sys_prompt="你是一个名为 Friday 的助手。",
model=DashScopeRealtimeModel(
model_name="qwen3-omni-flash-realtime",
api_key=os.getenv("DASHSCOPE_API_KEY"),
),
)
# 创建队列接收来自智能体的消息
outgoing_queue = asyncio.Queue()
# 智能体现在已准备好处理输入
# 在独立任务中处理输出消息
async def handle_agent_messages():
while True:
event = await outgoing_queue.get()
# 处理事件(例如通过 WebSocket 发送到前端)
print(f"智能体事件: {event.type}")
# 启动消息处理任务
asyncio.create_task(handle_agent_messages())
# 启动智能体(建立连接)
await agent.start(outgoing_queue)
# 完成后停止智能体
await agent.stop()
启动实时对话¶
下面演示如何在用户和实时智能体之间建立实时对话。
这里以 FastAPI 为例,展示如何搭建实时对话的后端框架。
后端设置(服务端):
后端需要:
创建 WebSocket 端点接受前端连接
在会话开始时创建
RealtimeAgent将前端的
ClientEvents转发给智能体将智能体的
ServerEvents转发给前端
from fastapi import FastAPI, WebSocket
from agentscope.agent import RealtimeAgent
from agentscope.realtime import (
DashScopeRealtimeModel,
ClientEvents,
ServerEvents,
)
app = FastAPI()
@app.websocket("/ws/{user_id}/{session_id}")
async def websocket_endpoint(
websocket: WebSocket,
user_id: str,
session_id: str,
):
await websocket.accept()
# 创建智能体消息队列
frontend_queue = asyncio.Queue()
# 创建智能体
agent = RealtimeAgent(
name="Assistant",
sys_prompt="你是一个有用的助手。",
model=DashScopeRealtimeModel(
model_name="qwen3-omni-flash-realtime",
api_key=os.getenv("DASHSCOPE_API_KEY"),
),
)
# 启动智能体
await agent.start(frontend_queue)
# 将智能体消息转发到前端
async def send_to_frontend():
while True:
msg = await frontend_queue.get()
await websocket.send_json(msg.model_dump())
asyncio.create_task(send_to_frontend())
# 接收前端消息并转发给智能体
while True:
data = await websocket.receive_json()
client_event = ClientEvents.from_json(data)
await agent.handle_input(client_event)
前端设置(客户端):
前端需要:
建立与后端的 WebSocket 连接
发送
CLIENT_SESSION_CREATE事件初始化会话捕获麦克风音频,通过
CLIENT_AUDIO_APPEND事件发送接收并处理 ``ServerEvents``(例如播放音频、显示转录文本)
// 连接 WebSocket
const ws = new WebSocket('ws://localhost:8000/ws/user1/session1');
ws.onopen = () => {
// 创建会话
ws.send(JSON.stringify({
type: 'client_session_create',
config: {
instructions: '你是一个有用的助手。',
user_name: 'User1'
}
}));
};
// 处理来自后端的消息
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'response_audio_delta') {
// 播放音频块
playAudio(data.delta);
}
};
// 发送音频数据
function sendAudioChunk(audioData) {
ws.send(JSON.stringify({
type: 'client_audio_append',
session_id: 'session1',
audio: audioData, // base64 编码
format: { encoding: 'pcm16', sample_rate: 16000 }
}));
}
完整的工作示例请参见 AgentScope 仓库中的
examples/agent/realtime_voice_agent/。
多智能体实时对话¶
AgentScope 通过 ChatRoom 类支持多智能体实时交互。
请注意目前大多数实时模型 API 仅支持单用户交互,但 AgentScope 的架构设计支持多智能体和多用户, 当 API 能力扩展时即可应用到多智能体场景。
实时聊天室¶
AgentScope 引入 ChatRoom 类来管理共享对话空间中的多个实时智能体。
ChatRoom 提供:
集中管理多个
RealtimeAgent实例智能体之间的自动消息广播
统一的前端通信消息队列
房间内所有智能体的生命周期管理
使用 ChatRoom¶
ChatRoom 的用法与 RealtimeAgent 类似:
async def example_chat_room() -> None:
"""使用 ChatRoom 和多个实时智能体的示例。"""
from agentscope.pipeline import ChatRoom
from agentscope.agent import RealtimeAgent
from agentscope.realtime import DashScopeRealtimeModel
# 创建多个智能体
agent1 = RealtimeAgent(
name="Agent1",
sys_prompt="你是 Agent1,一个有用的助手。",
model=DashScopeRealtimeModel(
model_name="qwen3-omni-flash-realtime",
api_key=os.getenv("DASHSCOPE_API_KEY"),
),
)
agent2 = RealtimeAgent(
name="Agent2",
sys_prompt="你是 Agent2,一个有用的助手。",
model=DashScopeRealtimeModel(
model_name="qwen3-omni-flash-realtime",
api_key=os.getenv("DASHSCOPE_API_KEY"),
),
)
# 创建包含多个智能体的聊天室
chat_room = ChatRoom(agents=[agent1, agent2])
# 创建队列接收来自所有智能体的消息
outgoing_queue = asyncio.Queue()
# 启动聊天室
await chat_room.start(outgoing_queue)
# 处理来自前端的输入
# 聊天室会广播给所有智能体
from agentscope.realtime import ClientEvents
client_event = ClientEvents.ClientTextAppendEvent(
session_id="session1",
text="大家好!",
)
await chat_room.handle_input(client_event)
# 完成后停止聊天室
await chat_room.stop()
发展规划¶
实时智能体功能目前为实验性质,正在积极开发中。未来计划包括:
支持更多实时模型 API
增强对话历史的记忆管理
多用户语音交互支持
改进 VAD(语音活动检测)配置
更好的错误处理和恢复机制
Total running time of the script: (0 minutes 0.001 seconds)