Routing

在 AgentScope 中有两种实现 Routing 的方法,都简单易实现:

  • 使用结构化输出的显式 routing

  • 使用工具调用的隐式 routing

小技巧

考虑到智能体 routing 没有统一的标准/定义,我们遵循 Building effective agents 中的设置

显式 Routing

在显式 routing 中,我们可以直接使用智能体的结构化输出来确定将消息路由到哪个智能体。

初始化 routing 智能体

import asyncio
import json
import os
from typing import Literal

from pydantic import BaseModel, Field

from agentscope.agent import ReActAgent
from agentscope.formatter import DashScopeChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.message import Msg
from agentscope.model import DashScopeChatModel
from agentscope.tool import Toolkit, ToolResponse

router = ReActAgent(
    name="Router",
    sys_prompt="你是一个路由智能体。你的目标是将用户查询路由到正确的后续任务,注意你不需要回答用户的问题。",
    model=DashScopeChatModel(
        model_name="qwen-max",
        api_key=os.environ["DASHSCOPE_API_KEY"],
        stream=False,
    ),
    formatter=DashScopeChatFormatter(),
)


# 使用结构化输出指定路由任务
class RoutingChoice(BaseModel):
    your_choice: Literal[
        "Content Generation",
        "Programming",
        "Information Retrieval",
        None,
    ] = Field(
        description="选择正确的后续任务,如果任务太简单或没有合适的任务,则选择 ``None``",
    )
    task_description: str | None = Field(
        description="任务描述",
        default=None,
    )


async def example_router_explicit() -> None:
    """使用结构化输出进行显式路由的示例。"""
    msg_user = Msg(
        "user",
        "帮我写一首诗",
        "user",
    )

    # 路由查询
    msg_res = await router(
        msg_user,
        structured_model=RoutingChoice,
    )

    # 结构化输出存储在 metadata 字段中
    print("结构化输出:")
    print(json.dumps(msg_res.metadata, indent=4, ensure_ascii=False))


asyncio.run(example_router_explicit())
/home/runner/work/agentscope/agentscope/src/agentscope/model/_dashscope_model.py:232: DeprecationWarning: 'required' is not supported by DashScope API. It will be converted to 'auto'.
  warnings.warn(
Router: {
    "type": "tool_use",
    "name": "generate_response",
    "input": {
        "your_choice": "Content Generation",
        "task_description": "用户请求帮助写一首诗"
    },
    "id": "call_da595ea6c75349df871785"
}
system: {
    "type": "tool_result",
    "id": "call_da595ea6c75349df871785",
    "name": "generate_response",
    "output": [
        {
            "type": "text",
            "text": "Successfully generated response."
        }
    ]
}
Router: 我将为您转接到我们的诗歌创作助手,它会根据您的需求帮助您创作一首诗。请告诉我您想要这首诗表达的主题或情感色彩是什么样的?例如,它可以是关于自然、爱情、友情等主题,或者希望传达快乐、忧郁等情绪。如果您有特定的格式要求(如五言绝句、七律等),也请一并告知。
结构化输出:
{
    "your_choice": "Content Generation",
    "task_description": "用户请求帮助写一首诗"
}

隐式 Routing

另一种方法是将下游智能体包装成工具函数,这样路由智能体就可以根据用户查询决定调用哪个工具。

我们首先定义几个工具函数:

async def generate_python(demand: str) -> ToolResponse:
    """根据需求生成 Python 代码。

    Args:
        demand (``str``):
            对 Python 代码的需求。
    """
    # 示例需求智能体
    python_agent = ReActAgent(
        name="PythonAgent",
        sys_prompt="你是一个 Python 专家,你的目标是根据需求生成 Python 代码。",
        model=DashScopeChatModel(
            model_name="qwen-max",
            api_key=os.environ["DASHSCOPE_API_KEY"],
            stream=False,
        ),
        memory=InMemoryMemory(),
        formatter=DashScopeChatFormatter(),
        toolkit=Toolkit(),
    )
    msg_res = await python_agent(Msg("user", demand, "user"))

    return ToolResponse(
        content=msg_res.get_content_blocks("text"),
    )


# 为演示目的模拟一些其他工具函数
async def generate_poem(demand: str) -> ToolResponse:
    """根据需求生成诗歌。

    Args:
        demand (``str``):
            对诗歌的需求。
    """
    pass


async def web_search(query: str) -> ToolResponse:
    """在网络上搜索查询。

    Args:
        query (``str``):
            要搜索的查询。
    """
    pass

之后,我们定义一个路由智能体并为其配备上述工具函数。

toolkit = Toolkit()
toolkit.register_tool_function(generate_python)
toolkit.register_tool_function(generate_poem)
toolkit.register_tool_function(web_search)

# 使用工具模块初始化路由智能体
router_implicit = ReActAgent(
    name="Router",
    sys_prompt="你是一个路由智能体。你的目标是将用户查询路由到正确的后续任务。",
    model=DashScopeChatModel(
        model_name="qwen-max",
        api_key=os.environ["DASHSCOPE_API_KEY"],
        stream=False,
    ),
    formatter=DashScopeChatFormatter(),
    toolkit=toolkit,
    memory=InMemoryMemory(),
)


async def example_router_implicit() -> None:
    """使用工具调用进行隐式路由的示例。"""
    msg_user = Msg(
        "user",
        "帮我在 Python 中生成一个快速排序函数",
        "user",
    )

    # 路由查询
    await router_implicit(msg_user)


asyncio.run(example_router_implicit())
Router: {
    "type": "tool_use",
    "name": "generate_python",
    "input": {
        "demand": "生成一个快速排序函数"
    },
    "id": "call_8808308ee93e4b289c06b2"
}
PythonAgent: 当然!快速排序是一种非常高效的排序算法,采用分治策略来对一个数组进行排序。下面是一个用 Python 实现的快速排序函数:

```python
def quicksort(arr):
    if len(arr) <= 1:
        return arr
    else:
        pivot = arr[len(arr) // 2]
        left = [x for x in arr if x < pivot]
        middle = [x for x in arr if x == pivot]
        right = [x for x in arr if x > pivot]
        return quicksort(left) + middle + quicksort(right)

# 示例使用
arr = [3, 6, 8, 10, 1, 2, 1]
sorted_arr = quicksort(arr)
print("Sorted array:", sorted_arr)
```

这个实现中,我们选择数组中间的元素作为基准(pivot),然后将数组分为三部分:
- `left`:所有小于基准的元素
- `middle`:所有等于基准的元素
- `right`:所有大于基准的元素

然后递归地对 `left` 和 `right` 部分进行快速排序,并将结果合并。

如果你需要一个更传统的原地排序版本,可以使用以下代码:

```python
def quicksort_inplace(arr, low, high):
    if low < high:
        pivot_index = partition(arr, low, high)
        quicksort_inplace(arr, low, pivot_index - 1)
        quicksort_inplace(arr, pivot_index + 1, high)

def partition(arr, low, high):
    pivot = arr[high]
    i = low - 1
    for j in range(low, high):
        if arr[j] <= pivot:
            i += 1
            arr[i], arr[j] = arr[j], arr[i]
    arr[i + 1], arr[high] = arr[high], arr[i + 1]
    return i + 1

# 示例使用
arr = [3, 6, 8, 10, 1, 2, 1]
quicksort_inplace(arr, 0, len(arr) - 1)
print("Sorted array:", arr)
```

在这个版本中,`partition` 函数用于将数组分区,并返回基准的位置。`quicksort_inplace` 函数则递归地对左右子数组进行排序。这样可以在原地完成排序,不需要额外的空间。
system: {
    "type": "tool_result",
    "id": "call_8808308ee93e4b289c06b2",
    "name": "generate_python",
    "output": [
        {
            "type": "text",
            "text": "当然!快速排序是一种非常高效的排序算法,采用分治策略来对一个数组进行排序。下面是一个用 Python 实现的快速排序函数:\n\n```python\ndef quicksort(arr):\n    if len(arr) <= 1:\n        return arr\n    else:\n        pivot = arr[len(arr) // 2]\n        left = [x for x in arr if x < pivot]\n        middle = [x for x in arr if x == pivot]\n        right = [x for x in arr if x > pivot]\n        return quicksort(left) + middle + quicksort(right)\n\n# 示例使用\narr = [3, 6, 8, 10, 1, 2, 1]\nsorted_arr = quicksort(arr)\nprint(\"Sorted array:\", sorted_arr)\n```\n\n这个实现中,我们选择数组中间的元素作为基准(pivot),然后将数组分为三部分:\n- `left`:所有小于基准的元素\n- `middle`:所有等于基准的元素\n- `right`:所有大于基准的元素\n\n然后递归地对 `left` 和 `right` 部分进行快速排序,并将结果合并。\n\n如果你需要一个更传统的原地排序版本,可以使用以下代码:\n\n```python\ndef quicksort_inplace(arr, low, high):\n    if low < high:\n        pivot_index = partition(arr, low, high)\n        quicksort_inplace(arr, low, pivot_index - 1)\n        quicksort_inplace(arr, pivot_index + 1, high)\n\ndef partition(arr, low, high):\n    pivot = arr[high]\n    i = low - 1\n    for j in range(low, high):\n        if arr[j] <= pivot:\n            i += 1\n            arr[i], arr[j] = arr[j], arr[i]\n    arr[i + 1], arr[high] = arr[high], arr[i + 1]\n    return i + 1\n\n# 示例使用\narr = [3, 6, 8, 10, 1, 2, 1]\nquicksort_inplace(arr, 0, len(arr) - 1)\nprint(\"Sorted array:\", arr)\n```\n\n在这个版本中,`partition` 函数用于将数组分区,并返回基准的位置。`quicksort_inplace` 函数则递归地对左右子数组进行排序。这样可以在原地完成排序,不需要额外的空间。"
        }
    ]
}
Router: 生成的快速排序函数有两个版本,第一个是使用额外空间的简洁版本,第二个是在原地完成排序的传统版本。

这里是简洁版本的快速排序函数:

```python
def quicksort(arr):
    if len(arr) <= 1:
        return arr
    else:
        pivot = arr[len(arr) // 2]
        left = [x for x in arr if x < pivot]
        middle = [x for x in arr if x == pivot]
        right = [x for x in arr if x > pivot]
        return quicksort(left) + middle + quicksort(right)

# 示例使用
arr = [3, 6, 8, 10, 1, 2, 1]
sorted_arr = quicksort(arr)
print("Sorted array:", sorted_arr)
```

这是更传统的原地排序版本,它不需要额外的空间来存储数组的部分:

```python
def quicksort_inplace(arr, low, high):
    if low < high:
        pivot_index = partition(arr, low, high)
        quicksort_inplace(arr, low, pivot_index - 1)
        quicksort_inplace(arr, pivot_index + 1, high)

def partition(arr, low, high):
    pivot = arr[high]
    i = low - 1
    for j in range(low, high):
        if arr[j] <= pivot:
            i += 1
            arr[i], arr[j] = arr[j], arr[i]
    arr[i + 1], arr[high] = arr[high], arr[i + 1]
    return i + 1

# 示例使用
arr = [3, 6, 8, 10, 1, 2, 1]
quicksort_inplace(arr, 0, len(arr) - 1)
print("Sorted array:", arr)
```

您可以根据需要选择使用哪一个。如果您想要节省内存,那么建议使用原地排序版本。如果您更倾向于代码的简洁性,那么可以使用第一个版本。您可以用这些代码对您的数组进行排序。

Total running time of the script: (2 minutes 9.476 seconds)

Gallery generated by Sphinx-Gallery