Source code for agentscope.formatter._dashscope_formatter
# -*- coding: utf-8 -*-
# pylint: disable=too-many-branches
"""The dashscope formatter module."""
import json
import os.path
from typing import Any
from ._truncated_formatter_base import TruncatedFormatterBase
from .._logging import logger
from .._utils._common import _is_accessible_local_file
from ..message import (
Msg,
TextBlock,
ImageBlock,
AudioBlock,
ToolUseBlock,
ToolResultBlock,
)
from ..token import TokenCounterBase
def _reformat_messages(
messages: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Reformat the content to be compatible with HuggingFaceTokenCounter.
This function processes a list of messages and converts multi-part
text content into single string content when all parts are plain text.
This is necessary for compatibility with HuggingFaceTokenCounter which
expects simple string content rather than structured content with
multiple parts.
Args:
messages (list[dict[str, Any]]):
A list of message dictionaries where each message may contain a
"content" field. The content can be either:
- A string (unchanged)
- A list of content items, where each item is a dict that may
contain "text", "type", and other fields
Returns:
list[dict[str, Any]]:
A list of reformatted messages. For messages where all content
items are plain text (have "text" field and either no "type"
field or "type" == "text"), the content list is converted to a
single newline-joined string. Other messages remain unchanged.
Example:
.. code-block:: python
# Case 1: All text content - will be converted
messages = [
{
"role": "user",
"content": [
{"text": "Hello", "type": "text"},
{"text": "World", "type": "text"}
]
}
]
result = _reformat_messages(messages)
print(result[0]["content"])
# Output: "Hello\nWorld"
# Case 2: Mixed content - will remain unchanged
messages = [
{
"role": "user",
"content": [
{"text": "Hello", "type": "text"},
{"image_url": "...", "type": "image"}
]
}
]
result = _reformat_messages(messages) # remain unchanged
print(type(result[0]["content"]))
# Output: <class 'list'>
"""
for message in messages:
content = message.get("content", [])
is_all_text = True
texts = []
for item in content:
if not isinstance(item, dict) or "text" not in item:
is_all_text = False
break
if "type" in item and item["type"] != "text":
is_all_text = False
break
if item["text"]:
texts.append(item["text"])
if is_all_text and texts:
message["content"] = "\n".join(texts)
return messages
[docs]
class DashScopeChatFormatter(TruncatedFormatterBase):
"""Formatter for DashScope messages."""
support_tools_api: bool = True
"""Whether support tools API"""
support_multiagent: bool = False
"""Whether support multi-agent conversations"""
support_vision: bool = True
"""Whether support vision data"""
supported_blocks: list[type] = [
TextBlock,
ImageBlock,
AudioBlock,
ToolUseBlock,
ToolResultBlock,
]
[docs]
async def _format(
self,
msgs: list[Msg],
) -> list[dict[str, Any]]:
"""Format message objects into DashScope API format.
Args:
msgs (`list[Msg]`):
The list of message objects to format.
Returns:
`list[dict[str, Any]]`:
The formatted messages as a list of dictionaries.
"""
self.assert_list_of_msgs(msgs)
formatted_msgs: list[dict] = []
for msg in msgs:
content_blocks = []
tool_calls = []
for block in msg.get_content_blocks():
typ = block.get("type")
if typ == "text":
content_blocks.append(
{
"text": block.get("text"),
},
)
elif typ in ["image", "audio"]:
source = block["source"]
if source["type"] == "url":
url = source["url"]
if _is_accessible_local_file(url):
content_blocks.append(
{typ: "file://" + os.path.abspath(url)},
)
else:
# treat as web url
content_blocks.append({typ: url})
elif source["type"] == "base64":
media_type = source["media_type"]
base64_data = source["data"]
content_blocks.append(
{typ: f"data:{media_type};base64,{base64_data}"},
)
else:
raise NotImplementedError(
f"Unsupported source type '{source.get('type')}' "
f"for {typ} block.",
)
elif typ == "tool_use":
tool_calls.append(
{
"id": block.get("id"),
"type": "function",
"function": {
"name": block.get("name"),
"arguments": json.dumps(
block.get("input", {}),
ensure_ascii=False,
),
},
},
)
elif typ == "tool_result":
formatted_msgs.append(
{
"role": "tool",
"tool_call_id": block.get("id"),
"content": self.convert_tool_result_to_string(
block.get("output"), # type: ignore[arg-type]
),
"name": block.get("name"),
},
)
else:
logger.warning(
"Unsupported block type %s in the message, skipped.",
typ,
)
msg_dashscope = {
"role": msg.role,
"content": content_blocks or [{"text": None}],
}
if tool_calls:
msg_dashscope["tool_calls"] = tool_calls
if msg_dashscope["content"] != [
{"text": None},
] or msg_dashscope.get(
"tool_calls",
):
formatted_msgs.append(msg_dashscope)
return _reformat_messages(formatted_msgs)
[docs]
class DashScopeMultiAgentFormatter(TruncatedFormatterBase):
"""DashScope formatter for multi-agent conversations, where more than
a user and an agent are involved.
.. note:: This formatter will combine previous messages (except tool
calls/results) into a history section in the first system message with
the conversation history prompt.
.. note:: For tool calls/results, they will be presented as separate
messages as required by the DashScope API. Therefore, the tool calls/
results messages are expected to be placed at the end of the input
messages.
.. tip:: Telling the assistant's name in the system prompt is very
important in multi-agent conversations. So that LLM can know who it
is playing as.
"""
support_tools_api: bool = True
"""Whether support tools API"""
support_multiagent: bool = True
"""Whether support multi-agent conversations"""
support_vision: bool = True
"""Whether support vision data"""
supported_blocks: list[type] = [
TextBlock,
# Multimodal
ImageBlock,
AudioBlock,
# Tool use
ToolUseBlock,
ToolResultBlock,
]
"""The list of supported message blocks"""
[docs]
def __init__(
self,
conversation_history_prompt: str = (
"# Conversation History\n"
"The content between <history></history> tags contains "
"your conversation history\n"
),
token_counter: TokenCounterBase | None = None,
max_tokens: int | None = None,
) -> None:
"""Initialize the DashScope multi-agent formatter.
Args:
conversation_history_prompt (`str`):
The prompt to use for the conversation history section.
token_counter (`TokenCounterBase | None`, optional):
The token counter used for truncation.
max_tokens (`int | None`, optional):
The maximum number of tokens allowed in the formatted
messages. If `None`, no truncation will be applied.
"""
super().__init__(token_counter=token_counter, max_tokens=max_tokens)
self.conversation_history_prompt = conversation_history_prompt
[docs]
async def _format_tool_sequence(
self,
msgs: list[Msg],
) -> list[dict[str, Any]]:
"""Given a sequence of tool call/result messages, format them into
the required format for the DashScope API.
Args:
msgs (`list[Msg]`):
The list of messages containing tool calls/results to format.
Returns:
`list[dict[str, Any]]`:
A list of dictionaries formatted for the DashScope API.
"""
return await DashScopeChatFormatter().format(msgs)
[docs]
async def _format_agent_message(
self,
msgs: list[Msg],
is_first: bool = True,
) -> list[dict[str, Any]]:
"""Given a sequence of messages without tool calls/results, format
them into a user message with conversation history tags. For the
first agent message, it will include the conversation history prompt.
Args:
msgs (`list[Msg]`):
A list of Msg objects to be formatted.
is_first (`bool`, defaults to `True`):
Whether this is the first agent message in the conversation.
If `True`, the conversation history prompt will be included.
Returns:
`list[dict[str, Any]]`:
A list of dictionaries formatted for the DashScope API.
"""
if is_first:
conversation_history_prompt = self.conversation_history_prompt
else:
conversation_history_prompt = ""
# Format into required DashScope format
formatted_msgs: list[dict] = []
# Collect the multimodal files
conversation_blocks = []
accumulated_text = []
for msg in msgs:
for block in msg.get_content_blocks():
if block["type"] == "text":
accumulated_text.append(f"{msg.name}: {block['text']}")
elif block["type"] in ["image", "audio"]:
# Handle the accumulated text as a single block
if accumulated_text:
conversation_blocks.append(
{"text": "\n".join(accumulated_text)},
)
accumulated_text.clear()
if block["source"]["type"] == "url":
url = block["source"]["url"]
if _is_accessible_local_file(url):
conversation_blocks.append(
{
block["type"]: "file://"
+ os.path.abspath(url),
},
)
else:
conversation_blocks.append({block["type"]: url})
elif block["source"]["type"] == "base64":
media_type = block["source"]["media_type"]
base64_data = block["source"]["data"]
conversation_blocks.append(
{
block[
"type"
]: f"data:{media_type};base64,{base64_data}",
},
)
else:
logger.warning(
"Unsupported block type %s in the message, "
"skipped.",
block["type"],
)
if accumulated_text:
conversation_blocks.append({"text": "\n".join(accumulated_text)})
if conversation_blocks:
if conversation_blocks[0].get("text"):
conversation_blocks[0]["text"] = (
conversation_history_prompt
+ "<history>\n"
+ conversation_blocks[0]["text"]
)
else:
conversation_blocks.insert(
0,
{
"text": conversation_history_prompt + "<history>\n",
},
)
if conversation_blocks[-1].get("text"):
conversation_blocks[-1]["text"] += "\n</history>"
else:
conversation_blocks.append({"text": "</history>"})
formatted_msgs.append(
{
"role": "user",
"content": conversation_blocks,
},
)
return _reformat_messages(formatted_msgs)
async def _format_system_message(
self,
msg: Msg,
) -> dict[str, Any]:
"""Format system message for DashScope API."""
return {
"role": "system",
"content": msg.get_text_content(),
}