Routing

There are two ways to implement routing in AgentScope, both simple and easy to implement:

  • Routing by structured output

  • Routing by tool calls

Tip

Considering there is no unified standard/definition for agent routing, we follow the setting in Building effective agents

Routing by Structured Output

By this way, we can directly use the structured output of the agent to determine which agent to route the message to.

Initialize a routing agent

import asyncio
import json
import os
from typing import Literal

from pydantic import BaseModel, Field

from agentscope.agent import ReActAgent
from agentscope.formatter import DashScopeChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.message import Msg
from agentscope.model import DashScopeChatModel
from agentscope.tool import Toolkit, ToolResponse

router = ReActAgent(
    name="Router",
    sys_prompt="You're a routing agent. Your target is to route the user query to the right follow-up task.",
    model=DashScopeChatModel(
        model_name="qwen-max",
        api_key=os.environ["DASHSCOPE_API_KEY"],
        stream=False,
    ),
    formatter=DashScopeChatFormatter(),
)


# Use structured output to specify the routing task
class RoutingChoice(BaseModel):
    your_choice: Literal[
        "Content Generation",
        "Programming",
        "Information Retrieval",
        None,
    ] = Field(
        description="Choose the right follow-up task, and choose ``None`` if the task is too simple or no suitable task",
    )
    task_description: str | None = Field(
        description="The task description",
        default=None,
    )


async def example_router_explicit() -> None:
    """Example of explicit routing with structured output."""
    msg_user = Msg(
        "user",
        "Help me to write a poem",
        "user",
    )

    # Route the query
    msg_res = await router(
        msg_user,
        structured_model=RoutingChoice,
    )

    # The structured output is stored in the metadata field
    print("The structured output:")
    print(json.dumps(msg_res.metadata, indent=4, ensure_ascii=False))


asyncio.run(example_router_explicit())
/home/runner/work/agentscope/agentscope/src/agentscope/model/_dashscope_model.py:232: DeprecationWarning: 'required' is not supported by DashScope API. It will be converted to 'auto'.
  warnings.warn(
Router: {
    "type": "tool_use",
    "name": "generate_response",
    "input": {
        "your_choice": "Content Generation",
        "task_description": "Write a poem"
    },
    "id": "call_4aeaf7d2cc5b49bdb66680"
}
system: {
    "type": "tool_result",
    "id": "call_4aeaf7d2cc5b49bdb66680",
    "name": "generate_response",
    "output": [
        {
            "type": "text",
            "text": "Successfully generated response."
        }
    ]
}
Router: I understand you'd like some help with writing a poem. That sounds like a wonderful creative endeavor! To get started, could you share a bit more about what kind of poem you're envisioning? For example:

- Do you have a specific theme or subject in mind?
- Is there a particular style or form of poetry you're interested in (like haiku, sonnet, free verse)?
- Are there any emotions or messages you want the poem to convey?

The more details you can provide, the better I can assist you in crafting a poem that meets your vision.
The structured output:
{
    "your_choice": "Content Generation",
    "task_description": "Write a poem"
}

Routing by Tool Calls

Another way is to wrap the downstream agents into a tool function, so that the routing agent decides which tool to call based on the user query.

We first define several tool functions:

async def generate_python(demand: str) -> ToolResponse:
    """Generate Python code based on the demand.

    Args:
        demand (``str``):
            The demand for the Python code.
    """
    # An example demand agent
    python_agent = ReActAgent(
        name="PythonAgent",
        sys_prompt="You're a Python expert, your target is to generate Python code based on the demand.",
        model=DashScopeChatModel(
            model_name="qwen-max",
            api_key=os.environ["DASHSCOPE_API_KEY"],
            stream=False,
        ),
        memory=InMemoryMemory(),
        formatter=DashScopeChatFormatter(),
        toolkit=Toolkit(),
    )
    msg_res = await python_agent(Msg("user", demand, "user"))

    return ToolResponse(
        content=msg_res.get_content_blocks("text"),
    )


# Fake some other tool functions for demonstration purposes
async def generate_poem(demand: str) -> ToolResponse:
    """Generate a poem based on the demand.

    Args:
        demand (``str``):
            The demand for the poem.
    """
    pass


async def web_search(query: str) -> ToolResponse:
    """Search the web for the query.

    Args:
        query (``str``):
            The query to search.
    """
    pass

After that, we define a routing agent and equip it with the above tool functions.

toolkit = Toolkit()
toolkit.register_tool_function(generate_python)
toolkit.register_tool_function(generate_poem)
toolkit.register_tool_function(web_search)

# Initialize the routing agent with the toolkit
router_implicit = ReActAgent(
    name="Router",
    sys_prompt="You're a routing agent. Your target is to route the user query to the right follow-up task.",
    model=DashScopeChatModel(
        model_name="qwen-max",
        api_key=os.environ["DASHSCOPE_API_KEY"],
        stream=False,
    ),
    formatter=DashScopeChatFormatter(),
    toolkit=toolkit,
    memory=InMemoryMemory(),
)


async def example_router_implicit() -> None:
    """Example of implicit routing with tool calls."""
    msg_user = Msg(
        "user",
        "Help me to generate a quick sort function in Python",
        "user",
    )

    # Route the query
    await router_implicit(msg_user)


asyncio.run(example_router_implicit())
Router: {
    "type": "tool_use",
    "name": "generate_python",
    "input": {
        "demand": "a quick sort function"
    },
    "id": "call_6846f6a9dd65495597d582"
}
PythonAgent: Certainly! Quick sort is a popular and efficient sorting algorithm that uses a divide-and-conquer approach. Here's a Python implementation of the quick sort algorithm:

```python
def quick_sort(arr):
    if len(arr) <= 1:
        return arr
    else:
        pivot = arr[len(arr) // 2]
        left = [x for x in arr if x < pivot]
        middle = [x for x in arr if x == pivot]
        right = [x for x in arr if x > pivot]
        return quick_sort(left) + middle + quick_sort(right)

# Example usage:
arr = [3, 6, 8, 10, 1, 2, 1]
sorted_arr = quick_sort(arr)
print("Sorted array:", sorted_arr)
```

### Explanation:
1. **Base Case**: If the array has 0 or 1 elements, it is already sorted, so we return it as is.
2. **Pivot Selection**: We choose the pivot element. In this implementation, we use the middle element of the array.
3. **Partitioning**:
   - `left` contains all elements less than the pivot.
   - `middle` contains all elements equal to the pivot.
   - `right` contains all elements greater than the pivot.
4. **Recursive Sorting**: We recursively apply the `quick_sort` function to the `left` and `right` subarrays and concatenate the results with the `middle` array.

This implementation is simple and easy to understand, but it may not be the most efficient in terms of space complexity due to the use of additional lists. For an in-place version, you can modify the implementation as follows:

```python
def quick_sort_in_place(arr, low, high):
    if low < high:
        pi = partition(arr, low, high)
        quick_sort_in_place(arr, low, pi - 1)
        quick_sort_in_place(arr, pi + 1, high)

def partition(arr, low, high):
    pivot = arr[high]
    i = low - 1
    for j in range(low, high):
        if arr[j] < pivot:
            i += 1
            arr[i], arr[j] = arr[j], arr[i]
    arr[i + 1], arr[high] = arr[high], arr[i + 1]
    return i + 1

# Example usage:
arr = [3, 6, 8, 10, 1, 2, 1]
quick_sort_in_place(arr, 0, len(arr) - 1)
print("Sorted array:", arr)
```

### Explanation:
1. **In-Place Partitioning**: The `partition` function rearranges the elements in the array such that elements less than the pivot are on the left, and elements greater than the pivot are on the right. It returns the index of the pivot after partitioning.
2. **Recursive Sorting**: The `quick_sort_in_place` function recursively sorts the subarrays before and after the pivot.

This in-place version is more space-efficient as it does not require additional lists.
system: {
    "type": "tool_result",
    "id": "call_6846f6a9dd65495597d582",
    "name": "generate_python",
    "output": [
        {
            "type": "text",
            "text": "Certainly! Quick sort is a popular and efficient sorting algorithm that uses a divide-and-conquer approach. Here's a Python implementation of the quick sort algorithm:\n\n```python\ndef quick_sort(arr):\n    if len(arr) <= 1:\n        return arr\n    else:\n        pivot = arr[len(arr) // 2]\n        left = [x for x in arr if x < pivot]\n        middle = [x for x in arr if x == pivot]\n        right = [x for x in arr if x > pivot]\n        return quick_sort(left) + middle + quick_sort(right)\n\n# Example usage:\narr = [3, 6, 8, 10, 1, 2, 1]\nsorted_arr = quick_sort(arr)\nprint(\"Sorted array:\", sorted_arr)\n```\n\n### Explanation:\n1. **Base Case**: If the array has 0 or 1 elements, it is already sorted, so we return it as is.\n2. **Pivot Selection**: We choose the pivot element. In this implementation, we use the middle element of the array.\n3. **Partitioning**:\n   - `left` contains all elements less than the pivot.\n   - `middle` contains all elements equal to the pivot.\n   - `right` contains all elements greater than the pivot.\n4. **Recursive Sorting**: We recursively apply the `quick_sort` function to the `left` and `right` subarrays and concatenate the results with the `middle` array.\n\nThis implementation is simple and easy to understand, but it may not be the most efficient in terms of space complexity due to the use of additional lists. For an in-place version, you can modify the implementation as follows:\n\n```python\ndef quick_sort_in_place(arr, low, high):\n    if low < high:\n        pi = partition(arr, low, high)\n        quick_sort_in_place(arr, low, pi - 1)\n        quick_sort_in_place(arr, pi + 1, high)\n\ndef partition(arr, low, high):\n    pivot = arr[high]\n    i = low - 1\n    for j in range(low, high):\n        if arr[j] < pivot:\n            i += 1\n            arr[i], arr[j] = arr[j], arr[i]\n    arr[i + 1], arr[high] = arr[high], arr[i + 1]\n    return i + 1\n\n# Example usage:\narr = [3, 6, 8, 10, 1, 2, 1]\nquick_sort_in_place(arr, 0, len(arr) - 1)\nprint(\"Sorted array:\", arr)\n```\n\n### Explanation:\n1. **In-Place Partitioning**: The `partition` function rearranges the elements in the array such that elements less than the pivot are on the left, and elements greater than the pivot are on the right. It returns the index of the pivot after partitioning.\n2. **Recursive Sorting**: The `quick_sort_in_place` function recursively sorts the subarrays before and after the pivot.\n\nThis in-place version is more space-efficient as it does not require additional lists."
        }
    ]
}
Router: Here is a Python implementation of the quick sort algorithm:

```python
def quick_sort(arr):
    if len(arr) <= 1:
        return arr
    else:
        pivot = arr[len(arr) // 2]
        left = [x for x in arr if x < pivot]
        middle = [x for x in arr if x == pivot]
        right = [x for x in arr if x > pivot]
        return quick_sort(left) + middle + quick_sort(right)

# Example usage:
arr = [3, 6, 8, 10, 1, 2, 1]
sorted_arr = quick_sort(arr)
print("Sorted array:", sorted_arr)
```

### Explanation:
- **Base Case**: If the array has 0 or 1 elements, it is already sorted, so we return it as is.
- **Pivot Selection**: We choose the pivot element. In this implementation, we use the middle element of the array.
- **Partitioning**:
  - `left` contains all elements less than the pivot.
  - `middle` contains all elements equal to the pivot.
  - `right` contains all elements greater than the pivot.
- **Recursive Sorting**: We recursively apply the `quick_sort` function to the `left` and `right` subarrays and concatenate the results with the `middle` array.

This implementation is simple and easy to understand, but it may not be the most efficient in terms of space complexity due to the use of additional lists. For an in-place version, you can modify the implementation as follows:

```python
def quick_sort_in_place(arr, low, high):
    if low < high:
        pi = partition(arr, low, high)
        quick_sort_in_place(arr, low, pi - 1)
        quick_sort_in_place(arr, pi + 1, high)

def partition(arr, low, high):
    pivot = arr[high]
    i = low - 1
    for j in range(low, high):
        if arr[j] < pivot:
            i += 1
            arr[i], arr[j] = arr[j], arr[i]
    arr[i + 1], arr[high] = arr[high], arr[i + 1]
    return i + 1

# Example usage:
arr = [3, 6, 8, 10, 1, 2, 1]
quick_sort_in_place(arr, 0, len(arr) - 1)
print("Sorted array:", arr)
```

### Explanation:
- **In-Place Partitioning**: The `partition` function rearranges the elements in the array such that elements less than the pivot are on the left, and elements greater than the pivot are on the right. It returns the index of the pivot after partitioning.
- **Recursive Sorting**: The `quick_sort_in_place` function recursively sorts the subarrays before and after the pivot.

This in-place version is more space-efficient as it does not require additional lists.

Total running time of the script: (1 minutes 56.689 seconds)

Gallery generated by Sphinx-Gallery