Source code for agentscope.formatter._a2a_formatter
# -*- coding: utf-8 -*-
"""The A2A message formatter class."""
import mimetypes
import uuid
from typing import Literal, TYPE_CHECKING
from .._logging import logger
from ._formatter_base import FormatterBase
from ..message import (
Msg,
TextBlock,
URLSource,
Base64Source,
ContentBlock,
)
if TYPE_CHECKING:
from a2a.types import (
Message,
Task,
Part,
)
else:
Message = "a2a.types.Message"
Task = "a2a.types.Task"
Part = "a2a.types.Part"
[docs]
class A2AChatFormatter(FormatterBase):
"""A2A message formatter class, which convert AgentScope messages into
A2A message format."""
[docs]
async def format(self, msgs: list[Msg]) -> Message:
"""Convert AgentScope messages into a A2A message object. Note that
A2A server only supports single request message, so the input msgs
list will be merged into a single A2A Message.
.. note:: Note the A2A protocol receives a single message per request,
so multi-message inputs will be merged into one A2A Message with role
'user'.
Args:
msgs (`list[Msg]`):
List of AgentScope Msg objects to be converted.
Returns:
`Message`:
The converted A2A Message object.
"""
from a2a.types import (
Part,
TextPart,
FilePart,
FileWithUri,
FileWithBytes,
DataPart,
Role,
Message,
)
self.assert_list_of_msgs(msgs)
parts = []
for msg in msgs:
for block in msg.get_content_blocks():
block_type = block.get("type")
if block_type == "text" and block.get("text"):
parts.append(
Part(
root=TextPart(
text=block.get("text"),
),
),
)
elif block_type == "thinking" and block.get("thinking"):
parts.append(
Part(
root=TextPart(
text=block.get("thinking"),
),
),
)
elif block_type in [
"image",
"video",
"audio",
] and block.get("source"):
source = block.get("source", {})
source_type = source.get("type")
if source_type == "url":
parts.append(
Part(
root=FilePart(
file=FileWithUri(
uri=source.get("url"),
),
),
),
)
elif source_type == "base64":
parts.append(
Part(
root=FilePart(
file=FileWithBytes(
bytes=source.get("data"),
mime_type=source.get("media_type"),
),
),
),
)
else:
raise ValueError(
f"Unsupported source type: {source_type}",
)
elif block_type in ["tool_use", "tool_result"]:
parts.append(
Part(
root=DataPart(
data=block,
),
),
)
else:
logger.error(
"Unsupported block type %s in A2AFormatter.",
block_type,
)
a2a_message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=parts,
)
return a2a_message
[docs]
async def format_a2a_message(self, name: str, message: Message) -> Msg:
"""Convert A2A Message object back to AgentScope Msg format.
Args:
name (`str`):
The name of the message sender.
message (`Message`):
The A2A Message object to be converted.
Returns:
`list[Msg]`:
List of converted AgentScope Msg objects.
"""
from a2a.types import Role
content = []
metadata = None
for part in message.parts:
content.append(
await self._format_a2a_part(part),
)
if message.role == Role.user:
role: Literal["user", "assistant"] = "user"
elif message.role == Role.agent:
role = "assistant"
else:
raise ValueError(
f"Unsupported role: {message.role} in A2A message.",
)
return Msg(
name=name,
role=role,
content=content,
metadata=metadata,
)
@staticmethod
def _guess_type(
uri: str | None = None,
mime_type: str | None = None,
) -> Literal["image", "video", "audio", "unknown"]:
"""Guess the content type from the uri or mime type.
Args:
uri (`str | None`, optional):
The uri of the content.
mime_type (`str | None`, optional):
The mime type of the content.
Returns:
`Literal["image", "video", "audio", "unknown"]`:
The guessed content type.
"""
if mime_type is None and uri is None:
raise ValueError(
"Either uri or mime_type must be provided to guess the"
" content type.",
)
if mime_type is None:
mime_type, _encoding = mimetypes.guess_type(uri or "")
if isinstance(mime_type, str):
if mime_type.startswith("image/"):
return "image"
if mime_type.startswith("video/"):
return "video"
if mime_type.startswith("audio/"):
return "audio"
return "unknown"
[docs]
async def format_a2a_task(self, name: str, task: Task) -> list[Msg]:
"""Convert A2A Task object back to AgentScope Msg format.
Args:
name (`str`):
The name of the message sender.
task (`Task`):
The A2A Task object to be converted.
Returns:
`list[Msg]`:
Converted AgentScope Msg objects.
"""
msgs = []
if task.status and task.status.message:
msgs.append(
await self.format_a2a_message(name, task.status.message),
)
merged_msgs = []
for msg in msgs:
if merged_msgs and merged_msgs[-1].role == msg.role:
merged_msgs[-1].content.extend(msg.content)
else:
merged_msgs.append(msg)
if task.artifacts:
for artifact in task.artifacts:
artifact_content = [
await self._format_a2a_part(_) for _ in artifact.parts
]
if merged_msgs and merged_msgs[-1].role == "assistant":
merged_msgs[-1].content.extend(artifact_content)
merged_msgs[-1].metadata = artifact.metadata
else:
merged_msgs.append(
Msg(
name=name,
role="assistant",
content=artifact_content,
metadata=artifact.metadata,
),
)
return merged_msgs
async def _format_a2a_part(self, part: Part) -> ContentBlock:
"""Convert a single A2A Part object into AgentScope ContentBlock.
.. note:: We will try to convert the `DataPart` into tool use and tool
result blocks if possible.
Args:
part (`Part`):
The A2A Part object to be converted.
Returns:
`ContentBlock`:
The converted AgentScope ContentBlock.
"""
from a2a.types import (
TextPart,
FilePart,
FileWithUri,
FileWithBytes,
DataPart,
)
if isinstance(part.root, TextPart):
return TextBlock(
type="text",
text=part.root.text,
)
if isinstance(part.root, FilePart):
if isinstance(part.root.file, FileWithUri):
return { # type: ignore[return-value, misc]
"type": self._guess_type(
part.root.file.uri,
part.root.file.mime_type,
),
"source": URLSource(
type="url",
url=part.root.file.uri,
),
}
if isinstance(part.root.file, FileWithBytes):
return { # type: ignore[return-value, misc]
"type": self._guess_type(
mime_type=part.root.file.mime_type,
),
"source": Base64Source(
type="base64",
media_type=part.root.file.mime_type
or "application/octet-stream",
data=part.root.file.bytes,
),
}
raise ValueError(
f"Unsupported File type: {type(part.root.file)} in A2A"
"message.",
)
if isinstance(part.root, DataPart):
# Maybe the tool use and tool result blocks
if {
"type",
"name",
"input",
"id",
} <= part.root.data.keys() and part.root.data[
"type"
] == "tool_use":
return part.root.data
if {
"type",
"name",
"output",
"id",
} <= part.root.data.keys() and part.root.data[
"type"
] == "tool_result":
return part.root.data
# TODO: what about the other data parts?
return TextBlock(
type="text",
text=str(part.root.data),
)
raise ValueError(
f"Unsupported Part type: {type(part.root)} in A2A message"
f": {part.root}",
)