Source code for agentscope.tts._dashscope_realtime_tts_model

# -*- coding: utf-8 -*-
"""DashScope Realtime TTS model implementation."""

import threading
from typing import Any, Literal, TYPE_CHECKING, AsyncGenerator

from ._tts_base import TTSModelBase
from ._tts_response import TTSResponse
from ..message import Msg, AudioBlock, Base64Source
from ..types import JSONSerializableObject

if TYPE_CHECKING:
    from dashscope.audio.qwen_tts_realtime import (
        QwenTtsRealtime,
        QwenTtsRealtimeCallback,
    )
else:
    QwenTtsRealtime = "dashscope.audio.qwen_tts_realtime.QwenTtsRealtime"
    QwenTtsRealtimeCallback = (
        "dashscope.audio.qwen_tts_realtime.QwenTtsRealtimeCallback"
    )


def _get_qwen_tts_realtime_callback_class() -> type["QwenTtsRealtimeCallback"]:
    from dashscope.audio.qwen_tts_realtime import QwenTtsRealtimeCallback

    class _DashScopeRealtimeTTSCallback(QwenTtsRealtimeCallback):
        """DashScope Realtime TTS callback."""

        def __init__(self) -> None:
            """Initialize the DashScope Realtime TTS callback."""
            super().__init__()

            # The event that will be set when a new audio chunk is received
            self.chunk_event = threading.Event()
            # The event that will be set when the TTS synthesis is finished
            self.finish_event = threading.Event()
            # Cache the audio data
            self._audio_data: str = ""

        def on_event(self, response: dict[str, Any]) -> None:
            """Called when a TTS event is received (DashScope SDK callback).

            Args:
                response (`dict[str, Any]`):
                    The event response dictionary.
            """
            try:
                event_type = response.get("type")

                if event_type == "session.created":
                    self._audio_data = ""
                    self.finish_event.clear()

                elif event_type == "response.audio.delta":
                    audio_data = response.get("delta")
                    if audio_data:
                        # Process audio data in thread callback
                        if isinstance(audio_data, bytes):
                            import base64

                            audio_data = base64.b64encode(audio_data).decode()

                        # Accumulate audio data
                        self._audio_data += audio_data

                        # Signal that a new audio chunk is available
                        if not self.chunk_event.is_set():
                            self.chunk_event.set()

                elif event_type == "response.done":
                    # Response completed, can be used for metrics
                    pass

                elif event_type == "session.finished":
                    self.chunk_event.set()
                    self.finish_event.set()

            except Exception:
                import traceback

                traceback.print_exc()
                self.finish_event.set()

        async def get_audio_data(self, block: bool) -> TTSResponse:
            """Get the current accumulated audio data as base64 string so far.

            Returns:
                `str`:
                    The base64-encoded audio data.
            """
            # Block until synthesis is finished
            if block:
                self.finish_event.wait()

            # Return the accumulated audio data
            if self._audio_data:
                return TTSResponse(
                    content=AudioBlock(
                        type="audio",
                        source=Base64Source(
                            type="base64",
                            data=self._audio_data,
                            media_type="audio/pcm;rate=24000",
                        ),
                    ),
                )

            # Reset for next tts request
            await self._reset()

            # Return empty response if no audio data
            return TTSResponse(content=None)

        async def get_audio_chunk(self) -> AsyncGenerator[TTSResponse, None]:
            """Get the audio data chunk as an async generator of `TTSResponse`
            objects.

            Returns:
                `AsyncGenerator[TTSResponse, None]`:
                    The async generator yielding TTSResponse with audio chunks.
            """
            while True:
                if self.finish_event.is_set():
                    yield TTSResponse(
                        content=AudioBlock(
                            type="audio",
                            source=Base64Source(
                                type="base64",
                                data=self._audio_data,
                                media_type="audio/pcm;rate=24000",
                            ),
                        ),
                        is_last=True,
                    )

                    # Reset for next tts request
                    await self._reset()

                    break

                if self.chunk_event.is_set():
                    # Clear the event for next chunk
                    self.chunk_event.clear()
                else:
                    # Wait for the next chunk
                    self.chunk_event.wait()

                yield TTSResponse(
                    content=AudioBlock(
                        type="audio",
                        source=Base64Source(
                            type="base64",
                            data=self._audio_data,
                            media_type="audio/pcm;rate=24000",
                        ),
                    ),
                    is_last=False,
                )

        async def _reset(self) -> None:
            """Reset the callback state for a new TTS request."""
            self.finish_event.clear()
            self.chunk_event.clear()
            self._audio_data = ""

    return _DashScopeRealtimeTTSCallback


[docs] class DashScopeRealtimeTTSModel(TTSModelBase): """TTS implementation for DashScope Qwen Realtime TTS API, which supports streaming input. The supported models include "qwen-3-tts-flash-realtime", "qwen-tts-realtime", etc. For more details, please see the `official document <https://bailian.console.aliyun.com/?tab=doc#/doc/?type=model&url=2938790>`_. .. note:: The DashScopeRealtimeTTSModel can only handle one streaming input request at a time, and cannot process multiple streaming input requests concurrently. For example, it cannot handle input sequences like `[msg_1_chunk0, msg_1_chunk1, msg_2_chunk0]`, where the prefixes "msg_x" indicate different streaming input requests. """ supports_streaming_input: bool = True """Whether the model supports streaming input."""
[docs] def __init__( self, api_key: str, model_name: str = "qwen3-tts-flash-realtime", voice: Literal["Cherry", "Nofish", "Ethan", "Jennifer"] | str = "Cherry", stream: bool = True, cold_start_length: int | None = None, cold_start_words: int | None = None, client_kwargs: dict[str, JSONSerializableObject] | None = None, generate_kwargs: dict[str, JSONSerializableObject] | None = None, ) -> None: """Initialize the DashScope TTS model by specifying the model, voice, and other parameters. .. note:: More details about the parameters, such as `model_name`, `voice`, and `mode` can be found in the `official document <https://bailian.console.aliyun.com/?tab=doc#/doc/?type=model&url=2938790>`_. .. note:: You can use `cold_start_length` and `cold_start_words` simultaneously to set both character and word thresholds for the first TTS request. For Chinese text, word segmentation (based on spaces) may not be effective. Args: api_key (`str`): The DashScope API key. model_name (`str`, defaults to "qwen-tts-realtime"): The TTS model name, e.g. "qwen3-tts-flash-realtime", "qwen-tts-realtime", etc. voice (`Literal["Cherry", "Serena", "Ethan", "Chelsie"] | str`, \ defaults to "Cherry".): The voice to use for synthesis. Refer to `official document <https://bailian.console.aliyun.com/?tab=doc#/doc/?type=model&url=2938790>`_ for the supported voices for each model. stream (`bool`, defaults to `True`): Whether to use streaming synthesis. cold_start_length (`int | None`, optional): The minimum length send threshold for the first TTS request, ensuring there is no pause in the synthesized speech for too short input text. The length is measured in number of characters. cold_start_words (`int | None`, optional): The minimum words send threshold for the first TTS request, ensuring there is no pause in the synthesized speech for too short input text. The words are identified by spaces in the text. client_kwargs (`dict[str, JSONSerializableObject] | None`, \ optional): The extra keyword arguments to initialize the DashScope realtime tts client. generate_kwargs (`dict[str, JSONSerializableObject] | None`, \ optional): The extra keyword arguments used in DashScope realtime tts API generation. """ super().__init__(model_name=model_name, stream=stream) import dashscope from dashscope.audio.qwen_tts_realtime import QwenTtsRealtime dashscope.api_key = api_key # Store configuration self.voice = voice self.mode = "server_commit" self.cold_start_length = cold_start_length self.cold_start_words = cold_start_words self.client_kwargs = client_kwargs or {} self.generate_kwargs = generate_kwargs or {} # Initialize TTS client # Save callback reference (for DashScope SDK) self._dashscope_callback = _get_qwen_tts_realtime_callback_class()() self._tts_client: QwenTtsRealtime = QwenTtsRealtime( model=self.model_name, callback=self._dashscope_callback, **self.client_kwargs, ) self._connected = False # The variables for tracking streaming input messages # If we have sent text for the current message self._first_send: bool = True # The current message ID being processed self._current_msg_id: str | None = None # The current prefix text already sent self._current_prefix: str = ""
[docs] async def connect(self) -> None: """Initialize the DashScope TTS model and establish connection.""" if self._connected: return self._tts_client.connect() # Update session with voice and format settings self._tts_client.update_session( voice=self.voice, mode=self.mode, **self.generate_kwargs, ) self._connected = True
[docs] async def close(self) -> None: """Close the TTS model and clean up resources.""" if not self._connected: return self._connected = False self._tts_client.finish() self._tts_client.close()
[docs] async def push( self, msg: Msg, **kwargs: Any, ) -> TTSResponse: """Append text to be synthesized and return the received TTS response. Note this method is non-blocking, and maybe return an empty response if no audio is received yet. To receive all the synthesized speech, call the `synthesize` method after pushing all the text chunks. Args: msg (`Msg`): The message to be synthesized. The `msg.id` identifies the streaming input request. **kwargs (`Any`): Additional keyword arguments to pass to the TTS API call. Returns: `TTSResponse`: The TTSResponse containing audio blocks. """ if not self._connected: raise RuntimeError( "TTS model is not connected. Call `connect()` first.", ) if self._current_msg_id is not None and self._current_msg_id != msg.id: raise RuntimeError( "DashScopeRealtimeTTSModel can only handle one streaming " "input request at a time. Please ensure that all chunks " "belong to the same message ID.", ) # Record current message ID self._current_msg_id = msg.id text = msg.get_text_content() # Determine if we should send text based on cold start settings only # for the first input chunk and not the last chunk if text: if self._first_send: # If we have cold start settings if self.cold_start_length: if len(text) < self.cold_start_length: delta_to_send = "" else: delta_to_send = text else: delta_to_send = text if delta_to_send and self.cold_start_words: if len(delta_to_send.split()) < self.cold_start_words: delta_to_send = "" else: # Remove the already sent prefix if not the first send delta_to_send = text.removeprefix(self._current_prefix) if delta_to_send: self._tts_client.append_text(delta_to_send) # Record sent prefix self._current_prefix += delta_to_send self._first_send = False # Wait for the audio data to be available res = await self._dashscope_callback.get_audio_data(block=False) return res # Return empty response if no text to send return TTSResponse(content=None)
[docs] async def synthesize( self, msg: Msg | None = None, **kwargs: Any, ) -> TTSResponse | AsyncGenerator[TTSResponse, None]: """Append text to be synthesized and return TTS response. Args: msg (`Msg | None`, optional): The message to be synthesized. **kwargs (`Any`): Additional keyword arguments to pass to the TTS API call. Returns: `TTSResponse | AsyncGenerator[TTSResponse, None]`: The TTSResponse object in non-streaming mode, or an async generator yielding TTSResponse objects in streaming mode. """ if not self._connected: raise RuntimeError( "TTS model is not connected. Call `connect()` first.", ) if self._current_msg_id is not None and self._current_msg_id != msg.id: raise RuntimeError( "DashScopeRealtimeTTSModel can only handle one streaming " "input request at a time. Please ensure that all chunks " "belong to the same message ID.", ) if msg is None: delta_to_send = "" else: # Record current message ID self._current_msg_id = msg.id delta_to_send = (msg.get_text_content() or "").removeprefix( self._current_prefix, ) # Determine if we should send text based on cold start settings only # for the first input chunk and not the last chunk if delta_to_send: self._tts_client.append_text(delta_to_send) # To keep correct prefix tracking self._current_prefix += delta_to_send self._first_send = False # We need to block until synthesis is complete to get all audio self._tts_client.commit() self._tts_client.finish() if self.stream: # Return an async generator for audio chunks res = self._dashscope_callback.get_audio_chunk() else: # Block and wait for all audio data to be available res = await self._dashscope_callback.get_audio_data(block=True) # Update state for next message self._current_msg_id = None self._first_send = True self._current_prefix = "" return res