agentscope.formatter._dashscope_formatter 源代码
# -*- coding: utf-8 -*-
# pylint: disable=too-many-branches
"""The dashscope formatter module."""
import json
import os.path
from typing import Any
from ._truncated_formatter_base import TruncatedFormatterBase
from .._logging import logger
from .._utils._common import _is_accessible_local_file
from ..message import (
Msg,
TextBlock,
ImageBlock,
AudioBlock,
VideoBlock,
ToolUseBlock,
ToolResultBlock,
URLSource,
)
from ..token import TokenCounterBase
def _format_dashscope_media_block(
block: ImageBlock | AudioBlock,
) -> dict[str, str]:
"""Format an image or audio block for DashScope API.
Args:
block (`ImageBlock` | `AudioBlock`):
The image or audio block to format.
Returns:
`dict[str, str]`:
A dictionary with "image" or "audio" key and the formatted URL or
data URI as value.
Raises:
`NotImplementedError`:
If the source type is not supported.
"""
typ = block["type"]
source = block["source"]
if source["type"] == "url":
url = source["url"]
if _is_accessible_local_file(url):
return {typ: "file://" + os.path.abspath(url)}
else:
# treat as web url
return {typ: url}
elif source["type"] == "base64":
media_type = source["media_type"]
base64_data = source["data"]
return {
typ: f"data:{media_type};base64,{base64_data}",
}
else:
raise NotImplementedError(
f"Unsupported source type '{source.get('type')}' "
f"for {typ} block.",
)
def _reformat_messages(
messages: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Reformat the content to be compatible with HuggingFaceTokenCounter.
This function processes a list of messages and converts multi-part
text content into single string content when all parts are plain text.
This is necessary for compatibility with HuggingFaceTokenCounter which
expects simple string content rather than structured content with
multiple parts.
Args:
messages (list[dict[str, Any]]):
A list of message dictionaries where each message may contain a
"content" field. The content can be either:
- A string (unchanged)
- A list of content items, where each item is a dict that may
contain "text", "type", and other fields
Returns:
list[dict[str, Any]]:
A list of reformatted messages. For messages where all content
items are plain text (have "text" field and either no "type"
field or "type" == "text"), the content list is converted to a
single newline-joined string. Other messages remain unchanged.
Example:
.. code-block:: python
# Case 1: All text content - will be converted
messages = [
{
"role": "user",
"content": [
{"text": "Hello", "type": "text"},
{"text": "World", "type": "text"}
]
}
]
result = _reformat_messages(messages)
print(result[0]["content"])
# Output: "Hello\nWorld"
# Case 2: Mixed content - will remain unchanged
messages = [
{
"role": "user",
"content": [
{"text": "Hello", "type": "text"},
{"image_url": "...", "type": "image"}
]
}
]
result = _reformat_messages(messages) # remain unchanged
print(type(result[0]["content"]))
# Output: <class 'list'>
"""
for message in messages:
content = message.get("content", [])
is_all_text = True
texts = []
for item in content:
if not isinstance(item, dict) or "text" not in item:
is_all_text = False
break
if "type" in item and item["type"] != "text":
is_all_text = False
break
if item["text"]:
texts.append(item["text"])
if is_all_text and texts:
message["content"] = "\n".join(texts)
return messages
[文档]
class DashScopeChatFormatter(TruncatedFormatterBase):
"""The DashScope formatter class for chatbot scenario, where only a user
and an agent are involved. We use the `role` field to identify different
entities in the conversation.
"""
support_tools_api: bool = True
"""Whether support tools API"""
support_multiagent: bool = False
"""Whether support multi-agent conversations"""
support_vision: bool = True
"""Whether support vision data"""
supported_blocks: list[type] = [
TextBlock,
ImageBlock,
AudioBlock,
VideoBlock,
ToolUseBlock,
ToolResultBlock,
]
[文档]
def __init__(
self,
promote_tool_result_images: bool = False,
promote_tool_result_audios: bool = False,
promote_tool_result_videos: bool = False,
token_counter: TokenCounterBase | None = None,
max_tokens: int | None = None,
) -> None:
"""Initialize the DashScope chat formatter.
Args:
promote_tool_result_images (`bool`, defaults to `False`):
Whether to promote images from tool results to user messages.
Most LLM APIs don't support images in tool result blocks, but
do support them in user message blocks. When `True`, images are
extracted and appended as a separate user message with
explanatory text indicating their source.
promote_tool_result_audios (`bool`, defaults to `False`):
Whether to promote audios from tool results to user messages.
Most LLM APIs don't support audios in tool result blocks, but
do support them in user message blocks. When `True`, audios are
extracted and appended as a separate user message with
explanatory text indicating their source.
promote_tool_result_videos (`bool`, defaults to `False`):
Whether to promote videos from tool results to user messages.
Most LLM APIs don't support videos in tool result blocks, but
do support them in user message blocks. When `True`, videos are
extracted and appended as a separate user message with
explanatory text indicating their source.
token_counter (`TokenCounterBase | None`, optional):
A token counter instance used to count tokens in the messages.
If not provided, the formatter will format the messages
without considering token limits.
max_tokens (`int | None`, optional):
The maximum number of tokens allowed in the formatted
messages. If not provided, the formatter will not truncate
the messages.
"""
super().__init__(token_counter, max_tokens)
self.promote_tool_result_images = promote_tool_result_images
self.promote_tool_result_audios = promote_tool_result_audios
self.promote_tool_result_videos = promote_tool_result_videos
[文档]
async def _format(
self,
msgs: list[Msg],
) -> list[dict[str, Any]]:
"""Format message objects into DashScope API format.
Args:
msgs (`list[Msg]`):
The list of message objects to format.
Returns:
`list[dict[str, Any]]`:
The formatted messages as a list of dictionaries.
"""
self.assert_list_of_msgs(msgs)
formatted_msgs: list[dict] = []
i = 0
while i < len(msgs):
msg = msgs[i]
content_blocks: list[dict[str, Any]] = []
tool_calls = []
for block in msg.get_content_blocks():
typ = block.get("type")
if typ == "text":
content_blocks.append(
{
"text": block.get("text"),
},
)
elif typ in ["image", "audio", "video"]:
content_blocks.append(
_format_dashscope_media_block(
block, # type: ignore[arg-type]
),
)
elif typ == "tool_use":
tool_calls.append(
{
"id": block.get("id"),
"type": "function",
"function": {
"name": block.get("name"),
"arguments": json.dumps(
block.get("input", {}),
ensure_ascii=False,
),
},
},
)
elif typ == "tool_result":
(
textual_output,
multimodal_data,
) = self.convert_tool_result_to_string(block["output"])
# First add the tool result message in DashScope API format
formatted_msgs.append(
{
"role": "tool",
"tool_call_id": block.get("id"),
"content": textual_output,
"name": block.get("name"),
},
)
# Then, handle the multimodal data if any
promoted_blocks: list = []
for url, multimodal_block in multimodal_data:
if (
multimodal_block["type"] == "image"
and self.promote_tool_result_images
):
promoted_blocks.extend(
[
TextBlock(
type="text",
text=f"\n- The image from '{url}': ",
),
ImageBlock(
type="image",
source=URLSource(
type="url",
url=url,
),
),
],
)
elif (
multimodal_block["type"] == "audio"
and self.promote_tool_result_audios
):
promoted_blocks.extend(
[
TextBlock(
type="text",
text=f"\n- The audio from '{url}': ",
),
AudioBlock(
type="audio",
source=URLSource(
type="url",
url=url,
),
),
],
)
elif (
multimodal_block["type"] == "video"
and self.promote_tool_result_videos
):
promoted_blocks.extend(
[
TextBlock(
type="text",
text=f"\n- The video from '{url}': ",
),
VideoBlock(
type="video",
source=URLSource(
type="url",
url=url,
),
),
],
)
if promoted_blocks:
# Insert promoted blocks as new user message(s)
promoted_blocks = [
TextBlock(
type="text",
text="<system-info>The following are "
f"the media contents from the tool "
f"result of '{block['name']}':",
),
*promoted_blocks,
TextBlock(
type="text",
text="</system-info>",
),
]
msgs.insert(
i + 1,
Msg(
name="user",
content=promoted_blocks,
role="user",
),
)
else:
logger.warning(
"Unsupported block type %s in the message, skipped.",
typ,
)
msg_dashscope = {
"role": msg.role,
"content": content_blocks or [{"text": None}],
}
if tool_calls:
msg_dashscope["tool_calls"] = tool_calls
if msg_dashscope["content"] != [
{"text": None},
] or msg_dashscope.get(
"tool_calls",
):
formatted_msgs.append(msg_dashscope)
# Move to next message
i += 1
return _reformat_messages(formatted_msgs)
[文档]
class DashScopeMultiAgentFormatter(TruncatedFormatterBase):
"""DashScope formatter for multi-agent conversations, where more than
a user and an agent are involved.
.. note:: This formatter will combine previous messages (except tool
calls/results) into a history section in the first system message with
the conversation history prompt.
.. note:: For tool calls/results, they will be presented as separate
messages as required by the DashScope API. Therefore, the tool calls/
results messages are expected to be placed at the end of the input
messages.
.. tip:: Telling the assistant's name in the system prompt is very
important in multi-agent conversations. So that LLM can know who it
is playing as.
"""
support_tools_api: bool = True
"""Whether support tools API"""
support_multiagent: bool = True
"""Whether support multi-agent conversations"""
support_vision: bool = True
"""Whether support vision data"""
supported_blocks: list[type] = [
TextBlock,
# Multimodal
ImageBlock,
AudioBlock,
VideoBlock,
# Tool use
ToolUseBlock,
ToolResultBlock,
]
"""The list of supported message blocks"""
[文档]
def __init__(
self,
conversation_history_prompt: str = (
"# Conversation History\n"
"The content between <history></history> tags contains "
"your conversation history\n"
),
promote_tool_result_images: bool = False,
promote_tool_result_audios: bool = False,
promote_tool_result_videos: bool = False,
token_counter: TokenCounterBase | None = None,
max_tokens: int | None = None,
) -> None:
"""Initialize the DashScope multi-agent formatter.
Args:
conversation_history_prompt (`str`):
The prompt to use for the conversation history section.
promote_tool_result_images (`bool`, defaults to `False`):
Whether to promote images from tool results to user messages.
Most LLM APIs don't support images in tool result blocks, but
do support them in user message blocks. When `True`, images are
extracted and appended as a separate user message with
explanatory text indicating their source.
promote_tool_result_audios (`bool`, defaults to `False`):
Whether to promote audios from tool results to user messages.
Most LLM APIs don't support audios in tool result blocks, but
do support them in user message blocks. When `True`, audios are
extracted and appended as a separate user message with
explanatory text indicating their source.
promote_tool_result_videos (`bool`, defaults to `False`):
Whether to promote videos from tool results to user messages.
Most LLM APIs don't support videos in tool result blocks, but
do support them in user message blocks. When `True`, videos are
extracted and appended as a separate user message with
explanatory text indicating their source.
token_counter (`TokenCounterBase | None`, optional):
The token counter used for truncation.
max_tokens (`int | None`, optional):
The maximum number of tokens allowed in the formatted
messages. If `None`, no truncation will be applied.
"""
super().__init__(token_counter=token_counter, max_tokens=max_tokens)
self.conversation_history_prompt = conversation_history_prompt
self.promote_tool_result_images = promote_tool_result_images
self.promote_tool_result_audios = promote_tool_result_audios
self.promote_tool_result_videos = promote_tool_result_videos
[文档]
async def _format_tool_sequence(
self,
msgs: list[Msg],
) -> list[dict[str, Any]]:
"""Given a sequence of tool call/result messages, format them into
the required format for the DashScope API.
Args:
msgs (`list[Msg]`):
The list of messages containing tool calls/results to format.
Returns:
`list[dict[str, Any]]`:
A list of dictionaries formatted for the DashScope API.
"""
return await DashScopeChatFormatter(
promote_tool_result_images=self.promote_tool_result_images,
promote_tool_result_audios=self.promote_tool_result_audios,
promote_tool_result_videos=self.promote_tool_result_videos,
).format(msgs)
[文档]
async def _format_agent_message(
self,
msgs: list[Msg],
is_first: bool = True,
) -> list[dict[str, Any]]:
"""Given a sequence of messages without tool calls/results, format
them into a user message with conversation history tags. For the
first agent message, it will include the conversation history prompt.
Args:
msgs (`list[Msg]`):
A list of Msg objects to be formatted.
is_first (`bool`, defaults to `True`):
Whether this is the first agent message in the conversation.
If `True`, the conversation history prompt will be included.
Returns:
`list[dict[str, Any]]`:
A list of dictionaries formatted for the DashScope API.
"""
if is_first:
conversation_history_prompt = self.conversation_history_prompt
else:
conversation_history_prompt = ""
# Format into required DashScope format
formatted_msgs: list[dict] = []
# Collect the multimodal files
conversation_blocks = []
accumulated_text = []
for msg in msgs:
for block in msg.get_content_blocks():
if block["type"] == "text":
accumulated_text.append(f"{msg.name}: {block['text']}")
elif block["type"] in ["image", "audio", "video"]:
# Handle the accumulated text as a single block
if accumulated_text:
conversation_blocks.append(
{"text": "\n".join(accumulated_text)},
)
accumulated_text.clear()
if block["source"]["type"] == "url":
url = block["source"]["url"]
if _is_accessible_local_file(url):
conversation_blocks.append(
{
block["type"]: "file://"
+ os.path.abspath(url),
},
)
else:
conversation_blocks.append({block["type"]: url})
elif block["source"]["type"] == "base64":
media_type = block["source"]["media_type"]
base64_data = block["source"]["data"]
conversation_blocks.append(
{
block[
"type"
]: f"data:{media_type};base64,{base64_data}",
},
)
else:
logger.warning(
"Unsupported block type %s in the message, "
"skipped.",
block["type"],
)
if accumulated_text:
conversation_blocks.append({"text": "\n".join(accumulated_text)})
if conversation_blocks:
if conversation_blocks[0].get("text"):
conversation_blocks[0]["text"] = (
conversation_history_prompt
+ "<history>\n"
+ conversation_blocks[0]["text"]
)
else:
conversation_blocks.insert(
0,
{
"text": conversation_history_prompt + "<history>\n",
},
)
if conversation_blocks[-1].get("text"):
conversation_blocks[-1]["text"] += "\n</history>"
else:
conversation_blocks.append({"text": "</history>"})
formatted_msgs.append(
{
"role": "user",
"content": conversation_blocks,
},
)
return _reformat_messages(formatted_msgs)
async def _format_system_message(
self,
msg: Msg,
) -> dict[str, Any]:
"""Format system message for DashScope API."""
return {
"role": "system",
"content": msg.get_text_content(),
}