# -*- coding: utf-8 -*-"""Async related modules."""fromtypingimportAnyfromconcurrent.futuresimportFuturefromloguruimportloggertry:importcloudpickleaspickleexceptImportErrorasimport_error:fromagentscope.utils.commonimportImportErrorReporterpickle=ImportErrorReporter(import_error,"distribute")from..messageimportMsgfrom.rpc_clientimportRpcClientfrom..utils.commonimport_is_web_urlfrom.retry_strategyimportRetryBase,_DEFAULT_RETRY_STRATEGY
[docs]classAsyncResult:"""Use this class to get the the async result from rpc server."""def__init__(self,host:str,port:int,task_id:int=None,stub:Future=None,retry:RetryBase=_DEFAULT_RETRY_STRATEGY,)->None:self._host=hostself._port=portself._stub=Noneself._retry=retryself._task_id:int=Noneiftask_idisnotNone:self._task_id=task_idelse:self._stub=stubself._ready=Falseself._data=Nonedef_fetch_result(self,)->None:"""Fetch result from the server."""ifself._task_idisNone:self._task_id=self._get_task_id()self._data=pickle.loads(RpcClient(self._host,self._port).update_result(self._task_id,retry=self._retry,),)# NOTE: its a hack here to download files# TODO: opt thisself._check_and_download_files()self._ready=True
[docs]defupdate_value(self)->None:"""Update the value. For compatibility with old version."""self._fetch_result()
def_get_task_id(self)->str:"""get the task_id."""try:returnself._stub.result()exceptExceptionase:logger.error(f"Failed to get task_id: {self._stub.result()}",)raiseValueError(f"Failed to get task_id: {self._stub.result()}",)fromedef_download(self,url:str)->str:ifnot_is_web_url(url):client=RpcClient(self._host,self._port)returnclient.download_file(path=url)else:returnurldef_check_and_download_files(self)->None:"""Check whether the urls are accessible. If not, download them from rpc server."""ifisinstance(self._data,Msg):ifisinstance(self._data.content,list):fori,blockinenumerate(self._data.content):ifblock["type"]in["image","audio","video","file"]:self._data.content[i]["url"]=self._download(block["url"],)
[docs]defresult(self)->Any:"""Get the result."""ifnotself._ready:self._fetch_result()returnself._data