.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "tutorial/task_realtime.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note :ref:`Go to the end ` to download the full example code. .. rst-class:: sphx-glr-example-title .. _sphx_glr_tutorial_task_realtime.py: .. _realtime: 实时智能体 ==================== **实时智能体(Realtime Agent)** 用于处理实时交互场景,例如语音对话或实时聊天会话。 AgentScope 中的实时智能体具有以下特性: - 集成 OpenAI、DashScope、Gemini 等实时模型 API - 统一的事件接口,简化与不同实时模型的交互 - 支持工具调用能力 - 支持多智能体交互 .. note:: 实时智能体目前处于活跃开发阶段,欢迎社区贡献、讨论和反馈!如果开发者对实时智能体感兴趣,欢迎加入讨论和开发。 .. GENERATED FROM PYTHON SOURCE LINES 19-29 .. code-block:: Python import asyncio import os from agentscope.agent import RealtimeAgent from agentscope.realtime import ( DashScopeRealtimeModel, OpenAIRealtimeModel, GeminiRealtimeModel, ) .. GENERATED FROM PYTHON SOURCE LINES 30-252 创建实时模型 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ AgentScope 目前支持以下实时模型 API: .. list-table:: :header-rows: 1 :widths: 15 25 25 15 20 * - 提供商 - 类名 - 支持的模型 - 输入模态 - 工具支持 * - DashScope - ``DashScopeRealtimeModel`` - ``qwen3-omni-flash-realtime`` - 文本、音频、图像 - 否 * - OpenAI - ``OpenAIRealtimeModel`` - ``gpt-4o-realtime-preview`` - 文本、音频 - 是 * - Gemini - ``GeminiRealtimeModel`` - ``gemini-2.5-flash-native-audio-preview-09-2025`` - 文本、音频、图像 - 是 以下是初始化不同实时模型的示例: .. code-block:: python :caption: 初始化不同实时模型的示例 # DashScope 实时模型 dashscope_model = DashScopeRealtimeModel( model_name="qwen3-omni-flash-realtime", api_key=os.getenv("DASHSCOPE_API_KEY"), voice="Cherry", # 可选项: "Cherry", "Serena", "Ethan", "Chelsie" enable_input_audio_transcription=True, ) # OpenAI 实时模型 openai_model = OpenAIRealtimeModel( model_name="gpt-4o-realtime-preview", api_key=os.getenv("OPENAI_API_KEY"), voice="alloy", # 可选项: "alloy", "echo", "marin", "cedar" enable_input_audio_transcription=True, ) # Gemini 实时模型 gemini_model = GeminiRealtimeModel( model_name="gemini-2.5-flash-native-audio-preview-09-2025", api_key=os.getenv("GEMINI_API_KEY"), voice="Puck", # 可选项: "Puck", "Charon", "Kore", "Fenrir" enable_input_audio_transcription=True, ) 实时模型提供以下核心方法: .. list-table:: :header-rows: 1 :widths: 30 70 * - 方法 - 描述 * - ``connect(outgoing_queue, instructions, tools)`` - 建立与实时模型 API 的 WebSocket 连接 * - ``disconnect()`` - 关闭 WebSocket 连接 * - ``send(data)`` - 向实时模型发送音频/文本/图像数据进行处理 ``connect()`` 方法中的 ``outgoing_queue`` 参数是一个 asyncio 队列, 用于将实时模型的事件转发到外部(例如智能体或前端)。 模型事件接口 ----------------------- AgentScope 提供统一的 ``agentscope.realtime.ModelEvents`` 接口, 简化与不同实时模型的交互。支持以下事件: .. note:: ModelEvents 中的 "session" 指的是实时模型与模型 API 之间的 WebSocket 连接会话,而非前端与后端之间的会话。 .. list-table:: :header-rows: 1 :widths: 40 60 * - 事件 - 描述 * - ``ModelEvents.ModelSessionCreatedEvent`` - 会话创建成功 * - ``ModelEvents.ModelSessionEndedEvent`` - 会话已结束 * - ``ModelEvents.ModelResponseCreatedEvent`` - 模型开始生成响应 * - ``ModelEvents.ModelResponseDoneEvent`` - 模型完成响应生成 * - ``ModelEvents.ModelResponseAudioDeltaEvent`` - 流式音频数据块 * - ``ModelEvents.ModelResponseAudioDoneEvent`` - 音频响应完成 * - ``ModelEvents.ModelResponseAudioTranscriptDeltaEvent`` - 流式音频转录文本块 * - ``ModelEvents.ModelResponseAudioTranscriptDoneEvent`` - 音频转录完成 * - ``ModelEvents.ModelResponseToolUseDeltaEvent`` - 流式工具调用参数 * - ``ModelEvents.ModelResponseToolUseDoneEvent`` - 工具调用参数完成 * - ``ModelEvents.ModelInputTranscriptionDeltaEvent`` - 流式用户输入转录文本块 * - ``ModelEvents.ModelInputTranscriptionDoneEvent`` - 用户输入转录完成 * - ``ModelEvents.ModelInputStartedEvent`` - 检测到用户音频输入开始(VAD) * - ``ModelEvents.ModelInputDoneEvent`` - 检测到用户音频输入结束(VAD) * - ``ModelEvents.ModelErrorEvent`` - 发生错误 创建实时智能体 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``RealtimeAgent`` 作为桥接层,负责: - 将实时模型的 ``ModelEvents`` 转换为 ``ServerEvents``,发送给前端和其他智能体 - 接收来自前端或其他智能体的 ``ClientEvents``,并转发给实时模型 API - 管理智能体的生命周期和事件队列 服务端和客户端事件 ------------------------- AgentScope 提供统一的 ``ServerEvents`` 和 ``ClientEvents``, 用于后端与前端之间的通信: **ServerEvents**(后端 → 前端): .. list-table:: :header-rows: 1 :widths: 40 60 * - 事件 - 描述 * - ``ServerEvents.ServerSessionCreatedEvent`` - 后端创建会话 * - ``ServerEvents.ServerSessionUpdatedEvent`` - 后端更新会话 * - ``ServerEvents.ServerSessionEndedEvent`` - 后端结束会话 * - ``ServerEvents.AgentReadyEvent`` - 智能体准备接收输入 * - ``ServerEvents.AgentEndedEvent`` - 智能体已结束 * - ``ServerEvents.AgentResponseCreatedEvent`` - 智能体开始生成响应 * - ``ServerEvents.AgentResponseDoneEvent`` - 智能体完成响应生成 * - ``ServerEvents.AgentResponseAudioDeltaEvent`` - 智能体流式音频块 * - ``ServerEvents.AgentResponseAudioDoneEvent`` - 音频响应完成 * - ``ServerEvents.AgentResponseAudioTranscriptDeltaEvent`` - 智能体响应的流式转录 * - ``ServerEvents.AgentResponseAudioTranscriptDoneEvent`` - 转录完成 * - ``ServerEvents.AgentResponseToolUseDeltaEvent`` - 流式工具调用数据 * - ``ServerEvents.AgentResponseToolUseDoneEvent`` - 工具调用完成 * - ``ServerEvents.AgentResponseToolResultEvent`` - 工具执行结果 * - ``ServerEvents.AgentInputTranscriptionDeltaEvent`` - 用户输入的流式转录 * - ``ServerEvents.AgentInputTranscriptionDoneEvent`` - 输入转录完成 * - ``ServerEvents.AgentInputStartedEvent`` - 用户音频输入开始 * - ``ServerEvents.AgentInputDoneEvent`` - 用户音频输入结束 * - ``ServerEvents.AgentErrorEvent`` - 发生错误 **ClientEvents**(前端 → 后端): .. list-table:: :header-rows: 1 :widths: 40 60 * - 事件 - 描述 * - ``ClientEvents.ClientSessionCreateEvent`` - 创建指定配置的新会话 * - ``ClientEvents.ClientSessionEndEvent`` - 结束当前会话 * - ``ClientEvents.ClientResponseCreateEvent`` - 请求智能体立即生成响应 * - ``ClientEvents.ClientResponseCancelEvent`` - 中断智能体的当前响应 * - ``ClientEvents.ClientTextAppendEvent`` - 追加文本输入 * - ``ClientEvents.ClientAudioAppendEvent`` - 追加音频输入 * - ``ClientEvents.ClientAudioCommitEvent`` - 提交音频输入(标志输入结束) * - ``ClientEvents.ClientImageAppendEvent`` - 追加图像输入 * - ``ClientEvents.ClientToolResultEvent`` - 发送工具执行结果 初始化实时智能体 ------------------------------ 以下是创建和使用实时智能体的示例: .. GENERATED FROM PYTHON SOURCE LINES 252-286 .. code-block:: Python async def example_realtime_agent() -> None: """创建和使用实时智能体的示例。""" agent = RealtimeAgent( name="Friday", sys_prompt="你是一个名为 Friday 的助手。", model=DashScopeRealtimeModel( model_name="qwen3-omni-flash-realtime", api_key=os.getenv("DASHSCOPE_API_KEY"), ), ) # 创建队列接收来自智能体的消息 outgoing_queue = asyncio.Queue() # 智能体现在已准备好处理输入 # 在独立任务中处理输出消息 async def handle_agent_messages(): while True: event = await outgoing_queue.get() # 处理事件(例如通过 WebSocket 发送到前端) print(f"智能体事件: {event.type}") # 启动消息处理任务 asyncio.create_task(handle_agent_messages()) # 启动智能体(建立连接) await agent.start(outgoing_queue) # 完成后停止智能体 await agent.stop() .. GENERATED FROM PYTHON SOURCE LINES 287-398 启动实时对话 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 下面演示如何在用户和实时智能体之间建立实时对话。 这里以 FastAPI 为例,展示如何搭建实时对话的后端框架。 **后端设置(服务端):** 后端需要: 1. 创建 WebSocket 端点接受前端连接 2. 在会话开始时创建 ``RealtimeAgent`` 3. 将前端的 ``ClientEvents`` 转发给智能体 4. 将智能体的 ``ServerEvents`` 转发给前端 .. code-block:: python from fastapi import FastAPI, WebSocket from agentscope.agent import RealtimeAgent from agentscope.realtime import ( DashScopeRealtimeModel, ClientEvents, ServerEvents, ) app = FastAPI() @app.websocket("/ws/{user_id}/{session_id}") async def websocket_endpoint( websocket: WebSocket, user_id: str, session_id: str, ): await websocket.accept() # 创建智能体消息队列 frontend_queue = asyncio.Queue() # 创建智能体 agent = RealtimeAgent( name="Assistant", sys_prompt="你是一个有用的助手。", model=DashScopeRealtimeModel( model_name="qwen3-omni-flash-realtime", api_key=os.getenv("DASHSCOPE_API_KEY"), ), ) # 启动智能体 await agent.start(frontend_queue) # 将智能体消息转发到前端 async def send_to_frontend(): while True: msg = await frontend_queue.get() await websocket.send_json(msg.model_dump()) asyncio.create_task(send_to_frontend()) # 接收前端消息并转发给智能体 while True: data = await websocket.receive_json() client_event = ClientEvents.from_json(data) await agent.handle_input(client_event) **前端设置(客户端):** 前端需要: 1. 建立与后端的 WebSocket 连接 2. 发送 ``CLIENT_SESSION_CREATE`` 事件初始化会话 3. 捕获麦克风音频,通过 ``CLIENT_AUDIO_APPEND`` 事件发送 4. 接收并处理 ``ServerEvents``(例如播放音频、显示转录文本) .. code-block:: javascript // 连接 WebSocket const ws = new WebSocket('ws://localhost:8000/ws/user1/session1'); ws.onopen = () => { // 创建会话 ws.send(JSON.stringify({ type: 'client_session_create', config: { instructions: '你是一个有用的助手。', user_name: 'User1' } })); }; // 处理来自后端的消息 ws.onmessage = (event) => { const data = JSON.parse(event.data); if (data.type === 'response_audio_delta') { // 播放音频块 playAudio(data.delta); } }; // 发送音频数据 function sendAudioChunk(audioData) { ws.send(JSON.stringify({ type: 'client_audio_append', session_id: 'session1', audio: audioData, // base64 编码 format: { encoding: 'pcm16', sample_rate: 16000 } })); } 完整的工作示例请参见 AgentScope 仓库中的 ``examples/agent/realtime_voice_agent/``。 .. GENERATED FROM PYTHON SOURCE LINES 400-424 多智能体实时对话 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ AgentScope 通过 ``ChatRoom`` 类支持多智能体实时交互。 请注意目前大多数实时模型 API 仅支持单用户交互,但 AgentScope 的架构设计支持多智能体和多用户, 当 API 能力扩展时即可应用到多智能体场景。 实时聊天室 ---------------------------- AgentScope 引入 ``ChatRoom`` 类来管理共享对话空间中的多个实时智能体。 ChatRoom 提供: - 集中管理多个 ``RealtimeAgent`` 实例 - 智能体之间的自动消息广播 - 统一的前端通信消息队列 - 房间内所有智能体的生命周期管理 使用 ChatRoom -------------- ``ChatRoom`` 的用法与 ``RealtimeAgent`` 类似: .. GENERATED FROM PYTHON SOURCE LINES 424-474 .. code-block:: Python async def example_chat_room() -> None: """使用 ChatRoom 和多个实时智能体的示例。""" from agentscope.pipeline import ChatRoom from agentscope.agent import RealtimeAgent from agentscope.realtime import DashScopeRealtimeModel # 创建多个智能体 agent1 = RealtimeAgent( name="Agent1", sys_prompt="你是 Agent1,一个有用的助手。", model=DashScopeRealtimeModel( model_name="qwen3-omni-flash-realtime", api_key=os.getenv("DASHSCOPE_API_KEY"), ), ) agent2 = RealtimeAgent( name="Agent2", sys_prompt="你是 Agent2,一个有用的助手。", model=DashScopeRealtimeModel( model_name="qwen3-omni-flash-realtime", api_key=os.getenv("DASHSCOPE_API_KEY"), ), ) # 创建包含多个智能体的聊天室 chat_room = ChatRoom(agents=[agent1, agent2]) # 创建队列接收来自所有智能体的消息 outgoing_queue = asyncio.Queue() # 启动聊天室 await chat_room.start(outgoing_queue) # 处理来自前端的输入 # 聊天室会广播给所有智能体 from agentscope.realtime import ClientEvents client_event = ClientEvents.ClientTextAppendEvent( session_id="session1", text="大家好!", ) await chat_room.handle_input(client_event) # 完成后停止聊天室 await chat_room.stop() .. GENERATED FROM PYTHON SOURCE LINES 475-487 发展规划 ~~~~~~~~~~~~~~~~~~~~~~~~~~~ 实时智能体功能目前为实验性质,正在积极开发中。未来计划包括: - 支持更多实时模型 API - 增强对话历史的记忆管理 - 多用户语音交互支持 - 改进 VAD(语音活动检测)配置 - 更好的错误处理和恢复机制 .. rst-class:: sphx-glr-timing **Total running time of the script:** (0 minutes 0.001 seconds) .. _sphx_glr_download_tutorial_task_realtime.py: .. only:: html .. container:: sphx-glr-footer sphx-glr-footer-example .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: task_realtime.ipynb ` .. container:: sphx-glr-download sphx-glr-download-python :download:`Download Python source code: task_realtime.py ` .. container:: sphx-glr-download sphx-glr-download-zip :download:`Download zipped: task_realtime.zip ` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_