agentscope.formatter._gemini_formatter 源代码

# -*- coding: utf-8 -*-
# pylint: disable=too-many-branches
"""Google gemini API formatter in agentscope."""
import base64
import os
from typing import Any
from urllib.parse import urlparse

from ._truncated_formatter_base import TruncatedFormatterBase
from .._utils._common import _get_bytes_from_web_url
from ..message import (
    Msg,
    TextBlock,
    ImageBlock,
    AudioBlock,
    ToolUseBlock,
    ToolResultBlock,
    VideoBlock,
    URLSource,
)
from .._logging import logger
from ..token import TokenCounterBase


def _format_gemini_media_block(
    media_block: ImageBlock | AudioBlock | VideoBlock,
) -> dict[str, Any]:
    """Format an image/audio/video block for Gemini API.

    Args:
        media_block (`ImageBlock | AudioBlock | VideoBlock`):
            The media block to format.

    Returns:
        `dict[str, Any]`:
            A dictionary with "inline_data" key in Gemini format.

    Raises:
        `ValueError`:
            If the source type is not supported.
    """
    source = media_block["source"]
    if source["type"] == "base64":
        return {
            "inline_data": {
                "data": source["data"],
                "mime_type": source["media_type"],
            },
        }
    elif source["type"] == "url":
        return {
            "inline_data": _to_gemini_inline_data(source["url"]),
        }
    else:
        raise ValueError(
            f"Unsupported source type: {source['type']}",
        )


def _to_gemini_inline_data(url: str) -> dict:
    """Convert url into the Gemini API required format."""
    parsed_url = urlparse(url)
    extension = url.split(".")[-1].lower()

    # Pre-calculate media type from extension (image/audio/video).
    typ = None
    for k, v in GeminiChatFormatter.supported_extensions.items():
        if extension in v:
            typ = k
            break

    if not os.path.exists(url) and parsed_url.scheme != "":
        # Web url
        if typ is None:
            raise TypeError(
                f"Unsupported file extension: {extension}, expected "
                f"{GeminiChatFormatter.supported_extensions}",
            )

        data = _get_bytes_from_web_url(url)
        return {
            "data": data,
            "mime_type": f"{typ}/{extension}",
        }

    elif os.path.exists(url):
        # Local file
        if typ is None:
            raise TypeError(
                f"Unsupported file extension: {extension}, expected "
                f"{GeminiChatFormatter.supported_extensions}",
            )

        with open(url, "rb") as f:
            data = base64.b64encode(f.read()).decode("utf-8")

        return {
            "data": data,
            "mime_type": f"{typ}/{extension}",
        }

    raise ValueError(
        f"The URL `{url}` is not a valid image URL or local file.",
    )


[文档] class GeminiChatFormatter(TruncatedFormatterBase): """The Gemini formatter class for chatbot scenario, where only a user and an agent are involved. We use the `role` field to identify different entities in the conversation. """ support_tools_api: bool = True """Whether support tools API""" support_multiagent: bool = False """Whether support multi-agent conversations""" support_vision: bool = True """Whether support vision data""" supported_blocks: list[type] = [ TextBlock, # Multimodal ImageBlock, VideoBlock, AudioBlock, # Tool use ToolUseBlock, ToolResultBlock, ] """The list of supported message blocks""" supported_extensions: dict[str, list[str]] = { "image": ["png", "jpeg", "webp", "heic", "heif"], "video": [ "mp4", "mpeg", "mov", "avi", "x-flv", "mpg", "webm", "wmv", "3gpp", ], "audio": ["mp3", "wav", "aiff", "aac", "ogg", "flac"], }
[文档] def __init__( self, promote_tool_result_images: bool = False, token_counter: TokenCounterBase | None = None, max_tokens: int | None = None, ) -> None: """Initialize the Gemini chat formatter. Args: promote_tool_result_images (`bool`, defaults to `False`): Whether to promote images from tool results to user messages. Most LLM APIs don't support images in tool result blocks, but do support them in user message blocks. When `True`, images are extracted and appended as a separate user message with explanatory text indicating their source. token_counter (`TokenCounterBase | None`, optional): A token counter instance used to count tokens in the messages. If not provided, the formatter will format the messages without considering token limits. max_tokens (`int | None`, optional): The maximum number of tokens allowed in the formatted messages. If not provided, the formatter will not truncate the messages. """ super().__init__(token_counter, max_tokens) self.promote_tool_result_images = promote_tool_result_images
[文档] async def _format( self, msgs: list[Msg], ) -> list[dict]: """Format message objects into Gemini API required format.""" self.assert_list_of_msgs(msgs) messages: list = [] i = 0 while i < len(msgs): msg = msgs[i] parts = [] for block in msg.get_content_blocks(): typ = block.get("type") if typ == "text": parts.append( { "text": block.get("text"), }, ) elif typ == "tool_use": parts.append( { "function_call": { "id": block["id"], "name": block["name"], "args": block["input"], }, }, ) elif typ == "tool_result": ( textual_output, multimodal_data, ) = self.convert_tool_result_to_string(block["output"]) # First add the tool result message in DashScope API format messages.append( { "role": "user", "parts": [ { "function_response": { "id": block["id"], "name": block["name"], "response": { "output": textual_output, }, }, }, ], }, ) promoted_blocks: list = [] for url, multimodal_block in multimodal_data: if ( multimodal_block["type"] == "image" and self.promote_tool_result_images ): promoted_blocks.extend( [ TextBlock( type="text", text=f"\n- The image from '{url}': ", ), ImageBlock( type="image", source=URLSource( type="url", url=url, ), ), ], ) if promoted_blocks: # Insert promoted blocks as new user message(s) promoted_blocks = [ TextBlock( type="text", text="<system-info>The following are " "the image contents from the tool " f"result of '{block['name']}':", ), *promoted_blocks, TextBlock( type="text", text="</system-info>", ), ] msgs.insert( i + 1, Msg( name="user", content=promoted_blocks, role="user", ), ) elif typ in ["image", "audio", "video"]: parts.append( _format_gemini_media_block( block, # type: ignore[arg-type] ), ) else: logger.warning( "Unsupported block type: %s in the message, skipped. ", typ, ) role = "model" if msg.role == "assistant" else "user" if parts: messages.append( { "role": role, "parts": parts, }, ) # Move to next message (including inserted messages, which will # be processed in subsequent iterations) i += 1 return messages
[文档] class GeminiMultiAgentFormatter(TruncatedFormatterBase): """The multi-agent formatter for Google Gemini API, where more than a user and an agent are involved. .. note:: This formatter will combine previous messages (except tool calls/results) into a history section in the first system message with the conversation history prompt. .. note:: For tool calls/results, they will be presented as separate messages as required by the Gemini API. Therefore, the tool calls/ results messages are expected to be placed at the end of the input messages. .. tip:: Telling the assistant's name in the system prompt is very important in multi-agent conversations. So that LLM can know who it is playing as. """ support_tools_api: bool = True """Whether support tools API""" support_multiagent: bool = True """Whether support multi-agent conversations""" support_vision: bool = True """Whether support vision data""" supported_blocks: list[type] = [ TextBlock, # Multimodal ImageBlock, VideoBlock, AudioBlock, # Tool use ToolUseBlock, ToolResultBlock, ] """The list of supported message blocks"""
[文档] def __init__( self, conversation_history_prompt: str = ( "# Conversation History\n" "The content between <history></history> tags contains " "your conversation history\n" ), promote_tool_result_images: bool = False, token_counter: TokenCounterBase | None = None, max_tokens: int | None = None, ) -> None: """Initialize the Gemini multi-agent formatter. Args: conversation_history_prompt (`str`): The prompt to be used for the conversation history section. promote_tool_result_images (`bool`, defaults to `False`): Whether to promote images from tool results to user messages. Most LLM APIs don't support images in tool result blocks, but do support them in user message blocks. When `True`, images are extracted and appended as a separate user message with explanatory text indicating their source. token_counter (`TokenCounterBase | None`, optional): The token counter used for truncation. max_tokens (`int | None`, optional): The maximum number of tokens allowed in the formatted messages. If `None`, no truncation will be applied. """ super().__init__(token_counter=token_counter, max_tokens=max_tokens) self.conversation_history_prompt = conversation_history_prompt self.promote_tool_result_images = promote_tool_result_images
async def _format_system_message( self, msg: Msg, ) -> dict[str, Any]: """Format system message for the Gemini API.""" return { "role": "user", "parts": [ { "text": msg.get_text_content(), }, ], }
[文档] async def _format_tool_sequence( self, msgs: list[Msg], ) -> list[dict[str, Any]]: """Given a sequence of tool call/result messages, format them into the required format for the Gemini API. Args: msgs (`list[Msg]`): The list of messages containing tool calls/results to format. Returns: `list[dict[str, Any]]`: A list of dictionaries formatted for the Gemini API. """ return await GeminiChatFormatter( promote_tool_result_images=self.promote_tool_result_images, ).format(msgs)
[文档] async def _format_agent_message( self, msgs: list[Msg], is_first: bool = True, ) -> list[dict[str, Any]]: """Given a sequence of messages without tool calls/results, format them into the required format for the Gemini API. Args: msgs (`list[Msg]`): A list of Msg objects to be formatted. is_first (`bool`, defaults to `True`): Whether this is the first agent message in the conversation. If `True`, the conversation history prompt will be included. Returns: `list[dict[str, Any]]`: A list of dictionaries formatted for the Gemini API. """ if is_first: conversation_history_prompt = self.conversation_history_prompt else: conversation_history_prompt = "" # Format into Gemini API required format formatted_msgs: list = [] # Collect the multimodal files conversation_parts: list = [] accumulated_text = [] for msg in msgs: for block in msg.get_content_blocks(): if block["type"] == "text": accumulated_text.append(f"{msg.name}: {block['text']}") elif block["type"] in ["image", "video", "audio"]: # handle the accumulated text as a single part if exists if accumulated_text: conversation_parts.append( { "text": "\n".join(accumulated_text), }, ) accumulated_text.clear() # handle the multimodal data conversation_parts.append( _format_gemini_media_block( block, # type: ignore[arg-type] ), ) if accumulated_text: conversation_parts.append( { "text": "\n".join(accumulated_text), }, ) # Add prompt and <history></history> tags around conversation history if conversation_parts: if conversation_parts[0].get("text"): conversation_parts[0]["text"] = ( conversation_history_prompt + "<history>" + conversation_parts[0]["text"] ) else: conversation_parts.insert( 0, {"text": conversation_history_prompt + "<history>"}, ) if conversation_parts[-1].get("text"): conversation_parts[-1]["text"] += "\n</history>" else: conversation_parts.append( {"text": "</history>"}, ) formatted_msgs.append( { "role": "user", "parts": conversation_parts, }, ) return formatted_msgs