实时智能体

实时智能体(Realtime Agent) 用于处理实时交互场景,例如语音对话或实时聊天会话。 AgentScope 中的实时智能体具有以下特性:

  • 集成 OpenAI、DashScope、Gemini 等实时模型 API

  • 统一的事件接口,简化与不同实时模型的交互

  • 支持工具调用能力

  • 支持多智能体交互

备注

实时智能体目前处于活跃开发阶段,欢迎社区贡献、讨论和反馈!如果开发者对实时智能体感兴趣,欢迎加入讨论和开发。

import asyncio
import os
from agentscope.agent import RealtimeAgent
from agentscope.realtime import (
    DashScopeRealtimeModel,
    OpenAIRealtimeModel,
    GeminiRealtimeModel,
)

创建实时模型

AgentScope 目前支持以下实时模型 API:

提供商

类名

支持的模型

输入模态

工具支持

DashScope

DashScopeRealtimeModel

qwen3-omni-flash-realtime

文本、音频、图像

OpenAI

OpenAIRealtimeModel

gpt-4o-realtime-preview

文本、音频

Gemini

GeminiRealtimeModel

gemini-2.5-flash-native-audio-preview-09-2025

文本、音频、图像

以下是初始化不同实时模型的示例:

初始化不同实时模型的示例
# DashScope 实时模型
dashscope_model = DashScopeRealtimeModel(
    model_name="qwen3-omni-flash-realtime",
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    voice="Cherry",  # 可选项: "Cherry", "Serena", "Ethan", "Chelsie"
    enable_input_audio_transcription=True,
)

# OpenAI 实时模型
openai_model = OpenAIRealtimeModel(
    model_name="gpt-4o-realtime-preview",
    api_key=os.getenv("OPENAI_API_KEY"),
    voice="alloy",  # 可选项: "alloy", "echo", "marin", "cedar"
    enable_input_audio_transcription=True,
)

# Gemini 实时模型
gemini_model = GeminiRealtimeModel(
    model_name="gemini-2.5-flash-native-audio-preview-09-2025",
    api_key=os.getenv("GEMINI_API_KEY"),
    voice="Puck",  # 可选项: "Puck", "Charon", "Kore", "Fenrir"
    enable_input_audio_transcription=True,
)

实时模型提供以下核心方法:

方法

描述

connect(outgoing_queue, instructions, tools)

建立与实时模型 API 的 WebSocket 连接

disconnect()

关闭 WebSocket 连接

send(data)

向实时模型发送音频/文本/图像数据进行处理

connect() 方法中的 outgoing_queue 参数是一个 asyncio 队列, 用于将实时模型的事件转发到外部(例如智能体或前端)。

模型事件接口

AgentScope 提供统一的 agentscope.realtime.ModelEvents 接口, 简化与不同实时模型的交互。支持以下事件:

备注

ModelEvents 中的 "session" 指的是实时模型与模型 API 之间的 WebSocket 连接会话,而非前端与后端之间的会话。

事件

描述

ModelEvents.ModelSessionCreatedEvent

会话创建成功

ModelEvents.ModelSessionEndedEvent

会话已结束

ModelEvents.ModelResponseCreatedEvent

模型开始生成响应

ModelEvents.ModelResponseDoneEvent

模型完成响应生成

ModelEvents.ModelResponseAudioDeltaEvent

流式音频数据块

ModelEvents.ModelResponseAudioDoneEvent

音频响应完成

ModelEvents.ModelResponseAudioTranscriptDeltaEvent

流式音频转录文本块

ModelEvents.ModelResponseAudioTranscriptDoneEvent

音频转录完成

ModelEvents.ModelResponseToolUseDeltaEvent

流式工具调用参数

ModelEvents.ModelResponseToolUseDoneEvent

工具调用参数完成

ModelEvents.ModelInputTranscriptionDeltaEvent

流式用户输入转录文本块

ModelEvents.ModelInputTranscriptionDoneEvent

用户输入转录完成

ModelEvents.ModelInputStartedEvent

检测到用户音频输入开始(VAD)

ModelEvents.ModelInputDoneEvent

检测到用户音频输入结束(VAD)

ModelEvents.ModelErrorEvent

发生错误

创建实时智能体

RealtimeAgent 作为桥接层,负责:

  • 将实时模型的 ModelEvents 转换为 ServerEvents,发送给前端和其他智能体

  • 接收来自前端或其他智能体的 ClientEvents,并转发给实时模型 API

  • 管理智能体的生命周期和事件队列

服务端和客户端事件

AgentScope 提供统一的 ServerEventsClientEvents, 用于后端与前端之间的通信:

**ServerEvents**(后端 → 前端):

事件

描述

ServerEvents.ServerSessionCreatedEvent

后端创建会话

ServerEvents.ServerSessionUpdatedEvent

后端更新会话

ServerEvents.ServerSessionEndedEvent

后端结束会话

ServerEvents.AgentReadyEvent

智能体准备接收输入

ServerEvents.AgentEndedEvent

智能体已结束

ServerEvents.AgentResponseCreatedEvent

智能体开始生成响应

ServerEvents.AgentResponseDoneEvent

智能体完成响应生成

ServerEvents.AgentResponseAudioDeltaEvent

智能体流式音频块

ServerEvents.AgentResponseAudioDoneEvent

音频响应完成

ServerEvents.AgentResponseAudioTranscriptDeltaEvent

智能体响应的流式转录

ServerEvents.AgentResponseAudioTranscriptDoneEvent

转录完成

ServerEvents.AgentResponseToolUseDeltaEvent

流式工具调用数据

ServerEvents.AgentResponseToolUseDoneEvent

工具调用完成

ServerEvents.AgentResponseToolResultEvent

工具执行结果

ServerEvents.AgentInputTranscriptionDeltaEvent

用户输入的流式转录

ServerEvents.AgentInputTranscriptionDoneEvent

输入转录完成

ServerEvents.AgentInputStartedEvent

用户音频输入开始

ServerEvents.AgentInputDoneEvent

用户音频输入结束

ServerEvents.AgentErrorEvent

发生错误

**ClientEvents**(前端 → 后端):

事件

描述

ClientEvents.ClientSessionCreateEvent

创建指定配置的新会话

ClientEvents.ClientSessionEndEvent

结束当前会话

ClientEvents.ClientResponseCreateEvent

请求智能体立即生成响应

ClientEvents.ClientResponseCancelEvent

中断智能体的当前响应

ClientEvents.ClientTextAppendEvent

追加文本输入

ClientEvents.ClientAudioAppendEvent

追加音频输入

ClientEvents.ClientAudioCommitEvent

提交音频输入(标志输入结束)

ClientEvents.ClientImageAppendEvent

追加图像输入

ClientEvents.ClientToolResultEvent

发送工具执行结果

初始化实时智能体

以下是创建和使用实时智能体的示例:

async def example_realtime_agent() -> None:
    """创建和使用实时智能体的示例。"""
    agent = RealtimeAgent(
        name="Friday",
        sys_prompt="你是一个名为 Friday 的助手。",
        model=DashScopeRealtimeModel(
            model_name="qwen3-omni-flash-realtime",
            api_key=os.getenv("DASHSCOPE_API_KEY"),
        ),
    )

    # 创建队列接收来自智能体的消息
    outgoing_queue = asyncio.Queue()

    # 智能体现在已准备好处理输入
    # 在独立任务中处理输出消息
    async def handle_agent_messages():
        while True:
            event = await outgoing_queue.get()
            # 处理事件(例如通过 WebSocket 发送到前端)
            print(f"智能体事件: {event.type}")

    # 启动消息处理任务
    asyncio.create_task(handle_agent_messages())

    # 启动智能体(建立连接)
    await agent.start(outgoing_queue)

    # 完成后停止智能体
    await agent.stop()

启动实时对话

下面演示如何在用户和实时智能体之间建立实时对话。

这里以 FastAPI 为例,展示如何搭建实时对话的后端框架。

后端设置(服务端):

后端需要:

  1. 创建 WebSocket 端点接受前端连接

  2. 在会话开始时创建 RealtimeAgent

  3. 将前端的 ClientEvents 转发给智能体

  4. 将智能体的 ServerEvents 转发给前端

from fastapi import FastAPI, WebSocket
from agentscope.agent import RealtimeAgent
from agentscope.realtime import (
    DashScopeRealtimeModel,
    ClientEvents,
    ServerEvents,
)

app = FastAPI()

@app.websocket("/ws/{user_id}/{session_id}")
async def websocket_endpoint(
    websocket: WebSocket,
    user_id: str,
    session_id: str,
):
    await websocket.accept()

    # 创建智能体消息队列
    frontend_queue = asyncio.Queue()

    # 创建智能体
    agent = RealtimeAgent(
        name="Assistant",
        sys_prompt="你是一个有用的助手。",
        model=DashScopeRealtimeModel(
            model_name="qwen3-omni-flash-realtime",
            api_key=os.getenv("DASHSCOPE_API_KEY"),
        ),
    )

    # 启动智能体
    await agent.start(frontend_queue)

    # 将智能体消息转发到前端
    async def send_to_frontend():
        while True:
            msg = await frontend_queue.get()
            await websocket.send_json(msg.model_dump())

    asyncio.create_task(send_to_frontend())

    # 接收前端消息并转发给智能体
    while True:
        data = await websocket.receive_json()
        client_event = ClientEvents.from_json(data)
        await agent.handle_input(client_event)

前端设置(客户端):

前端需要:

  1. 建立与后端的 WebSocket 连接

  2. 发送 CLIENT_SESSION_CREATE 事件初始化会话

  3. 捕获麦克风音频,通过 CLIENT_AUDIO_APPEND 事件发送

  4. 接收并处理 ``ServerEvents``(例如播放音频、显示转录文本)

// 连接 WebSocket
const ws = new WebSocket('ws://localhost:8000/ws/user1/session1');

ws.onopen = () => {
    // 创建会话
    ws.send(JSON.stringify({
        type: 'client_session_create',
        config: {
            instructions: '你是一个有用的助手。',
            user_name: 'User1'
        }
    }));
};

// 处理来自后端的消息
ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    if (data.type === 'response_audio_delta') {
        // 播放音频块
        playAudio(data.delta);
    }
};

// 发送音频数据
function sendAudioChunk(audioData) {
    ws.send(JSON.stringify({
        type: 'client_audio_append',
        session_id: 'session1',
        audio: audioData,  // base64 编码
        format: { encoding: 'pcm16', sample_rate: 16000 }
    }));
}

完整的工作示例请参见 AgentScope 仓库中的 examples/agent/realtime_voice_agent/

多智能体实时对话

AgentScope 通过 ChatRoom 类支持多智能体实时交互。

请注意目前大多数实时模型 API 仅支持单用户交互,但 AgentScope 的架构设计支持多智能体和多用户, 当 API 能力扩展时即可应用到多智能体场景。

实时聊天室

AgentScope 引入 ChatRoom 类来管理共享对话空间中的多个实时智能体。 ChatRoom 提供:

  • 集中管理多个 RealtimeAgent 实例

  • 智能体之间的自动消息广播

  • 统一的前端通信消息队列

  • 房间内所有智能体的生命周期管理

使用 ChatRoom

ChatRoom 的用法与 RealtimeAgent 类似:

async def example_chat_room() -> None:
    """使用 ChatRoom 和多个实时智能体的示例。"""
    from agentscope.pipeline import ChatRoom
    from agentscope.agent import RealtimeAgent
    from agentscope.realtime import DashScopeRealtimeModel

    # 创建多个智能体
    agent1 = RealtimeAgent(
        name="Agent1",
        sys_prompt="你是 Agent1,一个有用的助手。",
        model=DashScopeRealtimeModel(
            model_name="qwen3-omni-flash-realtime",
            api_key=os.getenv("DASHSCOPE_API_KEY"),
        ),
    )

    agent2 = RealtimeAgent(
        name="Agent2",
        sys_prompt="你是 Agent2,一个有用的助手。",
        model=DashScopeRealtimeModel(
            model_name="qwen3-omni-flash-realtime",
            api_key=os.getenv("DASHSCOPE_API_KEY"),
        ),
    )

    # 创建包含多个智能体的聊天室
    chat_room = ChatRoom(agents=[agent1, agent2])

    # 创建队列接收来自所有智能体的消息
    outgoing_queue = asyncio.Queue()

    # 启动聊天室
    await chat_room.start(outgoing_queue)

    # 处理来自前端的输入
    # 聊天室会广播给所有智能体
    from agentscope.realtime import ClientEvents

    client_event = ClientEvents.ClientTextAppendEvent(
        session_id="session1",
        text="大家好!",
    )
    await chat_room.handle_input(client_event)

    # 完成后停止聊天室
    await chat_room.stop()

发展规划

实时智能体功能目前为实验性质,正在积极开发中。未来计划包括:

  • 支持更多实时模型 API

  • 增强对话历史的记忆管理

  • 多用户语音交互支持

  • 改进 VAD(语音活动检测)配置

  • 更好的错误处理和恢复机制

Total running time of the script: (0 minutes 0.001 seconds)

Gallery generated by Sphinx-Gallery