# -*- coding: utf-8 -*-"""The OpenAI text embedding model class."""fromdatetimeimportdatetimefromtypingimportAny,Listfrom._embedding_responseimportEmbeddingResponsefrom._embedding_usageimportEmbeddingUsagefrom._cache_baseimportEmbeddingCacheBasefrom._embedding_baseimportEmbeddingModelBasefrom..messageimportTextBlock
[文档]classOpenAITextEmbedding(EmbeddingModelBase):"""OpenAI text embedding model class."""supported_modalities:list[str]=["text"]"""This class only supports text input."""
[文档]def__init__(self,api_key:str,model_name:str,dimensions:int=1024,embedding_cache:EmbeddingCacheBase|None=None,**kwargs:Any,)->None:"""Initialize the OpenAI text embedding model class. Args: api_key (`str`): The OpenAI API key. model_name (`str`): The name of the embedding model. dimensions (`int`, defaults to 1024): The dimension of the embedding vector. embedding_cache (`EmbeddingCacheBase | None`, defaults to `None`): The embedding cache class instance, used to cache the embedding results to avoid repeated API calls. # TODO: handle batch size limit and token limit """importopenaisuper().__init__(model_name,dimensions)self.client=openai.AsyncClient(api_key=api_key,**kwargs)self.embedding_cache=embedding_cache
[文档]asyncdef__call__(self,text:List[str|TextBlock],**kwargs:Any,)->EmbeddingResponse:"""Call the OpenAI embedding API. Args: text (`List[str | TextBlock]`): The input text to be embedded. It can be a list of strings. """gather_text=[]for_intext:ifisinstance(_,dict)and"text"in_:gather_text.append(_["text"])elifisinstance(_,str):gather_text.append(_)else:raiseValueError("Input text must be a list of strings or TextBlock dicts.",)kwargs={"input":gather_text,"model":self.model_name,"dimensions":self.dimensions,"encoding_format":"float",**kwargs,}ifself.embedding_cache:cached_embeddings=awaitself.embedding_cache.retrieve(identifier=kwargs,)ifcached_embeddings:returnEmbeddingResponse(embeddings=cached_embeddings,usage=EmbeddingUsage(tokens=0,time=0,),source="cache",)start_time=datetime.now()response=awaitself.client.embeddings.create(**kwargs)time=(datetime.now()-start_time).total_seconds()ifself.embedding_cache:awaitself.embedding_cache.store(identifier=kwargs,embeddings=[_.embeddingfor_inresponse.data],)returnEmbeddingResponse(embeddings=[_.embeddingfor_inresponse.data],usage=EmbeddingUsage(tokens=response.usage.total_tokens,time=time,),)