# -*- coding: utf-8 -*-
"""The dashscope API model classes."""
import collections
from datetime import datetime
from http import HTTPStatus
from typing import (
Any,
AsyncGenerator,
Generator,
Union,
TYPE_CHECKING,
List,
Literal,
Type,
)
from pydantic import BaseModel
from aioitertools import iter as giter
from ._model_base import ChatModelBase
from ._model_response import ChatResponse
from ._model_usage import ChatUsage
from .._utils._common import (
_json_loads_with_repair,
_create_tool_from_base_model,
)
from ..message import TextBlock, ToolUseBlock, ThinkingBlock
from ..tracing import trace_llm
from ..types import JSONSerializableObject
from .._logging import logger
if TYPE_CHECKING:
from dashscope.api_entities.dashscope_response import GenerationResponse
from dashscope.api_entities.dashscope_response import (
MultiModalConversationResponse,
)
else:
GenerationResponse = (
"dashscope.api_entities.dashscope_response.GenerationResponse"
)
MultiModalConversationResponse = (
"dashscope.api_entities.dashscope_response."
"MultiModalConversationResponse"
)
[文档]
class DashScopeChatModel(ChatModelBase):
"""The DashScope chat model class, which unifies the Generation and
MultimodalConversation APIs into one method."""
[文档]
def __init__(
self,
model_name: str,
api_key: str,
stream: bool = True,
enable_thinking: bool | None = None,
generate_kwargs: dict[str, JSONSerializableObject] | None = None,
base_http_api_url: str | None = None,
) -> None:
"""Initialize the DashScope chat model.
Args:
model_name (`str`):
The model names.
api_key (`str`):
The dashscope API key.
stream (`bool`):
The streaming output or not
enable_thinking (`bool | None`, optional):
Enable thinking or not, only support Qwen3, QwQ, DeepSeek-R1.
Refer to `DashScope documentation
<https://help.aliyun.com/zh/model-studio/deep-thinking>`_
for more details.
generate_kwargs (`dict[str, JSONSerializableObject] | None`, \
optional):
The extra keyword arguments used in DashScope API generation,
e.g. `temperature`, `seed`.
base_http_api_url (`str | None`, optional):
The base URL for DashScope API requests. If not provided,
the default base URL from the DashScope SDK will be used.
"""
if enable_thinking and not stream:
logger.info(
"In DashScope API, `stream` must be True when "
"`enable_thinking` is True. ",
)
stream = True
super().__init__(model_name, stream)
self.api_key = api_key
self.enable_thinking = enable_thinking
self.generate_kwargs = generate_kwargs or {}
if base_http_api_url is not None:
import dashscope
dashscope.base_http_api_url = base_http_api_url
[文档]
@trace_llm
async def __call__(
self,
messages: list[dict[str, Any]],
tools: list[dict] | None = None,
tool_choice: Literal["auto", "none", "any", "required"]
| str
| None = None,
structured_model: Type[BaseModel] | None = None,
**kwargs: Any,
) -> ChatResponse | AsyncGenerator[ChatResponse, None]:
"""Get the response from the dashscope
Generation/MultimodalConversation API by the given arguments.
.. note:: We unify the dashscope generation and multimodal conversation
APIs into one method, since they support similar arguments and share
the same functionality.
Args:
messages (`list[dict[str, Any]]`):
A list of dictionaries, where `role` and `content` fields are
required.
tools (`list[dict] | None`, default `None`):
The tools JSON schemas that the model can use.
tool_choice (`Literal["auto", "none", "any", "required"] | str \
| None`, default `None`):
Controls which (if any) tool is called by the model.
Can be "auto", "none", or specific tool name.
For more details, please refer to
https://help.aliyun.com/zh/model-studio/qwen-function-calling
structured_model (`Type[BaseModel] | None`, default `None`):
A Pydantic BaseModel class that defines the expected structure
for the model's output. When provided, the model will be forced
to return data that conforms to this schema by automatically
converting the BaseModel to a tool function and setting
`tool_choice` to enforce its usage. This enables structured
output generation.
.. note:: When `structured_model` is specified,
both `tools` and `tool_choice` parameters are ignored,
and the model will only perform structured output
generation without calling any other tools.
**kwargs (`Any`):
The keyword arguments for DashScope chat completions API,
e.g. `temperature`, `max_tokens`, `top_p`, etc. Please
refer to `DashScope documentation
<https://help.aliyun.com/zh/dashscope/developer-reference/api-details>`_
for more detailed arguments.
"""
import dashscope
# For qvq and qwen-vl models, the content field cannot be `None` or
# `[{"text": None}]`, so we need to convert it to an empty list.
if self.model_name.startswith("qvq") or "-vl" in self.model_name:
for msg in messages:
if msg["content"] is None or msg["content"] == [
{"text": None},
]:
msg["content"] = []
kwargs = {
"messages": messages,
"model": self.model_name,
"stream": self.stream,
**self.generate_kwargs,
**kwargs,
"result_format": "message",
# In agentscope, the `incremental_output` must be `True` when
# `self.stream` is True
"incremental_output": self.stream,
}
if tools:
kwargs["tools"] = self._format_tools_json_schemas(tools)
if tool_choice:
self._validate_tool_choice(tool_choice, tools)
kwargs["tool_choice"] = self._format_tool_choice(tool_choice)
if self.enable_thinking and "enable_thinking" not in kwargs:
kwargs["enable_thinking"] = self.enable_thinking
if structured_model:
if tools or tool_choice:
logger.warning(
"structured_model is provided. Both 'tools' and "
"'tool_choice' parameters will be overridden and "
"ignored. The model will only perform structured output "
"generation without calling any other tools.",
)
format_tool = _create_tool_from_base_model(structured_model)
kwargs["tools"] = self._format_tools_json_schemas(
[format_tool],
)
kwargs["tool_choice"] = self._format_tool_choice(
format_tool["function"]["name"],
)
start_datetime = datetime.now()
if self.model_name.startswith("qvq") or "-vl" in self.model_name:
response = dashscope.MultiModalConversation.call(
api_key=self.api_key,
**kwargs,
)
else:
response = await dashscope.aigc.generation.AioGeneration.call(
api_key=self.api_key,
**kwargs,
)
if self.stream:
return self._parse_dashscope_stream_response(
start_datetime,
response,
structured_model,
)
parsed_response = await self._parse_dashscope_generation_response(
start_datetime,
response,
structured_model,
)
return parsed_response
# pylint: disable=too-many-branches
async def _parse_dashscope_stream_response(
self,
start_datetime: datetime,
response: Union[
AsyncGenerator[GenerationResponse, None],
Generator[MultiModalConversationResponse, None, None],
],
structured_model: Type[BaseModel] | None = None,
) -> AsyncGenerator[ChatResponse, Any]:
"""Given a DashScope streaming response generator, extract the content
blocks and usages from it and yield ChatResponse objects.
Args:
start_datetime (`datetime`):
The start datetime of the response generation.
response (
`Union[AsyncGenerator[GenerationResponse, None], Generator[ \
MultiModalConversationResponse, None, None]]`
):
DashScope streaming response generator (GenerationResponse or
MultiModalConversationResponse) to parse.
structured_model (`Type[BaseModel] | None`, default `None`):
A Pydantic BaseModel class that defines the expected structure
for the model's output.
Returns:
AsyncGenerator[ChatResponse, Any]:
An async generator that yields ChatResponse objects containing
the content blocks and usage information for each chunk in the
streaming response.
.. note::
If `structured_model` is not `None`, the expected structured output
will be stored in the metadata of the `ChatResponse`.
"""
acc_content, acc_thinking_content = "", ""
acc_tool_calls = collections.defaultdict(dict)
metadata = None
async for chunk in giter(response):
if chunk.status_code != HTTPStatus.OK:
raise RuntimeError(
f"Failed to get response from _ API: {chunk}",
)
message = chunk.output.choices[0].message
# Update reasoning content
if isinstance(message.get("reasoning_content"), str):
acc_thinking_content += message["reasoning_content"]
# Update text content
if isinstance(message.content, str):
acc_content += message.content
elif isinstance(message.content, list):
for item in message.content:
if isinstance(item, dict) and "text" in item:
acc_content += item["text"]
# Update tool calls
for tool_call in message.get("tool_calls", []):
index = tool_call.get("index", 0)
if "id" in tool_call and tool_call["id"] != acc_tool_calls[
index
].get("id"):
acc_tool_calls[index]["id"] = (
acc_tool_calls[index].get("id", "") + tool_call["id"]
)
if "function" in tool_call:
func = tool_call["function"]
if "name" in func:
acc_tool_calls[index]["name"] = (
acc_tool_calls[index].get("name", "")
+ func["name"]
)
if "arguments" in func:
acc_tool_calls[index]["arguments"] = (
acc_tool_calls[index].get("arguments", "")
+ func["arguments"]
)
# to content blocks
content_blocks: list[TextBlock | ToolUseBlock | ThinkingBlock] = []
if acc_thinking_content:
content_blocks.append(
ThinkingBlock(
type="thinking",
thinking=acc_thinking_content,
),
)
if acc_content:
content_blocks.append(
TextBlock(
type="text",
text=acc_content,
),
)
for tool_call in acc_tool_calls.values():
repaired_input = _json_loads_with_repair(
tool_call.get("arguments", "{}") or "{}",
)
if not isinstance(repaired_input, dict):
repaired_input = {}
content_blocks.append(
ToolUseBlock(
type="tool_use",
id=tool_call.get("id", ""),
name=tool_call.get("name", ""),
input=repaired_input,
),
)
if structured_model:
metadata = repaired_input
usage = None
if chunk.usage:
usage = ChatUsage(
input_tokens=chunk.usage.input_tokens,
output_tokens=chunk.usage.output_tokens,
time=(datetime.now() - start_datetime).total_seconds(),
)
parsed_chunk = ChatResponse(
content=content_blocks,
usage=usage,
metadata=metadata,
)
yield parsed_chunk
async def _parse_dashscope_generation_response(
self,
start_datetime: datetime,
response: Union[
GenerationResponse,
MultiModalConversationResponse,
],
structured_model: Type[BaseModel] | None = None,
) -> ChatResponse:
"""Given a DashScope GenerationResponse object, extract the content
blocks and usages from it.
Args:
start_datetime (`datetime`):
The start datetime of the response generation.
response (
`Union[GenerationResponse, MultiModalConversationResponse]`
):
Dashscope GenerationResponse | MultiModalConversationResponse
object to parse.
structured_model (`Type[BaseModel] | None`, default `None`):
A Pydantic BaseModel class that defines the expected structure
for the model's output.
Returns:
ChatResponse (`ChatResponse`):
A ChatResponse object containing the content blocks and usage.
.. note::
If `structured_model` is not `None`, the expected structured output
will be stored in the metadata of the `ChatResponse`.
"""
# Collect the content blocks from the response.
if response.status_code != 200:
raise RuntimeError(response)
content_blocks: List[TextBlock | ToolUseBlock] = []
metadata = None
message = response.output.choices[0].message
content = message.get("content")
if response.output.choices[0].message.get("content") not in [
None,
"",
[],
]:
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and "text" in item:
content_blocks.append(
TextBlock(
type="text",
text=item["text"],
),
)
else:
content_blocks.append(
TextBlock(
type="text",
text=content,
),
)
if message.get("tool_calls"):
for tool_call in message["tool_calls"]:
input_ = _json_loads_with_repair(
tool_call["function"].get(
"arguments",
"{}",
)
or "{}",
)
content_blocks.append(
ToolUseBlock(
type="tool_use",
name=tool_call["function"]["name"],
input=input_,
id=tool_call["id"],
),
)
if structured_model:
metadata = input_
# Usage information
usage = None
if response.usage:
usage = ChatUsage(
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
time=(datetime.now() - start_datetime).total_seconds(),
)
parsed_response = ChatResponse(
content=content_blocks,
usage=usage,
metadata=metadata,
)
return parsed_response
def _format_tools_json_schemas(
self,
schemas: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Format the tools JSON schema into required format for DashScope API.
Args:
schemas (`dict[str, dict[str, Any]]`):
The tools JSON schemas.
"""
# Check schemas format
for value in schemas:
if (
not isinstance(value, dict)
or "type" not in value
or value["type"] != "function"
or "function" not in value
):
raise ValueError(
f"Each schema must be a dict with 'type' as 'function' "
f"and 'function' key, got {value}",
)
return schemas
def _format_tool_choice(
self,
tool_choice: Literal["auto", "none", "any", "required"] | str | None,
) -> str | dict | None:
"""Format tool_choice parameter for API compatibility.
Args:
tool_choice (`Literal["auto", "none", "any", "required"] | str \
| None`, default `None`):
Controls which (if any) tool is called by the model.
Can be "auto", "none", or specific tool name.
For more details, please refer to
https://help.aliyun.com/zh/model-studio/qwen-function-calling
Returns:
`dict | None`:
The formatted tool choice configuration dict, or None if
tool_choice is None.
"""
if tool_choice is None:
return None
if tool_choice in ["auto", "none"]:
return tool_choice
if tool_choice in ["any", "required"]:
logger.warning(
"tool_choice '%s' is not supported by DashScope API. "
"Supported options are 'auto', 'none', or specific function "
"name. Automatically using 'auto' instead.",
tool_choice,
)
return "auto"
return {"type": "function", "function": {"name": tool_choice}}